"""Fetch a random sample of domestic EPC JSONs from the GOV.UK EPB register. WHAT THIS IS FOR ---------------- Wide-scale accuracy testing of the SAP10 calculator's API front-end against real-world certificates (not the curated golden cohort, which masks raw-API behaviour). This script builds the *input corpus*: it samples certificate numbers uniformly at random across a date window, then downloads each cert's full schema-21 ``data`` payload (the exact shape ``EpcPropertyDataMapper.from_api_response`` consumes) into a local cache. Pair it with: - ``eval_api_sap_accuracy.py`` — % within 0.5 SAP, worst offenders, raises. - ``analyse_api_sap_clusters.py`` — error grouped by heating type / property. HOW THE SAMPLE IS DRAWN ----------------------- The register's ``/api/domestic/search`` endpoint is date-windowed and paged (``date_start``/``date_end``/``current_page``/``page_size``); results are ordered by registration date, so picking random PAGES across the whole window gives an unbiased spread over dates, regions and property types. Each chosen cert number is then resolved to its full JSON via ``/api/certificate``. USAGE ----- PYTHONPATH=/workspaces/model python scripts/fetch_2026_epc_sample.py Resumable — re-running skips certs already cached, so it's safe to interrupt. Token is read from ``backend/.env`` (``OPEN_EPC_API_TOKEN``). NB the register rejects a ``date_end`` that includes today, so keep the window in the past. Tune the constants below (window, page count, target size, seed). The cache dir defaults to ``/tmp/epc_2026_sample`` and can be overridden with the ``EPC_SAMPLE_CACHE`` env var. """ import os import json import time import random import threading from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import httpx from dotenv import load_dotenv load_dotenv("backend/.env") TOKEN = os.environ["OPEN_EPC_API_TOKEN"] BASE = "https://api.get-energy-performance-data.communities.gov.uk" H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"} CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) CACHE.mkdir(parents=True, exist_ok=True) # Sampling window + size. `date_end` must be strictly before today (the # register rejects "the date cannot include today"). TOTAL_PAGES is the # `totalPages` the search returns for this window at page_size=100 — re-probe # it if you change the window (it only needs to be an upper bound for the # random page draw; out-of-range pages just return fewer rows). WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"} TOTAL_PAGES = 7402 N_PAGES = 14 # random pages to pull → N_PAGES * 100 candidate certs TARGET = 1200 # cap on how many full JSONs to fetch random.seed(2026) # reproducible page draw def _get(url, params, timeout=20.0, tries=5): """GET with retry/backoff on 429 + 5xx (honours Retry-After).""" r = None for i in range(tries): try: r = httpx.get(url, params=params, headers=H, timeout=timeout) except httpx.HTTPError: time.sleep(1.5 * (i + 1)) continue if r.status_code == 429 or r.status_code >= 500: ra = r.headers.get("Retry-After") time.sleep(float(ra) if ra else 1.5 * (i + 1)) continue return r return r def sample_cert_numbers(): pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), N_PAGES)) certs = {} for p in pages: r = _get(f"{BASE}/api/domestic/search", {**WINDOW, "current_page": p, "page_size": 100}) if r is None or not r.is_success: print(f" search page {p} -> {getattr(r, 'status_code', 'ERR')}") continue for row in r.json().get("data", []): certs[row["certificateNumber"]] = row.get("registrationDate") print(f" page {p}: cumulative {len(certs)} certs") return certs _lock = threading.Lock() _done = {"ok": 0, "404": 0, "err": 0} def fetch_one(cert): out = CACHE / f"{cert}.json" if out.exists(): with _lock: _done["ok"] += 1 return r = _get(f"{BASE}/api/certificate", {"certificate_number": cert}) if r is not None and r.status_code == 404: with _lock: _done["404"] += 1 return if r is None or not r.is_success: with _lock: _done["err"] += 1 return try: payload = r.json()["data"] except Exception: with _lock: _done["err"] += 1 return out.write_text(json.dumps(payload)) with _lock: _done["ok"] += 1 if _done["ok"] % 100 == 0: print(f" fetched {_done['ok']} (404={_done['404']} err={_done['err']})") def main(): print("sampling cert numbers...") certs = sample_cert_numbers() cert_list = list(certs)[:TARGET] (CACHE / "_manifest.json").write_text( json.dumps({"certs": cert_list, "window": WINDOW}, indent=2) ) print(f"fetching {len(cert_list)} cert JSONs into {CACHE} ...") t0 = time.time() with ThreadPoolExecutor(max_workers=8) as ex: list(as_completed([ex.submit(fetch_one, c) for c in cert_list])) print(f"DONE in {time.time() - t0:.0f}s: ok={_done['ok']} 404={_done['404']} err={_done['err']}") print(f"cached JSON files: {len(list(CACHE.glob('????-????-????-????-????.json')))}") if __name__ == "__main__": main()