mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
feat(epc-prediction): postcode-clustered corpus fetch script (ADR-0029)
Builds the frozen validation corpus: samples postcodes from the register, then caches each postcode's full cohort of raw cert payloads (the shape from_api_response consumes), grouped by postcode, resumably. Reads the token from backend/.env; cache dir /tmp/epc_prediction_corpus (EPC_PREDICTION_CORPUS override). IO plumbing, not test-driven. Pairs with the leave-one-out harness. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
008a1b2783
commit
80b525f0f4
1 changed files with 162 additions and 0 deletions
162
scripts/fetch_epc_prediction_corpus.py
Normal file
162
scripts/fetch_epc_prediction_corpus.py
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
"""Build the frozen postcode-clustered corpus for EPC Prediction validation
|
||||
(ADR-0029).
|
||||
|
||||
WHAT THIS IS FOR
|
||||
----------------
|
||||
EPC Prediction estimates an EPC-less Property's `EpcPropertyData` from its
|
||||
**Comparable Properties** — the other certs in its postcode. Validating that
|
||||
needs *geographic clusters* (many certs per postcode), not random certs, so the
|
||||
leave-one-out harness can drop one cert and predict it from its neighbours.
|
||||
|
||||
This script builds that corpus once, offline-reusable: it samples postcodes
|
||||
from the register (an unbiased spread over dates/regions), then for each
|
||||
postcode downloads **every** domestic cert's full schema payload — the exact
|
||||
shape `EpcPropertyDataMapper.from_api_response` consumes — grouped on disk by
|
||||
postcode. The validation harness then runs entirely against this cache: fast,
|
||||
deterministic, no rate limits.
|
||||
|
||||
Pair it with `validate_epc_prediction.py` (the leave-one-out accuracy harness).
|
||||
|
||||
HOW THE SAMPLE IS DRAWN
|
||||
-----------------------
|
||||
Postcodes are seeded by sampling random PAGES of `/api/domestic/search` across
|
||||
a past date window (the register orders by registration date, so random pages
|
||||
give an unbiased postcode spread). Each seed cert contributes its postcode; we
|
||||
take the first N distinct postcodes and pull each one's *entire* cohort via
|
||||
`search_by_postcode` -> per-cert `/api/certificate`.
|
||||
|
||||
USAGE
|
||||
-----
|
||||
PYTHONPATH=. python scripts/fetch_epc_prediction_corpus.py
|
||||
|
||||
Resumable — re-running skips certs already cached, so it is safe to interrupt.
|
||||
Token is read from `backend/.env` (`OPEN_EPC_API_TOKEN`). The register rejects
|
||||
a `date_end` that includes today, so keep the window in the past.
|
||||
|
||||
Cache dir defaults to `/tmp/epc_prediction_corpus`, overridable via the
|
||||
`EPC_PREDICTION_CORPUS` env var. Layout:
|
||||
<cache>/<POSTCODE_NOSPACE>/<cert_number>.json # raw API `data` payload
|
||||
<cache>/_index.json # {postcode: [cert, ...]}
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv("backend/.env")
|
||||
TOKEN = os.environ["OPEN_EPC_API_TOKEN"]
|
||||
BASE = "https://api.get-energy-performance-data.communities.gov.uk"
|
||||
H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"}
|
||||
CACHE = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus"))
|
||||
CACHE.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Seed-postcode sampling. `date_end` must be strictly before today. TOTAL_PAGES
|
||||
# is the `totalPages` the search returns for this window at page_size=100 —
|
||||
# re-probe if you change the window (it only needs to be an upper bound for the
|
||||
# random page draw; out-of-range pages just return fewer rows).
|
||||
WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"}
|
||||
TOTAL_PAGES = 7402
|
||||
SEED_PAGES = 20 # random search pages → postcode seeds
|
||||
N_POSTCODES = 150 # distinct postcodes to pull full cohorts for
|
||||
random.seed(2026) # reproducible draw
|
||||
|
||||
|
||||
def _get(url: str, params: dict[str, object], timeout: float = 20.0, tries: int = 5):
|
||||
"""GET with retry/backoff on 429 + 5xx (honours Retry-After)."""
|
||||
r = None
|
||||
for i in range(tries):
|
||||
try:
|
||||
r = httpx.get(url, params=params, headers=H, timeout=timeout)
|
||||
except httpx.HTTPError:
|
||||
time.sleep(1.5 * (i + 1))
|
||||
continue
|
||||
if r.status_code == 429 or r.status_code >= 500:
|
||||
ra = r.headers.get("Retry-After")
|
||||
time.sleep(float(ra) if ra else 1.5 * (i + 1))
|
||||
continue
|
||||
return r
|
||||
return r
|
||||
|
||||
|
||||
def _normalise_postcode(postcode: str) -> str:
|
||||
return postcode.replace(" ", "").upper()
|
||||
|
||||
|
||||
def sample_postcodes() -> list[str]:
|
||||
"""Draw distinct postcodes from random search pages across the window."""
|
||||
pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), SEED_PAGES))
|
||||
seen: dict[str, None] = {}
|
||||
for p in pages:
|
||||
r = _get(
|
||||
f"{BASE}/api/domestic/search",
|
||||
{**WINDOW, "current_page": p, "page_size": 100},
|
||||
)
|
||||
if r is None or not r.is_success:
|
||||
print(f" seed page {p} -> {getattr(r, 'status_code', 'ERR')}")
|
||||
continue
|
||||
for row in r.json().get("data", []):
|
||||
pc = row.get("postcode")
|
||||
if pc:
|
||||
seen[_normalise_postcode(pc)] = None
|
||||
print(f" page {p}: cumulative {len(seen)} distinct postcodes")
|
||||
if len(seen) >= N_POSTCODES:
|
||||
break
|
||||
return list(seen)[:N_POSTCODES]
|
||||
|
||||
|
||||
def cohort_cert_numbers(postcode: str) -> list[str]:
|
||||
r = _get(f"{BASE}/api/domestic/search", {"postcode": postcode})
|
||||
if r is None or not r.is_success:
|
||||
return []
|
||||
return [
|
||||
row["certificateNumber"]
|
||||
for row in r.json().get("data", [])
|
||||
if row.get("certificateNumber")
|
||||
]
|
||||
|
||||
|
||||
def fetch_cert(postcode: str, cert: str) -> bool:
|
||||
"""Fetch + cache one cert's raw `data` payload. Returns True on success
|
||||
(or already-cached)."""
|
||||
out = CACHE / postcode / f"{cert}.json"
|
||||
if out.exists():
|
||||
return True
|
||||
r = _get(f"{BASE}/api/certificate", {"certificate_number": cert})
|
||||
if r is None or not r.is_success:
|
||||
return False
|
||||
try:
|
||||
payload = r.json()["data"]
|
||||
except (KeyError, ValueError):
|
||||
return False
|
||||
out.parent.mkdir(parents=True, exist_ok=True)
|
||||
out.write_text(json.dumps(payload))
|
||||
return True
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print("sampling seed postcodes ...")
|
||||
postcodes = sample_postcodes()
|
||||
print(f"pulling full cohorts for {len(postcodes)} postcodes into {CACHE} ...")
|
||||
index: dict[str, list[str]] = {}
|
||||
t0 = time.time()
|
||||
total_certs = 0
|
||||
for i, pc in enumerate(postcodes, 1):
|
||||
certs = cohort_cert_numbers(pc)
|
||||
fetched = [c for c in certs if fetch_cert(pc, c)]
|
||||
index[pc] = fetched
|
||||
total_certs += len(fetched)
|
||||
print(f" [{i}/{len(postcodes)}] {pc}: {len(fetched)}/{len(certs)} certs")
|
||||
(CACHE / "_index.json").write_text(json.dumps(index, indent=2))
|
||||
print(
|
||||
f"DONE in {time.time() - t0:.0f}s: {len(postcodes)} postcodes, "
|
||||
f"{total_certs} certs cached under {CACHE}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Reference in a new issue