From 98f71d25543292b6a433a85e78637ccc7096ba36 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sat, 6 Jun 2026 17:37:05 +0000 Subject: [PATCH] feat(diag): per-component cost decomposition for API SAP errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror of eval_api_sap_accuracy.py that decomposes each cert's SAP error into per-component energy/cost deltas WITHOUT generating an Elmhurst worksheet. Calibrates the consumer price from the certs we already get right (gas £0.0809/kWh n=291, elec £0.2839/kWh n=326 over |SAP err|<0.4), then for every cert compares our_component_kWh × price to the lodged heating_cost_current / hot_water_cost_current / lighting_cost_current and back-calculates a numeric energy target (lodged_cost / price). Clusters errors by (component × direction). On the 905-cert sample this reveals heat:high (we over-state heating energy → under-rate SAP) as the dominant broken cluster: 332 certs, only 36.7% within 0.5. Output CSV at /_cost_decomposition.csv. Co-Authored-By: Claude Opus 4.8 --- scripts/decompose_api_cost_error.py | 282 ++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 scripts/decompose_api_cost_error.py diff --git a/scripts/decompose_api_cost_error.py b/scripts/decompose_api_cost_error.py new file mode 100644 index 00000000..2cd8d6fa --- /dev/null +++ b/scripts/decompose_api_cost_error.py @@ -0,0 +1,282 @@ +"""Decompose each API cert's SAP error into per-component energy/cost deltas. + +WHAT THIS IS FOR +---------------- +`eval_api_sap_accuracy.py` tells us *which* certs are wrong (SAP err vs lodged +`energy_rating_current`). This script tells us *which component* is wrong and by +how much — without generating an Elmhurst worksheet. + +THE METHOD (calibrate-then-compare) +----------------------------------- +The API response carries the lodged per-component consumer costs +(`heating_cost_current`, `hot_water_cost_current`, `lighting_cost_current`). +Those use the EPC's *consumer* price basis, not SAP Table-12. So: + + 1. Calibrate the effective consumer price empirically on the certs we already + get right (|SAP err| < CAL_TOL): for gas heating certs + `gas_price = median(heating_cost / our_heating_kWh)`; for lighting (always + electric) `elec_price = median(lighting_cost / our_lighting_kWh)`. + 2. For every cert: `predicted_cost = our_component_kWh × calibrated_price`. + `delta = predicted - lodged`. The component with the biggest |delta| is the + broken one; the sign gives the direction (predicted > lodged => we + over-estimate that component's energy => we under-rate SAP). + `back_calc_kWh = lodged_cost / price` is a numeric energy target to fix to. + 3. Accuracy is ~+-10% — good for component triage + fix targets, NOT 1e-4. + +OUTPUT +------ + - Calibrated prices + how many certs fed each calibration. + - Cluster table: (component x direction) counts + mean |SAP err| + mean delta£. + - The worst certs per cluster (to pick the next slice). + - A full per-cert CSV at /_cost_decomposition.csv. + +USAGE +----- + PYTHONPATH=/workspaces/model python scripts/decompose_api_cost_error.py + +Reads the same cache as `eval_api_sap_accuracy.py` (default `/tmp/epc_2026_sample`, +overridable via `EPC_SAMPLE_CACHE`). +""" +import os +import csv +import json +import math +import statistics +from collections import Counter, defaultdict +from pathlib import Path +from typing import Any, Optional, cast + +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from domain.sap10_calculator.calculator import SapResult, calculate_sap_from_inputs +from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs +from domain.sap10_calculator.tables.table_12 import API_FUEL_TO_TABLE_12 + +CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample")) + +# Certs feed the price calibration only when they are this accurate already. +CAL_TOL = 0.4 +# A cert is flagged "broken on component X" only when |delta£| clears this floor, +# so tiny noise certs land in a "balanced" bucket rather than a spurious cluster. +DELTA_FLOOR_GBP = 40.0 + +# Table-12 fuel-code groups for assigning a calibrated consumer price. +GAS_CODE = 1 # mains gas +ELEC_CODES = frozenset({30, 31, 32, 33, 34, 35, 38, 40, 41}) # std/off-peak/HP + + +def _fuel_kind(code: Optional[int]) -> str: + """Classify a fuel code for pricing: gas / elec / other. + + The calculator stores the *raw API* fuel enum (e.g. 26 = mains gas), so + translate through `API_FUEL_TO_TABLE_12` first; Table-12 codes (30+) are + not keys in that map and pass through unchanged. + """ + if code is None: + return "other" + t12 = API_FUEL_TO_TABLE_12.get(code, code) + if t12 == GAS_CODE: + return "gas" + if t12 in ELEC_CODES: + return "elec" + return "other" + + +def _lodged_cost(doc: dict[str, Any], key: str) -> Optional[float]: + obj: Any = doc.get(key) + if isinstance(obj, dict): + val: Any = cast(dict[str, Any], obj).get("value") + if isinstance(val, (int, float)): + return float(val) + return None + + +def _heating_kwh(res: SapResult) -> float: + """Space-heating delivered fuel across main, main-2 and secondary.""" + return ( + res.main_heating_fuel_kwh_per_yr + + res.main_2_heating_fuel_kwh_per_yr + + res.secondary_heating_fuel_kwh_per_yr + ) + + +def main() -> None: + files = sorted(CACHE.glob("????-????-????-????-????.json")) + records: list[dict[str, Any]] = [] + cat: Counter[str] = Counter() + + for f in files: + cert = f.stem + try: + doc: dict[str, Any] = json.loads(f.read_text()) + except Exception: + cat["bad_json"] += 1 + continue + lodged_sap = doc.get("energy_rating_current") + if lodged_sap is None: + cat["no_lodged_sap"] += 1 + continue + try: + epc = EpcPropertyDataMapper.from_api_response(doc) + except ValueError as e: + cat["unsupported_schema" if "Unsupported EPC schema" in str(e) else "raise"] += 1 + continue + except Exception: + cat["raise"] += 1 + continue + try: + res = calculate_sap_from_inputs(cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES)) + except Exception: + cat["calc_raise"] += 1 + continue + if not math.isfinite(res.sap_score_continuous): + cat["non_finite"] += 1 + continue + cat["computed"] += 1 + records.append({ + "cert": cert, + "sap_err": res.sap_score_continuous - lodged_sap, + "heat_kwh": _heating_kwh(res), + "hw_kwh": res.hot_water_kwh_per_yr, + "light_kwh": res.lighting_kwh_per_yr, + "heat_fuel": _fuel_kind(res.main_heating_fuel_code), + "hw_fuel": _fuel_kind(res.hot_water_fuel_code), + "lodged_heat": _lodged_cost(doc, "heating_cost_current"), + "lodged_hw": _lodged_cost(doc, "hot_water_cost_current"), + "lodged_light": _lodged_cost(doc, "lighting_cost_current"), + "mains_gas": _mains_gas(doc), + "roof_construction": _roof_construction(doc), + }) + + # --- Calibrate consumer prices on the already-accurate certs ------------ + gas_samples: list[float] = [] + elec_samples: list[float] = [] + for r in records: + if abs(r["sap_err"]) >= CAL_TOL: + continue + if r["heat_fuel"] == "gas" and r["lodged_heat"] and r["heat_kwh"] > 0: + gas_samples.append(r["lodged_heat"] / r["heat_kwh"]) + if r["lodged_light"] and r["light_kwh"] > 0: + elec_samples.append(r["lodged_light"] / r["light_kwh"]) + + gas_price = statistics.median(gas_samples) if gas_samples else 0.0809 + elec_price = statistics.median(elec_samples) if elec_samples else 0.2839 + price_by_kind = {"gas": gas_price, "elec": elec_price, "other": gas_price} + + print("=" * 74) + print("CALIBRATED CONSUMER PRICES (median over |SAP err| < %.2f certs)" % CAL_TOL) + print(f" gas £{gas_price:.4f}/kWh (n={len(gas_samples)})") + print(f" elec £{elec_price:.4f}/kWh (n={len(elec_samples)})") + + # --- Per-cert component deltas ------------------------------------------ + for r in records: + gp = price_by_kind[r["heat_fuel"]] + hwp = price_by_kind[r["hw_fuel"]] + r["pred_heat"] = r["heat_kwh"] * gp + r["pred_hw"] = r["hw_kwh"] * hwp + r["pred_light"] = r["light_kwh"] * elec_price + r["d_heat"] = _delta(r["pred_heat"], r["lodged_heat"]) + r["d_hw"] = _delta(r["pred_hw"], r["lodged_hw"]) + r["d_light"] = _delta(r["pred_light"], r["lodged_light"]) + # back-calculated energy targets (what the lodged cost implies) + r["tgt_heat_kwh"] = (r["lodged_heat"] / gp) if r["lodged_heat"] else None + r["tgt_hw_kwh"] = (r["lodged_hw"] / hwp) if r["lodged_hw"] else None + # dominant broken component + comp, delta = _dominant(r) + r["broken"] = comp + r["broken_delta"] = delta + r["cluster"] = ( + "balanced" if comp is None + else f"{comp}:{'high' if delta > 0 else 'low'}" + ) + + _print_clusters(records) + _write_csv(records) + + print("\nCategories:", dict(cat)) + print(f"Full per-cert CSV -> {CACHE / '_cost_decomposition.csv'}") + + +def _mains_gas(doc: dict[str, Any]) -> Any: + es: Any = doc.get("sap_energy_source") or {} + return es.get("mains_gas") + + +def _roof_construction(doc: dict[str, Any]) -> Optional[int]: + bps: Any = doc.get("sap_building_parts") or [] + if bps and isinstance(bps[0], dict): + rc: Any = bps[0].get("roof_construction") + return rc if isinstance(rc, int) else None + return None + + +def _delta(pred: float, lodged: Optional[float]) -> Optional[float]: + return None if lodged is None else pred - lodged + + +def _dominant(r: dict[str, Any]) -> tuple[Optional[str], float]: + """The component with the largest |delta£| above the floor, with its delta.""" + candidates = [ + ("heat", r["d_heat"]), + ("hw", r["d_hw"]), + ("light", r["d_light"]), + ] + scored = [(c, d) for c, d in candidates if d is not None and abs(d) >= DELTA_FLOOR_GBP] + if not scored: + return None, 0.0 + comp, delta = max(scored, key=lambda cd: abs(cd[1])) + return comp, delta + + +def _print_clusters(records: list[dict[str, Any]]) -> None: + by_cluster: dict[str, list[dict[str, Any]]] = defaultdict(list) + for r in records: + by_cluster[r["cluster"]].append(r) + + print("=" * 74) + print(f"CLUSTERS by (component x direction) [delta floor £{DELTA_FLOOR_GBP:.0f}]") + print(f" {'cluster':14s} {'n':>4s} {'mean|sapErr|':>12s} {'meanΔ£':>8s} {'within0.5':>9s}") + order = sorted(by_cluster.items(), key=lambda kv: -len(kv[1])) + for name, rs in order: + n = len(rs) + mean_abs = sum(abs(r["sap_err"]) for r in rs) / n + mean_delta = sum(r["broken_delta"] for r in rs) / n + within = 100.0 * sum(1 for r in rs if abs(r["sap_err"]) < 0.5) / n + print(f" {name:14s} {n:>4d} {mean_abs:>12.2f} {mean_delta:>+8.0f} {within:>8.1f}%") + + # The fabric/heating clusters are the fix targets — show their worst certs. + for name in ("heat:high", "heat:low"): + rs = by_cluster.get(name, []) + if not rs: + continue + print("-" * 74) + print(f"WORST in {name} (broken_delta = predicted - lodged £):") + print(f" {'cert':22s} {'sapErr':>7s} {'Δ£':>6s} {'ourkWh':>7s} {'tgtkWh':>7s} roof") + worst = sorted(rs, key=lambda r: -abs(r["sap_err"]))[:15] + for r in worst: + tgt = r["tgt_heat_kwh"] + print(f" {r['cert']:22s} {r['sap_err']:+7.2f} {r['broken_delta']:+6.0f} " + f"{r['heat_kwh']:7.0f} {('%7.0f' % tgt) if tgt else ' -'} " + f"{str(r['roof_construction'])}") + + +def _write_csv(records: list[dict[str, Any]]) -> None: + cols = [ + "cert", "cluster", "broken", "broken_delta", "sap_err", + "heat_kwh", "tgt_heat_kwh", "d_heat", "lodged_heat", "pred_heat", + "hw_kwh", "tgt_hw_kwh", "d_hw", "lodged_hw", "pred_hw", + "light_kwh", "d_light", "lodged_light", "pred_light", + "heat_fuel", "hw_fuel", "mains_gas", "roof_construction", + ] + with open(CACHE / "_cost_decomposition.csv", "w", newline="") as fh: + w = csv.DictWriter(fh, fieldnames=cols, extrasaction="ignore") + w.writeheader() + for r in sorted(records, key=lambda r: -abs(r["sap_err"])): + w.writerow({k: _fmt(r.get(k)) for k in cols}) + + +def _fmt(v: Any) -> Any: + return round(v, 2) if isinstance(v, float) else v + + +if __name__ == "__main__": + main()