feat(scripts): run_modelling_e2e — inspect recommendations per property_id

Revives the local recommendation-inspection flow for specific Properties.
`scripts/run_modelling_e2e.py` reads each Property's UPRN from the DB
(read-only), fetches the latest EPC live from the gov EPC API by UPRN, runs the
Modelling stage in memory (all Generators → Optimiser → costed, attributed
Plan), and prints a per-Property plan table + writes a Markdown/CSV summary.
Persists nothing — purely for inspection.

The local DB's Properties have no linked ingested EPC (epc_property.property_id
is NULL for all rows; Ingestion's source clients are stubbed, #1136), so the
EPC must be fetched inline rather than read back. Builds the connection from the
`DB_*` env vars in backend/.env and the EPC token from `EPC_AUTH_TOKEN`.

Threads optional solar insights through harness `run_modelling` (so Solar PV
Options can fire once coordinates are wired) and adds the `solar_pv` catalogue
row. Solar + planning restrictions + DB persistence are noted follow-ups.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-08 14:25:33 +00:00
parent 24492aa4ba
commit 0918dd37ec
4 changed files with 179 additions and 2 deletions

1
.gitignore vendored
View file

@ -285,6 +285,7 @@ cache/
!datatypes/epc/domain/epc_codes.csv
# Generated property-inspection report artifacts (and any fetched EPC dump).
property_report.md
modelling_e2e.md
epc_dump*/
*.xlsx
# *.pdf

View file

@ -43,6 +43,7 @@ from tests.orchestration.fakes import (
FakePlanRepository,
FakePropertyRepo,
FakeScenarioRepository,
FakeSolarRepo,
FakeUnitOfWork,
)
@ -169,13 +170,19 @@ def run_modelling(
catalogue_path: Path = DEFAULT_CATALOGUE,
current_market_value: Optional[float] = None,
planning_restrictions: PlanningRestrictions = PlanningRestrictions(),
solar_insights: Optional[dict[str, Any]] = None,
print_table: bool = True,
) -> Plan:
"""Run ONLY the Modelling stage over ``epc`` with no database — skipping
Ingestion and Baseline. Modelling re-scores the EPC itself, so unlike
`run_one` this needs no lodged recorded-performance / RHI: it runs on any
EPC the calculator can score, which is what you want for inspecting
recommendations across an arbitrary EPC dump offline."""
recommendations across an arbitrary EPC dump offline.
``solar_insights`` is the Property's raw Google Solar ``buildingInsights``
JSON (as persisted by ``SolarRepository``); when given, the solar
Recommendation Generator sees the dwelling's potential and can offer Solar
PV Options (ADR-0026)."""
plan_repo = FakePlanRepository()
property_repo = FakePropertyRepo(
{
@ -194,6 +201,11 @@ def run_modelling(
)
unit = FakeUnitOfWork(
property=property_repo,
solar=FakeSolarRepo(
by_property={_PROPERTY_ID: solar_insights}
if solar_insights is not None
else None
),
scenario=FakeScenarioRepository(
{
_SCENARIO_ID: Scenario(

View file

@ -12,5 +12,6 @@
"secondary_glazing": { "unit_cost_per_m2": 510.0 },
"low_energy_lighting": { "unit_cost_per_m2": 8.0 },
"high_heat_retention_storage_heaters": { "unit_cost_per_m2": 3500.0 },
"air_source_heat_pump": { "unit_cost_per_m2": 12000.0 }
"air_source_heat_pump": { "unit_cost_per_m2": 12000.0 },
"solar_pv": { "unit_cost_per_m2": 0.0 }
}

View file

@ -0,0 +1,163 @@
"""Run Modelling end-to-end for specific Properties (by ``property_id``) and
print the recommendations for inspection.
The local DB's Properties have no linked, ingested EPC yet (Ingestion's source
clients are still stubbed #1136), so this script does the ingestion step
inline for inspection: it reads each Property's UPRN from the DB, fetches the
latest EPC **live** from the gov EPC API by UPRN, then runs the Modelling stage
in memory (every Recommendation Generator the Optimiser a costed, attributed
Plan). It is read-only on the DB (just the UPRN lookup) and persists nothing
purely for inspecting recommendations. Prints a per-Property plan table and
writes a Markdown + CSV summary.
Config: loads `backend/.env` for the DB creds (`DB_*`) and the EPC API token
(`EPC_AUTH_TOKEN`) the agent never sees the secrets. Run from the worktree
root so imports resolve to this checkout:
python -m scripts.run_modelling_e2e 115 116 117 # goal band C (default)
python -m scripts.run_modelling_e2e --goal B 115 116 117 # a different target band
Not yet wired (follow-ups): Google Solar potential (needs the Property's
coordinates from the geospatial/S3 layer, absent on the `property` row) so the
Solar PV Options don't fire here yet; planning restrictions default to
unrestricted (the conservation/listed gates aren't read).
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Optional
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from datatypes.epc.domain.epc_property_data import EpcPropertyData # noqa: E402
from domain.modelling.plan import Plan, PlanMeasure # noqa: E402
from harness.console import DEFAULT_CATALOGUE, run_modelling # noqa: E402
from harness.plan_table import format_plan_table # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from sqlalchemy import create_engine, text # noqa: E402
_ENV_PATH = _REPO_ROOT / "backend" / ".env"
_MARKDOWN_PATH = Path("modelling_e2e.md")
_CSV_PATH = Path("modelling_e2e.csv")
def _load_env(path: Path) -> None:
"""Load `KEY=value` lines from `backend/.env` into the environment (without
overriding anything already set), so the DB creds + EPC token are present."""
if not path.exists():
return
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def _db_url() -> str:
"""The connection string from the FastAPI-layer `DB_*` env vars."""
env = os.environ
return (
f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}"
f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}"
)
def _uprns_for(property_ids: list[int]) -> dict[int, Optional[int]]:
"""Read each Property's UPRN from the DB (read-only)."""
engine = create_engine(
_db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10}
)
with engine.connect() as conn:
rows = conn.execute(
text("SELECT id, uprn FROM property WHERE id = ANY(:ids)"),
{"ids": property_ids},
).fetchall()
return {int(pid): (int(uprn) if uprn is not None else None) for pid, uprn in rows}
def _measure_summary(measure: PlanMeasure) -> str:
return (
f" - {measure.measure_type}: "
f"+{measure.impact.sap_points:.2f} SAP · £{measure.cost.total:,.0f} "
f"{measure.description}"
)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("property_ids", type=int, nargs="+", help="Property ids to model")
parser.add_argument("--goal", default="C", help="target EPC band (default C)")
args = parser.parse_args()
_load_env(_ENV_PATH)
epc_client = EpcClientService(os.environ["EPC_AUTH_TOKEN"])
uprns = _uprns_for(args.property_ids)
print(
f"modelling {len(args.property_ids)} propertie(s) (goal band {args.goal}); "
f"EPCs fetched live by UPRN, modelled in memory — no DB writes...\n"
)
md_lines: list[str] = [f"# Modelling recommendations (goal band {args.goal})\n"]
csv_rows: list[str] = [
"property_id,uprn,baseline_sap,post_sap,measures,measure_types,cost_of_works"
]
for property_id in args.property_ids:
uprn = uprns.get(property_id)
try:
if uprn is None:
raise ValueError("no UPRN on the property row")
epc: Optional[EpcPropertyData] = epc_client.get_by_uprn(uprn)
if epc is None:
raise ValueError(f"no EPC found for UPRN {uprn}")
plan: Plan = run_modelling(
epc,
goal_band=args.goal,
catalogue_path=DEFAULT_CATALOGUE,
print_table=False,
)
except Exception as error: # noqa: BLE001 — one bad property must not stop the run
line = f"property {property_id} (uprn {uprn}): ERROR — {type(error).__name__}: {error}"
print(line + "\n")
md_lines.append(f"## Property {property_id}\n\n`{line}`\n")
csv_rows.append(f"{property_id},{uprn or ''},,,,ERROR,")
continue
measure_types = [m.measure_type for m in plan.measures]
header = (
f"=== Property {property_id} (uprn {uprn}) === "
f"SAP {plan.baseline.sap_continuous:.1f} -> {plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · £{plan.cost_of_works:,.0f}"
)
print(header)
print(format_plan_table(plan))
print()
md_lines.append(f"## Property {property_id} (uprn {uprn})\n")
md_lines.append(
f"SAP {plan.baseline.sap_continuous:.1f}{plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · cost £{plan.cost_of_works:,.0f}\n"
)
md_lines.extend(_measure_summary(m) for m in plan.measures)
md_lines.append("")
csv_rows.append(
f"{property_id},{uprn},{plan.baseline.sap_continuous:.2f},"
f"{plan.post_sap_continuous:.2f},{len(plan.measures)},"
f"{'|'.join(measure_types)},{plan.cost_of_works:.0f}"
)
_MARKDOWN_PATH.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
_CSV_PATH.write_text("\n".join(csv_rows) + "\n", encoding="utf-8")
print(f"wrote {_MARKDOWN_PATH.resolve()}")
print(f"wrote {_CSV_PATH.resolve()}")
if __name__ == "__main__":
main()