feat(diag): profile API SAP error against raw-API characteristics

Joins each computed cert's signed error (eval _results.csv) with a rich
feature set extracted from its RAW API JSON (not the mapped
EpcPropertyData), then ranks (feature, value) buckets by error carried
and by |mean signed| bias. Surfaces systematic API-path handling gaps —
a field the mapper silently drops still shows as an error-correlated
bucket. Companion to eval_api_sap_accuracy.py / decompose_api_cost_error.py.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-07 20:38:19 +00:00
parent a8e5563ace
commit 28b1da1e06

View file

@ -0,0 +1,188 @@
"""Profile API-path SAP error against RAW API-response characteristics.
WHAT THIS IS FOR
----------------
`eval_api_sap_accuracy.py` tells us HOW big the error is; this tells us
WHICH raw-API characteristics the error correlates with so we can find
systematic "API-path handling" gaps (a field dropped/mis-mapped on the
`from_api_response` `cert_to_inputs` path) rather than per-cert noise.
It deliberately profiles against the RAW JSON (`/tmp/epc_2026_sample/
<cert>.json`), NOT the mapped `EpcPropertyData`, so a feature that the
mapper silently drops still shows up here as an error-correlated bucket.
METHOD
------
1. Read `<cache>/_results.csv` (written by eval) cert -> signed err.
2. For each computed cert, extract a rich feature set from its raw JSON.
3. For every (feature, value) bucket: n, % within 0.5, mean signed,
mean |err|. Rank buckets by "wasted accuracy" = n_outside_0.5 ×
mean|err| so the biggest systematic levers float to the top.
4. Also dump the worst |err| certs with their full raw feature profile.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py --min-n 12
"""
from __future__ import annotations
import csv
import json
import os
import statistics as stats
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Optional
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
def _g(d: dict[str, Any], *path: str) -> Any:
"""Nested-get; returns None on any missing link."""
cur: Any = d
for k in path:
if not isinstance(cur, dict):
return None
cur = cur.get(k)
return cur
def features(doc: dict[str, Any]) -> dict[str, Any]:
"""Extract raw-API characteristics worth profiling against. Each value
is bucketed verbatim (stringified) so unmapped / unusual codes surface
as their own bucket rather than being normalised away."""
h = doc.get("sap_heating") or {}
es = doc.get("sap_energy_source") or {}
mh_list = h.get("main_heating_details") or [{}]
mh = mh_list[0] if mh_list else {}
bps = doc.get("sap_building_parts") or []
bp0 = bps[0] if bps else {}
pv = es.get("photovoltaic_supply")
has_pv = bool(pv.get("pv_arrays")) if isinstance(pv, dict) else bool(pv)
showers = h.get("shower_outlets") or []
if isinstance(showers, dict):
showers = [showers]
shower_types = sorted({
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_outlet_type")
for s in showers
} - {None})
# any building part lodging a non-ground floor_heat_loss
floor_codes = sorted({bp.get("floor_heat_loss") for bp in bps} - {None})
roof_codes = sorted({bp.get("roof_construction") for bp in bps} - {None})
return {
"dwelling_type": doc.get("dwelling_type"),
"property_type": doc.get("property_type"),
"built_form": doc.get("built_form"),
"age_band": doc.get("construction_age_band"),
"mains_gas": es.get("mains_gas"),
"meter_type": es.get("meter_type"),
"main_heat_cat": mh.get("main_heating_category"),
"main_sap_code": mh.get("sap_main_heating_code"),
"main_control": mh.get("main_heating_control"),
"main_data_source": mh.get("main_heating_data_source"),
"has_pcdb_main": mh.get("main_heating_index_number") is not None,
"main_fuel": mh.get("main_fuel_type"),
"has_secondary": (doc.get("secondary_heating") or {}).get("description") not in (None, "None"),
"whc": h.get("water_heating_code"),
"water_fuel": h.get("water_heating_fuel"),
"has_cylinder": doc.get("has_hot_water_cylinder"),
"immersion_type": h.get("immersion_heating_type"),
"n_building_parts": len(bps),
"floor_codes": ",".join(str(c) for c in floor_codes),
"roof_codes": ",".join(str(c) for c in roof_codes),
"wall_construction": bp0.get("wall_construction"),
"wall_insulation_type": bp0.get("wall_insulation_type"),
"roof_insulation_thickness": bp0.get("roof_insulation_thickness"),
"has_pv": has_pv,
"has_wwhrs": any(
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_wwhrs") not in (None, 1)
for s in showers
),
"shower_types": ",".join(str(t) for t in shower_types),
"conservatory": doc.get("conservatory_type"),
"mech_vent": doc.get("mechanical_ventilation"),
"is_flat": doc.get("property_type") == 2,
}
def main() -> None:
min_n = 10
if "--min-n" in sys.argv:
min_n = int(sys.argv[sys.argv.index("--min-n") + 1])
results_path = CACHE / "_results.csv"
if not results_path.exists():
sys.exit(f"no {results_path}; run eval_api_sap_accuracy.py first")
errs: dict[str, float] = {}
for r in csv.DictReader(results_path.open()):
errs[r["cert"]] = float(r["err"])
# cert -> features
rows: list[tuple[str, float, dict[str, Any]]] = []
for cert, err in errs.items():
f = CACHE / f"{cert}.json"
if not f.exists():
continue
try:
doc = json.loads(f.read_text())
except Exception:
continue
rows.append((cert, err, features(doc)))
n_all = len(rows)
base_within = sum(1 for _, e, _ in rows if abs(e) < 0.5) / n_all * 100
print(f"profiled {n_all} computed certs | overall within-0.5 = {base_within:.1f}% "
f"| mean signed {stats.mean(e for _, e, _ in rows):+.3f} "
f"| mean|err| {stats.mean(abs(e) for _, e, _ in rows):.3f}")
print("=" * 100)
# per-feature bucket analysis
feat_names = list(rows[0][2].keys())
bucket_lines: list[tuple[float, str]] = []
for fn in feat_names:
groups: dict[str, list[float]] = defaultdict(list)
for _, err, feats in rows:
groups[str(feats.get(fn))].append(err)
for val, es in groups.items():
n = len(es)
if n < min_n:
continue
w05 = sum(1 for e in es if abs(e) < 0.5)
within = w05 / n * 100
signed = stats.mean(es)
mabs = stats.mean(abs(e) for e in es)
n_out = n - w05
waste = n_out * mabs # ranking: how much total error this bucket carries
line = (f" {fn:22s}={val:<22.22s} n={n:4d} within0.5={within:4.0f}% "
f"signed={signed:+6.2f} mean|err|={mabs:5.2f} [waste={waste:6.0f}]")
bucket_lines.append((waste, line))
print("TOP ERROR-CARRYING BUCKETS (ranked by n_outside_0.5 × mean|err|; min-n="
f"{min_n}):")
for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:45]:
print(line)
print("=" * 100)
print("MOST BIASED BUCKETS (|mean signed| — systematic over/under-rate, min-n="
f"{min_n}):")
biased: list[tuple[float, str]] = []
for fn in feat_names:
groups2: dict[str, list[float]] = defaultdict(list)
for _, err, feats in rows:
groups2[str(feats.get(fn))].append(err)
for val, es in groups2.items():
if len(es) < min_n:
continue
signed = stats.mean(es)
biased.append((abs(signed),
f" {fn:22s}={val:<22.22s} n={len(es):4d} signed={signed:+6.2f} "
f"mean|err|={stats.mean(abs(e) for e in es):5.2f}"))
for _, line in sorted(biased, key=lambda x: -x[0])[:25]:
print(line)
if __name__ == "__main__":
main()