diff --git a/scripts/profile_api_error.py b/scripts/profile_api_error.py new file mode 100644 index 00000000..f071cbd2 --- /dev/null +++ b/scripts/profile_api_error.py @@ -0,0 +1,188 @@ +"""Profile API-path SAP error against RAW API-response characteristics. + +WHAT THIS IS FOR +---------------- +`eval_api_sap_accuracy.py` tells us HOW big the error is; this tells us +WHICH raw-API characteristics the error correlates with — so we can find +systematic "API-path handling" gaps (a field dropped/mis-mapped on the +`from_api_response` → `cert_to_inputs` path) rather than per-cert noise. + +It deliberately profiles against the RAW JSON (`/tmp/epc_2026_sample/ +.json`), NOT the mapped `EpcPropertyData`, so a feature that the +mapper silently drops still shows up here as an error-correlated bucket. + +METHOD +------ +1. Read `/_results.csv` (written by eval) → cert -> signed err. +2. For each computed cert, extract a rich feature set from its raw JSON. +3. For every (feature, value) bucket: n, % within 0.5, mean signed, + mean |err|. Rank buckets by "wasted accuracy" = n_outside_0.5 × + mean|err| so the biggest systematic levers float to the top. +4. Also dump the worst |err| certs with their full raw feature profile. + +USAGE +----- + PYTHONPATH=/workspaces/model python scripts/profile_api_error.py + PYTHONPATH=/workspaces/model python scripts/profile_api_error.py --min-n 12 +""" +from __future__ import annotations + +import csv +import json +import os +import statistics as stats +import sys +from collections import defaultdict +from pathlib import Path +from typing import Any, Optional + +CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) + + +def _g(d: dict[str, Any], *path: str) -> Any: + """Nested-get; returns None on any missing link.""" + cur: Any = d + for k in path: + if not isinstance(cur, dict): + return None + cur = cur.get(k) + return cur + + +def features(doc: dict[str, Any]) -> dict[str, Any]: + """Extract raw-API characteristics worth profiling against. Each value + is bucketed verbatim (stringified) so unmapped / unusual codes surface + as their own bucket rather than being normalised away.""" + h = doc.get("sap_heating") or {} + es = doc.get("sap_energy_source") or {} + mh_list = h.get("main_heating_details") or [{}] + mh = mh_list[0] if mh_list else {} + bps = doc.get("sap_building_parts") or [] + bp0 = bps[0] if bps else {} + pv = es.get("photovoltaic_supply") + has_pv = bool(pv.get("pv_arrays")) if isinstance(pv, dict) else bool(pv) + showers = h.get("shower_outlets") or [] + if isinstance(showers, dict): + showers = [showers] + shower_types = sorted({ + (s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_outlet_type") + for s in showers + } - {None}) + # any building part lodging a non-ground floor_heat_loss + floor_codes = sorted({bp.get("floor_heat_loss") for bp in bps} - {None}) + roof_codes = sorted({bp.get("roof_construction") for bp in bps} - {None}) + + return { + "dwelling_type": doc.get("dwelling_type"), + "property_type": doc.get("property_type"), + "built_form": doc.get("built_form"), + "age_band": doc.get("construction_age_band"), + "mains_gas": es.get("mains_gas"), + "meter_type": es.get("meter_type"), + "main_heat_cat": mh.get("main_heating_category"), + "main_sap_code": mh.get("sap_main_heating_code"), + "main_control": mh.get("main_heating_control"), + "main_data_source": mh.get("main_heating_data_source"), + "has_pcdb_main": mh.get("main_heating_index_number") is not None, + "main_fuel": mh.get("main_fuel_type"), + "has_secondary": (doc.get("secondary_heating") or {}).get("description") not in (None, "None"), + "whc": h.get("water_heating_code"), + "water_fuel": h.get("water_heating_fuel"), + "has_cylinder": doc.get("has_hot_water_cylinder"), + "immersion_type": h.get("immersion_heating_type"), + "n_building_parts": len(bps), + "floor_codes": ",".join(str(c) for c in floor_codes), + "roof_codes": ",".join(str(c) for c in roof_codes), + "wall_construction": bp0.get("wall_construction"), + "wall_insulation_type": bp0.get("wall_insulation_type"), + "roof_insulation_thickness": bp0.get("roof_insulation_thickness"), + "has_pv": has_pv, + "has_wwhrs": any( + (s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_wwhrs") not in (None, 1) + for s in showers + ), + "shower_types": ",".join(str(t) for t in shower_types), + "conservatory": doc.get("conservatory_type"), + "mech_vent": doc.get("mechanical_ventilation"), + "is_flat": doc.get("property_type") == 2, + } + + +def main() -> None: + min_n = 10 + if "--min-n" in sys.argv: + min_n = int(sys.argv[sys.argv.index("--min-n") + 1]) + + results_path = CACHE / "_results.csv" + if not results_path.exists(): + sys.exit(f"no {results_path}; run eval_api_sap_accuracy.py first") + errs: dict[str, float] = {} + for r in csv.DictReader(results_path.open()): + errs[r["cert"]] = float(r["err"]) + + # cert -> features + rows: list[tuple[str, float, dict[str, Any]]] = [] + for cert, err in errs.items(): + f = CACHE / f"{cert}.json" + if not f.exists(): + continue + try: + doc = json.loads(f.read_text()) + except Exception: + continue + rows.append((cert, err, features(doc))) + + n_all = len(rows) + base_within = sum(1 for _, e, _ in rows if abs(e) < 0.5) / n_all * 100 + print(f"profiled {n_all} computed certs | overall within-0.5 = {base_within:.1f}% " + f"| mean signed {stats.mean(e for _, e, _ in rows):+.3f} " + f"| mean|err| {stats.mean(abs(e) for _, e, _ in rows):.3f}") + print("=" * 100) + + # per-feature bucket analysis + feat_names = list(rows[0][2].keys()) + bucket_lines: list[tuple[float, str]] = [] + for fn in feat_names: + groups: dict[str, list[float]] = defaultdict(list) + for _, err, feats in rows: + groups[str(feats.get(fn))].append(err) + for val, es in groups.items(): + n = len(es) + if n < min_n: + continue + w05 = sum(1 for e in es if abs(e) < 0.5) + within = w05 / n * 100 + signed = stats.mean(es) + mabs = stats.mean(abs(e) for e in es) + n_out = n - w05 + waste = n_out * mabs # ranking: how much total error this bucket carries + line = (f" {fn:22s}={val:<22.22s} n={n:4d} within0.5={within:4.0f}% " + f"signed={signed:+6.2f} mean|err|={mabs:5.2f} [waste={waste:6.0f}]") + bucket_lines.append((waste, line)) + + print("TOP ERROR-CARRYING BUCKETS (ranked by n_outside_0.5 × mean|err|; min-n=" + f"{min_n}):") + for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:45]: + print(line) + + print("=" * 100) + print("MOST BIASED BUCKETS (|mean signed| — systematic over/under-rate, min-n=" + f"{min_n}):") + biased: list[tuple[float, str]] = [] + for fn in feat_names: + groups2: dict[str, list[float]] = defaultdict(list) + for _, err, feats in rows: + groups2[str(feats.get(fn))].append(err) + for val, es in groups2.items(): + if len(es) < min_n: + continue + signed = stats.mean(es) + biased.append((abs(signed), + f" {fn:22s}={val:<22.22s} n={len(es):4d} signed={signed:+6.2f} " + f"mean|err|={stats.mean(abs(e) for e in es):5.2f}")) + for _, line in sorted(biased, key=lambda x: -x[0])[:25]: + print(line) + + +if __name__ == "__main__": + main()