mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
chore(scripts): add corpus SAP-accuracy profiler + per-cert dive tools
profile_corpus_error.py buckets signed SAP error by raw-API feature and lists worst over/under-raters with the PE/CO2-vs-cost split (COST-side vs DEMAND-side triage). dive_cert.py dumps one cert's lodged-vs-ours SAP/CO2/PE + full intermediate line refs + mapped inputs. Both run on the committed RdSAP-21.0.1 corpus (no /tmp sample needed). Used to find the stone-wall, per-part-roof, ground-floor-flat and HP-water fixes this session. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
72ef0f0e7b
commit
26106505be
2 changed files with 320 additions and 0 deletions
103
scripts/dive_cert.py
Normal file
103
scripts/dive_cert.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
"""Deep-dive a single corpus cert: lodged vs computed SAP/CO2/PE + the full
|
||||
intermediate line-ref dump + the mapped fabric/heat-loss inputs, so the
|
||||
diverging line is visible WITHOUT an Elmhurst worksheet.
|
||||
|
||||
USAGE
|
||||
PYTHONPATH=/workspaces/model python scripts/dive_cert.py <cert_number_or_substr>
|
||||
PYTHONPATH=/workspaces/model python scripts/dive_cert.py --filter wall_insulation_type=3 [--n 8]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
|
||||
from domain.sap10_calculator.calculator import calculate_sap_from_inputs
|
||||
from domain.sap10_calculator.rdsap.cert_to_inputs import (
|
||||
SAP_10_2_SPEC_PRICES,
|
||||
cert_to_inputs,
|
||||
)
|
||||
from scripts.profile_api_error import features
|
||||
|
||||
_CORPUS = Path("backend/epc_api/json_samples/RdSAP-Schema-21.0.1/corpus.jsonl")
|
||||
|
||||
|
||||
def _cert_id(doc: dict[str, Any]) -> str:
|
||||
return str(
|
||||
doc.get("certificate_number")
|
||||
or doc.get("lmk_key")
|
||||
or doc.get("uprn")
|
||||
or "?"
|
||||
)
|
||||
|
||||
|
||||
def _dump(doc: dict[str, Any]) -> None:
|
||||
cert = _cert_id(doc)
|
||||
lodged_sap = doc.get("energy_rating_current")
|
||||
lodged_co2 = doc.get("co2_emissions_current")
|
||||
lodged_pe = doc.get("energy_consumption_current")
|
||||
epc = EpcPropertyDataMapper.from_api_response(doc)
|
||||
r = calculate_sap_from_inputs(cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES))
|
||||
print("=" * 90)
|
||||
print(f"CERT {cert}")
|
||||
print(
|
||||
f" SAP lodged={lodged_sap} ours={r.sap_score_continuous:.2f} "
|
||||
f"d={r.sap_score_continuous - (lodged_sap or 0):+.2f}"
|
||||
)
|
||||
if lodged_co2 is not None:
|
||||
print(
|
||||
f" CO2 lodged={lodged_co2:.3f} ours={r.co2_kg_per_yr / 1000:.3f} t "
|
||||
f"d={r.co2_kg_per_yr / 1000 - lodged_co2:+.3f}"
|
||||
)
|
||||
if lodged_pe is not None:
|
||||
print(
|
||||
f" PE lodged={lodged_pe:.1f} ours={r.primary_energy_kwh_per_m2:.1f} "
|
||||
f"d={r.primary_energy_kwh_per_m2 - lodged_pe:+.1f} kWh/m2"
|
||||
)
|
||||
print(
|
||||
f" energy kWh/yr: spaceheat={r.space_heating_kwh_per_yr:.0f} "
|
||||
f"main={r.main_heating_fuel_kwh_per_yr:.0f} "
|
||||
f"sec={r.secondary_heating_fuel_kwh_per_yr:.0f} "
|
||||
f"hw={r.hot_water_kwh_per_yr:.0f} light={r.lighting_kwh_per_yr:.0f} "
|
||||
f"pumpfan={r.pumps_fans_kwh_per_yr:.0f}"
|
||||
)
|
||||
d = epc.__dict__
|
||||
print(" --- key mapped inputs ---")
|
||||
f = features(doc)
|
||||
for k in (
|
||||
"property_type", "built_form", "age_band", "main_sap_code",
|
||||
"main_heat_cat", "main_fuel", "has_pcdb_main", "main_data_source",
|
||||
"wall_construction", "wall_insulation_type", "roof_codes",
|
||||
"roof_insulation_thickness", "whc", "water_fuel", "immersion_type",
|
||||
"has_cylinder", "has_secondary", "has_pv", "mains_gas", "n_building_parts",
|
||||
):
|
||||
print(f" {k:26s}= {f.get(k)}")
|
||||
print(" --- intermediate line refs ---")
|
||||
inter = r.intermediate or {}
|
||||
for k in sorted(inter):
|
||||
print(f" {k:34s}= {inter[k]:.4f}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
docs = [json.loads(l) for l in _CORPUS.read_text().splitlines() if l.strip()]
|
||||
if "--filter" in sys.argv:
|
||||
spec = sys.argv[sys.argv.index("--filter") + 1]
|
||||
key, _, val = spec.partition("=")
|
||||
n = int(sys.argv[sys.argv.index("--n") + 1]) if "--n" in sys.argv else 6
|
||||
hits = [d for d in docs if str(features(d).get(key)) == val]
|
||||
print(f"{len(hits)} certs match {spec}; dumping first {n}")
|
||||
for d in hits[:n]:
|
||||
_dump(d)
|
||||
return
|
||||
target = sys.argv[1]
|
||||
for d in docs:
|
||||
if target in _cert_id(d):
|
||||
_dump(d)
|
||||
return
|
||||
print(f"no cert matching {target}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
217
scripts/profile_corpus_error.py
Normal file
217
scripts/profile_corpus_error.py
Normal file
|
|
@ -0,0 +1,217 @@
|
|||
"""Profile API-path SAP/CO2/PE error over the COMMITTED corpus (no /tmp cache).
|
||||
|
||||
WHAT THIS IS FOR
|
||||
----------------
|
||||
The accuracy thesis: the gov-API response carries the full SAP input set and our
|
||||
calculator is deterministic, so EVERY cert should reproduce the lodged
|
||||
SAP/CO2/PE. Any divergence is an input-handling bug, not irreducible noise.
|
||||
|
||||
This is the per-cert microscope for that loop. It runs the in-repo corpus
|
||||
(``backend/epc_api/json_samples/RdSAP-Schema-21.0.1/corpus.jsonl``) through the
|
||||
real ``from_api_response`` -> ``cert_to_inputs`` -> ``calculate_sap_from_inputs``
|
||||
path, then:
|
||||
1. buckets the signed SAP error by raw-API feature (reusing
|
||||
``profile_api_error.features``) ranked by wasted accuracy, so a
|
||||
dropped/mis-mapped field surfaces as a biased bucket;
|
||||
2. for the worst over- and under-raters, prints the PE/CO2-vs-cost split so
|
||||
each can be triaged WITHOUT a worksheet:
|
||||
- PE & CO2 both ~match lodged but SAP off -> COST-side bug
|
||||
(tariff / PV export / standing charge / secondary fuel);
|
||||
- PE/CO2 also off -> DEMAND-side bug
|
||||
(fabric / ventilation / gains / heating demand).
|
||||
|
||||
USAGE
|
||||
-----
|
||||
PYTHONPATH=/workspaces/model python scripts/profile_corpus_error.py
|
||||
PYTHONPATH=/workspaces/model python scripts/profile_corpus_error.py --min-n 15 --worst 40
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import statistics as stats
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
|
||||
from domain.sap10_calculator.calculator import calculate_sap_from_inputs
|
||||
from domain.sap10_calculator.rdsap.cert_to_inputs import (
|
||||
SAP_10_2_SPEC_PRICES,
|
||||
cert_to_inputs,
|
||||
)
|
||||
from scripts.profile_api_error import features
|
||||
|
||||
_CORPUS = Path("backend/epc_api/json_samples/RdSAP-Schema-21.0.1/corpus.jsonl")
|
||||
|
||||
|
||||
class Row:
|
||||
__slots__ = (
|
||||
"cert", "sap_err", "co2_err_t", "pe_err", "lodged_sap",
|
||||
"our_sap", "lodged_pe", "our_pe", "feats",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cert: str,
|
||||
sap_err: float,
|
||||
co2_err_t: Optional[float],
|
||||
pe_err: Optional[float],
|
||||
lodged_sap: float,
|
||||
our_sap: float,
|
||||
lodged_pe: Optional[float],
|
||||
our_pe: float,
|
||||
feats: dict[str, Any],
|
||||
) -> None:
|
||||
self.cert = cert
|
||||
self.sap_err = sap_err
|
||||
self.co2_err_t = co2_err_t
|
||||
self.pe_err = pe_err
|
||||
self.lodged_sap = lodged_sap
|
||||
self.our_sap = our_sap
|
||||
self.lodged_pe = lodged_pe
|
||||
self.our_pe = our_pe
|
||||
self.feats = feats
|
||||
|
||||
|
||||
def _load() -> list[dict[str, Any]]:
|
||||
return [
|
||||
json.loads(line)
|
||||
for line in _CORPUS.read_text().splitlines()
|
||||
if line.strip()
|
||||
]
|
||||
|
||||
|
||||
def _compute(corpus: list[dict[str, Any]]) -> tuple[list[Row], int, int]:
|
||||
rows: list[Row] = []
|
||||
skipped = 0
|
||||
raised = 0
|
||||
for doc in corpus:
|
||||
lodged_sap = doc.get("energy_rating_current")
|
||||
if lodged_sap is None:
|
||||
skipped += 1
|
||||
continue
|
||||
try:
|
||||
epc = EpcPropertyDataMapper.from_api_response(doc)
|
||||
result = calculate_sap_from_inputs(
|
||||
cert_to_inputs(epc, prices=SAP_10_2_SPEC_PRICES)
|
||||
)
|
||||
except Exception:
|
||||
raised += 1
|
||||
continue
|
||||
cert = str(
|
||||
doc.get("certificate_number")
|
||||
or doc.get("lmk_key")
|
||||
or doc.get("uprn")
|
||||
or len(rows)
|
||||
)
|
||||
lodged_co2_t = doc.get("co2_emissions_current")
|
||||
lodged_pe = doc.get("energy_consumption_current")
|
||||
rows.append(Row(
|
||||
cert=cert,
|
||||
sap_err=result.sap_score_continuous - lodged_sap,
|
||||
co2_err_t=(result.co2_kg_per_yr / 1000.0 - lodged_co2_t)
|
||||
if lodged_co2_t is not None else None,
|
||||
pe_err=(result.primary_energy_kwh_per_m2 - lodged_pe)
|
||||
if lodged_pe is not None else None,
|
||||
lodged_sap=lodged_sap,
|
||||
our_sap=result.sap_score_continuous,
|
||||
lodged_pe=lodged_pe,
|
||||
our_pe=result.primary_energy_kwh_per_m2,
|
||||
feats=features(doc),
|
||||
))
|
||||
return rows, skipped, raised
|
||||
|
||||
|
||||
def _triage(r: Row) -> str:
|
||||
"""Cost vs demand label from the PE/CO2 split (~tolerant)."""
|
||||
if r.pe_err is None or r.co2_err_t is None:
|
||||
return "?"
|
||||
pe_ok = abs(r.pe_err) < 5.0 # kWh/m2/yr
|
||||
co2_ok = abs(r.co2_err_t) < 0.10 # t/yr
|
||||
if pe_ok and co2_ok:
|
||||
return "COST" # demand reproduces, cost-side off
|
||||
return "DEMAND"
|
||||
|
||||
|
||||
def main() -> None:
|
||||
min_n = 12
|
||||
n_worst = 30
|
||||
if "--min-n" in sys.argv:
|
||||
min_n = int(sys.argv[sys.argv.index("--min-n") + 1])
|
||||
if "--worst" in sys.argv:
|
||||
n_worst = int(sys.argv[sys.argv.index("--worst") + 1])
|
||||
|
||||
rows, skipped, raised = _compute(_load())
|
||||
n = len(rows)
|
||||
within = sum(1 for r in rows if abs(r.sap_err) < 0.5) / n * 100
|
||||
print(
|
||||
f"profiled {n} certs ({skipped} no-lodged-SAP, {raised} raised) | "
|
||||
f"within-0.5 = {within:.1f}% | "
|
||||
f"signed {stats.mean(r.sap_err for r in rows):+.3f} | "
|
||||
f"MAE {stats.mean(abs(r.sap_err) for r in rows):.3f}"
|
||||
)
|
||||
out = [r for r in rows if abs(r.sap_err) >= 0.5]
|
||||
cost_n = sum(1 for r in out if _triage(r) == "COST")
|
||||
dem_n = sum(1 for r in out if _triage(r) == "DEMAND")
|
||||
print(
|
||||
f"of {len(out)} outside-0.5: {dem_n} DEMAND-side (PE/CO2 also off), "
|
||||
f"{cost_n} COST-side (PE/CO2 match), {len(out) - cost_n - dem_n} unknown"
|
||||
)
|
||||
print("=" * 104)
|
||||
|
||||
feat_names = list(rows[0].feats.keys())
|
||||
bucket_lines: list[tuple[float, str]] = []
|
||||
for fn in feat_names:
|
||||
groups: dict[str, list[float]] = defaultdict(list)
|
||||
for r in rows:
|
||||
groups[str(r.feats.get(fn))].append(r.sap_err)
|
||||
for val, es in groups.items():
|
||||
cnt = len(es)
|
||||
if cnt < min_n:
|
||||
continue
|
||||
w05 = sum(1 for e in es if abs(e) < 0.5)
|
||||
mabs = stats.mean(abs(e) for e in es)
|
||||
waste = (cnt - w05) * mabs
|
||||
bucket_lines.append((waste, (
|
||||
f" {fn:22s}={val:<20.20s} n={cnt:4d} "
|
||||
f"within0.5={w05 / cnt * 100:4.0f}% "
|
||||
f"signed={stats.mean(es):+6.2f} mean|err|={mabs:5.2f} "
|
||||
f"[waste={waste:6.0f}]"
|
||||
)))
|
||||
print(f"TOP ERROR-CARRYING BUCKETS (n_out x mean|err|; min-n={min_n}):")
|
||||
for _, line in sorted(bucket_lines, key=lambda x: -x[0])[:40]:
|
||||
print(line)
|
||||
|
||||
print("=" * 104)
|
||||
print(f"WORST {n_worst} OVER-RATERS (our SAP too high -> we under-count loss/cost):")
|
||||
_dump_worst(sorted(rows, key=lambda r: -r.sap_err)[:n_worst])
|
||||
print("-" * 104)
|
||||
print(f"WORST {n_worst} UNDER-RATERS (our SAP too low -> we over-count loss/cost):")
|
||||
_dump_worst(sorted(rows, key=lambda r: r.sap_err)[:n_worst])
|
||||
|
||||
|
||||
def _dump_worst(rows: list[Row]) -> None:
|
||||
print(
|
||||
f" {'cert':>16s} {'lodgSAP':>7s} {'ourSAP':>7s} {'dSAP':>6s} "
|
||||
f"{'dPE':>6s} {'dCO2t':>6s} {'split':>6s} "
|
||||
f"heat/prop/wall/roof/fuel"
|
||||
)
|
||||
for r in rows:
|
||||
f = r.feats
|
||||
sig = (
|
||||
f"{f.get('main_sap_code')}/{f.get('property_type')}/"
|
||||
f"{f.get('wall_construction')}/{f.get('roof_codes')}/"
|
||||
f"{f.get('main_fuel')} pcdb={f.get('has_pcdb_main')} "
|
||||
f"2nd={f.get('has_secondary')} pv={f.get('has_pv')}"
|
||||
)
|
||||
pe = f"{r.pe_err:+6.1f}" if r.pe_err is not None else " ?"
|
||||
co2 = f"{r.co2_err_t:+6.2f}" if r.co2_err_t is not None else " ?"
|
||||
print(
|
||||
f" {r.cert:>16.16s} {r.lodged_sap:7.1f} {r.our_sap:7.2f} "
|
||||
f"{r.sap_err:+6.2f} {pe} {co2} {_triage(r):>6s} {sig}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Reference in a new issue