Model/scripts/fetch_epc_dump.py
Khalim Conn-Kowlessar ea3af8d2f4 feat(modelling): CLI to fetch an EPC dump + build the inspection report
run_property_report builds the three-section Markdown+CSV report over a dir of
API-shaped EPC JSON, offline (defaults to the golden 57: 57/57 scorable, MAE
0.54, 6 flagged |Δ|>0.5). fetch_epc_dump pulls raw cert JSON from the live API
by --uprn/--postcode (picking the latest cert per match, skipping existing
files), mirroring fetch_cohort2's proven HTTP shape and reading
OPEN_EPC_API_TOKEN. Report artifacts + epc_dump/ are gitignored.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 11:26:17 +00:00

153 lines
5.3 KiB
Python

"""Fetch a dump of raw EPC API JSON for a property set, to feed the report.
Given UPRNs and/or postcodes, hits the live gov.uk EPC API, picks the latest
certificate per match, and writes its raw inner `data` payload — identical in
shape to the committed golden fixtures — to one JSON per cert under a dump dir.
`scripts.run_property_report` then runs that dump offline.
Keeping the raw JSON (not just the mapped EPC) is what the report's calculator-
error section needs: the cert's lodged `energy_rating_current` lives on it.
python -m scripts.fetch_epc_dump --uprn 100023336956 100023336957
python -m scripts.fetch_epc_dump --postcode "SW1A 1AA" --out epc_dump
Reads the Bearer token from `OPEN_EPC_API_TOKEN` (backend/.env). The API rate-
limits (429); `call_with_retry` backs off, and existing files are skipped, so a
re-run resumes a partial dump. Run from the worktree root (import trap).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any, Optional
import httpx
from dotenv import load_dotenv
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from infrastructure.epc_client._retry import call_with_retry # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from infrastructure.epc_client.exceptions import ( # noqa: E402
EpcApiError,
EpcNotFoundError,
EpcRateLimitError,
)
_DEFAULT_OUT = _REPO_ROOT / "epc_dump"
def _headers(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _latest_cert_for_uprn(token: str, uprn: int) -> Optional[str]:
"""Search the API and return the most-recent certificate number for the
UPRN (by registration date), or None when nothing is lodged."""
resp = httpx.get(
f"{EpcClientService.BASE_URL}/api/domestic/search",
params={"uprn": uprn},
headers=_headers(token),
timeout=EpcClientService.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
return None
if resp.status_code == 429:
raise EpcRateLimitError("Rate limited by EPC API")
if not resp.is_success:
raise EpcApiError(f"EPC API search error {resp.status_code}: {resp.text}")
rows: list[dict[str, Any]] = resp.json().get("data", [])
if not rows:
return None
latest: dict[str, Any] = max(rows, key=lambda row: row["registrationDate"])
cert: str = latest["certificateNumber"]
return cert
def _fetch_raw(token: str, cert_num: str) -> dict[str, Any]:
resp = httpx.get(
f"{EpcClientService.BASE_URL}/api/certificate",
params={"certificate_number": cert_num},
headers=_headers(token),
timeout=EpcClientService.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
raise EpcNotFoundError(cert_num)
if resp.status_code == 429:
raise EpcRateLimitError("Rate limited by EPC API")
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
payload: dict[str, Any] = resp.json()["data"]
return payload
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Fetch raw EPC API JSON into a dump dir.")
parser.add_argument("--uprn", nargs="*", type=int, default=[], help="UPRNs to fetch")
parser.add_argument(
"--postcode", nargs="*", default=[], help="postcodes to fetch (all certs)"
)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT, help="dump directory")
return parser.parse_args()
def main() -> int:
args = _parse_args()
if not args.uprn and not args.postcode:
print("give at least one --uprn or --postcode")
return 2
load_dotenv(_REPO_ROOT / "backend" / ".env")
token = os.environ.get("OPEN_EPC_API_TOKEN")
if not token:
print("OPEN_EPC_API_TOKEN is not set (backend/.env) — cannot fetch")
return 2
out: Path = args.out
out.mkdir(parents=True, exist_ok=True)
# (kind, value) work-list — UPRNs resolve to one cert, postcodes to many.
cert_nums: list[str] = []
for uprn in args.uprn:
cert = call_with_retry(lambda u=uprn: _latest_cert_for_uprn(token, u))
if cert is None:
print(f"no cert uprn={uprn}")
continue
cert_nums.append(cert)
for postcode in args.postcode:
client = EpcClientService(token)
results = call_with_retry(lambda pc=postcode: client.search_by_postcode(pc))
cert_nums.extend(result.certificate_number for result in results)
fetched = 0
skipped = 0
missing = 0
for cert_num in cert_nums:
out_path = out / f"{cert_num}.json"
if out_path.exists():
print(f"skip {cert_num}")
skipped += 1
continue
try:
raw = call_with_retry(lambda c=cert_num: _fetch_raw(token, c))
except EpcNotFoundError:
print(f"404 {cert_num}")
missing += 1
continue
out_path.write_text(json.dumps(raw, indent=2))
print(f"fetch {cert_num}")
fetched += 1
print(f"\nfetched={fetched} skipped={skipped} missing={missing} -> {out.resolve()}")
print(f"now run: python -m scripts.run_property_report {out}")
return 0
if __name__ == "__main__":
sys.exit(main())