"""Decompose each API cert's SAP error into per-component energy/cost deltas. WHAT THIS IS FOR ---------------- `eval_api_sap_accuracy.py` tells us *which* certs are wrong (SAP err vs lodged `energy_rating_current`). This script tells us *which component* is wrong and by how much — without generating an Elmhurst worksheet. THE METHOD (calibrate-then-compare) ----------------------------------- The API response carries the lodged per-component consumer costs (`heating_cost_current`, `hot_water_cost_current`, `lighting_cost_current`). Those use the EPC's *consumer* price basis, not SAP Table-12. So: 1. Calibrate the effective consumer price empirically on the certs we already get right (|SAP err| < CAL_TOL): for gas heating certs `gas_price = median(heating_cost / our_heating_kWh)`; for lighting (always electric) `elec_price = median(lighting_cost / our_lighting_kWh)`. 2. For every cert: `predicted_cost = our_component_kWh × calibrated_price`. `delta = predicted - lodged`. The component with the biggest |delta| is the broken one; the sign gives the direction (predicted > lodged => we over-estimate that component's energy => we under-rate SAP). `back_calc_kWh = lodged_cost / price` is a numeric energy target to fix to. 3. Accuracy is ~+-10% — good for component triage + fix targets, NOT 1e-4. OUTPUT ------ - Calibrated prices + how many certs fed each calibration. - Cluster table: (component x direction) counts + mean |SAP err| + mean delta£. - The worst certs per cluster (to pick the next slice). - A full per-cert CSV at /_cost_decomposition.csv. USAGE ----- PYTHONPATH=/workspaces/model python scripts/decompose_api_cost_error.py Reads the same cache as `eval_api_sap_accuracy.py` (default `/tmp/epc_2026_sample`, overridable via `EPC_SAMPLE_CACHE`). """ import os import csv import json import math import statistics from collections import Counter, defaultdict from pathlib import Path from typing import Any, Optional, cast from datatypes.epc.domain.mapper import EpcPropertyDataMapper from domain.sap10_calculator.calculator import SapResult, calculate_sap_from_inputs from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs from domain.sap10_calculator.tables.table_12 import API_FUEL_TO_TABLE_12 CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) # Certs feed the price calibration only when they are this accurate already. CAL_TOL = 0.4 # A cert is flagged "broken on component X" only when |delta£| clears this floor, # so tiny noise certs land in a "balanced" bucket rather than a spurious cluster. DELTA_FLOOR_GBP = 40.0 # Table-12 fuel-code groups for assigning a calibrated consumer price. GAS_CODE = 1 # mains gas ELEC_CODES = frozenset({30, 31, 32, 33, 34, 35, 38, 40, 41}) # std/off-peak/HP def _fuel_kind(code: Optional[int]) -> str: """Classify a fuel code for pricing: gas / elec / other. The calculator stores the *raw API* fuel enum (e.g. 26 = mains gas), so translate through `API_FUEL_TO_TABLE_12` first; Table-12 codes (30+) are not keys in that map and pass through unchanged. """ if code is None: return "other" t12 = API_FUEL_TO_TABLE_12.get(code, code) if t12 == GAS_CODE: return "gas" if t12 in ELEC_CODES: return "elec" return "other" def _lodged_cost(doc: dict[str, Any], key: str) -> Optional[float]: obj: Any = doc.get(key) if isinstance(obj, dict): val: Any = cast(dict[str, Any], obj).get("value") if isinstance(val, (int, float)): return float(val) return None def _heating_kwh(res: SapResult) -> float: """Space-heating delivered fuel across main, main-2 and secondary.""" return ( res.main_heating_fuel_kwh_per_yr + res.main_2_heating_fuel_kwh_per_yr + res.secondary_heating_fuel_kwh_per_yr ) def main() -> None: files = sorted(CACHE.glob("????-????-????-????-????.json")) records: list[dict[str, Any]] = [] cat: Counter[str] = Counter() for f in files: cert = f.stem try: doc: dict[str, Any] = json.loads(f.read_text()) except Exception: cat["bad_json"] += 1 continue lodged_sap = doc.get("energy_rating_current") if lodged_sap is None: cat["no_lodged_sap"] += 1 continue try: epc = EpcPropertyDataMapper.from_api_response(doc) except ValueError as e: cat["unsupported_schema" if "Unsupported EPC schema" in str(e) else "raise"] += 1 continue except Exception: cat["raise"] += 1 continue try: res = calculate_sap_from_inputs(cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES)) except Exception: cat["calc_raise"] += 1 continue if not math.isfinite(res.sap_score_continuous): cat["non_finite"] += 1 continue cat["computed"] += 1 records.append({ "cert": cert, "sap_err": res.sap_score_continuous - lodged_sap, "heat_kwh": _heating_kwh(res), "hw_kwh": res.hot_water_kwh_per_yr, "light_kwh": res.lighting_kwh_per_yr, "heat_fuel": _fuel_kind(res.main_heating_fuel_code), "hw_fuel": _fuel_kind(res.hot_water_fuel_code), "lodged_heat": _lodged_cost(doc, "heating_cost_current"), "lodged_hw": _lodged_cost(doc, "hot_water_cost_current"), "lodged_light": _lodged_cost(doc, "lighting_cost_current"), "mains_gas": _mains_gas(doc), "roof_construction": _roof_construction(doc), }) # --- Calibrate consumer prices on the already-accurate certs ------------ gas_samples: list[float] = [] elec_samples: list[float] = [] for r in records: if abs(r["sap_err"]) >= CAL_TOL: continue if r["heat_fuel"] == "gas" and r["lodged_heat"] and r["heat_kwh"] > 0: gas_samples.append(r["lodged_heat"] / r["heat_kwh"]) if r["lodged_light"] and r["light_kwh"] > 0: elec_samples.append(r["lodged_light"] / r["light_kwh"]) gas_price = statistics.median(gas_samples) if gas_samples else 0.0809 elec_price = statistics.median(elec_samples) if elec_samples else 0.2839 price_by_kind = {"gas": gas_price, "elec": elec_price, "other": gas_price} print("=" * 74) print("CALIBRATED CONSUMER PRICES (median over |SAP err| < %.2f certs)" % CAL_TOL) print(f" gas £{gas_price:.4f}/kWh (n={len(gas_samples)})") print(f" elec £{elec_price:.4f}/kWh (n={len(elec_samples)})") # --- Per-cert component deltas ------------------------------------------ for r in records: gp = price_by_kind[r["heat_fuel"]] hwp = price_by_kind[r["hw_fuel"]] r["pred_heat"] = r["heat_kwh"] * gp r["pred_hw"] = r["hw_kwh"] * hwp r["pred_light"] = r["light_kwh"] * elec_price r["d_heat"] = _delta(r["pred_heat"], r["lodged_heat"]) r["d_hw"] = _delta(r["pred_hw"], r["lodged_hw"]) r["d_light"] = _delta(r["pred_light"], r["lodged_light"]) # back-calculated energy targets (what the lodged cost implies) r["tgt_heat_kwh"] = (r["lodged_heat"] / gp) if r["lodged_heat"] else None r["tgt_hw_kwh"] = (r["lodged_hw"] / hwp) if r["lodged_hw"] else None # dominant broken component comp, delta = _dominant(r) r["broken"] = comp r["broken_delta"] = delta r["cluster"] = ( "balanced" if comp is None else f"{comp}:{'high' if delta > 0 else 'low'}" ) _print_clusters(records) _write_csv(records) print("\nCategories:", dict(cat)) print(f"Full per-cert CSV -> {CACHE / '_cost_decomposition.csv'}") def _mains_gas(doc: dict[str, Any]) -> Any: es: Any = doc.get("sap_energy_source") or {} return es.get("mains_gas") def _roof_construction(doc: dict[str, Any]) -> Optional[int]: bps: Any = doc.get("sap_building_parts") or [] if bps and isinstance(bps[0], dict): rc: Any = bps[0].get("roof_construction") return rc if isinstance(rc, int) else None return None def _delta(pred: float, lodged: Optional[float]) -> Optional[float]: return None if lodged is None else pred - lodged def _dominant(r: dict[str, Any]) -> tuple[Optional[str], float]: """The component with the largest |delta£| above the floor, with its delta.""" candidates = [ ("heat", r["d_heat"]), ("hw", r["d_hw"]), ("light", r["d_light"]), ] scored = [(c, d) for c, d in candidates if d is not None and abs(d) >= DELTA_FLOOR_GBP] if not scored: return None, 0.0 comp, delta = max(scored, key=lambda cd: abs(cd[1])) return comp, delta def _print_clusters(records: list[dict[str, Any]]) -> None: by_cluster: dict[str, list[dict[str, Any]]] = defaultdict(list) for r in records: by_cluster[r["cluster"]].append(r) print("=" * 74) print(f"CLUSTERS by (component x direction) [delta floor £{DELTA_FLOOR_GBP:.0f}]") print(f" {'cluster':14s} {'n':>4s} {'mean|sapErr|':>12s} {'meanΔ£':>8s} {'within0.5':>9s}") order = sorted(by_cluster.items(), key=lambda kv: -len(kv[1])) for name, rs in order: n = len(rs) mean_abs = sum(abs(r["sap_err"]) for r in rs) / n mean_delta = sum(r["broken_delta"] for r in rs) / n within = 100.0 * sum(1 for r in rs if abs(r["sap_err"]) < 0.5) / n print(f" {name:14s} {n:>4d} {mean_abs:>12.2f} {mean_delta:>+8.0f} {within:>8.1f}%") # The fabric/heating clusters are the fix targets — show their worst certs. for name in ("heat:high", "heat:low"): rs = by_cluster.get(name, []) if not rs: continue print("-" * 74) print(f"WORST in {name} (broken_delta = predicted - lodged £):") print(f" {'cert':22s} {'sapErr':>7s} {'Δ£':>6s} {'ourkWh':>7s} {'tgtkWh':>7s} roof") worst = sorted(rs, key=lambda r: -abs(r["sap_err"]))[:15] for r in worst: tgt = r["tgt_heat_kwh"] print(f" {r['cert']:22s} {r['sap_err']:+7.2f} {r['broken_delta']:+6.0f} " f"{r['heat_kwh']:7.0f} {('%7.0f' % tgt) if tgt else ' -'} " f"{str(r['roof_construction'])}") def _write_csv(records: list[dict[str, Any]]) -> None: cols = [ "cert", "cluster", "broken", "broken_delta", "sap_err", "heat_kwh", "tgt_heat_kwh", "d_heat", "lodged_heat", "pred_heat", "hw_kwh", "tgt_hw_kwh", "d_hw", "lodged_hw", "pred_hw", "light_kwh", "d_light", "lodged_light", "pred_light", "heat_fuel", "hw_fuel", "mains_gas", "roof_construction", ] with open(CACHE / "_cost_decomposition.csv", "w", newline="") as fh: w = csv.DictWriter(fh, fieldnames=cols, extrasaction="ignore") w.writeheader() for r in sorted(records, key=lambda r: -abs(r["sap_err"])): w.writerow({k: _fmt(r.get(k)) for k in cols}) def _fmt(v: Any) -> Any: return round(v, 2) if isinstance(v, float) else v if __name__ == "__main__": main()