feat(modelling): CLI to fetch an EPC dump + build the inspection report

run_property_report builds the three-section Markdown+CSV report over a dir of
API-shaped EPC JSON, offline (defaults to the golden 57: 57/57 scorable, MAE
0.54, 6 flagged |Δ|>0.5). fetch_epc_dump pulls raw cert JSON from the live API
by --uprn/--postcode (picking the latest cert per match, skipping existing
files), mirroring fetch_cohort2's proven HTTP shape and reading
OPEN_EPC_API_TOKEN. Report artifacts + epc_dump/ are gitignored.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-04 11:26:17 +00:00
parent ae267070b1
commit ea3af8d2f4
3 changed files with 226 additions and 0 deletions

3
.gitignore vendored
View file

@ -283,6 +283,9 @@ cache/
*.csv
# Tracked reference CSV: SAP enum codes (gov api /api/codes) co-located with EpcPropertyData.
!datatypes/epc/domain/epc_codes.csv
# Generated property-inspection report artifacts (and any fetched EPC dump).
property_report.md
/epc_dump/
*.xlsx
# *.pdf
**/Chunks/

153
scripts/fetch_epc_dump.py Normal file
View file

@ -0,0 +1,153 @@
"""Fetch a dump of raw EPC API JSON for a property set, to feed the report.
Given UPRNs and/or postcodes, hits the live gov.uk EPC API, picks the latest
certificate per match, and writes its raw inner `data` payload identical in
shape to the committed golden fixtures to one JSON per cert under a dump dir.
`scripts.run_property_report` then runs that dump offline.
Keeping the raw JSON (not just the mapped EPC) is what the report's calculator-
error section needs: the cert's lodged `energy_rating_current` lives on it.
python -m scripts.fetch_epc_dump --uprn 100023336956 100023336957
python -m scripts.fetch_epc_dump --postcode "SW1A 1AA" --out epc_dump
Reads the Bearer token from `OPEN_EPC_API_TOKEN` (backend/.env). The API rate-
limits (429); `call_with_retry` backs off, and existing files are skipped, so a
re-run resumes a partial dump. Run from the worktree root (import trap).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any, Optional
import httpx
from dotenv import load_dotenv
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from infrastructure.epc_client._retry import call_with_retry # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from infrastructure.epc_client.exceptions import ( # noqa: E402
EpcApiError,
EpcNotFoundError,
EpcRateLimitError,
)
_DEFAULT_OUT = _REPO_ROOT / "epc_dump"
def _headers(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _latest_cert_for_uprn(token: str, uprn: int) -> Optional[str]:
"""Search the API and return the most-recent certificate number for the
UPRN (by registration date), or None when nothing is lodged."""
resp = httpx.get(
f"{EpcClientService.BASE_URL}/api/domestic/search",
params={"uprn": uprn},
headers=_headers(token),
timeout=EpcClientService.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
return None
if resp.status_code == 429:
raise EpcRateLimitError("Rate limited by EPC API")
if not resp.is_success:
raise EpcApiError(f"EPC API search error {resp.status_code}: {resp.text}")
rows: list[dict[str, Any]] = resp.json().get("data", [])
if not rows:
return None
latest: dict[str, Any] = max(rows, key=lambda row: row["registrationDate"])
cert: str = latest["certificateNumber"]
return cert
def _fetch_raw(token: str, cert_num: str) -> dict[str, Any]:
resp = httpx.get(
f"{EpcClientService.BASE_URL}/api/certificate",
params={"certificate_number": cert_num},
headers=_headers(token),
timeout=EpcClientService.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
raise EpcNotFoundError(cert_num)
if resp.status_code == 429:
raise EpcRateLimitError("Rate limited by EPC API")
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
payload: dict[str, Any] = resp.json()["data"]
return payload
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Fetch raw EPC API JSON into a dump dir.")
parser.add_argument("--uprn", nargs="*", type=int, default=[], help="UPRNs to fetch")
parser.add_argument(
"--postcode", nargs="*", default=[], help="postcodes to fetch (all certs)"
)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT, help="dump directory")
return parser.parse_args()
def main() -> int:
args = _parse_args()
if not args.uprn and not args.postcode:
print("give at least one --uprn or --postcode")
return 2
load_dotenv(_REPO_ROOT / "backend" / ".env")
token = os.environ.get("OPEN_EPC_API_TOKEN")
if not token:
print("OPEN_EPC_API_TOKEN is not set (backend/.env) — cannot fetch")
return 2
out: Path = args.out
out.mkdir(parents=True, exist_ok=True)
# (kind, value) work-list — UPRNs resolve to one cert, postcodes to many.
cert_nums: list[str] = []
for uprn in args.uprn:
cert = call_with_retry(lambda u=uprn: _latest_cert_for_uprn(token, u))
if cert is None:
print(f"no cert uprn={uprn}")
continue
cert_nums.append(cert)
for postcode in args.postcode:
client = EpcClientService(token)
results = call_with_retry(lambda pc=postcode: client.search_by_postcode(pc))
cert_nums.extend(result.certificate_number for result in results)
fetched = 0
skipped = 0
missing = 0
for cert_num in cert_nums:
out_path = out / f"{cert_num}.json"
if out_path.exists():
print(f"skip {cert_num}")
skipped += 1
continue
try:
raw = call_with_retry(lambda c=cert_num: _fetch_raw(token, c))
except EpcNotFoundError:
print(f"404 {cert_num}")
missing += 1
continue
out_path.write_text(json.dumps(raw, indent=2))
print(f"fetch {cert_num}")
fetched += 1
print(f"\nfetched={fetched} skipped={skipped} missing={missing} -> {out.resolve()}")
print(f"now run: python -m scripts.run_property_report {out}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,70 @@
"""Build the per-property inspection report over an EPC-JSON dump, offline.
Reads a directory of API-shaped EPC JSON (identical to the EPC API response
what `from_api_response` parses), runs each cert through the Modelling harness,
and writes the three-section report (calculator error vs lodged SAP, Plans +
costings, recommended measures + their triggers) as Markdown and CSV. No
database, no network run it against a cached dump fetched by
`scripts.fetch_epc_dump`. Run from the worktree root so imports resolve to this
checkout, not /workspaces/model.
# no args -> the committed golden cohort (57 real API certs)
python -m scripts.run_property_report
# your fetched dump, optional goal band (default C)
python -m scripts.run_property_report epc_dump C
"""
from __future__ import annotations
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from harness.report import ( # noqa: E402
build_property_reports,
format_report_csv,
format_report_markdown,
parity_report_for,
)
_DEFAULT_DIR = _REPO_ROOT / "tests/domain/sap10_calculator/rdsap/fixtures/golden"
_MARKDOWN_PATH = Path("property_report.md")
_CSV_PATH = Path("property_report.csv")
def main() -> None:
args = sys.argv[1:]
directory = Path(args[0]) if args else _DEFAULT_DIR
goal_band = args[1] if len(args) > 1 else "C"
paths = sorted(directory.glob("*.json"))
if not paths:
print(f"no *.json files under {directory}")
raise SystemExit(1)
print(
f"building inspection report over {len(paths)} EPC JSON(s) from "
f"{directory} (goal band {goal_band}), offline — no database...\n"
)
reports = build_property_reports(paths, goal_band=goal_band)
parity = parity_report_for(reports)
flagged = sum(1 for report in reports if report.sap_error_exceeds_threshold)
errored = sum(1 for report in reports if report.calculator_error is not None)
print(
f"calculator parity: {parity.case_count} scorable · "
f"MAE {parity.global_mae:.2f} · bias {parity.global_bias:+.2f}\n"
f"flagged |Δ|>0.5 : {flagged}\n"
f"calculator errors: {errored}"
)
_MARKDOWN_PATH.write_text(format_report_markdown(reports), encoding="utf-8")
_CSV_PATH.write_text(format_report_csv(reports) + "\n", encoding="utf-8")
print(f"\nwrote {_MARKDOWN_PATH.resolve()}")
print(f"wrote {_CSV_PATH.resolve()}")
if __name__ == "__main__":
main()