Model/scripts/fetch_2026_epc_sample.py
Khalim Conn-Kowlessar 3b442f9606 scripts: promote the API SAP-accuracy toolkit from /tmp
Three reusable scripts (each with a purpose/usage docstring) for wide-scale
testing of the calculator's API front-end against the GOV.UK EPB register —
the toolkit behind the 1000-cert study (docs/HANDOVER_API_SAMPLE_ACCURACY.md):

  fetch_2026_epc_sample.py    — sample cert numbers across a date window
                                (random pages) + download full schema-21 JSON
                                to a cache; resumable, 429/5xx backoff.
  eval_api_sap_accuracy.py    — % within 0.5 SAP, error histogram, worst-40,
                                and the mapper/calculator raise breakdown.
  analyse_api_sap_clusters.py — error grouped by property + heating type to
                                locate clusters (electric heating, flats, PV).

Cache dir defaults to /tmp/epc_2026_sample, overridable via EPC_SAMPLE_CACHE.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 16:52:09 +00:00

145 lines
5.3 KiB
Python

"""Fetch a random sample of domestic EPC JSONs from the GOV.UK EPB register.
WHAT THIS IS FOR
----------------
Wide-scale accuracy testing of the SAP10 calculator's API front-end against
real-world certificates (not the curated golden cohort, which masks raw-API
behaviour). This script builds the *input corpus*: it samples certificate
numbers uniformly at random across a date window, then downloads each cert's
full schema-21 ``data`` payload (the exact shape
``EpcPropertyDataMapper.from_api_response`` consumes) into a local cache.
Pair it with:
- ``eval_api_sap_accuracy.py`` — % within 0.5 SAP, worst offenders, raises.
- ``analyse_api_sap_clusters.py`` — error grouped by heating type / property.
HOW THE SAMPLE IS DRAWN
-----------------------
The register's ``/api/domestic/search`` endpoint is date-windowed and paged
(``date_start``/``date_end``/``current_page``/``page_size``); results are
ordered by registration date, so picking random PAGES across the whole window
gives an unbiased spread over dates, regions and property types. Each chosen
cert number is then resolved to its full JSON via ``/api/certificate``.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/fetch_2026_epc_sample.py
Resumable — re-running skips certs already cached, so it's safe to interrupt.
Token is read from ``backend/.env`` (``OPEN_EPC_API_TOKEN``). NB the register
rejects a ``date_end`` that includes today, so keep the window in the past.
Tune the constants below (window, page count, target size, seed). The cache
dir defaults to ``/tmp/epc_2026_sample`` and can be overridden with the
``EPC_SAMPLE_CACHE`` env var.
"""
import os
import json
import time
import random
import threading
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
from dotenv import load_dotenv
load_dotenv("backend/.env")
TOKEN = os.environ["OPEN_EPC_API_TOKEN"]
BASE = "https://api.get-energy-performance-data.communities.gov.uk"
H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"}
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
CACHE.mkdir(parents=True, exist_ok=True)
# Sampling window + size. `date_end` must be strictly before today (the
# register rejects "the date cannot include today"). TOTAL_PAGES is the
# `totalPages` the search returns for this window at page_size=100 — re-probe
# it if you change the window (it only needs to be an upper bound for the
# random page draw; out-of-range pages just return fewer rows).
WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"}
TOTAL_PAGES = 7402
N_PAGES = 14 # random pages to pull → N_PAGES * 100 candidate certs
TARGET = 1200 # cap on how many full JSONs to fetch
random.seed(2026) # reproducible page draw
def _get(url, params, timeout=20.0, tries=5):
"""GET with retry/backoff on 429 + 5xx (honours Retry-After)."""
r = None
for i in range(tries):
try:
r = httpx.get(url, params=params, headers=H, timeout=timeout)
except httpx.HTTPError:
time.sleep(1.5 * (i + 1))
continue
if r.status_code == 429 or r.status_code >= 500:
ra = r.headers.get("Retry-After")
time.sleep(float(ra) if ra else 1.5 * (i + 1))
continue
return r
return r
def sample_cert_numbers():
pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), N_PAGES))
certs = {}
for p in pages:
r = _get(f"{BASE}/api/domestic/search", {**WINDOW, "current_page": p, "page_size": 100})
if r is None or not r.is_success:
print(f" search page {p} -> {getattr(r, 'status_code', 'ERR')}")
continue
for row in r.json().get("data", []):
certs[row["certificateNumber"]] = row.get("registrationDate")
print(f" page {p}: cumulative {len(certs)} certs")
return certs
_lock = threading.Lock()
_done = {"ok": 0, "404": 0, "err": 0}
def fetch_one(cert):
out = CACHE / f"{cert}.json"
if out.exists():
with _lock:
_done["ok"] += 1
return
r = _get(f"{BASE}/api/certificate", {"certificate_number": cert})
if r is not None and r.status_code == 404:
with _lock:
_done["404"] += 1
return
if r is None or not r.is_success:
with _lock:
_done["err"] += 1
return
try:
payload = r.json()["data"]
except Exception:
with _lock:
_done["err"] += 1
return
out.write_text(json.dumps(payload))
with _lock:
_done["ok"] += 1
if _done["ok"] % 100 == 0:
print(f" fetched {_done['ok']} (404={_done['404']} err={_done['err']})")
def main():
print("sampling cert numbers...")
certs = sample_cert_numbers()
cert_list = list(certs)[:TARGET]
(CACHE / "_manifest.json").write_text(
json.dumps({"certs": cert_list, "window": WINDOW}, indent=2)
)
print(f"fetching {len(cert_list)} cert JSONs into {CACHE} ...")
t0 = time.time()
with ThreadPoolExecutor(max_workers=8) as ex:
list(as_completed([ex.submit(fetch_one, c) for c in cert_list]))
print(f"DONE in {time.time() - t0:.0f}s: ok={_done['ok']} 404={_done['404']} err={_done['err']}")
print(f"cached JSON files: {len(list(CACHE.glob('????-????-????-????-????.json')))}")
if __name__ == "__main__":
main()