feat(scripts): wire S3 geospatial + Google Solar into run_modelling_e2e

Per Property the inspection script now resolves the UPRN's spatial
reference from the Ordnance Survey Open-UPRN parquet in S3
(GeospatialS3Repository over a boto3 ParquetReader) and threads both
levers into run_modelling:

- planning_restrictions: the conservation/listed/heritage flags that gate
  the wall + solar measures (ADR-0019/0020).
- solar_insights: a live Google Solar buildingInsights fetch keyed on the
  reference coordinates, so the Solar PV Options can fire (ADR-0026).

Mirrors IngestionOrchestrator._fetch's coords->solar flow. Degrades
gracefully per Property: a UPRN S3 doesn't cover -> unrestricted/no-solar;
a point Google has no coverage for (BuildingInsightsNotFoundError) ->
no-solar; both still modelled. --no-solar skips the Google leg. A context
note (restrictions; solar) is printed and written to the md/csv summary.

Verified live: spatial_for + solar fetch round-trip on real UPRNs (S3 via
ambient ~/.aws creds, pyarrow reads parquet bytes). pyright clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-08 14:55:33 +00:00
parent 0918dd37ec
commit 1b4806f8e4

View file

@ -10,35 +10,55 @@ Plan). It is read-only on the DB (just the UPRN lookup) and persists nothing —
purely for inspecting recommendations. Prints a per-Property plan table and
writes a Markdown + CSV summary.
Config: loads `backend/.env` for the DB creds (`DB_*`) and the EPC API token
(`EPC_AUTH_TOKEN`) the agent never sees the secrets. Run from the worktree
root so imports resolve to this checkout:
Config: loads `backend/.env` for the DB creds (`DB_*`), the EPC API token
(`EPC_AUTH_TOKEN`), the Google Solar key (`GOOGLE_SOLAR_API_KEY`) and the S3
reference bucket (`DATA_BUCKET`) the agent never sees the secrets. AWS creds
come from the ambient `~/.aws` profile. Run from the worktree root so imports
resolve to this checkout:
python -m scripts.run_modelling_e2e 115 116 117 # goal band C (default)
python -m scripts.run_modelling_e2e --goal B 115 116 117 # a different target band
python -m scripts.run_modelling_e2e --no-solar 115 116 # skip the Google Solar leg
Not yet wired (follow-ups): Google Solar potential (needs the Property's
coordinates from the geospatial/S3 layer, absent on the `property` row) so the
Solar PV Options don't fire here yet; planning restrictions default to
unrestricted (the conservation/listed gates aren't read).
Per Property the script resolves the UPRN's spatial reference from the Ordnance
Survey Open-UPRN parquet in S3 (`GeospatialS3Repository`): the planning
protections (conservation/listed/heritage) gate the wall + solar measures, and
the coordinates drive a live Google Solar `buildingInsights` fetch so the Solar
PV Options can fire (ADR-0026). Buildings S3 doesn't cover, or that Google has
no solar coverage for, fall back to unrestricted / no-solar and are still
modelled. Pass `--no-solar` to skip the Google leg entirely.
"""
from __future__ import annotations
import argparse
import io
import os
import sys
from pathlib import Path
from typing import Optional
from typing import Any, Optional, cast
import boto3
import pandas as pd
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from datatypes.epc.domain.epc_property_data import EpcPropertyData # noqa: E402
from domain.geospatial.planning_restrictions import PlanningRestrictions # noqa: E402
from domain.geospatial.spatial_reference import SpatialReference # noqa: E402
from domain.modelling.plan import Plan, PlanMeasure # noqa: E402
from harness.console import DEFAULT_CATALOGUE, run_modelling # noqa: E402
from harness.plan_table import format_plan_table # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from infrastructure.solar.google_solar_api_client import ( # noqa: E402
BuildingInsightsNotFoundError,
GoogleSolarApiClient,
)
from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
ParquetReader,
)
from sqlalchemy import create_engine, text # noqa: E402
_ENV_PATH = _REPO_ROOT / "backend" / ".env"
@ -68,6 +88,48 @@ def _db_url() -> str:
)
def _s3_parquet_reader(bucket: str) -> ParquetReader:
"""A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the
`GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile;
pyarrow reads the parquet bytes (s3fs is not installed here)."""
# boto3 ships only partial type stubs, so the client is an untyped boundary.
client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType]
def read(key: str) -> pd.DataFrame:
body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read())
return pd.read_parquet(io.BytesIO(body))
return read
def _spatial_for(
repo: GeospatialS3Repository, uprn: int
) -> Optional[SpatialReference]:
"""The UPRN's spatial reference (coordinates + planning protections), or
None when S3 doesn't cover it — a missing reference must not abort the run,
so a lookup error degrades to None (unrestricted, no solar)."""
try:
return repo.spatial_for(uprn)
except Exception as error: # noqa: BLE001 — S3/parquet hiccup is non-fatal
print(f" spatial lookup failed for uprn {uprn}: {type(error).__name__}: {error}")
return None
def _solar_insights_for(
solar_client: GoogleSolarApiClient, spatial: Optional[SpatialReference]
) -> Optional[dict[str, Any]]:
"""The raw Google Solar `buildingInsights` for the reference's coordinates,
or None when there are no coordinates / Google has no coverage there."""
if spatial is None or spatial.coordinates is None:
return None
try:
return solar_client.get_building_insights(
spatial.coordinates.longitude, spatial.coordinates.latitude
)
except BuildingInsightsNotFoundError:
return None # no Google solar coverage at this point — model without it
def _uprns_for(property_ids: list[int]) -> dict[int, Optional[int]]:
"""Read each Property's UPRN from the DB (read-only)."""
engine = create_engine(
@ -81,6 +143,28 @@ def _uprns_for(property_ids: list[int]) -> dict[int, Optional[int]]:
return {int(pid): (int(uprn) if uprn is not None else None) for pid, uprn in rows}
def _context_summary(
spatial: Optional[SpatialReference], solar_insights: Optional[dict[str, Any]]
) -> str:
"""A one-line note on what the geospatial leg contributed: which planning
protections gated the measures, and whether Google Solar potential fired."""
if spatial is None:
restrictions_note = "no spatial reference"
else:
flags = [
name
for name, on in (
("conservation", spatial.restrictions.in_conservation_area),
("listed", spatial.restrictions.is_listed),
("heritage", spatial.restrictions.is_heritage),
)
if on
]
restrictions_note = ", ".join(flags) if flags else "unrestricted"
solar_note = "solar ✓" if solar_insights is not None else "no solar"
return f"{restrictions_note}; {solar_note}"
def _measure_summary(measure: PlanMeasure) -> str:
return (
f" - {measure.measure_type}: "
@ -93,10 +177,17 @@ def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("property_ids", type=int, nargs="+", help="Property ids to model")
parser.add_argument("--goal", default="C", help="target EPC band (default C)")
parser.add_argument(
"--no-solar",
action="store_true",
help="skip the live Google Solar fetch (no Solar PV Options)",
)
args = parser.parse_args()
_load_env(_ENV_PATH)
epc_client = EpcClientService(os.environ["EPC_AUTH_TOKEN"])
geospatial = GeospatialS3Repository(_s3_parquet_reader(os.environ["DATA_BUCKET"]))
solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"])
uprns = _uprns_for(args.property_ids)
print(
@ -117,10 +208,19 @@ def main() -> None:
epc: Optional[EpcPropertyData] = epc_client.get_by_uprn(uprn)
if epc is None:
raise ValueError(f"no EPC found for UPRN {uprn}")
spatial: Optional[SpatialReference] = _spatial_for(geospatial, uprn)
restrictions: PlanningRestrictions = (
spatial.restrictions if spatial is not None else PlanningRestrictions()
)
solar_insights: Optional[dict[str, Any]] = (
None if args.no_solar else _solar_insights_for(solar_client, spatial)
)
plan: Plan = run_modelling(
epc,
goal_band=args.goal,
catalogue_path=DEFAULT_CATALOGUE,
planning_restrictions=restrictions,
solar_insights=solar_insights,
print_table=False,
)
except Exception as error: # noqa: BLE001 — one bad property must not stop the run
@ -131,10 +231,11 @@ def main() -> None:
continue
measure_types = [m.measure_type for m in plan.measures]
context = _context_summary(spatial, solar_insights)
header = (
f"=== Property {property_id} (uprn {uprn}) === "
f"SAP {plan.baseline.sap_continuous:.1f} -> {plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · £{plan.cost_of_works:,.0f}"
f"· {len(plan.measures)} measure(s) · £{plan.cost_of_works:,.0f} · {context}"
)
print(header)
print(format_plan_table(plan))
@ -143,7 +244,8 @@ def main() -> None:
md_lines.append(f"## Property {property_id} (uprn {uprn})\n")
md_lines.append(
f"SAP {plan.baseline.sap_continuous:.1f}{plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · cost £{plan.cost_of_works:,.0f}\n"
f"· {len(plan.measures)} measure(s) · cost £{plan.cost_of_works:,.0f} "
f"· {context}\n"
)
md_lines.extend(_measure_summary(m) for m in plan.measures)
md_lines.append("")