From 80b525f0f40ef5c18c8febc3eb71fcc3a82f85d8 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sat, 13 Jun 2026 23:36:19 +0000 Subject: [PATCH] feat(epc-prediction): postcode-clustered corpus fetch script (ADR-0029) Builds the frozen validation corpus: samples postcodes from the register, then caches each postcode's full cohort of raw cert payloads (the shape from_api_response consumes), grouped by postcode, resumably. Reads the token from backend/.env; cache dir /tmp/epc_prediction_corpus (EPC_PREDICTION_CORPUS override). IO plumbing, not test-driven. Pairs with the leave-one-out harness. Co-Authored-By: Claude Opus 4.8 --- scripts/fetch_epc_prediction_corpus.py | 162 +++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 scripts/fetch_epc_prediction_corpus.py diff --git a/scripts/fetch_epc_prediction_corpus.py b/scripts/fetch_epc_prediction_corpus.py new file mode 100644 index 00000000..2e69ee6c --- /dev/null +++ b/scripts/fetch_epc_prediction_corpus.py @@ -0,0 +1,162 @@ +"""Build the frozen postcode-clustered corpus for EPC Prediction validation +(ADR-0029). + +WHAT THIS IS FOR +---------------- +EPC Prediction estimates an EPC-less Property's `EpcPropertyData` from its +**Comparable Properties** — the other certs in its postcode. Validating that +needs *geographic clusters* (many certs per postcode), not random certs, so the +leave-one-out harness can drop one cert and predict it from its neighbours. + +This script builds that corpus once, offline-reusable: it samples postcodes +from the register (an unbiased spread over dates/regions), then for each +postcode downloads **every** domestic cert's full schema payload — the exact +shape `EpcPropertyDataMapper.from_api_response` consumes — grouped on disk by +postcode. The validation harness then runs entirely against this cache: fast, +deterministic, no rate limits. + +Pair it with `validate_epc_prediction.py` (the leave-one-out accuracy harness). + +HOW THE SAMPLE IS DRAWN +----------------------- +Postcodes are seeded by sampling random PAGES of `/api/domestic/search` across +a past date window (the register orders by registration date, so random pages +give an unbiased postcode spread). Each seed cert contributes its postcode; we +take the first N distinct postcodes and pull each one's *entire* cohort via +`search_by_postcode` -> per-cert `/api/certificate`. + +USAGE +----- + PYTHONPATH=. python scripts/fetch_epc_prediction_corpus.py + +Resumable — re-running skips certs already cached, so it is safe to interrupt. +Token is read from `backend/.env` (`OPEN_EPC_API_TOKEN`). The register rejects +a `date_end` that includes today, so keep the window in the past. + +Cache dir defaults to `/tmp/epc_prediction_corpus`, overridable via the +`EPC_PREDICTION_CORPUS` env var. Layout: + //.json # raw API `data` payload + /_index.json # {postcode: [cert, ...]} +""" + +import json +import os +import random +import time +from pathlib import Path + +import httpx +from dotenv import load_dotenv + +load_dotenv("backend/.env") +TOKEN = os.environ["OPEN_EPC_API_TOKEN"] +BASE = "https://api.get-energy-performance-data.communities.gov.uk" +H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"} +CACHE = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus")) +CACHE.mkdir(parents=True, exist_ok=True) + +# Seed-postcode sampling. `date_end` must be strictly before today. TOTAL_PAGES +# is the `totalPages` the search returns for this window at page_size=100 — +# re-probe if you change the window (it only needs to be an upper bound for the +# random page draw; out-of-range pages just return fewer rows). +WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"} +TOTAL_PAGES = 7402 +SEED_PAGES = 20 # random search pages → postcode seeds +N_POSTCODES = 150 # distinct postcodes to pull full cohorts for +random.seed(2026) # reproducible draw + + +def _get(url: str, params: dict[str, object], timeout: float = 20.0, tries: int = 5): + """GET with retry/backoff on 429 + 5xx (honours Retry-After).""" + r = None + for i in range(tries): + try: + r = httpx.get(url, params=params, headers=H, timeout=timeout) + except httpx.HTTPError: + time.sleep(1.5 * (i + 1)) + continue + if r.status_code == 429 or r.status_code >= 500: + ra = r.headers.get("Retry-After") + time.sleep(float(ra) if ra else 1.5 * (i + 1)) + continue + return r + return r + + +def _normalise_postcode(postcode: str) -> str: + return postcode.replace(" ", "").upper() + + +def sample_postcodes() -> list[str]: + """Draw distinct postcodes from random search pages across the window.""" + pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), SEED_PAGES)) + seen: dict[str, None] = {} + for p in pages: + r = _get( + f"{BASE}/api/domestic/search", + {**WINDOW, "current_page": p, "page_size": 100}, + ) + if r is None or not r.is_success: + print(f" seed page {p} -> {getattr(r, 'status_code', 'ERR')}") + continue + for row in r.json().get("data", []): + pc = row.get("postcode") + if pc: + seen[_normalise_postcode(pc)] = None + print(f" page {p}: cumulative {len(seen)} distinct postcodes") + if len(seen) >= N_POSTCODES: + break + return list(seen)[:N_POSTCODES] + + +def cohort_cert_numbers(postcode: str) -> list[str]: + r = _get(f"{BASE}/api/domestic/search", {"postcode": postcode}) + if r is None or not r.is_success: + return [] + return [ + row["certificateNumber"] + for row in r.json().get("data", []) + if row.get("certificateNumber") + ] + + +def fetch_cert(postcode: str, cert: str) -> bool: + """Fetch + cache one cert's raw `data` payload. Returns True on success + (or already-cached).""" + out = CACHE / postcode / f"{cert}.json" + if out.exists(): + return True + r = _get(f"{BASE}/api/certificate", {"certificate_number": cert}) + if r is None or not r.is_success: + return False + try: + payload = r.json()["data"] + except (KeyError, ValueError): + return False + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(payload)) + return True + + +def main() -> None: + print("sampling seed postcodes ...") + postcodes = sample_postcodes() + print(f"pulling full cohorts for {len(postcodes)} postcodes into {CACHE} ...") + index: dict[str, list[str]] = {} + t0 = time.time() + total_certs = 0 + for i, pc in enumerate(postcodes, 1): + certs = cohort_cert_numbers(pc) + fetched = [c for c in certs if fetch_cert(pc, c)] + index[pc] = fetched + total_certs += len(fetched) + print(f" [{i}/{len(postcodes)}] {pc}: {len(fetched)}/{len(certs)} certs") + (CACHE / "_index.json").write_text(json.dumps(index, indent=2)) + print( + f"DONE in {time.time() - t0:.0f}s: {len(postcodes)} postcodes, " + f"{total_certs} certs cached under {CACHE}" + ) + + +if __name__ == "__main__": + main()