Model/scripts/profile_api_error.py
Khalim Conn-Kowlessar 28b1da1e06 feat(diag): profile API SAP error against raw-API characteristics
Joins each computed cert's signed error (eval _results.csv) with a rich
feature set extracted from its RAW API JSON (not the mapped
EpcPropertyData), then ranks (feature, value) buckets by error carried
and by |mean signed| bias. Surfaces systematic API-path handling gaps —
a field the mapper silently drops still shows as an error-correlated
bucket. Companion to eval_api_sap_accuracy.py / decompose_api_cost_error.py.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 20:38:19 +00:00

188 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Profile API-path SAP error against RAW API-response characteristics.
WHAT THIS IS FOR
----------------
`eval_api_sap_accuracy.py` tells us HOW big the error is; this tells us
WHICH raw-API characteristics the error correlates with — so we can find
systematic "API-path handling" gaps (a field dropped/mis-mapped on the
`from_api_response` → `cert_to_inputs` path) rather than per-cert noise.
It deliberately profiles against the RAW JSON (`/tmp/epc_2026_sample/
<cert>.json`), NOT the mapped `EpcPropertyData`, so a feature that the
mapper silently drops still shows up here as an error-correlated bucket.
METHOD
------
1. Read `<cache>/_results.csv` (written by eval) → cert -> signed err.
2. For each computed cert, extract a rich feature set from its raw JSON.
3. For every (feature, value) bucket: n, % within 0.5, mean signed,
mean |err|. Rank buckets by "wasted accuracy" = n_outside_0.5 ×
mean|err| so the biggest systematic levers float to the top.
4. Also dump the worst |err| certs with their full raw feature profile.
USAGE
-----
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py
PYTHONPATH=/workspaces/model python scripts/profile_api_error.py --min-n 12
"""
from __future__ import annotations
import csv
import json
import os
import statistics as stats
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Optional
CACHE = Path(os.environ.get("EPC_SAMPLE_CACHE", "/tmp/epc_2026_sample"))
def _g(d: dict[str, Any], *path: str) -> Any:
"""Nested-get; returns None on any missing link."""
cur: Any = d
for k in path:
if not isinstance(cur, dict):
return None
cur = cur.get(k)
return cur
def features(doc: dict[str, Any]) -> dict[str, Any]:
"""Extract raw-API characteristics worth profiling against. Each value
is bucketed verbatim (stringified) so unmapped / unusual codes surface
as their own bucket rather than being normalised away."""
h = doc.get("sap_heating") or {}
es = doc.get("sap_energy_source") or {}
mh_list = h.get("main_heating_details") or [{}]
mh = mh_list[0] if mh_list else {}
bps = doc.get("sap_building_parts") or []
bp0 = bps[0] if bps else {}
pv = es.get("photovoltaic_supply")
has_pv = bool(pv.get("pv_arrays")) if isinstance(pv, dict) else bool(pv)
showers = h.get("shower_outlets") or []
if isinstance(showers, dict):
showers = [showers]
shower_types = sorted({
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_outlet_type")
for s in showers
} - {None})
# any building part lodging a non-ground floor_heat_loss
floor_codes = sorted({bp.get("floor_heat_loss") for bp in bps} - {None})
roof_codes = sorted({bp.get("roof_construction") for bp in bps} - {None})
return {
"dwelling_type": doc.get("dwelling_type"),
"property_type": doc.get("property_type"),
"built_form": doc.get("built_form"),
"age_band": doc.get("construction_age_band"),
"mains_gas": es.get("mains_gas"),
"meter_type": es.get("meter_type"),
"main_heat_cat": mh.get("main_heating_category"),
"main_sap_code": mh.get("sap_main_heating_code"),
"main_control": mh.get("main_heating_control"),
"main_data_source": mh.get("main_heating_data_source"),
"has_pcdb_main": mh.get("main_heating_index_number") is not None,
"main_fuel": mh.get("main_fuel_type"),
"has_secondary": (doc.get("secondary_heating") or {}).get("description") not in (None, "None"),
"whc": h.get("water_heating_code"),
"water_fuel": h.get("water_heating_fuel"),
"has_cylinder": doc.get("has_hot_water_cylinder"),
"immersion_type": h.get("immersion_heating_type"),
"n_building_parts": len(bps),
"floor_codes": ",".join(str(c) for c in floor_codes),
"roof_codes": ",".join(str(c) for c in roof_codes),
"wall_construction": bp0.get("wall_construction"),
"wall_insulation_type": bp0.get("wall_insulation_type"),
"roof_insulation_thickness": bp0.get("roof_insulation_thickness"),
"has_pv": has_pv,
"has_wwhrs": any(
(s.get("shower_outlet", s) if isinstance(s, dict) else {}).get("shower_wwhrs") not in (None, 1)
for s in showers
),
"shower_types": ",".join(str(t) for t in shower_types),
"conservatory": doc.get("conservatory_type"),
"mech_vent": doc.get("mechanical_ventilation"),
"is_flat": doc.get("property_type") == 2,
}
def main() -> None:
min_n = 10
if "--min-n" in sys.argv:
min_n = int(sys.argv[sys.argv.index("--min-n") + 1])
results_path = CACHE / "_results.csv"
if not results_path.exists():
sys.exit(f"no {results_path}; run eval_api_sap_accuracy.py first")
errs: dict[str, float] = {}
for r in csv.DictReader(results_path.open()):
errs[r["cert"]] = float(r["err"])
# cert -> features
rows: list[tuple[str, float, dict[str, Any]]] = []
for cert, err in errs.items():
f = CACHE / f"{cert}.json"
if not f.exists():
continue
try:
doc = json.loads(f.read_text())
except Exception:
continue
rows.append((cert, err, features(doc)))
n_all = len(rows)
base_within = sum(1 for _, e, _ in rows if abs(e) < 0.5) / n_all * 100
print(f"profiled {n_all} computed certs | overall within-0.5 = {base_within:.1f}% "
f"| mean signed {stats.mean(e for _, e, _ in rows):+.3f} "
f"| mean|err| {stats.mean(abs(e) for _, e, _ in rows):.3f}")
print("=" * 100)
# per-feature bucket analysis
feat_names = list(rows[0][2].keys())
bucket_lines: list[tuple[float, str]] = []
for fn in feat_names:
groups: dict[str, list[float]] = defaultdict(list)
for _, err, feats in rows:
groups[str(feats.get(fn))].append(err)
for val, es in groups.items():
n = len(es)
if n < min_n:
continue
w05 = sum(1 for e in es if abs(e) < 0.5)
within = w05 / n * 100
signed = stats.mean(es)
mabs = stats.mean(abs(e) for e in es)
n_out = n - w05
waste = n_out * mabs # ranking: how much total error this bucket carries
line = (f" {fn:22s}={val:<22.22s} n={n:4d} within0.5={within:4.0f}% "
f"signed={signed:+6.2f} mean|err|={mabs:5.2f} [waste={waste:6.0f}]")
bucket_lines.append((waste, line))
print("TOP ERROR-CARRYING BUCKETS (ranked by n_outside_0.5 × mean|err|; min-n="
f"{min_n}):")
for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:45]:
print(line)
print("=" * 100)
print("MOST BIASED BUCKETS (|mean signed| — systematic over/under-rate, min-n="
f"{min_n}):")
biased: list[tuple[float, str]] = []
for fn in feat_names:
groups2: dict[str, list[float]] = defaultdict(list)
for _, err, feats in rows:
groups2[str(feats.get(fn))].append(err)
for val, es in groups2.items():
if len(es) < min_n:
continue
signed = stats.mean(es)
biased.append((abs(signed),
f" {fn:22s}={val:<22.22s} n={len(es):4d} signed={signed:+6.2f} "
f"mean|err|={stats.mean(abs(e) for e in es):5.2f}"))
for _, line in sorted(biased, key=lambda x: -x[0])[:25]:
print(line)
if __name__ == "__main__":
main()