mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Joins each computed cert's signed error (eval _results.csv) with a rich feature set extracted from its RAW API JSON (not the mapped EpcPropertyData), then ranks (feature, value) buckets by error carried and by |mean signed| bias. Surfaces systematic API-path handling gaps — a field the mapper silently drops still shows as an error-correlated bucket. Companion to eval_api_sap_accuracy.py / decompose_api_cost_error.py. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
188 lines
7.5 KiB
Python
188 lines
7.5 KiB
Python
"""Profile API-path SAP error against RAW API-response characteristics.
|
||
|
||
WHAT THIS IS FOR
|
||
----------------
|
||
`eval_api_sap_accuracy.py` tells us HOW big the error is; this tells us
|
||
WHICH raw-API characteristics the error correlates with — so we can find
|
||
systematic "API-path handling" gaps (a field dropped/mis-mapped on the
|
||
`from_api_response` → `cert_to_inputs` path) rather than per-cert noise.
|
||
|
||
It deliberately profiles against the RAW JSON (`/tmp/epc_2026_sample/
|
||
<cert>.json`), NOT the mapped `EpcPropertyData`, so a feature that the
|
||
mapper silently drops still shows up here as an error-correlated bucket.
|
||
|
||
METHOD
|
||
------
|
||
1. Read `<cache>/_results.csv` (written by eval) → cert -> signed err.
|
||
2. For each computed cert, extract a rich feature set from its raw JSON.
|
||
3. For every (feature, value) bucket: n, % within 0.5, mean signed,
|
||
mean |err|. Rank buckets by "wasted accuracy" = n_outside_0.5 ×
|
||
mean|err| so the biggest systematic levers float to the top.
|
||
4. Also dump the worst |err| certs with their full raw feature profile.
|
||
|
||
USAGE
|
||
-----
|
||
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py
|
||
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py --min-n 12
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import json
|
||
import os
|
||
import statistics as stats
|
||
import sys
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
|
||
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
|
||
|
||
|
||
def _g(d: dict[str, Any], *path: str) -> Any:
|
||
"""Nested-get; returns None on any missing link."""
|
||
cur: Any = d
|
||
for k in path:
|
||
if not isinstance(cur, dict):
|
||
return None
|
||
cur = cur.get(k)
|
||
return cur
|
||
|
||
|
||
def features(doc: dict[str, Any]) -> dict[str, Any]:
|
||
"""Extract raw-API characteristics worth profiling against. Each value
|
||
is bucketed verbatim (stringified) so unmapped / unusual codes surface
|
||
as their own bucket rather than being normalised away."""
|
||
h = doc.get("sap_heating") or {}
|
||
es = doc.get("sap_energy_source") or {}
|
||
mh_list = h.get("main_heating_details") or [{}]
|
||
mh = mh_list[0] if mh_list else {}
|
||
bps = doc.get("sap_building_parts") or []
|
||
bp0 = bps[0] if bps else {}
|
||
pv = es.get("photovoltaic_supply")
|
||
has_pv = bool(pv.get("pv_arrays")) if isinstance(pv, dict) else bool(pv)
|
||
showers = h.get("shower_outlets") or []
|
||
if isinstance(showers, dict):
|
||
showers = [showers]
|
||
shower_types = sorted({
|
||
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_outlet_type")
|
||
for s in showers
|
||
} - {None})
|
||
# any building part lodging a non-ground floor_heat_loss
|
||
floor_codes = sorted({bp.get("floor_heat_loss") for bp in bps} - {None})
|
||
roof_codes = sorted({bp.get("roof_construction") for bp in bps} - {None})
|
||
|
||
return {
|
||
"dwelling_type": doc.get("dwelling_type"),
|
||
"property_type": doc.get("property_type"),
|
||
"built_form": doc.get("built_form"),
|
||
"age_band": doc.get("construction_age_band"),
|
||
"mains_gas": es.get("mains_gas"),
|
||
"meter_type": es.get("meter_type"),
|
||
"main_heat_cat": mh.get("main_heating_category"),
|
||
"main_sap_code": mh.get("sap_main_heating_code"),
|
||
"main_control": mh.get("main_heating_control"),
|
||
"main_data_source": mh.get("main_heating_data_source"),
|
||
"has_pcdb_main": mh.get("main_heating_index_number") is not None,
|
||
"main_fuel": mh.get("main_fuel_type"),
|
||
"has_secondary": (doc.get("secondary_heating") or {}).get("description") not in (None, "None"),
|
||
"whc": h.get("water_heating_code"),
|
||
"water_fuel": h.get("water_heating_fuel"),
|
||
"has_cylinder": doc.get("has_hot_water_cylinder"),
|
||
"immersion_type": h.get("immersion_heating_type"),
|
||
"n_building_parts": len(bps),
|
||
"floor_codes": ",".join(str(c) for c in floor_codes),
|
||
"roof_codes": ",".join(str(c) for c in roof_codes),
|
||
"wall_construction": bp0.get("wall_construction"),
|
||
"wall_insulation_type": bp0.get("wall_insulation_type"),
|
||
"roof_insulation_thickness": bp0.get("roof_insulation_thickness"),
|
||
"has_pv": has_pv,
|
||
"has_wwhrs": any(
|
||
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_wwhrs") not in (None, 1)
|
||
for s in showers
|
||
),
|
||
"shower_types": ",".join(str(t) for t in shower_types),
|
||
"conservatory": doc.get("conservatory_type"),
|
||
"mech_vent": doc.get("mechanical_ventilation"),
|
||
"is_flat": doc.get("property_type") == 2,
|
||
}
|
||
|
||
|
||
def main() -> None:
|
||
min_n = 10
|
||
if "--min-n" in sys.argv:
|
||
min_n = int(sys.argv[sys.argv.index("--min-n") + 1])
|
||
|
||
results_path = CACHE / "_results.csv"
|
||
if not results_path.exists():
|
||
sys.exit(f"no {results_path}; run eval_api_sap_accuracy.py first")
|
||
errs: dict[str, float] = {}
|
||
for r in csv.DictReader(results_path.open()):
|
||
errs[r["cert"]] = float(r["err"])
|
||
|
||
# cert -> features
|
||
rows: list[tuple[str, float, dict[str, Any]]] = []
|
||
for cert, err in errs.items():
|
||
f = CACHE / f"{cert}.json"
|
||
if not f.exists():
|
||
continue
|
||
try:
|
||
doc = json.loads(f.read_text())
|
||
except Exception:
|
||
continue
|
||
rows.append((cert, err, features(doc)))
|
||
|
||
n_all = len(rows)
|
||
base_within = sum(1 for _, e, _ in rows if abs(e) < 0.5) / n_all * 100
|
||
print(f"profiled {n_all} computed certs | overall within-0.5 = {base_within:.1f}% "
|
||
f"| mean signed {stats.mean(e for _, e, _ in rows):+.3f} "
|
||
f"| mean|err| {stats.mean(abs(e) for _, e, _ in rows):.3f}")
|
||
print("=" * 100)
|
||
|
||
# per-feature bucket analysis
|
||
feat_names = list(rows[0][2].keys())
|
||
bucket_lines: list[tuple[float, str]] = []
|
||
for fn in feat_names:
|
||
groups: dict[str, list[float]] = defaultdict(list)
|
||
for _, err, feats in rows:
|
||
groups[str(feats.get(fn))].append(err)
|
||
for val, es in groups.items():
|
||
n = len(es)
|
||
if n < min_n:
|
||
continue
|
||
w05 = sum(1 for e in es if abs(e) < 0.5)
|
||
within = w05 / n * 100
|
||
signed = stats.mean(es)
|
||
mabs = stats.mean(abs(e) for e in es)
|
||
n_out = n - w05
|
||
waste = n_out * mabs # ranking: how much total error this bucket carries
|
||
line = (f" {fn:22s}={val:<22.22s} n={n:4d} within0.5={within:4.0f}% "
|
||
f"signed={signed:+6.2f} mean|err|={mabs:5.2f} [waste={waste:6.0f}]")
|
||
bucket_lines.append((waste, line))
|
||
|
||
print("TOP ERROR-CARRYING BUCKETS (ranked by n_outside_0.5 × mean|err|; min-n="
|
||
f"{min_n}):")
|
||
for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:45]:
|
||
print(line)
|
||
|
||
print("=" * 100)
|
||
print("MOST BIASED BUCKETS (|mean signed| — systematic over/under-rate, min-n="
|
||
f"{min_n}):")
|
||
biased: list[tuple[float, str]] = []
|
||
for fn in feat_names:
|
||
groups2: dict[str, list[float]] = defaultdict(list)
|
||
for _, err, feats in rows:
|
||
groups2[str(feats.get(fn))].append(err)
|
||
for val, es in groups2.items():
|
||
if len(es) < min_n:
|
||
continue
|
||
signed = stats.mean(es)
|
||
biased.append((abs(signed),
|
||
f" {fn:22s}={val:<22.22s} n={len(es):4d} signed={signed:+6.2f} "
|
||
f"mean|err|={stats.mean(abs(e) for e in es):5.2f}"))
|
||
for _, line in sorted(biased, key=lambda x: -x[0])[:25]:
|
||
print(line)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|