From 3b442f96067326ae2ccea80ac67d29ecbd80f9f0 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 4 Jun 2026 16:52:09 +0000 Subject: [PATCH] scripts: promote the API SAP-accuracy toolkit from /tmp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three reusable scripts (each with a purpose/usage docstring) for wide-scale testing of the calculator's API front-end against the GOV.UK EPB register — the toolkit behind the 1000-cert study (docs/HANDOVER_API_SAMPLE_ACCURACY.md): fetch_2026_epc_sample.py — sample cert numbers across a date window (random pages) + download full schema-21 JSON to a cache; resumable, 429/5xx backoff. eval_api_sap_accuracy.py — % within 0.5 SAP, error histogram, worst-40, and the mapper/calculator raise breakdown. analyse_api_sap_clusters.py — error grouped by property + heating type to locate clusters (electric heating, flats, PV). Cache dir defaults to /tmp/epc_2026_sample, overridable via EPC_SAMPLE_CACHE. Co-Authored-By: Claude Opus 4.8 --- scripts/analyse_api_sap_clusters.py | 102 +++++++++++++++++ scripts/eval_api_sap_accuracy.py | 169 ++++++++++++++++++++++++++++ scripts/fetch_2026_epc_sample.py | 145 ++++++++++++++++++++++++ 3 files changed, 416 insertions(+) create mode 100644 scripts/analyse_api_sap_clusters.py create mode 100644 scripts/eval_api_sap_accuracy.py create mode 100644 scripts/fetch_2026_epc_sample.py diff --git a/scripts/analyse_api_sap_clusters.py b/scripts/analyse_api_sap_clusters.py new file mode 100644 index 00000000..ab8f0516 --- /dev/null +++ b/scripts/analyse_api_sap_clusters.py @@ -0,0 +1,102 @@ +"""Group API-path SAP error by property + heating type to find clusters. + +WHAT THIS IS FOR +---------------- +The headline number from `eval_api_sap_accuracy.py` tells you HOW accurate the +API path is; this tells you WHERE the error lives so you can prioritise. It +buckets the cached sample's per-cert SAP error (continuous vs lodged) by: + - property type (house / flat / bungalow / maisonette / park home), + - real PV presence, + - heating identity (main_heating_category + whether a PCDB index is lodged), +and prints n / mean|err| / %<0.5 per group, plus red flags (negative or +extreme-low SAP). The load-bearing cut is heating: e.g. electric storage +heaters (cat 7) and room heaters (cat 10) are the worst clusters, which points +the next worksheet-backed fix at those systems. + +USAGE +----- + PYTHONPATH=/workspaces/model python scripts/analyse_api_sap_clusters.py + +Reads the cache written by `fetch_2026_epc_sample.py` (default +`/tmp/epc_2026_sample`, overridable via `EPC_SAMPLE_CACHE`). +""" +import os +import json +import math +from collections import defaultdict +from pathlib import Path + +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from domain.sap10_calculator.calculator import calculate_sap_from_inputs +from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs + +CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) +PROP = {"0": "House", "1": "Bungalow", "2": "Flat", "3": "Maisonette", "4": "Park home"} + + +def real_pv(doc): + """True only for a genuine PV array — `none_or_no_details` / 0% is not PV.""" + es = doc.get("sap_energy_source", {}) or {} + pv = es.get("photovoltaic_supply") + if not isinstance(pv, dict): + return False + if set(pv.keys()) <= {"none_or_no_details"}: + nod = pv.get("none_or_no_details") or {} + return bool(nod.get("percent_roof_area")) + return True + + +def heat_identity(doc): + h = doc.get("sap_heating", {}) or {} + mh = (h.get("main_heating_details") or [{}]) + m0 = mh[0] if mh else {} + return m0.get("main_heating_index_number"), m0.get("main_heating_category") + + +def main(): + rows = [] + for f in sorted(CACHE.glob("????-????-????-????-????.json")): + doc = json.loads(f.read_text()) + lodged = doc.get("energy_rating_current") + try: + epc = EpcPropertyDataMapper.from_api_response(doc) + cont = calculate_sap_from_inputs( + cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES) + ).sap_score_continuous + except Exception: + continue + if lodged is None or not math.isfinite(cont): + continue + idx, cat = heat_identity(doc) + rows.append(dict( + cert=f.stem, ae=abs(cont - lodged), cont=cont, lodged=lodged, + prop=PROP.get(str(doc.get("property_type")), str(doc.get("property_type"))), + pv=real_pv(doc), idx=idx, cat=cat, + neg=(cont < 0), low_lodged=(lodged <= 20), + )) + n = len(rows) + + def grp(keyfn, label): + g = defaultdict(list) + for r in rows: + g[keyfn(r)].append(r["ae"]) + print(f"\n-- mean|err| by {label} (n, mean|err|, %<0.5) --") + for k, v in sorted(g.items(), key=lambda kv: -sum(kv[1]) / len(kv[1])): + if len(v) < 5: + continue + p = 100 * sum(1 for x in v if x < 0.5) / len(v) + print(f" {str(k):28s} n={len(v):4d} mean={sum(v) / len(v):6.2f} <0.5={p:4.1f}%") + + print(f"computed n={n}") + grp(lambda r: r["prop"], "property type") + grp(lambda r: "PV" if r["pv"] else "no-PV", "real PV presence") + grp(lambda r: f"cat={r['cat']},idx={'Y' if r['idx'] else '-'}", "heating identity") + + neg = [r for r in rows if r["neg"]] + loww = [r for r in rows if r["low_lodged"]] + print(f"\nRED FLAGS: negative continuous SAP: {len(neg)} | lodged<=20 (extreme): {len(loww)}") + print(" negative-SAP certs:", [r["cert"] for r in neg][:15]) + + +if __name__ == "__main__": + main() diff --git a/scripts/eval_api_sap_accuracy.py b/scripts/eval_api_sap_accuracy.py new file mode 100644 index 00000000..7f1dd86d --- /dev/null +++ b/scripts/eval_api_sap_accuracy.py @@ -0,0 +1,169 @@ +"""Score the SAP10 calculator's API path against a cached EPC sample. + +WHAT THIS IS FOR +---------------- +Measures how well the API front-end (`from_api_response` → `cert_to_inputs` +→ continuous SAP) reproduces each cert's lodged rounded SAP +(`energy_rating_current`) across the sample built by +`fetch_2026_epc_sample.py`. This is the headline accuracy gauge for raw-API +behaviour on an unbiased population. + +Each cert lands in one bucket: + - computed — ran end-to-end; SAP error recorded. + - unsupported_schema — pre-21 schema the mapper doesn't support (skip). + - raise: — mapper raised (UnmappedApiCode etc.) — a gap to fix. + - calc_raise: — calculator raised (UnmappedSapCode etc.) — a gap. + +OUTPUT +------ + - Category counts + the raise breakdown with example certs (what to fix). + - For computed certs: % within 0.5 / 1 / 2 / 5 SAP, median/mean/p90/p99/max + |err|, the signed mean (over- vs under-rating), abs-err histogram. + - The 40 worst offenders with diagnostic columns (to prioritise). + - A full per-cert CSV at /_results.csv for ad-hoc slicing. + +USAGE +----- + PYTHONPATH=/workspaces/model python scripts/eval_api_sap_accuracy.py + +Reads the cache written by `fetch_2026_epc_sample.py` (default +`/tmp/epc_2026_sample`, overridable via `EPC_SAMPLE_CACHE`). +""" +import os +import json +import csv +import math +from collections import Counter, defaultdict +from pathlib import Path + +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from domain.sap10_calculator.calculator import calculate_sap_from_inputs +from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs + +CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) + + +def diag(doc): + """A few raw-JSON fields that help explain a cert's error at a glance.""" + es = doc.get("sap_energy_source", {}) or {} + h = doc.get("sap_heating", {}) or {} + mh = (h.get("main_heating_details") or [{}]) + mh0 = mh[0] if mh else {} + pv = es.get("photovoltaic_supply") + return { + "schema": doc.get("schema_type"), + "prop_type": doc.get("property_type"), + "built_form": doc.get("built_form"), + "age_band": doc.get("construction_age_band"), + "mains_gas": es.get("mains_gas"), + "main_heat_cat": mh0.get("main_heating_category"), + "main_heat_idx": mh0.get("main_heating_index_number"), + "n_bps": len(doc.get("sap_building_parts") or []), + "lodged_band": doc.get("current_energy_efficiency_band"), + } + + +def main(): + files = sorted(CACHE.glob("????-????-????-????-????.json")) + rows = [] + cat = Counter() + exc_examples = defaultdict(list) + for f in files: + cert = f.stem + try: + doc = json.loads(f.read_text()) + except Exception: + cat["bad_json"] += 1 + continue + lodged = doc.get("energy_rating_current") + try: + epc = EpcPropertyDataMapper.from_api_response(doc) + except ValueError as e: + if "Unsupported EPC schema" in str(e): + cat["unsupported_schema"] += 1 + else: + cat["raise:ValueError"] += 1 + exc_examples["ValueError:" + str(e)[:60]].append(cert) + continue + except Exception as e: + ename = type(e).__name__ + cat[f"raise:{ename}"] += 1 + exc_examples[f"{ename}:{str(e)[:60]}"].append(cert) + continue + if lodged is None: + cat["no_lodged_sap"] += 1 + continue + try: + cont = calculate_sap_from_inputs( + cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES) + ).sap_score_continuous + except Exception as e: + ename = type(e).__name__ + cat[f"calc_raise:{ename}"] += 1 + exc_examples[f"calc:{ename}:{str(e)[:50]}"].append(cert) + continue + if not math.isfinite(cont): + cat["non_finite"] += 1 + continue + err = cont - lodged + cat["computed"] += 1 + rows.append({ + "cert": cert, "our_cont": round(cont, 4), "lodged": lodged, + "err": round(err, 4), "abs_err": round(abs(err), 4), **diag(doc), + }) + + if rows: + keys = list(rows[0].keys()) + with open(CACHE / "_results.csv", "w", newline="") as fh: + w = csv.DictWriter(fh, fieldnames=keys) + w.writeheader() + w.writerows(rows) + + n = len(rows) + print("=" * 70) + print(f"SAMPLE: {len(files)} cached certs | categories:") + for k, v in cat.most_common(): + print(f" {k:28s} {v}") + if n == 0: + return + abs_errs = sorted(r["abs_err"] for r in rows) + + def pct(thr): + return 100.0 * sum(1 for r in rows if r["abs_err"] < thr) / n + + print("=" * 70) + print(f"COMPUTED: {n} certs (continuous SAP vs lodged rounded)") + print(f" % |err| < 0.5 : {pct(0.5):.1f}% <-- headline") + print(f" % |err| < 1.0 : {pct(1.0):.1f}%") + print(f" % |err| < 2.0 : {pct(2.0):.1f}%") + print(f" % |err| < 5.0 : {pct(5.0):.1f}%") + print(f" median |err| : {abs_errs[n // 2]:.3f}") + print(f" mean |err| : {sum(abs_errs) / n:.3f}") + print(f" p90 |err| : {abs_errs[int(n * 0.90)]:.3f}") + print(f" p99 |err| : {abs_errs[int(n * 0.99)]:.3f}") + print(f" max |err| : {abs_errs[-1]:.3f}") + signed = [r["err"] for r in rows] + print(f" mean signed err: {sum(signed) / n:+.3f} (we - lodged; +ve = we over-rate)") + print(" abs-err buckets:") + for lo, hi in [(0, 0.5), (0.5, 1), (1, 2), (2, 5), (5, 10), (10, 1e9)]: + c = sum(1 for r in rows if lo <= r["abs_err"] < hi) + print(f" [{lo:>4}, {hi:>4}) : {c:4d} ({100 * c / n:4.1f}%)") + print("=" * 70) + print("TOP 40 LARGEST |err| (prioritise these):") + worst = sorted(rows, key=lambda r: -r["abs_err"])[:40] + print(f" {'cert':22s} {'err':>7s} {'our':>6s} {'lodg':>4s} prop bf age gas cat/idx bps") + for r in worst: + print(f" {r['cert']:22s} {r['err']:+7.2f} {r['our_cont']:6.1f} {r['lodged']:4d} " + f"{str(r['prop_type']):>4s} {str(r['built_form']):>2s} {str(r['age_band'])[:3]:>3s} " + f"{str(r['mains_gas']):>3s} {str(r['main_heat_cat']):>3s}/{str(r['main_heat_idx']):>6s} " + f"{r['n_bps']}") + if exc_examples: + print("=" * 70) + print("RAISE/ERROR EXAMPLES (mapper/calculator gaps — also prioritise):") + for k, v in sorted(exc_examples.items(), key=lambda kv: -len(kv[1]))[:20]: + print(f" [{len(v):3d}] {k} e.g. {v[0]}") + print(f"\nFull per-cert CSV -> {CACHE / '_results.csv'}") + + +if __name__ == "__main__": + main() diff --git a/scripts/fetch_2026_epc_sample.py b/scripts/fetch_2026_epc_sample.py new file mode 100644 index 00000000..7c16ae3e --- /dev/null +++ b/scripts/fetch_2026_epc_sample.py @@ -0,0 +1,145 @@ +"""Fetch a random sample of domestic EPC JSONs from the GOV.UK EPB register. + +WHAT THIS IS FOR +---------------- +Wide-scale accuracy testing of the SAP10 calculator's API front-end against +real-world certificates (not the curated golden cohort, which masks raw-API +behaviour). This script builds the *input corpus*: it samples certificate +numbers uniformly at random across a date window, then downloads each cert's +full schema-21 ``data`` payload (the exact shape +``EpcPropertyDataMapper.from_api_response`` consumes) into a local cache. + +Pair it with: + - ``eval_api_sap_accuracy.py`` — % within 0.5 SAP, worst offenders, raises. + - ``analyse_api_sap_clusters.py`` — error grouped by heating type / property. + +HOW THE SAMPLE IS DRAWN +----------------------- +The register's ``/api/domestic/search`` endpoint is date-windowed and paged +(``date_start``/``date_end``/``current_page``/``page_size``); results are +ordered by registration date, so picking random PAGES across the whole window +gives an unbiased spread over dates, regions and property types. Each chosen +cert number is then resolved to its full JSON via ``/api/certificate``. + +USAGE +----- + PYTHONPATH=/workspaces/model python scripts/fetch_2026_epc_sample.py + +Resumable — re-running skips certs already cached, so it's safe to interrupt. +Token is read from ``backend/.env`` (``OPEN_EPC_API_TOKEN``). NB the register +rejects a ``date_end`` that includes today, so keep the window in the past. + +Tune the constants below (window, page count, target size, seed). The cache +dir defaults to ``/tmp/epc_2026_sample`` and can be overridden with the +``EPC_SAMPLE_CACHE`` env var. +""" +import os +import json +import time +import random +import threading +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +import httpx +from dotenv import load_dotenv + +load_dotenv("backend/.env") +TOKEN = os.environ["OPEN_EPC_API_TOKEN"] +BASE = "https://api.get-energy-performance-data.communities.gov.uk" +H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"} +CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) +CACHE.mkdir(parents=True, exist_ok=True) + +# Sampling window + size. `date_end` must be strictly before today (the +# register rejects "the date cannot include today"). TOTAL_PAGES is the +# `totalPages` the search returns for this window at page_size=100 — re-probe +# it if you change the window (it only needs to be an upper bound for the +# random page draw; out-of-range pages just return fewer rows). +WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"} +TOTAL_PAGES = 7402 +N_PAGES = 14 # random pages to pull → N_PAGES * 100 candidate certs +TARGET = 1200 # cap on how many full JSONs to fetch +random.seed(2026) # reproducible page draw + + +def _get(url, params, timeout=20.0, tries=5): + """GET with retry/backoff on 429 + 5xx (honours Retry-After).""" + r = None + for i in range(tries): + try: + r = httpx.get(url, params=params, headers=H, timeout=timeout) + except httpx.HTTPError: + time.sleep(1.5 * (i + 1)) + continue + if r.status_code == 429 or r.status_code >= 500: + ra = r.headers.get("Retry-After") + time.sleep(float(ra) if ra else 1.5 * (i + 1)) + continue + return r + return r + + +def sample_cert_numbers(): + pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), N_PAGES)) + certs = {} + for p in pages: + r = _get(f"{BASE}/api/domestic/search", {**WINDOW, "current_page": p, "page_size": 100}) + if r is None or not r.is_success: + print(f" search page {p} -> {getattr(r, 'status_code', 'ERR')}") + continue + for row in r.json().get("data", []): + certs[row["certificateNumber"]] = row.get("registrationDate") + print(f" page {p}: cumulative {len(certs)} certs") + return certs + + +_lock = threading.Lock() +_done = {"ok": 0, "404": 0, "err": 0} + + +def fetch_one(cert): + out = CACHE / f"{cert}.json" + if out.exists(): + with _lock: + _done["ok"] += 1 + return + r = _get(f"{BASE}/api/certificate", {"certificate_number": cert}) + if r is not None and r.status_code == 404: + with _lock: + _done["404"] += 1 + return + if r is None or not r.is_success: + with _lock: + _done["err"] += 1 + return + try: + payload = r.json()["data"] + except Exception: + with _lock: + _done["err"] += 1 + return + out.write_text(json.dumps(payload)) + with _lock: + _done["ok"] += 1 + if _done["ok"] % 100 == 0: + print(f" fetched {_done['ok']} (404={_done['404']} err={_done['err']})") + + +def main(): + print("sampling cert numbers...") + certs = sample_cert_numbers() + cert_list = list(certs)[:TARGET] + (CACHE / "_manifest.json").write_text( + json.dumps({"certs": cert_list, "window": WINDOW}, indent=2) + ) + print(f"fetching {len(cert_list)} cert JSONs into {CACHE} ...") + t0 = time.time() + with ThreadPoolExecutor(max_workers=8) as ex: + list(as_completed([ex.submit(fetch_one, c) for c in cert_list])) + print(f"DONE in {time.time() - t0:.0f}s: ok={_done['ok']} 404={_done['404']} err={_done['err']}") + print(f"cached JSON files: {len(list(CACHE.glob('????-????-????-????-????.json')))}") + + +if __name__ == "__main__": + main()