"""Profile API-path SAP error against RAW API-response characteristics. WHAT THIS IS FOR ---------------- `eval_api_sap_accuracy.py` tells us HOW big the error is; this tells us WHICH raw-API characteristics the error correlates with — so we can find systematic "API-path handling" gaps (a field dropped/mis-mapped on the `from_api_response` → `cert_to_inputs` path) rather than per-cert noise. It deliberately profiles against the RAW JSON (`/tmp/epc_2026_sample/ .json`), NOT the mapped `EpcPropertyData`, so a feature that the mapper silently drops still shows up here as an error-correlated bucket. METHOD ------ 1. Read `/_results.csv` (written by eval) → cert -> signed err. 2. For each computed cert, extract a rich feature set from its raw JSON. 3. For every (feature, value) bucket: n, % within 0.5, mean signed, mean |err|. Rank buckets by "wasted accuracy" = n_outside_0.5 × mean|err| so the biggest systematic levers float to the top. 4. Also dump the worst |err| certs with their full raw feature profile. USAGE ----- PYTHONPATH=/workspaces/model python scripts/profile_api_error.py PYTHONPATH=/workspaces/model python scripts/profile_api_error.py --min-n 12 """ from __future__ import annotations import csv import json import os import statistics as stats import sys from collections import defaultdict from pathlib import Path from typing import Any, Optional CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) def _g(d: dict[str, Any], *path: str) -> Any: """Nested-get; returns None on any missing link.""" cur: Any = d for k in path: if not isinstance(cur, dict): return None cur = cur.get(k) return cur def features(doc: dict[str, Any]) -> dict[str, Any]: """Extract raw-API characteristics worth profiling against. Each value is bucketed verbatim (stringified) so unmapped / unusual codes surface as their own bucket rather than being normalised away.""" h = doc.get("sap_heating") or {} es = doc.get("sap_energy_source") or {} mh_list = h.get("main_heating_details") or [{}] mh = mh_list[0] if mh_list else {} bps = doc.get("sap_building_parts") or [] bp0 = bps[0] if bps else {} pv = es.get("photovoltaic_supply") has_pv = bool(pv.get("pv_arrays")) if isinstance(pv, dict) else bool(pv) showers = h.get("shower_outlets") or [] if isinstance(showers, dict): showers = [showers] shower_types = sorted({ (s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_outlet_type") for s in showers } - {None}) # any building part lodging a non-ground floor_heat_loss floor_codes = sorted({bp.get("floor_heat_loss") for bp in bps} - {None}) roof_codes = sorted({bp.get("roof_construction") for bp in bps} - {None}) return { "dwelling_type": doc.get("dwelling_type"), "property_type": doc.get("property_type"), "built_form": doc.get("built_form"), "age_band": doc.get("construction_age_band"), "mains_gas": es.get("mains_gas"), "meter_type": es.get("meter_type"), "main_heat_cat": mh.get("main_heating_category"), "main_sap_code": mh.get("sap_main_heating_code"), "main_control": mh.get("main_heating_control"), "main_data_source": mh.get("main_heating_data_source"), "has_pcdb_main": mh.get("main_heating_index_number") is not None, "main_fuel": mh.get("main_fuel_type"), "has_secondary": (doc.get("secondary_heating") or {}).get("description") not in (None, "None"), "whc": h.get("water_heating_code"), "water_fuel": h.get("water_heating_fuel"), "has_cylinder": doc.get("has_hot_water_cylinder"), "immersion_type": h.get("immersion_heating_type"), "n_building_parts": len(bps), "floor_codes": ",".join(str(c) for c in floor_codes), "roof_codes": ",".join(str(c) for c in roof_codes), "wall_construction": bp0.get("wall_construction"), "wall_insulation_type": bp0.get("wall_insulation_type"), "roof_insulation_thickness": bp0.get("roof_insulation_thickness"), "has_pv": has_pv, "has_wwhrs": any( (s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_wwhrs") not in (None, 1) for s in showers ), "shower_types": ",".join(str(t) for t in shower_types), "conservatory": doc.get("conservatory_type"), "mech_vent": doc.get("mechanical_ventilation"), "is_flat": doc.get("property_type") == 2, } def main() -> None: min_n = 10 if "--min-n" in sys.argv: min_n = int(sys.argv[sys.argv.index("--min-n") + 1]) results_path = CACHE / "_results.csv" if not results_path.exists(): sys.exit(f"no {results_path}; run eval_api_sap_accuracy.py first") errs: dict[str, float] = {} for r in csv.DictReader(results_path.open()): errs[r["cert"]] = float(r["err"]) # cert -> features rows: list[tuple[str, float, dict[str, Any]]] = [] for cert, err in errs.items(): f = CACHE / f"{cert}.json" if not f.exists(): continue try: doc = json.loads(f.read_text()) except Exception: continue rows.append((cert, err, features(doc))) n_all = len(rows) base_within = sum(1 for _, e, _ in rows if abs(e) < 0.5) / n_all * 100 print(f"profiled {n_all} computed certs | overall within-0.5 = {base_within:.1f}% " f"| mean signed {stats.mean(e for _, e, _ in rows):+.3f} " f"| mean|err| {stats.mean(abs(e) for _, e, _ in rows):.3f}") print("=" * 100) # per-feature bucket analysis feat_names = list(rows[0][2].keys()) bucket_lines: list[tuple[float, str]] = [] for fn in feat_names: groups: dict[str, list[float]] = defaultdict(list) for _, err, feats in rows: groups[str(feats.get(fn))].append(err) for val, es in groups.items(): n = len(es) if n < min_n: continue w05 = sum(1 for e in es if abs(e) < 0.5) within = w05 / n * 100 signed = stats.mean(es) mabs = stats.mean(abs(e) for e in es) n_out = n - w05 waste = n_out * mabs # ranking: how much total error this bucket carries line = (f" {fn:22s}={val:<22.22s} n={n:4d} within0.5={within:4.0f}% " f"signed={signed:+6.2f} mean|err|={mabs:5.2f} [waste={waste:6.0f}]") bucket_lines.append((waste, line)) print("TOP ERROR-CARRYING BUCKETS (ranked by n_outside_0.5 × mean|err|; min-n=" f"{min_n}):") for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:45]: print(line) print("=" * 100) print("MOST BIASED BUCKETS (|mean signed| — systematic over/under-rate, min-n=" f"{min_n}):") biased: list[tuple[float, str]] = [] for fn in feat_names: groups2: dict[str, list[float]] = defaultdict(list) for _, err, feats in rows: groups2[str(feats.get(fn))].append(err) for val, es in groups2.items(): if len(es) < min_n: continue signed = stats.mean(es) biased.append((abs(signed), f" {fn:22s}={val:<22.22s} n={len(es):4d} signed={signed:+6.2f} " f"mean|err|={stats.mean(abs(e) for e in es):5.2f}")) for _, line in sorted(biased, key=lambda x: -x[0])[:25]: print(line) if __name__ == "__main__": main()