scripts: promote the API SAP-accuracy toolkit from /tmp

Three reusable scripts (each with a purpose/usage docstring) for wide-scale
testing of the calculator's API front-end against the GOV.UK EPB register —
the toolkit behind the 1000-cert study (docs/HANDOVER_API_SAMPLE_ACCURACY.md):

  fetch_2026_epc_sample.py    — sample cert numbers across a date window
                                (random pages) + download full schema-21 JSON
                                to a cache; resumable, 429/5xx backoff.
  eval_api_sap_accuracy.py    — % within 0.5 SAP, error histogram, worst-40,
                                and the mapper/calculator raise breakdown.
  analyse_api_sap_clusters.py — error grouped by property + heating type to
                                locate clusters (electric heating, flats, PV).

Cache dir defaults to /tmp/epc_2026_sample, overridable via EPC_SAMPLE_CACHE.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-04 16:52:09 +00:00
parent 19ed29e13c
commit 3b442f9606
3 changed files with 416 additions and 0 deletions

View file

@ -0,0 +1,102 @@
"""Group API-path SAP error by property + heating type to find clusters.
WHAT THIS IS FOR
----------------
The headline number from `eval_api_sap_accuracy.py` tells you HOW accurate the
API path is; this tells you WHERE the error lives so you can prioritise. It
buckets the cached sample's per-cert SAP error (continuous vs lodged) by:
- property type (house / flat / bungalow / maisonette / park home),
- real PV presence,
- heating identity (main_heating_category + whether a PCDB index is lodged),
and prints n / mean|err| / %<0.5 per group, plus red flags (negative or
extreme-low SAP). The load-bearing cut is heating: e.g. electric storage
heaters (cat 7) and room heaters (cat 10) are the worst clusters, which points
the next worksheet-backed fix at those systems.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/analyse_api_sap_clusters.py
Reads the cache written by `fetch_2026_epc_sample.py` (default
`/tmp/epc_2026_sample`, overridable via `EPC_SAMPLE_CACHE`).
"""
import os
import json
import math
from collections import defaultdict
from pathlib import Path
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.sap10_calculator.calculator import calculate_sap_from_inputs
from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
PROP = {"0": "House", "1": "Bungalow", "2": "Flat", "3": "Maisonette", "4": "Park home"}
def real_pv(doc):
"""True only for a genuine PV array — `none_or_no_details` / 0% is not PV."""
es = doc.get("sap_energy_source", {}) or {}
pv = es.get("photovoltaic_supply")
if not isinstance(pv, dict):
return False
if set(pv.keys()) <= {"none_or_no_details"}:
nod = pv.get("none_or_no_details") or {}
return bool(nod.get("percent_roof_area"))
return True
def heat_identity(doc):
h = doc.get("sap_heating", {}) or {}
mh = (h.get("main_heating_details") or [{}])
m0 = mh[0] if mh else {}
return m0.get("main_heating_index_number"), m0.get("main_heating_category")
def main():
rows = []
for f in sorted(CACHE.glob("????-????-????-????-????.json")):
doc = json.loads(f.read_text())
lodged = doc.get("energy_rating_current")
try:
epc = EpcPropertyDataMapper.from_api_response(doc)
cont = calculate_sap_from_inputs(
cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES)
).sap_score_continuous
except Exception:
continue
if lodged is None or not math.isfinite(cont):
continue
idx, cat = heat_identity(doc)
rows.append(dict(
cert=f.stem, ae=abs(cont - lodged), cont=cont, lodged=lodged,
prop=PROP.get(str(doc.get("property_type")), str(doc.get("property_type"))),
pv=real_pv(doc), idx=idx, cat=cat,
neg=(cont < 0), low_lodged=(lodged <= 20),
))
n = len(rows)
def grp(keyfn, label):
g = defaultdict(list)
for r in rows:
g[keyfn(r)].append(r["ae"])
print(f"\n-- mean|err| by {label} (n, mean|err|, %<0.5) --")
for k, v in sorted(g.items(), key=lambda kv: -sum(kv[1]) / len(kv[1])):
if len(v) < 5:
continue
p = 100 * sum(1 for x in v if x < 0.5) / len(v)
print(f" {str(k):28s} n={len(v):4d} mean={sum(v) / len(v):6.2f} <0.5={p:4.1f}%")
print(f"computed n={n}")
grp(lambda r: r["prop"], "property type")
grp(lambda r: "PV" if r["pv"] else "no-PV", "real PV presence")
grp(lambda r: f"cat={r['cat']},idx={'Y' if r['idx'] else '-'}", "heating identity")
neg = [r for r in rows if r["neg"]]
loww = [r for r in rows if r["low_lodged"]]
print(f"\nRED FLAGS: negative continuous SAP: {len(neg)} | lodged<=20 (extreme): {len(loww)}")
print(" negative-SAP certs:", [r["cert"] for r in neg][:15])
if __name__ == "__main__":
main()

View file

@ -0,0 +1,169 @@
"""Score the SAP10 calculator's API path against a cached EPC sample.
WHAT THIS IS FOR
----------------
Measures how well the API front-end (`from_api_response` `cert_to_inputs`
continuous SAP) reproduces each cert's lodged rounded SAP
(`energy_rating_current`) across the sample built by
`fetch_2026_epc_sample.py`. This is the headline accuracy gauge for raw-API
behaviour on an unbiased population.
Each cert lands in one bucket:
- computed ran end-to-end; SAP error recorded.
- unsupported_schema pre-21 schema the mapper doesn't support (skip).
- raise:<Exc> mapper raised (UnmappedApiCode etc.) a gap to fix.
- calc_raise:<Exc> calculator raised (UnmappedSapCode etc.) a gap.
OUTPUT
------
- Category counts + the raise breakdown with example certs (what to fix).
- For computed certs: % within 0.5 / 1 / 2 / 5 SAP, median/mean/p90/p99/max
|err|, the signed mean (over- vs under-rating), abs-err histogram.
- The 40 worst offenders with diagnostic columns (to prioritise).
- A full per-cert CSV at <cache>/_results.csv for ad-hoc slicing.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/eval_api_sap_accuracy.py
Reads the cache written by `fetch_2026_epc_sample.py` (default
`/tmp/epc_2026_sample`, overridable via `EPC_SAMPLE_CACHE`).
"""
import os
import json
import csv
import math
from collections import Counter, defaultdict
from pathlib import Path
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.sap10_calculator.calculator import calculate_sap_from_inputs
from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
def diag(doc):
"""A few raw-JSON fields that help explain a cert's error at a glance."""
es = doc.get("sap_energy_source", {}) or {}
h = doc.get("sap_heating", {}) or {}
mh = (h.get("main_heating_details") or [{}])
mh0 = mh[0] if mh else {}
pv = es.get("photovoltaic_supply")
return {
"schema": doc.get("schema_type"),
"prop_type": doc.get("property_type"),
"built_form": doc.get("built_form"),
"age_band": doc.get("construction_age_band"),
"mains_gas": es.get("mains_gas"),
"main_heat_cat": mh0.get("main_heating_category"),
"main_heat_idx": mh0.get("main_heating_index_number"),
"n_bps": len(doc.get("sap_building_parts") or []),
"lodged_band": doc.get("current_energy_efficiency_band"),
}
def main():
files = sorted(CACHE.glob("????-????-????-????-????.json"))
rows = []
cat = Counter()
exc_examples = defaultdict(list)
for f in files:
cert = f.stem
try:
doc = json.loads(f.read_text())
except Exception:
cat["bad_json"] += 1
continue
lodged = doc.get("energy_rating_current")
try:
epc = EpcPropertyDataMapper.from_api_response(doc)
except ValueError as e:
if "Unsupported EPC schema" in str(e):
cat["unsupported_schema"] += 1
else:
cat["raise:ValueError"] += 1
exc_examples["ValueError:" + str(e)[:60]].append(cert)
continue
except Exception as e:
ename = type(e).__name__
cat[f"raise:{ename}"] += 1
exc_examples[f"{ename}:{str(e)[:60]}"].append(cert)
continue
if lodged is None:
cat["no_lodged_sap"] += 1
continue
try:
cont = calculate_sap_from_inputs(
cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES)
).sap_score_continuous
except Exception as e:
ename = type(e).__name__
cat[f"calc_raise:{ename}"] += 1
exc_examples[f"calc:{ename}:{str(e)[:50]}"].append(cert)
continue
if not math.isfinite(cont):
cat["non_finite"] += 1
continue
err = cont - lodged
cat["computed"] += 1
rows.append({
"cert": cert, "our_cont": round(cont, 4), "lodged": lodged,
"err": round(err, 4), "abs_err": round(abs(err), 4), **diag(doc),
})
if rows:
keys = list(rows[0].keys())
with open(CACHE / "_results.csv", "w", newline="") as fh:
w = csv.DictWriter(fh, fieldnames=keys)
w.writeheader()
w.writerows(rows)
n = len(rows)
print("=" * 70)
print(f"SAMPLE: {len(files)} cached certs | categories:")
for k, v in cat.most_common():
print(f" {k:28s} {v}")
if n == 0:
return
abs_errs = sorted(r["abs_err"] for r in rows)
def pct(thr):
return 100.0 * sum(1 for r in rows if r["abs_err"] < thr) / n
print("=" * 70)
print(f"COMPUTED: {n} certs (continuous SAP vs lodged rounded)")
print(f" % |err| < 0.5 : {pct(0.5):.1f}% <-- headline")
print(f" % |err| < 1.0 : {pct(1.0):.1f}%")
print(f" % |err| < 2.0 : {pct(2.0):.1f}%")
print(f" % |err| < 5.0 : {pct(5.0):.1f}%")
print(f" median |err| : {abs_errs[n // 2]:.3f}")
print(f" mean |err| : {sum(abs_errs) / n:.3f}")
print(f" p90 |err| : {abs_errs[int(n * 0.90)]:.3f}")
print(f" p99 |err| : {abs_errs[int(n * 0.99)]:.3f}")
print(f" max |err| : {abs_errs[-1]:.3f}")
signed = [r["err"] for r in rows]
print(f" mean signed err: {sum(signed) / n:+.3f} (we - lodged; +ve = we over-rate)")
print(" abs-err buckets:")
for lo, hi in [(0, 0.5), (0.5, 1), (1, 2), (2, 5), (5, 10), (10, 1e9)]:
c = sum(1 for r in rows if lo <= r["abs_err"] < hi)
print(f" [{lo:>4}, {hi:>4}) : {c:4d} ({100 * c / n:4.1f}%)")
print("=" * 70)
print("TOP 40 LARGEST |err| (prioritise these):")
worst = sorted(rows, key=lambda r: -r["abs_err"])[:40]
print(f" {'cert':22s} {'err':>7s} {'our':>6s} {'lodg':>4s} prop bf age gas cat/idx bps")
for r in worst:
print(f" {r['cert']:22s} {r['err']:+7.2f} {r['our_cont']:6.1f} {r['lodged']:4d} "
f"{str(r['prop_type']):>4s} {str(r['built_form']):>2s} {str(r['age_band'])[:3]:>3s} "
f"{str(r['mains_gas']):>3s} {str(r['main_heat_cat']):>3s}/{str(r['main_heat_idx']):>6s} "
f"{r['n_bps']}")
if exc_examples:
print("=" * 70)
print("RAISE/ERROR EXAMPLES (mapper/calculator gaps — also prioritise):")
for k, v in sorted(exc_examples.items(), key=lambda kv: -len(kv[1]))[:20]:
print(f" [{len(v):3d}] {k} e.g. {v[0]}")
print(f"\nFull per-cert CSV -> {CACHE / '_results.csv'}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,145 @@
"""Fetch a random sample of domestic EPC JSONs from the GOV.UK EPB register.
WHAT THIS IS FOR
----------------
Wide-scale accuracy testing of the SAP10 calculator's API front-end against
real-world certificates (not the curated golden cohort, which masks raw-API
behaviour). This script builds the *input corpus*: it samples certificate
numbers uniformly at random across a date window, then downloads each cert's
full schema-21 ``data`` payload (the exact shape
``EpcPropertyDataMapper.from_api_response`` consumes) into a local cache.
Pair it with:
- ``eval_api_sap_accuracy.py`` % within 0.5 SAP, worst offenders, raises.
- ``analyse_api_sap_clusters.py`` error grouped by heating type / property.
HOW THE SAMPLE IS DRAWN
-----------------------
The register's ``/api/domestic/search`` endpoint is date-windowed and paged
(``date_start``/``date_end``/``current_page``/``page_size``); results are
ordered by registration date, so picking random PAGES across the whole window
gives an unbiased spread over dates, regions and property types. Each chosen
cert number is then resolved to its full JSON via ``/api/certificate``.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/fetch_2026_epc_sample.py
Resumable re-running skips certs already cached, so it's safe to interrupt.
Token is read from ``backend/.env`` (``OPEN_EPC_API_TOKEN``). NB the register
rejects a ``date_end`` that includes today, so keep the window in the past.
Tune the constants below (window, page count, target size, seed). The cache
dir defaults to ``/tmp/epc_2026_sample`` and can be overridden with the
``EPC_SAMPLE_CACHE`` env var.
"""
import os
import json
import time
import random
import threading
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
from dotenv import load_dotenv
load_dotenv("backend/.env")
TOKEN = os.environ["OPEN_EPC_API_TOKEN"]
BASE = "https://api.get-energy-performance-data.communities.gov.uk"
H = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"}
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
CACHE.mkdir(parents=True, exist_ok=True)
# Sampling window + size. `date_end` must be strictly before today (the
# register rejects "the date cannot include today"). TOTAL_PAGES is the
# `totalPages` the search returns for this window at page_size=100 — re-probe
# it if you change the window (it only needs to be an upper bound for the
# random page draw; out-of-range pages just return fewer rows).
WINDOW = {"date_start": "2026-01-01", "date_end": "2026-05-31"}
TOTAL_PAGES = 7402
N_PAGES = 14 # random pages to pull → N_PAGES * 100 candidate certs
TARGET = 1200 # cap on how many full JSONs to fetch
random.seed(2026) # reproducible page draw
def _get(url, params, timeout=20.0, tries=5):
"""GET with retry/backoff on 429 + 5xx (honours Retry-After)."""
r = None
for i in range(tries):
try:
r = httpx.get(url, params=params, headers=H, timeout=timeout)
except httpx.HTTPError:
time.sleep(1.5 * (i + 1))
continue
if r.status_code == 429 or r.status_code >= 500:
ra = r.headers.get("Retry-After")
time.sleep(float(ra) if ra else 1.5 * (i + 1))
continue
return r
return r
def sample_cert_numbers():
pages = sorted(random.sample(range(1, TOTAL_PAGES + 1), N_PAGES))
certs = {}
for p in pages:
r = _get(f"{BASE}/api/domestic/search", {**WINDOW, "current_page": p, "page_size": 100})
if r is None or not r.is_success:
print(f" search page {p} -> {getattr(r, 'status_code', 'ERR')}")
continue
for row in r.json().get("data", []):
certs[row["certificateNumber"]] = row.get("registrationDate")
print(f" page {p}: cumulative {len(certs)} certs")
return certs
_lock = threading.Lock()
_done = {"ok": 0, "404": 0, "err": 0}
def fetch_one(cert):
out = CACHE / f"{cert}.json"
if out.exists():
with _lock:
_done["ok"] += 1
return
r = _get(f"{BASE}/api/certificate", {"certificate_number": cert})
if r is not None and r.status_code == 404:
with _lock:
_done["404"] += 1
return
if r is None or not r.is_success:
with _lock:
_done["err"] += 1
return
try:
payload = r.json()["data"]
except Exception:
with _lock:
_done["err"] += 1
return
out.write_text(json.dumps(payload))
with _lock:
_done["ok"] += 1
if _done["ok"] % 100 == 0:
print(f" fetched {_done['ok']} (404={_done['404']} err={_done['err']})")
def main():
print("sampling cert numbers...")
certs = sample_cert_numbers()
cert_list = list(certs)[:TARGET]
(CACHE / "_manifest.json").write_text(
json.dumps({"certs": cert_list, "window": WINDOW}, indent=2)
)
print(f"fetching {len(cert_list)} cert JSONs into {CACHE} ...")
t0 = time.time()
with ThreadPoolExecutor(max_workers=8) as ex:
list(as_completed([ex.submit(fetch_one, c) for c in cert_list]))
print(f"DONE in {time.time() - t0:.0f}s: ok={_done['ok']} 404={_done['404']} err={_done['err']}")
print(f"cached JSON files: {len(list(CACHE.glob('????-????-????-????-????.json')))}")
if __name__ == "__main__":
main()