feat(diag): per-component cost decomposition for API SAP errors

Mirror of eval_api_sap_accuracy.py that decomposes each cert's SAP error
into per-component energy/cost deltas WITHOUT generating an Elmhurst
worksheet. Calibrates the consumer price from the certs we already get
right (gas £0.0809/kWh n=291, elec £0.2839/kWh n=326 over |SAP err|<0.4),
then for every cert compares our_component_kWh × price to the lodged
heating_cost_current / hot_water_cost_current / lighting_cost_current and
back-calculates a numeric energy target (lodged_cost / price).

Clusters errors by (component × direction). On the 905-cert sample this
reveals heat:high (we over-state heating energy → under-rate SAP) as the
dominant broken cluster: 332 certs, only 36.7% within 0.5. Output CSV at
<cache>/_cost_decomposition.csv.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-06 17:37:05 +00:00
parent 27375d93a4
commit 98f71d2554

View file

@ -0,0 +1,282 @@
"""Decompose each API cert's SAP error into per-component energy/cost deltas.
WHAT THIS IS FOR
----------------
`eval_api_sap_accuracy.py` tells us *which* certs are wrong (SAP err vs lodged
`energy_rating_current`). This script tells us *which component* is wrong and by
how much without generating an Elmhurst worksheet.
THE METHOD (calibrate-then-compare)
-----------------------------------
The API response carries the lodged per-component consumer costs
(`heating_cost_current`, `hot_water_cost_current`, `lighting_cost_current`).
Those use the EPC's *consumer* price basis, not SAP Table-12. So:
1. Calibrate the effective consumer price empirically on the certs we already
get right (|SAP err| < CAL_TOL): for gas heating certs
`gas_price = median(heating_cost / our_heating_kWh)`; for lighting (always
electric) `elec_price = median(lighting_cost / our_lighting_kWh)`.
2. For every cert: `predicted_cost = our_component_kWh × calibrated_price`.
`delta = predicted - lodged`. The component with the biggest |delta| is the
broken one; the sign gives the direction (predicted > lodged => we
over-estimate that component's energy => we under-rate SAP).
`back_calc_kWh = lodged_cost / price` is a numeric energy target to fix to.
3. Accuracy is ~+-10% good for component triage + fix targets, NOT 1e-4.
OUTPUT
------
- Calibrated prices + how many certs fed each calibration.
- Cluster table: (component x direction) counts + mean |SAP err| + mean delta£.
- The worst certs per cluster (to pick the next slice).
- A full per-cert CSV at <cache>/_cost_decomposition.csv.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/decompose_api_cost_error.py
Reads the same cache as `eval_api_sap_accuracy.py` (default `/tmp/epc_2026_sample`,
overridable via `EPC_SAMPLE_CACHE`).
"""
import os
import csv
import json
import math
import statistics
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Optional, cast
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.sap10_calculator.calculator import SapResult, calculate_sap_from_inputs
from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs
from domain.sap10_calculator.tables.table_12 import API_FUEL_TO_TABLE_12
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
# Certs feed the price calibration only when they are this accurate already.
CAL_TOL = 0.4
# A cert is flagged "broken on component X" only when |delta£| clears this floor,
# so tiny noise certs land in a "balanced" bucket rather than a spurious cluster.
DELTA_FLOOR_GBP = 40.0
# Table-12 fuel-code groups for assigning a calibrated consumer price.
GAS_CODE = 1 # mains gas
ELEC_CODES = frozenset({30, 31, 32, 33, 34, 35, 38, 40, 41}) # std/off-peak/HP
def _fuel_kind(code: Optional[int]) -> str:
"""Classify a fuel code for pricing: gas / elec / other.
The calculator stores the *raw API* fuel enum (e.g. 26 = mains gas), so
translate through `API_FUEL_TO_TABLE_12` first; Table-12 codes (30+) are
not keys in that map and pass through unchanged.
"""
if code is None:
return "other"
t12 = API_FUEL_TO_TABLE_12.get(code, code)
if t12 == GAS_CODE:
return "gas"
if t12 in ELEC_CODES:
return "elec"
return "other"
def _lodged_cost(doc: dict[str, Any], key: str) -> Optional[float]:
obj: Any = doc.get(key)
if isinstance(obj, dict):
val: Any = cast(dict[str, Any], obj).get("value")
if isinstance(val, (int, float)):
return float(val)
return None
def _heating_kwh(res: SapResult) -> float:
"""Space-heating delivered fuel across main, main-2 and secondary."""
return (
res.main_heating_fuel_kwh_per_yr
+ res.main_2_heating_fuel_kwh_per_yr
+ res.secondary_heating_fuel_kwh_per_yr
)
def main() -> None:
files = sorted(CACHE.glob("????-????-????-????-????.json"))
records: list[dict[str, Any]] = []
cat: Counter[str] = Counter()
for f in files:
cert = f.stem
try:
doc: dict[str, Any] = json.loads(f.read_text())
except Exception:
cat["bad_json"] += 1
continue
lodged_sap = doc.get("energy_rating_current")
if lodged_sap is None:
cat["no_lodged_sap"] += 1
continue
try:
epc = EpcPropertyDataMapper.from_api_response(doc)
except ValueError as e:
cat["unsupported_schema" if "Unsupported EPC schema" in str(e) else "raise"] += 1
continue
except Exception:
cat["raise"] += 1
continue
try:
res = calculate_sap_from_inputs(cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES))
except Exception:
cat["calc_raise"] += 1
continue
if not math.isfinite(res.sap_score_continuous):
cat["non_finite"] += 1
continue
cat["computed"] += 1
records.append({
"cert": cert,
"sap_err": res.sap_score_continuous - lodged_sap,
"heat_kwh": _heating_kwh(res),
"hw_kwh": res.hot_water_kwh_per_yr,
"light_kwh": res.lighting_kwh_per_yr,
"heat_fuel": _fuel_kind(res.main_heating_fuel_code),
"hw_fuel": _fuel_kind(res.hot_water_fuel_code),
"lodged_heat": _lodged_cost(doc, "heating_cost_current"),
"lodged_hw": _lodged_cost(doc, "hot_water_cost_current"),
"lodged_light": _lodged_cost(doc, "lighting_cost_current"),
"mains_gas": _mains_gas(doc),
"roof_construction": _roof_construction(doc),
})
# --- Calibrate consumer prices on the already-accurate certs ------------
gas_samples: list[float] = []
elec_samples: list[float] = []
for r in records:
if abs(r["sap_err"]) >= CAL_TOL:
continue
if r["heat_fuel"] == "gas" and r["lodged_heat"] and r["heat_kwh"] > 0:
gas_samples.append(r["lodged_heat"] / r["heat_kwh"])
if r["lodged_light"] and r["light_kwh"] > 0:
elec_samples.append(r["lodged_light"] / r["light_kwh"])
gas_price = statistics.median(gas_samples) if gas_samples else 0.0809
elec_price = statistics.median(elec_samples) if elec_samples else 0.2839
price_by_kind = {"gas": gas_price, "elec": elec_price, "other": gas_price}
print("=" * 74)
print("CALIBRATED CONSUMER PRICES (median over |SAP err| < %.2f certs)" % CAL_TOL)
print(f" gas £{gas_price:.4f}/kWh (n={len(gas_samples)})")
print(f" elec £{elec_price:.4f}/kWh (n={len(elec_samples)})")
# --- Per-cert component deltas ------------------------------------------
for r in records:
gp = price_by_kind[r["heat_fuel"]]
hwp = price_by_kind[r["hw_fuel"]]
r["pred_heat"] = r["heat_kwh"] * gp
r["pred_hw"] = r["hw_kwh"] * hwp
r["pred_light"] = r["light_kwh"] * elec_price
r["d_heat"] = _delta(r["pred_heat"], r["lodged_heat"])
r["d_hw"] = _delta(r["pred_hw"], r["lodged_hw"])
r["d_light"] = _delta(r["pred_light"], r["lodged_light"])
# back-calculated energy targets (what the lodged cost implies)
r["tgt_heat_kwh"] = (r["lodged_heat"] / gp) if r["lodged_heat"] else None
r["tgt_hw_kwh"] = (r["lodged_hw"] / hwp) if r["lodged_hw"] else None
# dominant broken component
comp, delta = _dominant(r)
r["broken"] = comp
r["broken_delta"] = delta
r["cluster"] = (
"balanced" if comp is None
else f"{comp}:{'high' if delta > 0 else 'low'}"
)
_print_clusters(records)
_write_csv(records)
print("\nCategories:", dict(cat))
print(f"Full per-cert CSV -> {CACHE / '_cost_decomposition.csv'}")
def _mains_gas(doc: dict[str, Any]) -> Any:
es: Any = doc.get("sap_energy_source") or {}
return es.get("mains_gas")
def _roof_construction(doc: dict[str, Any]) -> Optional[int]:
bps: Any = doc.get("sap_building_parts") or []
if bps and isinstance(bps[0], dict):
rc: Any = bps[0].get("roof_construction")
return rc if isinstance(rc, int) else None
return None
def _delta(pred: float, lodged: Optional[float]) -> Optional[float]:
return None if lodged is None else pred - lodged
def _dominant(r: dict[str, Any]) -> tuple[Optional[str], float]:
"""The component with the largest |delta£| above the floor, with its delta."""
candidates = [
("heat", r["d_heat"]),
("hw", r["d_hw"]),
("light", r["d_light"]),
]
scored = [(c, d) for c, d in candidates if d is not None and abs(d) >= DELTA_FLOOR_GBP]
if not scored:
return None, 0.0
comp, delta = max(scored, key=lambda cd: abs(cd[1]))
return comp, delta
def _print_clusters(records: list[dict[str, Any]]) -> None:
by_cluster: dict[str, list[dict[str, Any]]] = defaultdict(list)
for r in records:
by_cluster[r["cluster"]].append(r)
print("=" * 74)
print(f"CLUSTERS by (component x direction) [delta floor £{DELTA_FLOOR_GBP:.0f}]")
print(f" {'cluster':14s} {'n':>4s} {'mean|sapErr|':>12s} {'meanΔ£':>8s} {'within0.5':>9s}")
order = sorted(by_cluster.items(), key=lambda kv: -len(kv[1]))
for name, rs in order:
n = len(rs)
mean_abs = sum(abs(r["sap_err"]) for r in rs) / n
mean_delta = sum(r["broken_delta"] for r in rs) / n
within = 100.0 * sum(1 for r in rs if abs(r["sap_err"]) < 0.5) / n
print(f" {name:14s} {n:>4d} {mean_abs:>12.2f} {mean_delta:>+8.0f} {within:>8.1f}%")
# The fabric/heating clusters are the fix targets — show their worst certs.
for name in ("heat:high", "heat:low"):
rs = by_cluster.get(name, [])
if not rs:
continue
print("-" * 74)
print(f"WORST in {name} (broken_delta = predicted - lodged £):")
print(f" {'cert':22s} {'sapErr':>7s} {'Δ£':>6s} {'ourkWh':>7s} {'tgtkWh':>7s} roof")
worst = sorted(rs, key=lambda r: -abs(r["sap_err"]))[:15]
for r in worst:
tgt = r["tgt_heat_kwh"]
print(f" {r['cert']:22s} {r['sap_err']:+7.2f} {r['broken_delta']:+6.0f} "
f"{r['heat_kwh']:7.0f} {('%7.0f' % tgt) if tgt else ' -'} "
f"{str(r['roof_construction'])}")
def _write_csv(records: list[dict[str, Any]]) -> None:
cols = [
"cert", "cluster", "broken", "broken_delta", "sap_err",
"heat_kwh", "tgt_heat_kwh", "d_heat", "lodged_heat", "pred_heat",
"hw_kwh", "tgt_hw_kwh", "d_hw", "lodged_hw", "pred_hw",
"light_kwh", "d_light", "lodged_light", "pred_light",
"heat_fuel", "hw_fuel", "mains_gas", "roof_construction",
]
with open(CACHE / "_cost_decomposition.csv", "w", newline="") as fh:
w = csv.DictWriter(fh, fieldnames=cols, extrasaction="ignore")
w.writeheader()
for r in sorted(records, key=lambda r: -abs(r["sap_err"])):
w.writerow({k: _fmt(r.get(k)) for k in cols})
def _fmt(v: Any) -> Any:
return round(v, 2) if isinstance(v, float) else v
if __name__ == "__main__":
main()