feat(scripts): DB-catalogue local run + optional --persist for run_modelling_e2e

Slice 5 (local run sources the DB, read-only) + slice 6 (optional persist),
landing together as one script rewrite (the persist path is interleaved with
the compute path).

The same local computation now runs whether or not the result is stored:
- Both modes price against the live `material` catalogue (read-only
  ProductPostgresRepository over one shared Session) and model against a real
  Scenario read from the DB (--scenario-id; its goal_value drives the band,
  rejected if null) — so the inspected recommendations are exactly what gets
  stored. The JSON sample catalogue is no longer used by this script.
- --measures restricts the run to a comma-separated considered_measures
  allowlist (e.g. high_heat_retention_storage_heaters,solar_pv).
- --persist writes the inputs (EPC + spatial + solar) and the *same* computed
  Plan via the production repos in one PostgresUnitOfWork, then commits
  (idempotent: PlanPostgresRepository replaces by (property_id, scenario_id)).
  Gated: --persist requires --scenario-id and --portfolio-id. Default is
  inspect-only — no DB writes.

harness.console.run_modelling gains `products` and `scenario` overrides (the
seam the script drives); defaults unchanged, so existing callers are
unaffected. Suite 257 pass + 3 xfail; pyright clean; --help/guard/measure
parsing verified. Not yet executed against the DB (awaiting property_ids +
write-confirm).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-08 20:45:50 +00:00
parent 62e1d4b813
commit 0f6077a830
2 changed files with 183 additions and 46 deletions

View file

@ -39,6 +39,7 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import (
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.product.product_json_repository import ProductJsonRepository
from repositories.product.product_repository import ProductRepository
from tests.orchestration.fakes import (
FakeEpcRepo,
FakePlanRepository,
@ -173,6 +174,8 @@ def run_modelling(
planning_restrictions: PlanningRestrictions = PlanningRestrictions(),
solar_insights: Optional[dict[str, Any]] = None,
considered_measures: Optional[frozenset[MeasureType]] = None,
products: Optional[ProductRepository] = None,
scenario: Optional[Scenario] = None,
print_table: bool = True,
) -> Plan:
"""Run ONLY the Modelling stage over ``epc`` with no database — skipping
@ -184,7 +187,22 @@ def run_modelling(
``solar_insights`` is the Property's raw Google Solar ``buildingInsights``
JSON (as persisted by ``SolarRepository``); when given, the solar
Recommendation Generator sees the dwelling's potential and can offer Solar
PV Options (ADR-0026)."""
PV Options (ADR-0026).
``products`` overrides the Product catalogue source (default: the JSON
sample catalogue) pass a read-only ``ProductPostgresRepository`` to price
against the live ``material`` table. ``scenario`` overrides the default
Increasing-EPC-to-``goal_band`` Scenario pass a Scenario read from the DB
so the run targets a real ``scenario_id`` (its ``goal_value``/budget drive
the Optimiser); the computed Plan is then keyed by that Scenario's id."""
scenario_obj = scenario or Scenario(
id=_SCENARIO_ID,
goal="Increasing EPC",
goal_value=goal_band,
budget=None,
is_default=True,
)
scenario_id = scenario_obj.id
plan_repo = FakePlanRepository()
property_repo = FakePropertyRepo(
{
@ -208,18 +226,8 @@ def run_modelling(
if solar_insights is not None
else None
),
scenario=FakeScenarioRepository(
{
_SCENARIO_ID: Scenario(
id=_SCENARIO_ID,
goal="Increasing EPC",
goal_value=goal_band,
budget=None,
is_default=True,
)
}
),
product=ProductJsonRepository(catalogue_path),
scenario=FakeScenarioRepository({scenario_id: scenario_obj}),
product=products or ProductJsonRepository(catalogue_path),
plan=plan_repo,
)
@ -229,12 +237,12 @@ def run_modelling(
fuel_rates=FuelRatesStaticFileRepository(),
).run(
property_ids=[_PROPERTY_ID],
scenario_ids=[_SCENARIO_ID],
scenario_ids=[scenario_id],
portfolio_id=_PORTFOLIO_ID,
considered_measures=considered_measures,
)
plan = plan_repo.saved[(_PROPERTY_ID, _SCENARIO_ID)]
plan = plan_repo.saved[(_PROPERTY_ID, scenario_id)]
if print_table:
print("\n" + format_plan_table(plan))
return plan

View file

@ -3,30 +3,41 @@ print the recommendations for inspection.
The local DB's Properties have no linked, ingested EPC yet (Ingestion's source
clients are still stubbed #1136), so this script does the ingestion step
inline for inspection: it reads each Property's UPRN from the DB, fetches the
latest EPC **live** from the gov EPC API by UPRN, then runs the Modelling stage
in memory (every Recommendation Generator the Optimiser a costed, attributed
Plan). It is read-only on the DB (just the UPRN lookup) and persists nothing
purely for inspecting recommendations. Prints a per-Property plan table and
writes a Markdown + CSV summary.
inline: it reads each Property's UPRN from the DB, fetches the latest EPC
**live** from the gov EPC API by UPRN, resolves the UPRN's spatial reference
from S3, and fetches Google Solar then runs the Modelling stage (every
Recommendation Generator the Optimiser a costed, attributed Plan). The same
local computation runs whether or not you store the result: by default it
persists **nothing** (the run is for inspecting recommendations); pass
`--persist` to write the inputs + the Plan to the DB.
To keep the inspected recommendations identical to what gets stored, **both
modes price against the live ``material`` catalogue (read-only)** and model
against a real **Scenario** read from the DB not the JSON sample catalogue.
Pass `--scenario-id` to target a real Scenario (its ``goal_value`` drives the
band); without it the run synthesises an Increasing-EPC-to-``--goal`` Scenario.
``--measures`` restricts the run to a comma-separated set of measure types
(mirroring the legacy `inclusions`) e.g. only HHRSH + Solar PV.
Config: loads `backend/.env` for the DB creds (`DB_*`), the EPC API token
(`EPC_AUTH_TOKEN`), the Google Solar key (`GOOGLE_SOLAR_API_KEY`) and the S3
reference bucket (`DATA_BUCKET`) the agent never sees the secrets. AWS creds
come from the ambient `~/.aws` profile. Run from the worktree root so imports
resolve to this checkout:
come from the ambient `~/.aws` profile. Run from the worktree root:
python -m scripts.run_modelling_e2e 115 116 117 # goal band C (default)
python -m scripts.run_modelling_e2e --goal B 115 116 117 # a different target band
python -m scripts.run_modelling_e2e --no-solar 115 116 # skip the Google Solar leg
# inspect only (no DB writes), HHRSH + Solar PV, against Scenario 1263:
python -m scripts.run_modelling_e2e --scenario-id 1263 \
--measures high_heat_retention_storage_heaters,solar_pv 115 116 117
# same run, but persist the Plans (needs --portfolio-id):
python -m scripts.run_modelling_e2e --scenario-id 1263 --portfolio-id 4 \
--measures high_heat_retention_storage_heaters,solar_pv --persist 115 116 117
python -m scripts.run_modelling_e2e --no-solar 115 116 # skip the Google leg
Per Property the script resolves the UPRN's spatial reference from the Ordnance
Survey Open-UPRN parquet in S3 (`GeospatialS3Repository`): the planning
protections (conservation/listed/heritage) gate the wall + solar measures, and
the coordinates drive a live Google Solar `buildingInsights` fetch so the Solar
PV Options can fire (ADR-0026). Buildings S3 doesn't cover, or that Google has
no solar coverage for, fall back to unrestricted / no-solar and are still
modelled. Pass `--no-solar` to skip the Google leg entirely.
Per Property the spatial reference (S3 Open-UPRN parquet) gives the planning
protections (conservation/listed/heritage gate the wall + solar measures) and
the coordinates that drive the Google Solar fetch (ADR-0026). Buildings S3
doesn't cover, or that Google has no solar coverage for, fall back to
unrestricted / no-solar and are still modelled. Pass `--no-solar` to skip the
Google leg.
"""
from __future__ import annotations
@ -47,8 +58,10 @@ sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import
from datatypes.epc.domain.epc_property_data import EpcPropertyData # noqa: E402
from domain.geospatial.planning_restrictions import PlanningRestrictions # noqa: E402
from domain.geospatial.spatial_reference import SpatialReference # noqa: E402
from domain.modelling.measure_type import MeasureType # noqa: E402
from domain.modelling.plan import Plan, PlanMeasure # noqa: E402
from harness.console import DEFAULT_CATALOGUE, run_modelling # noqa: E402
from domain.modelling.scenario import Scenario # noqa: E402
from harness.console import run_modelling # noqa: E402
from harness.plan_table import format_plan_table # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from infrastructure.solar.google_solar_api_client import ( # noqa: E402
@ -59,7 +72,15 @@ from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
ParquetReader,
)
from sqlalchemy import create_engine, text # noqa: E402
from repositories.product.product_postgres_repository import ( # noqa: E402
ProductPostgresRepository,
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402
from repositories.scenario.scenario_postgres_repository import ( # noqa: E402
ScenarioPostgresRepository,
)
from sqlalchemy import Engine, create_engine, text # noqa: E402
from sqlmodel import Session # noqa: E402
_ENV_PATH = _REPO_ROOT / "backend" / ".env"
_MARKDOWN_PATH = Path("modelling_e2e.md")
@ -130,11 +151,15 @@ def _solar_insights_for(
return None # no Google solar coverage at this point — model without it
def _uprns_for(property_ids: list[int]) -> dict[int, Optional[int]]:
"""Read each Property's UPRN from the DB (read-only)."""
engine = create_engine(
def _engine() -> Engine:
"""A connection-pooled engine to DevAssessmentModelDB (DB_* creds)."""
return create_engine(
_db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10}
)
def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[int]]:
"""Read each Property's UPRN from the DB (read-only)."""
with engine.connect() as conn:
rows = conn.execute(
text("SELECT id, uprn FROM property WHERE id = ANY(:ids)"),
@ -143,6 +168,26 @@ def _uprns_for(property_ids: list[int]) -> dict[int, Optional[int]]:
return {int(pid): (int(uprn) if uprn is not None else None) for pid, uprn in rows}
def _scenario_for(session: Session, scenario_id: int) -> Scenario:
"""Read the Scenario the run targets (read-only). An Increasing-EPC Scenario
must carry a ``goal_value`` (band) the old null-band rows were a fixed bug
and crash the Optimiser's target — so reject one that does not."""
scenario: Scenario = ScenarioPostgresRepository(session).get_many([scenario_id])[0]
if scenario.goal == "Increasing EPC" and not scenario.goal_value:
raise ValueError(
f"scenario {scenario_id} has no goal_value (band); pick a recent one"
)
return scenario
def _parse_measures(raw: Optional[str]) -> Optional[frozenset[MeasureType]]:
"""Parse `--measures a,b,c` into a `considered_measures` allowlist, or None
(consider every modelled measure) when unset. Raises on an unknown type."""
if raw is None:
return None
return frozenset(MeasureType(token.strip()) for token in raw.split(",") if token.strip())
def _context_summary(
spatial: Optional[SpatialReference], solar_insights: Optional[dict[str, Any]]
) -> str:
@ -173,10 +218,57 @@ def _measure_summary(measure: PlanMeasure) -> str:
)
def _persist(
engine: Engine,
*,
property_id: int,
uprn: int,
portfolio_id: int,
scenario: Scenario,
epc: EpcPropertyData,
spatial: Optional[SpatialReference],
solar_insights: Optional[dict[str, Any]],
plan: Plan,
) -> None:
"""Write the run's inputs (EPC + spatial + solar) and the computed Plan to
the DB in one Unit of Work, then commit. ``PlanPostgresRepository`` replaces
any existing Plan for ``(property_id, scenario.id)`` (idempotent re-run)."""
with PostgresUnitOfWork(lambda: Session(engine)) as uow:
uow.epc.save(epc, property_id=property_id, portfolio_id=portfolio_id)
if spatial is not None:
uow.spatial.save(uprn, spatial)
if solar_insights is not None:
uow.solar.save(property_id, solar_insights)
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
uow.commit()
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("property_ids", type=int, nargs="+", help="Property ids to model")
parser.add_argument("--goal", default="C", help="target EPC band (default C)")
parser.add_argument("--goal", default="C", help="target band when no --scenario-id (default C)")
parser.add_argument(
"--scenario-id", type=int, default=None, help="model against this DB Scenario"
)
parser.add_argument(
"--measures",
default=None,
help="comma-separated measure types to consider (default: all)",
)
parser.add_argument(
"--portfolio-id", type=int, default=None, help="portfolio id (required for --persist)"
)
parser.add_argument(
"--persist",
action="store_true",
help="WRITE the inputs + Plan to the DB (default: inspect only, no writes)",
)
parser.add_argument(
"--no-solar",
action="store_true",
@ -184,18 +276,39 @@ def main() -> None:
)
args = parser.parse_args()
if args.persist and (args.scenario_id is None or args.portfolio_id is None):
parser.error("--persist requires --scenario-id and --portfolio-id")
_load_env(_ENV_PATH)
epc_client = EpcClientService(os.environ["EPC_AUTH_TOKEN"])
geospatial = GeospatialS3Repository(_s3_parquet_reader(os.environ["DATA_BUCKET"]))
solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"])
uprns = _uprns_for(args.property_ids)
print(
f"modelling {len(args.property_ids)} propertie(s) (goal band {args.goal}); "
f"EPCs fetched live by UPRN, modelled in memory — no DB writes...\n"
engine = _engine()
considered = _parse_measures(args.measures)
uprns = _uprns_for(engine, args.property_ids)
# One read-only session for the live `material` catalogue, reused across the
# batch so both store and no-store runs price against the same DB rows.
catalogue_session = Session(engine)
products = ProductPostgresRepository(catalogue_session)
scenario: Optional[Scenario] = (
_scenario_for(catalogue_session, args.scenario_id)
if args.scenario_id is not None
else None
)
md_lines: list[str] = [f"# Modelling recommendations (goal band {args.goal})\n"]
target = (
f"scenario {scenario.id} (band {scenario.goal_value})"
if scenario is not None
else f"synthesised Increasing-EPC band {args.goal}"
)
measures_note = ",".join(sorted(considered)) if considered else "all measures"
mode = "PERSISTING to DB" if args.persist else "no DB writes"
print(
f"modelling {len(args.property_ids)} propertie(s) · {target} · {measures_note} · "
f"{mode} (DB material catalogue, live EPC/solar)...\n"
)
md_lines: list[str] = [f"# Modelling recommendations ({target}, {measures_note})\n"]
csv_rows: list[str] = [
"property_id,uprn,baseline_sap,post_sap,measures,measure_types,cost_of_works"
]
@ -218,11 +331,26 @@ def main() -> None:
plan: Plan = run_modelling(
epc,
goal_band=args.goal,
catalogue_path=DEFAULT_CATALOGUE,
planning_restrictions=restrictions,
solar_insights=solar_insights,
considered_measures=considered,
products=products,
scenario=scenario,
print_table=False,
)
if args.persist:
assert scenario is not None # guaranteed by the --persist guard
_persist(
engine,
property_id=property_id,
uprn=uprn,
portfolio_id=args.portfolio_id,
scenario=scenario,
epc=epc,
spatial=spatial,
solar_insights=solar_insights,
plan=plan,
)
except Exception as error: # noqa: BLE001 — one bad property must not stop the run
line = f"property {property_id} (uprn {uprn}): ERROR — {type(error).__name__}: {error}"
print(line + "\n")
@ -255,6 +383,7 @@ def main() -> None:
f"{'|'.join(measure_types)},{plan.cost_of_works:.0f}"
)
catalogue_session.close()
_MARKDOWN_PATH.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
_CSV_PATH.write_text("\n".join(csv_rows) + "\n", encoding="utf-8")
print(f"wrote {_MARKDOWN_PATH.resolve()}")