From 027ee1fba31250b0b803418e747dfab7c6cf890b Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sun, 14 Jun 2026 09:12:08 +0000 Subject: [PATCH] refactor(epc-prediction): extract shared leave-one-out scorer + corpus loader (ADR-0030) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit "One scorer, two harnesses" (ADR-0030): the committed gate, the local script, and the future battle-test must run the *same* scoring. Extract it: - domain/epc_prediction/validation.py — `iter_predictions` (the single leave-one-out orchestration: latest-per-address hold-out, SAP-10.2 target filter, all-vintage source) + `evaluate_component_accuracy` (calculator-free ComponentAccuracy aggregation, the primary signal). Unit-tested. - harness/epc_prediction_corpus.py — `load_corpus(dir)` IO: corpus dir -> Comparable cohorts (maps payloads, carries address + registration_date). validate_epc_prediction.py now just loads + calls the scorer for the component section and iterates iter_predictions for the calculator-floored end-to-end. Identical numbers (181 targets, SAP MAE 6.34) — behaviour-preserving. Co-Authored-By: Claude Opus 4.8 --- domain/epc_prediction/validation.py | 159 +++++++++++++ harness/epc_prediction_corpus.py | 71 ++++++ scripts/validate_epc_prediction.py | 221 ++++-------------- .../domain/epc_prediction/test_validation.py | 123 ++++++++++ 4 files changed, 398 insertions(+), 176 deletions(-) create mode 100644 domain/epc_prediction/validation.py create mode 100644 harness/epc_prediction_corpus.py create mode 100644 tests/domain/epc_prediction/test_validation.py diff --git a/domain/epc_prediction/validation.py b/domain/epc_prediction/validation.py new file mode 100644 index 00000000..d778246e --- /dev/null +++ b/domain/epc_prediction/validation.py @@ -0,0 +1,159 @@ +"""Component Accuracy aggregation for EPC Prediction (ADR-0030). + +The leave-one-out scorer, calculator-FREE on purpose: it holds out each SAP 10.2 +target, predicts it from its (all-vintage) Comparable Properties, and aggregates +the per-component classification hits + geometry residuals from +`compare_prediction`. This is the *primary*, calculator-independent signal — the +end-to-end SAP / carbon / PE check (which needs the calculator) is layered on top +by the runner. The same function backs both the committed ratcheting gate and the +offline national battle-test (one scorer, two harnesses). + +Pure given the loaded cohorts: corpus IO (reading + mapping cert payloads) is the +caller's job, so this is directly unit-testable. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import date +from typing import Iterable, Iterator, Optional, Sequence + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.epc_prediction.comparable_properties import ( + Comparable, + PredictionTarget, + select_comparables, +) +from domain.epc_prediction.epc_prediction import EpcPrediction +from domain.epc_prediction.prediction_comparison import compare_prediction + +# Only SAP 10.2 certs are valid held-out targets (ADR-0030) — the only vintage +# with full-fidelity lodged components. The source cohort keeps all vintages. +_SAP_10_2: float = 10.2 + + +def _empty_classification() -> dict[str, list[int]]: + return {} + + +def _empty_residuals() -> dict[str, list[float]]: + return {} + + +@dataclass +class ComponentAccuracy: + """Aggregated leave-one-out Component Accuracy over a corpus. + + `classification` maps a component name to [hits, applicable-total] (a + not-applicable `None` hit is excluded from the total); `residuals` maps a + numeric component to its signed (predicted − actual) values. `targets` counts + the held-out SAP 10.2 properties scored. + """ + + classification: dict[str, list[int]] = field( + default_factory=_empty_classification + ) + residuals: dict[str, list[float]] = field(default_factory=_empty_residuals) + targets: int = 0 + + def rate(self, component: str) -> Optional[float]: + """The classification hit-rate for a component, or None when nothing was + applicable.""" + hits, total = self.classification.get(component, [0, 0]) + return hits / total if total else None + + def mean_abs_residual(self, component: str) -> Optional[float]: + """Mean absolute residual for a numeric component, or None when empty.""" + values = self.residuals.get(component, []) + return sum(abs(v) for v in values) / len(values) if values else None + + +def _recency_key(comparable: Comparable) -> tuple[date, str]: + return ( + comparable.registration_date or date.min, + comparable.certificate_number, + ) + + +def _latest_per_address(cohort: Sequence[Comparable]) -> list[Comparable]: + """One held-out property per address — the latest cert, the best ground + truth. Comparables with no address each stand alone.""" + latest: dict[str, Comparable] = {} + standalone: list[Comparable] = [] + for c in cohort: + if c.address is None: + standalone.append(c) + elif c.address not in latest or _recency_key(c) > _recency_key( + latest[c.address] + ): + latest[c.address] = c + return list(latest.values()) + standalone + + +def iter_predictions( + cohorts: Iterable[Sequence[Comparable]], + *, + target_sap_version: float = _SAP_10_2, +) -> Iterator[tuple[EpcPropertyData, EpcPropertyData]]: + """Yield `(predicted, actual)` for every SAP-`target_sap_version` held-out + target across the cohorts — the single leave-one-out orchestration the + Component Accuracy scorer and the runner's calculator end-to-end both consume + (ADR-0030: one scorer, two harnesses). A target is held out by whole address + (so a re-lodgement can't leak) and predicted from its all-vintage cohort.""" + predictor = EpcPrediction() + for cohort in cohorts: + for held_out in _latest_per_address(cohort): + if held_out.epc.sap_version != target_sap_version: + continue + others = [ + c + for c in cohort + if c.address is None or c.address != held_out.address + ] + actual = held_out.epc + target = PredictionTarget( + postcode=actual.postcode, + property_type=actual.property_type or "", + built_form=actual.built_form, + ) + comparables = select_comparables(target, others) + if not comparables.members: + continue + yield predictor.predict(target, comparables), actual + + +def evaluate_component_accuracy( + cohorts: Iterable[Sequence[Comparable]], + *, + target_sap_version: float = _SAP_10_2, +) -> ComponentAccuracy: + """Score Component Accuracy by leave-one-out over each postcode cohort — + aggregating the `compare_prediction` hits + residuals across every held-out + SAP-`target_sap_version` target. Calculator-free (the primary signal).""" + accuracy = ComponentAccuracy() + for predicted, actual in iter_predictions( + cohorts, target_sap_version=target_sap_version + ): + comparison = compare_prediction(predicted, actual) + accuracy.targets += 1 + for name, hit in comparison.categorical_hits.items(): + counter = accuracy.classification.setdefault(name, [0, 0]) + if hit is not None: + counter[1] += 1 + counter[0] += int(hit) + accuracy.residuals.setdefault("floor_area", []).append( + comparison.floor_area_residual + ) + accuracy.residuals.setdefault("window_count", []).append( + float(comparison.window_count_residual) + ) + accuracy.residuals.setdefault("total_window_area", []).append( + comparison.total_window_area_residual + ) + accuracy.residuals.setdefault("building_parts", []).append( + float(comparison.building_parts_residual) + ) + accuracy.residuals.setdefault("door_count", []).append( + float(comparison.door_count_residual) + ) + return accuracy diff --git a/harness/epc_prediction_corpus.py b/harness/epc_prediction_corpus.py new file mode 100644 index 00000000..e52311e7 --- /dev/null +++ b/harness/epc_prediction_corpus.py @@ -0,0 +1,71 @@ +"""Load a postcode-clustered EPC corpus into Comparable cohorts (ADR-0030). + +The IO half of the EPC Prediction validation: read each postcode's cached cert +payloads, map them through `EpcPropertyDataMapper.from_api_response`, and build +`Comparable`s carrying the register metadata (address + registration date) the +leave-one-out scorer needs to dedupe re-lodgements and hold out a whole address. +A cert the mapper rejects (unsupported schema, malformed) is skipped, never fatal. + +Shared by the committed-fixture gate, the local validation script, and the +offline national battle-test — the corpus directory differs, the loading does +not. Layout: `//.json` + `/_index.json`. +""" + +from __future__ import annotations + +import json +from datetime import date +from pathlib import Path +from typing import Any, Optional + +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from domain.epc_prediction.comparable_properties import Comparable + + +def load_corpus(corpus_dir: Path) -> list[list[Comparable]]: + """Load every postcode cohort under `corpus_dir`. Returns one list of + Comparables per postcode (the unit the leave-one-out scorer iterates).""" + index_path = corpus_dir / "_index.json" + if not index_path.exists(): + raise FileNotFoundError( + f"no corpus index at {index_path} — run a corpus fetch first" + ) + index: dict[str, list[str]] = json.loads(index_path.read_text()) + return [ + _load_cohort(corpus_dir, postcode, certs) + for postcode, certs in index.items() + ] + + +def _load_cohort( + corpus_dir: Path, postcode: str, certs: list[str] +) -> list[Comparable]: + cohort: list[Comparable] = [] + for cert in certs: + path = corpus_dir / postcode / f"{cert}.json" + if not path.exists(): + continue + raw: dict[str, Any] = json.loads(path.read_text()) + try: + epc = EpcPropertyDataMapper.from_api_response(raw) + except Exception: # noqa: BLE001 — a bad cert must not abort the sweep + continue + cohort.append( + Comparable( + epc=epc, + certificate_number=cert, + address=_address(raw), + registration_date=_registration_date(raw), + ) + ) + return cohort + + +def _address(raw: dict[str, Any]) -> Optional[str]: + value = raw.get("address_line_1") + return str(value).strip().upper() if value else None + + +def _registration_date(raw: dict[str, Any]) -> Optional[date]: + value = raw.get("registration_date") + return date.fromisoformat(str(value)) if value else None diff --git a/scripts/validate_epc_prediction.py b/scripts/validate_epc_prediction.py index 18ee4bbb..797389d2 100644 --- a/scripts/validate_epc_prediction.py +++ b/scripts/validate_epc_prediction.py @@ -23,90 +23,24 @@ Corpus dir: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus). from __future__ import annotations -import json import os import statistics -from datetime import date from pathlib import Path from typing import Optional from datatypes.epc.domain.epc_property_data import EpcPropertyData -from datatypes.epc.domain.mapper import EpcPropertyDataMapper -from domain.epc_prediction.comparable_properties import ( - Comparable, - PredictionTarget, - select_comparables, +from domain.epc_prediction.validation import ( + evaluate_component_accuracy, + iter_predictions, ) -from domain.epc_prediction.epc_prediction import EpcPrediction -from domain.epc_prediction.prediction_comparison import compare_prediction from domain.sap10_calculator.calculator import Sap10Calculator, SapResult +from harness.epc_prediction_corpus import load_corpus -# Target-cert spec gate: only SAP 10.2 certs (schema 21.0.x) carry full-fidelity -# lodged components + a same-spec lodged figure to check against (ADR-0030). The -# source cohort keeps all vintages — components are methodology-agnostic. -_SAP_10_2: float = 10.2 _KG_PER_TONNE: float = 1000.0 CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus")) -def _load_cohort(postcode: str, certs: list[str]) -> list[Comparable]: - """Map a postcode's cached cert payloads to Comparables, skipping any the - mapper rejects (unsupported schema, malformed). Address + registration date - come straight off the cached payload (the register metadata) so the harness - can dedupe re-lodgements and hold out a whole address.""" - cohort: list[Comparable] = [] - for cert in certs: - path = CORPUS / postcode / f"{cert}.json" - if not path.exists(): - continue - raw = json.loads(path.read_text()) - try: - epc = EpcPropertyDataMapper.from_api_response(raw) - except Exception: # noqa: BLE001 — a bad cert must not abort the sweep - continue - cohort.append( - Comparable( - epc=epc, - certificate_number=cert, - address=_address(raw), - registration_date=_registration_date(raw), - ) - ) - return cohort - - -def _address(raw: dict[str, object]) -> Optional[str]: - value = raw.get("address_line_1") - return str(value).strip().upper() if value else None - - -def _registration_date(raw: dict[str, object]) -> Optional[date]: - value = raw.get("registration_date") - return date.fromisoformat(str(value)) if value else None - - -def _ground_truth_properties(cohort: list[Comparable]) -> list[Comparable]: - """Collapse a postcode's certs to one held-out property per address — the - latest cert, the best ground truth. Comparables with no address each stand - alone.""" - latest: dict[str, Comparable] = {} - standalone: list[Comparable] = [] - for c in cohort: - if c.address is None: - standalone.append(c) - elif c.address not in latest or _recency(c) > _recency(latest[c.address]): - latest[c.address] = c - return list(latest.values()) + standalone - - -def _recency(comparable: Comparable) -> tuple[date, str]: - return ( - comparable.registration_date or date.min, - comparable.certificate_number, - ) - - def _result( calculator: Sap10Calculator, epc: EpcPropertyData ) -> Optional[SapResult]: @@ -123,111 +57,55 @@ def _co2_tonnes(result: SapResult) -> float: def main() -> None: - index_path = CORPUS / "_index.json" - if not index_path.exists(): - raise SystemExit(f"no corpus at {CORPUS} — run fetch_epc_prediction_corpus.py") - index: dict[str, list[str]] = json.loads(index_path.read_text()) - + cohorts = load_corpus(CORPUS) calculator = Sap10Calculator() - predictor = EpcPrediction() - # Classification: name -> [hits, applicable-total], populated from whatever - # components compare_prediction reports (insertion order preserved). A None - # hit (the actual lodges no value) is excluded from the denominator. - categoricals: dict[str, list[int]] = {} - floor_res: list[float] = [] - window_count_res: list[int] = [] - window_area_res: list[float] = [] - parts_res: list[int] = [] - door_res: list[int] = [] - # End-to-end (calculator-FLOORED) vs API-lodged — secondary guard, ADR-0030. - sap_vs_lodged: list[float] = [] - co2_vs_lodged: list[float] = [] - pei_vs_lodged: list[float] = [] - # Attribution readout: how far the calculator alone is from lodged on the - # ACTUAL components — the floor the end-to-end numbers can reach. - sap_calc_actual_vs_lodged: list[float] = [] - predicted_n = skipped_non_102 = skipped_no_cohort = 0 - - for postcode, certs in index.items(): - cohort = _load_cohort(postcode, certs) - targets = _ground_truth_properties(cohort) - if len(targets) < 2: - skipped_no_cohort += len(targets) - continue - for held_out in targets: - # Only SAP 10.2 certs are valid validation targets (ADR-0030); the - # source cohort (`others`) keeps every vintage. - if held_out.epc.sap_version != _SAP_10_2: - skipped_non_102 += 1 - continue - # Exclude every cert of the held-out address (not just the held cert) - # so a re-lodgement of the same property cannot leak into the cohort. - others = [ - c - for c in cohort - if c.address is None or c.address != held_out.address - ] - actual = held_out.epc - target = PredictionTarget( - postcode=postcode, - property_type=actual.property_type or "", - built_form=actual.built_form, - ) - comparables = select_comparables(target, others) - if not comparables.members: - continue - predicted = predictor.predict(target, comparables) - predicted_n += 1 - - cmp = compare_prediction(predicted, actual) - for name, hit in cmp.categorical_hits.items(): - _tally(categoricals.setdefault(name, [0, 0]), hit) - floor_res.append(cmp.floor_area_residual) - window_count_res.append(cmp.window_count_residual) - window_area_res.append(cmp.total_window_area_residual) - parts_res.append(cmp.building_parts_residual) - door_res.append(cmp.door_count_residual) - - pred_result = _result(calculator, predicted) - actual_result = _result(calculator, actual) - lodged_sap = actual.energy_rating_current - lodged_co2 = actual.co2_emissions_current - lodged_pei = actual.energy_consumption_current - if pred_result is not None: - if lodged_sap is not None: - sap_vs_lodged.append( - abs(pred_result.sap_score_continuous - lodged_sap) - ) - if lodged_co2 is not None: - co2_vs_lodged.append( - abs(_co2_tonnes(pred_result) - lodged_co2) - ) - if lodged_pei is not None: - pei_vs_lodged.append( - abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei) - ) - if actual_result is not None and lodged_sap is not None: - sap_calc_actual_vs_lodged.append( - abs(actual_result.sap_score_continuous - lodged_sap) - ) + # PRIMARY signal — Component Accuracy, calculator-free (the shared scorer). + accuracy = evaluate_component_accuracy(cohorts) print(f"corpus: {CORPUS}") - print( - f"predicted {predicted_n} SAP-10.2 held-out targets " - f"({skipped_non_102} non-10.2 targets skipped, " - f"{skipped_no_cohort} had no cohort)\n" - ) + print(f"predicted {accuracy.targets} SAP-10.2 held-out targets\n") print("--- Component Accuracy (PRIMARY, calculator-independent) ---") - for name, (hits, total) in categoricals.items(): + for name, (hits, total) in accuracy.classification.items(): if total: print(f"CLASSIFICATION {name}: {hits}/{total} = {hits / total:.1%}") print() - _residual("floor_area (m2)", floor_res) - _residual("window_count", [float(x) for x in window_count_res]) - _residual("total_window_area (m2)", window_area_res) - _residual("building_parts", [float(x) for x in parts_res]) - _residual("door_count", [float(x) for x in door_res]) + _residual("floor_area (m2)", accuracy.residuals.get("floor_area", [])) + _residual("window_count", accuracy.residuals.get("window_count", [])) + _residual( + "total_window_area (m2)", accuracy.residuals.get("total_window_area", []) + ) + _residual("building_parts", accuracy.residuals.get("building_parts", [])) + _residual("door_count", accuracy.residuals.get("door_count", [])) + + # SECONDARY guard — end-to-end vs API-lodged, calculator-FLOORED. Re-walks the + # same held-out targets (one orchestration via iter_predictions). + sap_vs_lodged: list[float] = [] + co2_vs_lodged: list[float] = [] + pei_vs_lodged: list[float] = [] + sap_calc_actual_vs_lodged: list[float] = [] # the floor the end-to-end reaches + for predicted, actual in iter_predictions(cohorts): + pred_result = _result(calculator, predicted) + actual_result = _result(calculator, actual) + lodged_sap = actual.energy_rating_current + lodged_co2 = actual.co2_emissions_current + lodged_pei = actual.energy_consumption_current + if pred_result is not None: + if lodged_sap is not None: + sap_vs_lodged.append( + abs(pred_result.sap_score_continuous - lodged_sap) + ) + if lodged_co2 is not None: + co2_vs_lodged.append(abs(_co2_tonnes(pred_result) - lodged_co2)) + if lodged_pei is not None: + pei_vs_lodged.append( + abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei) + ) + if actual_result is not None and lodged_sap is not None: + sap_calc_actual_vs_lodged.append( + abs(actual_result.sap_score_continuous - lodged_sap) + ) + print() print("--- End-to-end vs API-lodged (SECONDARY, calculator-FLOORED) ---") _sap_line("SAP |pred − lodged|", sap_vs_lodged) @@ -236,15 +114,6 @@ def main() -> None: _sap_line(" floor: SAP |calc(actual) − lodged|", sap_calc_actual_vs_lodged) -def _tally(counter: list[int], hit: Optional[bool]) -> None: - """Record one classification outcome: a None hit (actual absent) is not - applicable and skipped; else increment the applicable total and the hits.""" - if hit is None: - return - counter[1] += 1 - counter[0] += int(hit) - - def _residual(label: str, values: list[float]) -> None: if not values: print(f"RESIDUAL {label}: (none)") diff --git a/tests/domain/epc_prediction/test_validation.py b/tests/domain/epc_prediction/test_validation.py new file mode 100644 index 00000000..acf49a47 --- /dev/null +++ b/tests/domain/epc_prediction/test_validation.py @@ -0,0 +1,123 @@ +"""Behaviour of the Component Accuracy leave-one-out scorer (ADR-0030): given +loaded postcode cohorts, hold out each SAP 10.2 target, predict it from its +all-vintage neighbours, and aggregate the per-component hits + residuals. Pure +(no IO, no calculator) — corpus loading is the caller's job. +""" + +from datetime import date +from typing import Optional, Union + +from datatypes.epc.domain.epc_property_data import ( + EpcPropertyData, + MainHeatingDetail, + SapBuildingPart, + SapEnergySource, + SapFloorDimension, + SapHeating, +) +from domain.epc_prediction.comparable_properties import Comparable +from domain.epc_prediction.validation import evaluate_component_accuracy + + +def _comparable( + *, + certificate_number: str, + address: str, + sap_version: float, + wall_construction: Union[int, str] = 1, + registration_date: Optional[date] = None, +) -> Comparable: + """A Comparable carrying a fully-populated opaque EpcPropertyData — every + field the predictor + comparison read (the partial-instance idiom).""" + epc: EpcPropertyData = object.__new__(EpcPropertyData) + epc.sap_version = sap_version + epc.postcode = "LS6 1AA" + epc.property_type = "2" + epc.built_form = "4" + epc.total_floor_area_m2 = 80.0 + epc.door_count = 2 + epc.solar_water_heating = False + epc.has_hot_water_cylinder = True + part: SapBuildingPart = object.__new__(SapBuildingPart) + part.wall_construction = wall_construction + part.wall_insulation_type = 1 + part.construction_age_band = "K" + part.roof_construction = 1 + part.roof_insulation_thickness = 100 + part.sap_room_in_roof = None + floor_dim: SapFloorDimension = object.__new__(SapFloorDimension) + floor_dim.floor_construction = 1 + floor_dim.floor_insulation = 1 + part.sap_floor_dimensions = [floor_dim] + epc.sap_building_parts = [part] + epc.sap_windows = [] + detail: MainHeatingDetail = object.__new__(MainHeatingDetail) + detail.main_fuel_type = 20 + detail.main_heating_category = 2 + detail.main_heating_control = 2100 + heating: SapHeating = object.__new__(SapHeating) + heating.main_heating_details = [detail] + heating.water_heating_fuel = 20 + heating.water_heating_code = 901 + heating.cylinder_insulation_type = 1 + heating.secondary_heating_type = None + epc.sap_heating = heating + energy: SapEnergySource = object.__new__(SapEnergySource) + energy.photovoltaic_supply = None + energy.photovoltaic_arrays = None + epc.sap_energy_source = energy + return Comparable( + epc=epc, + certificate_number=certificate_number, + address=address, + registration_date=registration_date, + ) + + +def test_scores_only_sap_10_2_targets() -> None: + # Arrange — a cohort of two distinct addresses: one SAP 10.2, one older + # (SAP 9.94). Only the 10.2 cert is a valid held-out target; the older one + # is kept as source evidence (its components are still valid). + cohort = [ + _comparable( + certificate_number="A", address="1 THE ROW", sap_version=10.2 + ), + _comparable( + certificate_number="B", address="2 THE ROW", sap_version=9.94 + ), + ] + + # Act + accuracy = evaluate_component_accuracy([cohort]) + + # Assert — exactly one target scored (the 10.2 cert), predicted from the + # older neighbour; the older cert was never held out. + assert accuracy.targets == 1 + assert accuracy.rate("wall_construction") == 1.0 + + +def test_aggregates_a_wall_classification_miss() -> None: + # Arrange — the 10.2 target is solid brick (2); its only neighbour (the + # source) is cavity (1), so the predicted mode misses the wall. + cohort = [ + _comparable( + certificate_number="A", + address="1 THE ROW", + sap_version=10.2, + wall_construction=2, + ), + _comparable( + certificate_number="B", + address="2 THE ROW", + sap_version=10.2, + wall_construction=1, + ), + ] + + # Act + accuracy = evaluate_component_accuracy([cohort]) + + # Assert — both are 10.2 targets, and each is predicted from the other (the + # opposite wall), so wall_construction is missed both times. + assert accuracy.targets == 2 + assert accuracy.rate("wall_construction") == 0.0