From ea3af8d2f4688a78d7db8723de36a709f5990bc1 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 4 Jun 2026 11:26:17 +0000 Subject: [PATCH] feat(modelling): CLI to fetch an EPC dump + build the inspection report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_property_report builds the three-section Markdown+CSV report over a dir of API-shaped EPC JSON, offline (defaults to the golden 57: 57/57 scorable, MAE 0.54, 6 flagged |Δ|>0.5). fetch_epc_dump pulls raw cert JSON from the live API by --uprn/--postcode (picking the latest cert per match, skipping existing files), mirroring fetch_cohort2's proven HTTP shape and reading OPEN_EPC_API_TOKEN. Report artifacts + epc_dump/ are gitignored. Co-Authored-By: Claude Opus 4.8 --- .gitignore | 3 + scripts/fetch_epc_dump.py | 153 +++++++++++++++++++++++++++++++++ scripts/run_property_report.py | 70 +++++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 scripts/fetch_epc_dump.py create mode 100644 scripts/run_property_report.py diff --git a/.gitignore b/.gitignore index a1bd9c0b..a48af48a 100644 --- a/.gitignore +++ b/.gitignore @@ -283,6 +283,9 @@ cache/ *.csv # Tracked reference CSV: SAP enum codes (gov api /api/codes) co-located with EpcPropertyData. !datatypes/epc/domain/epc_codes.csv +# Generated property-inspection report artifacts (and any fetched EPC dump). +property_report.md +/epc_dump/ *.xlsx # *.pdf **/Chunks/ diff --git a/scripts/fetch_epc_dump.py b/scripts/fetch_epc_dump.py new file mode 100644 index 00000000..bc22b35d --- /dev/null +++ b/scripts/fetch_epc_dump.py @@ -0,0 +1,153 @@ +"""Fetch a dump of raw EPC API JSON for a property set, to feed the report. + +Given UPRNs and/or postcodes, hits the live gov.uk EPC API, picks the latest +certificate per match, and writes its raw inner `data` payload — identical in +shape to the committed golden fixtures — to one JSON per cert under a dump dir. +`scripts.run_property_report` then runs that dump offline. + +Keeping the raw JSON (not just the mapped EPC) is what the report's calculator- +error section needs: the cert's lodged `energy_rating_current` lives on it. + + python -m scripts.fetch_epc_dump --uprn 100023336956 100023336957 + python -m scripts.fetch_epc_dump --postcode "SW1A 1AA" --out epc_dump + +Reads the Bearer token from `OPEN_EPC_API_TOKEN` (backend/.env). The API rate- +limits (429); `call_with_retry` backs off, and existing files are skipped, so a +re-run resumes a partial dump. Run from the worktree root (import trap). +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path +from typing import Any, Optional + +import httpx +from dotenv import load_dotenv + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from infrastructure.epc_client._retry import call_with_retry # noqa: E402 +from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402 +from infrastructure.epc_client.exceptions import ( # noqa: E402 + EpcApiError, + EpcNotFoundError, + EpcRateLimitError, +) + +_DEFAULT_OUT = _REPO_ROOT / "epc_dump" + + +def _headers(token: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}", "Accept": "application/json"} + + +def _latest_cert_for_uprn(token: str, uprn: int) -> Optional[str]: + """Search the API and return the most-recent certificate number for the + UPRN (by registration date), or None when nothing is lodged.""" + resp = httpx.get( + f"{EpcClientService.BASE_URL}/api/domestic/search", + params={"uprn": uprn}, + headers=_headers(token), + timeout=EpcClientService.REQUEST_TIMEOUT, + ) + if resp.status_code == 404: + return None + if resp.status_code == 429: + raise EpcRateLimitError("Rate limited by EPC API") + if not resp.is_success: + raise EpcApiError(f"EPC API search error {resp.status_code}: {resp.text}") + + rows: list[dict[str, Any]] = resp.json().get("data", []) + if not rows: + return None + latest: dict[str, Any] = max(rows, key=lambda row: row["registrationDate"]) + cert: str = latest["certificateNumber"] + return cert + + +def _fetch_raw(token: str, cert_num: str) -> dict[str, Any]: + resp = httpx.get( + f"{EpcClientService.BASE_URL}/api/certificate", + params={"certificate_number": cert_num}, + headers=_headers(token), + timeout=EpcClientService.REQUEST_TIMEOUT, + ) + if resp.status_code == 404: + raise EpcNotFoundError(cert_num) + if resp.status_code == 429: + raise EpcRateLimitError("Rate limited by EPC API") + if not resp.is_success: + raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}") + payload: dict[str, Any] = resp.json()["data"] + return payload + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Fetch raw EPC API JSON into a dump dir.") + parser.add_argument("--uprn", nargs="*", type=int, default=[], help="UPRNs to fetch") + parser.add_argument( + "--postcode", nargs="*", default=[], help="postcodes to fetch (all certs)" + ) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT, help="dump directory") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + if not args.uprn and not args.postcode: + print("give at least one --uprn or --postcode") + return 2 + + load_dotenv(_REPO_ROOT / "backend" / ".env") + token = os.environ.get("OPEN_EPC_API_TOKEN") + if not token: + print("OPEN_EPC_API_TOKEN is not set (backend/.env) — cannot fetch") + return 2 + + out: Path = args.out + out.mkdir(parents=True, exist_ok=True) + + # (kind, value) work-list — UPRNs resolve to one cert, postcodes to many. + cert_nums: list[str] = [] + for uprn in args.uprn: + cert = call_with_retry(lambda u=uprn: _latest_cert_for_uprn(token, u)) + if cert is None: + print(f"no cert uprn={uprn}") + continue + cert_nums.append(cert) + for postcode in args.postcode: + client = EpcClientService(token) + results = call_with_retry(lambda pc=postcode: client.search_by_postcode(pc)) + cert_nums.extend(result.certificate_number for result in results) + + fetched = 0 + skipped = 0 + missing = 0 + for cert_num in cert_nums: + out_path = out / f"{cert_num}.json" + if out_path.exists(): + print(f"skip {cert_num}") + skipped += 1 + continue + try: + raw = call_with_retry(lambda c=cert_num: _fetch_raw(token, c)) + except EpcNotFoundError: + print(f"404 {cert_num}") + missing += 1 + continue + out_path.write_text(json.dumps(raw, indent=2)) + print(f"fetch {cert_num}") + fetched += 1 + + print(f"\nfetched={fetched} skipped={skipped} missing={missing} -> {out.resolve()}") + print(f"now run: python -m scripts.run_property_report {out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/run_property_report.py b/scripts/run_property_report.py new file mode 100644 index 00000000..c40e7420 --- /dev/null +++ b/scripts/run_property_report.py @@ -0,0 +1,70 @@ +"""Build the per-property inspection report over an EPC-JSON dump, offline. + +Reads a directory of API-shaped EPC JSON (identical to the EPC API response — +what `from_api_response` parses), runs each cert through the Modelling harness, +and writes the three-section report (calculator error vs lodged SAP, Plans + +costings, recommended measures + their triggers) as Markdown and CSV. No +database, no network — run it against a cached dump fetched by +`scripts.fetch_epc_dump`. Run from the worktree root so imports resolve to this +checkout, not /workspaces/model. + + # no args -> the committed golden cohort (57 real API certs) + python -m scripts.run_property_report + + # your fetched dump, optional goal band (default C) + python -m scripts.run_property_report epc_dump C +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from harness.report import ( # noqa: E402 + build_property_reports, + format_report_csv, + format_report_markdown, + parity_report_for, +) + +_DEFAULT_DIR = _REPO_ROOT / "tests/domain/sap10_calculator/rdsap/fixtures/golden" +_MARKDOWN_PATH = Path("property_report.md") +_CSV_PATH = Path("property_report.csv") + + +def main() -> None: + args = sys.argv[1:] + directory = Path(args[0]) if args else _DEFAULT_DIR + goal_band = args[1] if len(args) > 1 else "C" + paths = sorted(directory.glob("*.json")) + if not paths: + print(f"no *.json files under {directory}") + raise SystemExit(1) + + print( + f"building inspection report over {len(paths)} EPC JSON(s) from " + f"{directory} (goal band {goal_band}), offline — no database...\n" + ) + reports = build_property_reports(paths, goal_band=goal_band) + + parity = parity_report_for(reports) + flagged = sum(1 for report in reports if report.sap_error_exceeds_threshold) + errored = sum(1 for report in reports if report.calculator_error is not None) + print( + f"calculator parity: {parity.case_count} scorable · " + f"MAE {parity.global_mae:.2f} · bias {parity.global_bias:+.2f}\n" + f"flagged |Δ|>0.5 : {flagged}\n" + f"calculator errors: {errored}" + ) + + _MARKDOWN_PATH.write_text(format_report_markdown(reports), encoding="utf-8") + _CSV_PATH.write_text(format_report_csv(reports) + "\n", encoding="utf-8") + print(f"\nwrote {_MARKDOWN_PATH.resolve()}") + print(f"wrote {_CSV_PATH.resolve()}") + + +if __name__ == "__main__": + main()