mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Mirror of eval_api_sap_accuracy.py that decomposes each cert's SAP error into per-component energy/cost deltas WITHOUT generating an Elmhurst worksheet. Calibrates the consumer price from the certs we already get right (gas £0.0809/kWh n=291, elec £0.2839/kWh n=326 over |SAP err|<0.4), then for every cert compares our_component_kWh × price to the lodged heating_cost_current / hot_water_cost_current / lighting_cost_current and back-calculates a numeric energy target (lodged_cost / price). Clusters errors by (component × direction). On the 905-cert sample this reveals heat:high (we over-state heating energy → under-rate SAP) as the dominant broken cluster: 332 certs, only 36.7% within 0.5. Output CSV at <cache>/_cost_decomposition.csv. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
282 lines
11 KiB
Python
282 lines
11 KiB
Python
"""Decompose each API cert's SAP error into per-component energy/cost deltas.
|
||
|
||
WHAT THIS IS FOR
|
||
----------------
|
||
`eval_api_sap_accuracy.py` tells us *which* certs are wrong (SAP err vs lodged
|
||
`energy_rating_current`). This script tells us *which component* is wrong and by
|
||
how much — without generating an Elmhurst worksheet.
|
||
|
||
THE METHOD (calibrate-then-compare)
|
||
-----------------------------------
|
||
The API response carries the lodged per-component consumer costs
|
||
(`heating_cost_current`, `hot_water_cost_current`, `lighting_cost_current`).
|
||
Those use the EPC's *consumer* price basis, not SAP Table-12. So:
|
||
|
||
1. Calibrate the effective consumer price empirically on the certs we already
|
||
get right (|SAP err| < CAL_TOL): for gas heating certs
|
||
`gas_price = median(heating_cost / our_heating_kWh)`; for lighting (always
|
||
electric) `elec_price = median(lighting_cost / our_lighting_kWh)`.
|
||
2. For every cert: `predicted_cost = our_component_kWh × calibrated_price`.
|
||
`delta = predicted - lodged`. The component with the biggest |delta| is the
|
||
broken one; the sign gives the direction (predicted > lodged => we
|
||
over-estimate that component's energy => we under-rate SAP).
|
||
`back_calc_kWh = lodged_cost / price` is a numeric energy target to fix to.
|
||
3. Accuracy is ~+-10% — good for component triage + fix targets, NOT 1e-4.
|
||
|
||
OUTPUT
|
||
------
|
||
- Calibrated prices + how many certs fed each calibration.
|
||
- Cluster table: (component x direction) counts + mean |SAP err| + mean delta£.
|
||
- The worst certs per cluster (to pick the next slice).
|
||
- A full per-cert CSV at <cache>/_cost_decomposition.csv.
|
||
|
||
USAGE
|
||
-----
|
||
PYTHONPATH=/workspaces/model python scripts/decompose_api_cost_error.py
|
||
|
||
Reads the same cache as `eval_api_sap_accuracy.py` (default `/tmp/epc_2026_sample`,
|
||
overridable via `EPC_SAMPLE_CACHE`).
|
||
"""
|
||
import os
|
||
import csv
|
||
import json
|
||
import math
|
||
import statistics
|
||
from collections import Counter, defaultdict
|
||
from pathlib import Path
|
||
from typing import Any, Optional, cast
|
||
|
||
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
|
||
from domain.sap10_calculator.calculator import SapResult, calculate_sap_from_inputs
|
||
from domain.sap10_calculator.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES, cert_to_inputs
|
||
from domain.sap10_calculator.tables.table_12 import API_FUEL_TO_TABLE_12
|
||
|
||
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
|
||
|
||
# Certs feed the price calibration only when they are this accurate already.
|
||
CAL_TOL = 0.4
|
||
# A cert is flagged "broken on component X" only when |delta£| clears this floor,
|
||
# so tiny noise certs land in a "balanced" bucket rather than a spurious cluster.
|
||
DELTA_FLOOR_GBP = 40.0
|
||
|
||
# Table-12 fuel-code groups for assigning a calibrated consumer price.
|
||
GAS_CODE = 1 # mains gas
|
||
ELEC_CODES = frozenset({30, 31, 32, 33, 34, 35, 38, 40, 41}) # std/off-peak/HP
|
||
|
||
|
||
def _fuel_kind(code: Optional[int]) -> str:
|
||
"""Classify a fuel code for pricing: gas / elec / other.
|
||
|
||
The calculator stores the *raw API* fuel enum (e.g. 26 = mains gas), so
|
||
translate through `API_FUEL_TO_TABLE_12` first; Table-12 codes (30+) are
|
||
not keys in that map and pass through unchanged.
|
||
"""
|
||
if code is None:
|
||
return "other"
|
||
t12 = API_FUEL_TO_TABLE_12.get(code, code)
|
||
if t12 == GAS_CODE:
|
||
return "gas"
|
||
if t12 in ELEC_CODES:
|
||
return "elec"
|
||
return "other"
|
||
|
||
|
||
def _lodged_cost(doc: dict[str, Any], key: str) -> Optional[float]:
|
||
obj: Any = doc.get(key)
|
||
if isinstance(obj, dict):
|
||
val: Any = cast(dict[str, Any], obj).get("value")
|
||
if isinstance(val, (int, float)):
|
||
return float(val)
|
||
return None
|
||
|
||
|
||
def _heating_kwh(res: SapResult) -> float:
|
||
"""Space-heating delivered fuel across main, main-2 and secondary."""
|
||
return (
|
||
res.main_heating_fuel_kwh_per_yr
|
||
+ res.main_2_heating_fuel_kwh_per_yr
|
||
+ res.secondary_heating_fuel_kwh_per_yr
|
||
)
|
||
|
||
|
||
def main() -> None:
|
||
files = sorted(CACHE.glob("????-????-????-????-????.json"))
|
||
records: list[dict[str, Any]] = []
|
||
cat: Counter[str] = Counter()
|
||
|
||
for f in files:
|
||
cert = f.stem
|
||
try:
|
||
doc: dict[str, Any] = json.loads(f.read_text())
|
||
except Exception:
|
||
cat["bad_json"] += 1
|
||
continue
|
||
lodged_sap = doc.get("energy_rating_current")
|
||
if lodged_sap is None:
|
||
cat["no_lodged_sap"] += 1
|
||
continue
|
||
try:
|
||
epc = EpcPropertyDataMapper.from_api_response(doc)
|
||
except ValueError as e:
|
||
cat["unsupported_schema" if "Unsupported EPC schema" in str(e) else "raise"] += 1
|
||
continue
|
||
except Exception:
|
||
cat["raise"] += 1
|
||
continue
|
||
try:
|
||
res = calculate_sap_from_inputs(cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES))
|
||
except Exception:
|
||
cat["calc_raise"] += 1
|
||
continue
|
||
if not math.isfinite(res.sap_score_continuous):
|
||
cat["non_finite"] += 1
|
||
continue
|
||
cat["computed"] += 1
|
||
records.append({
|
||
"cert": cert,
|
||
"sap_err": res.sap_score_continuous - lodged_sap,
|
||
"heat_kwh": _heating_kwh(res),
|
||
"hw_kwh": res.hot_water_kwh_per_yr,
|
||
"light_kwh": res.lighting_kwh_per_yr,
|
||
"heat_fuel": _fuel_kind(res.main_heating_fuel_code),
|
||
"hw_fuel": _fuel_kind(res.hot_water_fuel_code),
|
||
"lodged_heat": _lodged_cost(doc, "heating_cost_current"),
|
||
"lodged_hw": _lodged_cost(doc, "hot_water_cost_current"),
|
||
"lodged_light": _lodged_cost(doc, "lighting_cost_current"),
|
||
"mains_gas": _mains_gas(doc),
|
||
"roof_construction": _roof_construction(doc),
|
||
})
|
||
|
||
# --- Calibrate consumer prices on the already-accurate certs ------------
|
||
gas_samples: list[float] = []
|
||
elec_samples: list[float] = []
|
||
for r in records:
|
||
if abs(r["sap_err"]) >= CAL_TOL:
|
||
continue
|
||
if r["heat_fuel"] == "gas" and r["lodged_heat"] and r["heat_kwh"] > 0:
|
||
gas_samples.append(r["lodged_heat"] / r["heat_kwh"])
|
||
if r["lodged_light"] and r["light_kwh"] > 0:
|
||
elec_samples.append(r["lodged_light"] / r["light_kwh"])
|
||
|
||
gas_price = statistics.median(gas_samples) if gas_samples else 0.0809
|
||
elec_price = statistics.median(elec_samples) if elec_samples else 0.2839
|
||
price_by_kind = {"gas": gas_price, "elec": elec_price, "other": gas_price}
|
||
|
||
print("=" * 74)
|
||
print("CALIBRATED CONSUMER PRICES (median over |SAP err| < %.2f certs)" % CAL_TOL)
|
||
print(f" gas £{gas_price:.4f}/kWh (n={len(gas_samples)})")
|
||
print(f" elec £{elec_price:.4f}/kWh (n={len(elec_samples)})")
|
||
|
||
# --- Per-cert component deltas ------------------------------------------
|
||
for r in records:
|
||
gp = price_by_kind[r["heat_fuel"]]
|
||
hwp = price_by_kind[r["hw_fuel"]]
|
||
r["pred_heat"] = r["heat_kwh"] * gp
|
||
r["pred_hw"] = r["hw_kwh"] * hwp
|
||
r["pred_light"] = r["light_kwh"] * elec_price
|
||
r["d_heat"] = _delta(r["pred_heat"], r["lodged_heat"])
|
||
r["d_hw"] = _delta(r["pred_hw"], r["lodged_hw"])
|
||
r["d_light"] = _delta(r["pred_light"], r["lodged_light"])
|
||
# back-calculated energy targets (what the lodged cost implies)
|
||
r["tgt_heat_kwh"] = (r["lodged_heat"] / gp) if r["lodged_heat"] else None
|
||
r["tgt_hw_kwh"] = (r["lodged_hw"] / hwp) if r["lodged_hw"] else None
|
||
# dominant broken component
|
||
comp, delta = _dominant(r)
|
||
r["broken"] = comp
|
||
r["broken_delta"] = delta
|
||
r["cluster"] = (
|
||
"balanced" if comp is None
|
||
else f"{comp}:{'high' if delta > 0 else 'low'}"
|
||
)
|
||
|
||
_print_clusters(records)
|
||
_write_csv(records)
|
||
|
||
print("\nCategories:", dict(cat))
|
||
print(f"Full per-cert CSV -> {CACHE / '_cost_decomposition.csv'}")
|
||
|
||
|
||
def _mains_gas(doc: dict[str, Any]) -> Any:
|
||
es: Any = doc.get("sap_energy_source") or {}
|
||
return es.get("mains_gas")
|
||
|
||
|
||
def _roof_construction(doc: dict[str, Any]) -> Optional[int]:
|
||
bps: Any = doc.get("sap_building_parts") or []
|
||
if bps and isinstance(bps[0], dict):
|
||
rc: Any = bps[0].get("roof_construction")
|
||
return rc if isinstance(rc, int) else None
|
||
return None
|
||
|
||
|
||
def _delta(pred: float, lodged: Optional[float]) -> Optional[float]:
|
||
return None if lodged is None else pred - lodged
|
||
|
||
|
||
def _dominant(r: dict[str, Any]) -> tuple[Optional[str], float]:
|
||
"""The component with the largest |delta£| above the floor, with its delta."""
|
||
candidates = [
|
||
("heat", r["d_heat"]),
|
||
("hw", r["d_hw"]),
|
||
("light", r["d_light"]),
|
||
]
|
||
scored = [(c, d) for c, d in candidates if d is not None and abs(d) >= DELTA_FLOOR_GBP]
|
||
if not scored:
|
||
return None, 0.0
|
||
comp, delta = max(scored, key=lambda cd: abs(cd[1]))
|
||
return comp, delta
|
||
|
||
|
||
def _print_clusters(records: list[dict[str, Any]]) -> None:
|
||
by_cluster: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||
for r in records:
|
||
by_cluster[r["cluster"]].append(r)
|
||
|
||
print("=" * 74)
|
||
print(f"CLUSTERS by (component x direction) [delta floor £{DELTA_FLOOR_GBP:.0f}]")
|
||
print(f" {'cluster':14s} {'n':>4s} {'mean|sapErr|':>12s} {'meanΔ£':>8s} {'within0.5':>9s}")
|
||
order = sorted(by_cluster.items(), key=lambda kv: -len(kv[1]))
|
||
for name, rs in order:
|
||
n = len(rs)
|
||
mean_abs = sum(abs(r["sap_err"]) for r in rs) / n
|
||
mean_delta = sum(r["broken_delta"] for r in rs) / n
|
||
within = 100.0 * sum(1 for r in rs if abs(r["sap_err"]) < 0.5) / n
|
||
print(f" {name:14s} {n:>4d} {mean_abs:>12.2f} {mean_delta:>+8.0f} {within:>8.1f}%")
|
||
|
||
# The fabric/heating clusters are the fix targets — show their worst certs.
|
||
for name in ("heat:high", "heat:low"):
|
||
rs = by_cluster.get(name, [])
|
||
if not rs:
|
||
continue
|
||
print("-" * 74)
|
||
print(f"WORST in {name} (broken_delta = predicted - lodged £):")
|
||
print(f" {'cert':22s} {'sapErr':>7s} {'Δ£':>6s} {'ourkWh':>7s} {'tgtkWh':>7s} roof")
|
||
worst = sorted(rs, key=lambda r: -abs(r["sap_err"]))[:15]
|
||
for r in worst:
|
||
tgt = r["tgt_heat_kwh"]
|
||
print(f" {r['cert']:22s} {r['sap_err']:+7.2f} {r['broken_delta']:+6.0f} "
|
||
f"{r['heat_kwh']:7.0f} {('%7.0f' % tgt) if tgt else ' -'} "
|
||
f"{str(r['roof_construction'])}")
|
||
|
||
|
||
def _write_csv(records: list[dict[str, Any]]) -> None:
|
||
cols = [
|
||
"cert", "cluster", "broken", "broken_delta", "sap_err",
|
||
"heat_kwh", "tgt_heat_kwh", "d_heat", "lodged_heat", "pred_heat",
|
||
"hw_kwh", "tgt_hw_kwh", "d_hw", "lodged_hw", "pred_hw",
|
||
"light_kwh", "d_light", "lodged_light", "pred_light",
|
||
"heat_fuel", "hw_fuel", "mains_gas", "roof_construction",
|
||
]
|
||
with open(CACHE / "_cost_decomposition.csv", "w", newline="") as fh:
|
||
w = csv.DictWriter(fh, fieldnames=cols, extrasaction="ignore")
|
||
w.writeheader()
|
||
for r in sorted(records, key=lambda r: -abs(r["sap_err"])):
|
||
w.writerow({k: _fmt(r.get(k)) for k in cols})
|
||
|
||
|
||
def _fmt(v: Any) -> Any:
|
||
return round(v, 2) if isinstance(v, float) else v
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|