diff --git a/domain/epc_prediction/validation.py b/domain/epc_prediction/validation.py
new file mode 100644
index 00000000..d778246e
--- /dev/null
+++ b/domain/epc_prediction/validation.py
@@ -0,0 +1,159 @@
+"""Component Accuracy aggregation for EPC Prediction (ADR-0030).
+
+The leave-one-out scorer, calculator-FREE on purpose: it holds out each SAP 10.2
+target, predicts it from its (all-vintage) Comparable Properties, and aggregates
+the per-component classification hits + geometry residuals from
+`compare_prediction`. This is the *primary*, calculator-independent signal — the
+end-to-end SAP / carbon / PE check (which needs the calculator) is layered on top
+by the runner. The same function backs both the committed ratcheting gate and the
+offline national battle-test (one scorer, two harnesses).
+
+Pure given the loaded cohorts: corpus IO (reading + mapping cert payloads) is the
+caller's job, so this is directly unit-testable.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from datetime import date
+from typing import Iterable, Iterator, Optional, Sequence
+
+from datatypes.epc.domain.epc_property_data import EpcPropertyData
+from domain.epc_prediction.comparable_properties import (
+ Comparable,
+ PredictionTarget,
+ select_comparables,
+)
+from domain.epc_prediction.epc_prediction import EpcPrediction
+from domain.epc_prediction.prediction_comparison import compare_prediction
+
+# Only SAP 10.2 certs are valid held-out targets (ADR-0030) — the only vintage
+# with full-fidelity lodged components. The source cohort keeps all vintages.
+_SAP_10_2: float = 10.2
+
+
+def _empty_classification() -> dict[str, list[int]]:
+ return {}
+
+
+def _empty_residuals() -> dict[str, list[float]]:
+ return {}
+
+
+@dataclass
+class ComponentAccuracy:
+ """Aggregated leave-one-out Component Accuracy over a corpus.
+
+ `classification` maps a component name to [hits, applicable-total] (a
+ not-applicable `None` hit is excluded from the total); `residuals` maps a
+ numeric component to its signed (predicted − actual) values. `targets` counts
+ the held-out SAP 10.2 properties scored.
+ """
+
+ classification: dict[str, list[int]] = field(
+ default_factory=_empty_classification
+ )
+ residuals: dict[str, list[float]] = field(default_factory=_empty_residuals)
+ targets: int = 0
+
+ def rate(self, component: str) -> Optional[float]:
+ """The classification hit-rate for a component, or None when nothing was
+ applicable."""
+ hits, total = self.classification.get(component, [0, 0])
+ return hits / total if total else None
+
+ def mean_abs_residual(self, component: str) -> Optional[float]:
+ """Mean absolute residual for a numeric component, or None when empty."""
+ values = self.residuals.get(component, [])
+ return sum(abs(v) for v in values) / len(values) if values else None
+
+
+def _recency_key(comparable: Comparable) -> tuple[date, str]:
+ return (
+ comparable.registration_date or date.min,
+ comparable.certificate_number,
+ )
+
+
+def _latest_per_address(cohort: Sequence[Comparable]) -> list[Comparable]:
+ """One held-out property per address — the latest cert, the best ground
+ truth. Comparables with no address each stand alone."""
+ latest: dict[str, Comparable] = {}
+ standalone: list[Comparable] = []
+ for c in cohort:
+ if c.address is None:
+ standalone.append(c)
+ elif c.address not in latest or _recency_key(c) > _recency_key(
+ latest[c.address]
+ ):
+ latest[c.address] = c
+ return list(latest.values()) + standalone
+
+
+def iter_predictions(
+ cohorts: Iterable[Sequence[Comparable]],
+ *,
+ target_sap_version: float = _SAP_10_2,
+) -> Iterator[tuple[EpcPropertyData, EpcPropertyData]]:
+ """Yield `(predicted, actual)` for every SAP-`target_sap_version` held-out
+ target across the cohorts — the single leave-one-out orchestration the
+ Component Accuracy scorer and the runner's calculator end-to-end both consume
+ (ADR-0030: one scorer, two harnesses). A target is held out by whole address
+ (so a re-lodgement can't leak) and predicted from its all-vintage cohort."""
+ predictor = EpcPrediction()
+ for cohort in cohorts:
+ for held_out in _latest_per_address(cohort):
+ if held_out.epc.sap_version != target_sap_version:
+ continue
+ others = [
+ c
+ for c in cohort
+ if c.address is None or c.address != held_out.address
+ ]
+ actual = held_out.epc
+ target = PredictionTarget(
+ postcode=actual.postcode,
+ property_type=actual.property_type or "",
+ built_form=actual.built_form,
+ )
+ comparables = select_comparables(target, others)
+ if not comparables.members:
+ continue
+ yield predictor.predict(target, comparables), actual
+
+
+def evaluate_component_accuracy(
+ cohorts: Iterable[Sequence[Comparable]],
+ *,
+ target_sap_version: float = _SAP_10_2,
+) -> ComponentAccuracy:
+ """Score Component Accuracy by leave-one-out over each postcode cohort —
+ aggregating the `compare_prediction` hits + residuals across every held-out
+ SAP-`target_sap_version` target. Calculator-free (the primary signal)."""
+ accuracy = ComponentAccuracy()
+ for predicted, actual in iter_predictions(
+ cohorts, target_sap_version=target_sap_version
+ ):
+ comparison = compare_prediction(predicted, actual)
+ accuracy.targets += 1
+ for name, hit in comparison.categorical_hits.items():
+ counter = accuracy.classification.setdefault(name, [0, 0])
+ if hit is not None:
+ counter[1] += 1
+ counter[0] += int(hit)
+ accuracy.residuals.setdefault("floor_area", []).append(
+ comparison.floor_area_residual
+ )
+ accuracy.residuals.setdefault("window_count", []).append(
+ float(comparison.window_count_residual)
+ )
+ accuracy.residuals.setdefault("total_window_area", []).append(
+ comparison.total_window_area_residual
+ )
+ accuracy.residuals.setdefault("building_parts", []).append(
+ float(comparison.building_parts_residual)
+ )
+ accuracy.residuals.setdefault("door_count", []).append(
+ float(comparison.door_count_residual)
+ )
+ return accuracy
diff --git a/harness/epc_prediction_corpus.py b/harness/epc_prediction_corpus.py
new file mode 100644
index 00000000..e52311e7
--- /dev/null
+++ b/harness/epc_prediction_corpus.py
@@ -0,0 +1,71 @@
+"""Load a postcode-clustered EPC corpus into Comparable cohorts (ADR-0030).
+
+The IO half of the EPC Prediction validation: read each postcode's cached cert
+payloads, map them through `EpcPropertyDataMapper.from_api_response`, and build
+`Comparable`s carrying the register metadata (address + registration date) the
+leave-one-out scorer needs to dedupe re-lodgements and hold out a whole address.
+A cert the mapper rejects (unsupported schema, malformed) is skipped, never fatal.
+
+Shared by the committed-fixture gate, the local validation script, and the
+offline national battle-test — the corpus directory differs, the loading does
+not. Layout: `
//.json` + `/_index.json`.
+"""
+
+from __future__ import annotations
+
+import json
+from datetime import date
+from pathlib import Path
+from typing import Any, Optional
+
+from datatypes.epc.domain.mapper import EpcPropertyDataMapper
+from domain.epc_prediction.comparable_properties import Comparable
+
+
+def load_corpus(corpus_dir: Path) -> list[list[Comparable]]:
+ """Load every postcode cohort under `corpus_dir`. Returns one list of
+ Comparables per postcode (the unit the leave-one-out scorer iterates)."""
+ index_path = corpus_dir / "_index.json"
+ if not index_path.exists():
+ raise FileNotFoundError(
+ f"no corpus index at {index_path} — run a corpus fetch first"
+ )
+ index: dict[str, list[str]] = json.loads(index_path.read_text())
+ return [
+ _load_cohort(corpus_dir, postcode, certs)
+ for postcode, certs in index.items()
+ ]
+
+
+def _load_cohort(
+ corpus_dir: Path, postcode: str, certs: list[str]
+) -> list[Comparable]:
+ cohort: list[Comparable] = []
+ for cert in certs:
+ path = corpus_dir / postcode / f"{cert}.json"
+ if not path.exists():
+ continue
+ raw: dict[str, Any] = json.loads(path.read_text())
+ try:
+ epc = EpcPropertyDataMapper.from_api_response(raw)
+ except Exception: # noqa: BLE001 — a bad cert must not abort the sweep
+ continue
+ cohort.append(
+ Comparable(
+ epc=epc,
+ certificate_number=cert,
+ address=_address(raw),
+ registration_date=_registration_date(raw),
+ )
+ )
+ return cohort
+
+
+def _address(raw: dict[str, Any]) -> Optional[str]:
+ value = raw.get("address_line_1")
+ return str(value).strip().upper() if value else None
+
+
+def _registration_date(raw: dict[str, Any]) -> Optional[date]:
+ value = raw.get("registration_date")
+ return date.fromisoformat(str(value)) if value else None
diff --git a/scripts/validate_epc_prediction.py b/scripts/validate_epc_prediction.py
index 18ee4bbb..797389d2 100644
--- a/scripts/validate_epc_prediction.py
+++ b/scripts/validate_epc_prediction.py
@@ -23,90 +23,24 @@ Corpus dir: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus).
from __future__ import annotations
-import json
import os
import statistics
-from datetime import date
from pathlib import Path
from typing import Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
-from datatypes.epc.domain.mapper import EpcPropertyDataMapper
-from domain.epc_prediction.comparable_properties import (
- Comparable,
- PredictionTarget,
- select_comparables,
+from domain.epc_prediction.validation import (
+ evaluate_component_accuracy,
+ iter_predictions,
)
-from domain.epc_prediction.epc_prediction import EpcPrediction
-from domain.epc_prediction.prediction_comparison import compare_prediction
from domain.sap10_calculator.calculator import Sap10Calculator, SapResult
+from harness.epc_prediction_corpus import load_corpus
-# Target-cert spec gate: only SAP 10.2 certs (schema 21.0.x) carry full-fidelity
-# lodged components + a same-spec lodged figure to check against (ADR-0030). The
-# source cohort keeps all vintages — components are methodology-agnostic.
-_SAP_10_2: float = 10.2
_KG_PER_TONNE: float = 1000.0
CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus"))
-def _load_cohort(postcode: str, certs: list[str]) -> list[Comparable]:
- """Map a postcode's cached cert payloads to Comparables, skipping any the
- mapper rejects (unsupported schema, malformed). Address + registration date
- come straight off the cached payload (the register metadata) so the harness
- can dedupe re-lodgements and hold out a whole address."""
- cohort: list[Comparable] = []
- for cert in certs:
- path = CORPUS / postcode / f"{cert}.json"
- if not path.exists():
- continue
- raw = json.loads(path.read_text())
- try:
- epc = EpcPropertyDataMapper.from_api_response(raw)
- except Exception: # noqa: BLE001 — a bad cert must not abort the sweep
- continue
- cohort.append(
- Comparable(
- epc=epc,
- certificate_number=cert,
- address=_address(raw),
- registration_date=_registration_date(raw),
- )
- )
- return cohort
-
-
-def _address(raw: dict[str, object]) -> Optional[str]:
- value = raw.get("address_line_1")
- return str(value).strip().upper() if value else None
-
-
-def _registration_date(raw: dict[str, object]) -> Optional[date]:
- value = raw.get("registration_date")
- return date.fromisoformat(str(value)) if value else None
-
-
-def _ground_truth_properties(cohort: list[Comparable]) -> list[Comparable]:
- """Collapse a postcode's certs to one held-out property per address — the
- latest cert, the best ground truth. Comparables with no address each stand
- alone."""
- latest: dict[str, Comparable] = {}
- standalone: list[Comparable] = []
- for c in cohort:
- if c.address is None:
- standalone.append(c)
- elif c.address not in latest or _recency(c) > _recency(latest[c.address]):
- latest[c.address] = c
- return list(latest.values()) + standalone
-
-
-def _recency(comparable: Comparable) -> tuple[date, str]:
- return (
- comparable.registration_date or date.min,
- comparable.certificate_number,
- )
-
-
def _result(
calculator: Sap10Calculator, epc: EpcPropertyData
) -> Optional[SapResult]:
@@ -123,111 +57,55 @@ def _co2_tonnes(result: SapResult) -> float:
def main() -> None:
- index_path = CORPUS / "_index.json"
- if not index_path.exists():
- raise SystemExit(f"no corpus at {CORPUS} — run fetch_epc_prediction_corpus.py")
- index: dict[str, list[str]] = json.loads(index_path.read_text())
-
+ cohorts = load_corpus(CORPUS)
calculator = Sap10Calculator()
- predictor = EpcPrediction()
- # Classification: name -> [hits, applicable-total], populated from whatever
- # components compare_prediction reports (insertion order preserved). A None
- # hit (the actual lodges no value) is excluded from the denominator.
- categoricals: dict[str, list[int]] = {}
- floor_res: list[float] = []
- window_count_res: list[int] = []
- window_area_res: list[float] = []
- parts_res: list[int] = []
- door_res: list[int] = []
- # End-to-end (calculator-FLOORED) vs API-lodged — secondary guard, ADR-0030.
- sap_vs_lodged: list[float] = []
- co2_vs_lodged: list[float] = []
- pei_vs_lodged: list[float] = []
- # Attribution readout: how far the calculator alone is from lodged on the
- # ACTUAL components — the floor the end-to-end numbers can reach.
- sap_calc_actual_vs_lodged: list[float] = []
- predicted_n = skipped_non_102 = skipped_no_cohort = 0
-
- for postcode, certs in index.items():
- cohort = _load_cohort(postcode, certs)
- targets = _ground_truth_properties(cohort)
- if len(targets) < 2:
- skipped_no_cohort += len(targets)
- continue
- for held_out in targets:
- # Only SAP 10.2 certs are valid validation targets (ADR-0030); the
- # source cohort (`others`) keeps every vintage.
- if held_out.epc.sap_version != _SAP_10_2:
- skipped_non_102 += 1
- continue
- # Exclude every cert of the held-out address (not just the held cert)
- # so a re-lodgement of the same property cannot leak into the cohort.
- others = [
- c
- for c in cohort
- if c.address is None or c.address != held_out.address
- ]
- actual = held_out.epc
- target = PredictionTarget(
- postcode=postcode,
- property_type=actual.property_type or "",
- built_form=actual.built_form,
- )
- comparables = select_comparables(target, others)
- if not comparables.members:
- continue
- predicted = predictor.predict(target, comparables)
- predicted_n += 1
-
- cmp = compare_prediction(predicted, actual)
- for name, hit in cmp.categorical_hits.items():
- _tally(categoricals.setdefault(name, [0, 0]), hit)
- floor_res.append(cmp.floor_area_residual)
- window_count_res.append(cmp.window_count_residual)
- window_area_res.append(cmp.total_window_area_residual)
- parts_res.append(cmp.building_parts_residual)
- door_res.append(cmp.door_count_residual)
-
- pred_result = _result(calculator, predicted)
- actual_result = _result(calculator, actual)
- lodged_sap = actual.energy_rating_current
- lodged_co2 = actual.co2_emissions_current
- lodged_pei = actual.energy_consumption_current
- if pred_result is not None:
- if lodged_sap is not None:
- sap_vs_lodged.append(
- abs(pred_result.sap_score_continuous - lodged_sap)
- )
- if lodged_co2 is not None:
- co2_vs_lodged.append(
- abs(_co2_tonnes(pred_result) - lodged_co2)
- )
- if lodged_pei is not None:
- pei_vs_lodged.append(
- abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei)
- )
- if actual_result is not None and lodged_sap is not None:
- sap_calc_actual_vs_lodged.append(
- abs(actual_result.sap_score_continuous - lodged_sap)
- )
+ # PRIMARY signal — Component Accuracy, calculator-free (the shared scorer).
+ accuracy = evaluate_component_accuracy(cohorts)
print(f"corpus: {CORPUS}")
- print(
- f"predicted {predicted_n} SAP-10.2 held-out targets "
- f"({skipped_non_102} non-10.2 targets skipped, "
- f"{skipped_no_cohort} had no cohort)\n"
- )
+ print(f"predicted {accuracy.targets} SAP-10.2 held-out targets\n")
print("--- Component Accuracy (PRIMARY, calculator-independent) ---")
- for name, (hits, total) in categoricals.items():
+ for name, (hits, total) in accuracy.classification.items():
if total:
print(f"CLASSIFICATION {name}: {hits}/{total} = {hits / total:.1%}")
print()
- _residual("floor_area (m2)", floor_res)
- _residual("window_count", [float(x) for x in window_count_res])
- _residual("total_window_area (m2)", window_area_res)
- _residual("building_parts", [float(x) for x in parts_res])
- _residual("door_count", [float(x) for x in door_res])
+ _residual("floor_area (m2)", accuracy.residuals.get("floor_area", []))
+ _residual("window_count", accuracy.residuals.get("window_count", []))
+ _residual(
+ "total_window_area (m2)", accuracy.residuals.get("total_window_area", [])
+ )
+ _residual("building_parts", accuracy.residuals.get("building_parts", []))
+ _residual("door_count", accuracy.residuals.get("door_count", []))
+
+ # SECONDARY guard — end-to-end vs API-lodged, calculator-FLOORED. Re-walks the
+ # same held-out targets (one orchestration via iter_predictions).
+ sap_vs_lodged: list[float] = []
+ co2_vs_lodged: list[float] = []
+ pei_vs_lodged: list[float] = []
+ sap_calc_actual_vs_lodged: list[float] = [] # the floor the end-to-end reaches
+ for predicted, actual in iter_predictions(cohorts):
+ pred_result = _result(calculator, predicted)
+ actual_result = _result(calculator, actual)
+ lodged_sap = actual.energy_rating_current
+ lodged_co2 = actual.co2_emissions_current
+ lodged_pei = actual.energy_consumption_current
+ if pred_result is not None:
+ if lodged_sap is not None:
+ sap_vs_lodged.append(
+ abs(pred_result.sap_score_continuous - lodged_sap)
+ )
+ if lodged_co2 is not None:
+ co2_vs_lodged.append(abs(_co2_tonnes(pred_result) - lodged_co2))
+ if lodged_pei is not None:
+ pei_vs_lodged.append(
+ abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei)
+ )
+ if actual_result is not None and lodged_sap is not None:
+ sap_calc_actual_vs_lodged.append(
+ abs(actual_result.sap_score_continuous - lodged_sap)
+ )
+
print()
print("--- End-to-end vs API-lodged (SECONDARY, calculator-FLOORED) ---")
_sap_line("SAP |pred − lodged|", sap_vs_lodged)
@@ -236,15 +114,6 @@ def main() -> None:
_sap_line(" floor: SAP |calc(actual) − lodged|", sap_calc_actual_vs_lodged)
-def _tally(counter: list[int], hit: Optional[bool]) -> None:
- """Record one classification outcome: a None hit (actual absent) is not
- applicable and skipped; else increment the applicable total and the hits."""
- if hit is None:
- return
- counter[1] += 1
- counter[0] += int(hit)
-
-
def _residual(label: str, values: list[float]) -> None:
if not values:
print(f"RESIDUAL {label}: (none)")
diff --git a/tests/domain/epc_prediction/test_validation.py b/tests/domain/epc_prediction/test_validation.py
new file mode 100644
index 00000000..acf49a47
--- /dev/null
+++ b/tests/domain/epc_prediction/test_validation.py
@@ -0,0 +1,123 @@
+"""Behaviour of the Component Accuracy leave-one-out scorer (ADR-0030): given
+loaded postcode cohorts, hold out each SAP 10.2 target, predict it from its
+all-vintage neighbours, and aggregate the per-component hits + residuals. Pure
+(no IO, no calculator) — corpus loading is the caller's job.
+"""
+
+from datetime import date
+from typing import Optional, Union
+
+from datatypes.epc.domain.epc_property_data import (
+ EpcPropertyData,
+ MainHeatingDetail,
+ SapBuildingPart,
+ SapEnergySource,
+ SapFloorDimension,
+ SapHeating,
+)
+from domain.epc_prediction.comparable_properties import Comparable
+from domain.epc_prediction.validation import evaluate_component_accuracy
+
+
+def _comparable(
+ *,
+ certificate_number: str,
+ address: str,
+ sap_version: float,
+ wall_construction: Union[int, str] = 1,
+ registration_date: Optional[date] = None,
+) -> Comparable:
+ """A Comparable carrying a fully-populated opaque EpcPropertyData — every
+ field the predictor + comparison read (the partial-instance idiom)."""
+ epc: EpcPropertyData = object.__new__(EpcPropertyData)
+ epc.sap_version = sap_version
+ epc.postcode = "LS6 1AA"
+ epc.property_type = "2"
+ epc.built_form = "4"
+ epc.total_floor_area_m2 = 80.0
+ epc.door_count = 2
+ epc.solar_water_heating = False
+ epc.has_hot_water_cylinder = True
+ part: SapBuildingPart = object.__new__(SapBuildingPart)
+ part.wall_construction = wall_construction
+ part.wall_insulation_type = 1
+ part.construction_age_band = "K"
+ part.roof_construction = 1
+ part.roof_insulation_thickness = 100
+ part.sap_room_in_roof = None
+ floor_dim: SapFloorDimension = object.__new__(SapFloorDimension)
+ floor_dim.floor_construction = 1
+ floor_dim.floor_insulation = 1
+ part.sap_floor_dimensions = [floor_dim]
+ epc.sap_building_parts = [part]
+ epc.sap_windows = []
+ detail: MainHeatingDetail = object.__new__(MainHeatingDetail)
+ detail.main_fuel_type = 20
+ detail.main_heating_category = 2
+ detail.main_heating_control = 2100
+ heating: SapHeating = object.__new__(SapHeating)
+ heating.main_heating_details = [detail]
+ heating.water_heating_fuel = 20
+ heating.water_heating_code = 901
+ heating.cylinder_insulation_type = 1
+ heating.secondary_heating_type = None
+ epc.sap_heating = heating
+ energy: SapEnergySource = object.__new__(SapEnergySource)
+ energy.photovoltaic_supply = None
+ energy.photovoltaic_arrays = None
+ epc.sap_energy_source = energy
+ return Comparable(
+ epc=epc,
+ certificate_number=certificate_number,
+ address=address,
+ registration_date=registration_date,
+ )
+
+
+def test_scores_only_sap_10_2_targets() -> None:
+ # Arrange — a cohort of two distinct addresses: one SAP 10.2, one older
+ # (SAP 9.94). Only the 10.2 cert is a valid held-out target; the older one
+ # is kept as source evidence (its components are still valid).
+ cohort = [
+ _comparable(
+ certificate_number="A", address="1 THE ROW", sap_version=10.2
+ ),
+ _comparable(
+ certificate_number="B", address="2 THE ROW", sap_version=9.94
+ ),
+ ]
+
+ # Act
+ accuracy = evaluate_component_accuracy([cohort])
+
+ # Assert — exactly one target scored (the 10.2 cert), predicted from the
+ # older neighbour; the older cert was never held out.
+ assert accuracy.targets == 1
+ assert accuracy.rate("wall_construction") == 1.0
+
+
+def test_aggregates_a_wall_classification_miss() -> None:
+ # Arrange — the 10.2 target is solid brick (2); its only neighbour (the
+ # source) is cavity (1), so the predicted mode misses the wall.
+ cohort = [
+ _comparable(
+ certificate_number="A",
+ address="1 THE ROW",
+ sap_version=10.2,
+ wall_construction=2,
+ ),
+ _comparable(
+ certificate_number="B",
+ address="2 THE ROW",
+ sap_version=10.2,
+ wall_construction=1,
+ ),
+ ]
+
+ # Act
+ accuracy = evaluate_component_accuracy([cohort])
+
+ # Assert — both are 10.2 targets, and each is predicted from the other (the
+ # opposite wall), so wall_construction is missed both times.
+ assert accuracy.targets == 2
+ assert accuracy.rate("wall_construction") == 0.0