refactor(epc-prediction): extract shared leave-one-out scorer + corpus loader (ADR-0030)

"One scorer, two harnesses" (ADR-0030): the committed gate, the local script,
and the future battle-test must run the *same* scoring. Extract it:

- domain/epc_prediction/validation.py — `iter_predictions` (the single
  leave-one-out orchestration: latest-per-address hold-out, SAP-10.2 target
  filter, all-vintage source) + `evaluate_component_accuracy` (calculator-free
  ComponentAccuracy aggregation, the primary signal). Unit-tested.
- harness/epc_prediction_corpus.py — `load_corpus(dir)` IO: corpus dir ->
  Comparable cohorts (maps payloads, carries address + registration_date).

validate_epc_prediction.py now just loads + calls the scorer for the component
section and iterates iter_predictions for the calculator-floored end-to-end.
Identical numbers (181 targets, SAP MAE 6.34) — behaviour-preserving.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-14 09:12:08 +00:00
parent 65cb094abe
commit 027ee1fba3
4 changed files with 398 additions and 176 deletions

View file

@ -0,0 +1,159 @@
"""Component Accuracy aggregation for EPC Prediction (ADR-0030).
The leave-one-out scorer, calculator-FREE on purpose: it holds out each SAP 10.2
target, predicts it from its (all-vintage) Comparable Properties, and aggregates
the per-component classification hits + geometry residuals from
`compare_prediction`. This is the *primary*, calculator-independent signal the
end-to-end SAP / carbon / PE check (which needs the calculator) is layered on top
by the runner. The same function backs both the committed ratcheting gate and the
offline national battle-test (one scorer, two harnesses).
Pure given the loaded cohorts: corpus IO (reading + mapping cert payloads) is the
caller's job, so this is directly unit-testable.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date
from typing import Iterable, Iterator, Optional, Sequence
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.epc_prediction.comparable_properties import (
Comparable,
PredictionTarget,
select_comparables,
)
from domain.epc_prediction.epc_prediction import EpcPrediction
from domain.epc_prediction.prediction_comparison import compare_prediction
# Only SAP 10.2 certs are valid held-out targets (ADR-0030) — the only vintage
# with full-fidelity lodged components. The source cohort keeps all vintages.
_SAP_10_2: float = 10.2
def _empty_classification() -> dict[str, list[int]]:
return {}
def _empty_residuals() -> dict[str, list[float]]:
return {}
@dataclass
class ComponentAccuracy:
"""Aggregated leave-one-out Component Accuracy over a corpus.
`classification` maps a component name to [hits, applicable-total] (a
not-applicable `None` hit is excluded from the total); `residuals` maps a
numeric component to its signed (predicted actual) values. `targets` counts
the held-out SAP 10.2 properties scored.
"""
classification: dict[str, list[int]] = field(
default_factory=_empty_classification
)
residuals: dict[str, list[float]] = field(default_factory=_empty_residuals)
targets: int = 0
def rate(self, component: str) -> Optional[float]:
"""The classification hit-rate for a component, or None when nothing was
applicable."""
hits, total = self.classification.get(component, [0, 0])
return hits / total if total else None
def mean_abs_residual(self, component: str) -> Optional[float]:
"""Mean absolute residual for a numeric component, or None when empty."""
values = self.residuals.get(component, [])
return sum(abs(v) for v in values) / len(values) if values else None
def _recency_key(comparable: Comparable) -> tuple[date, str]:
return (
comparable.registration_date or date.min,
comparable.certificate_number,
)
def _latest_per_address(cohort: Sequence[Comparable]) -> list[Comparable]:
"""One held-out property per address — the latest cert, the best ground
truth. Comparables with no address each stand alone."""
latest: dict[str, Comparable] = {}
standalone: list[Comparable] = []
for c in cohort:
if c.address is None:
standalone.append(c)
elif c.address not in latest or _recency_key(c) > _recency_key(
latest[c.address]
):
latest[c.address] = c
return list(latest.values()) + standalone
def iter_predictions(
cohorts: Iterable[Sequence[Comparable]],
*,
target_sap_version: float = _SAP_10_2,
) -> Iterator[tuple[EpcPropertyData, EpcPropertyData]]:
"""Yield `(predicted, actual)` for every SAP-`target_sap_version` held-out
target across the cohorts the single leave-one-out orchestration the
Component Accuracy scorer and the runner's calculator end-to-end both consume
(ADR-0030: one scorer, two harnesses). A target is held out by whole address
(so a re-lodgement can't leak) and predicted from its all-vintage cohort."""
predictor = EpcPrediction()
for cohort in cohorts:
for held_out in _latest_per_address(cohort):
if held_out.epc.sap_version != target_sap_version:
continue
others = [
c
for c in cohort
if c.address is None or c.address != held_out.address
]
actual = held_out.epc
target = PredictionTarget(
postcode=actual.postcode,
property_type=actual.property_type or "",
built_form=actual.built_form,
)
comparables = select_comparables(target, others)
if not comparables.members:
continue
yield predictor.predict(target, comparables), actual
def evaluate_component_accuracy(
cohorts: Iterable[Sequence[Comparable]],
*,
target_sap_version: float = _SAP_10_2,
) -> ComponentAccuracy:
"""Score Component Accuracy by leave-one-out over each postcode cohort —
aggregating the `compare_prediction` hits + residuals across every held-out
SAP-`target_sap_version` target. Calculator-free (the primary signal)."""
accuracy = ComponentAccuracy()
for predicted, actual in iter_predictions(
cohorts, target_sap_version=target_sap_version
):
comparison = compare_prediction(predicted, actual)
accuracy.targets += 1
for name, hit in comparison.categorical_hits.items():
counter = accuracy.classification.setdefault(name, [0, 0])
if hit is not None:
counter[1] += 1
counter[0] += int(hit)
accuracy.residuals.setdefault("floor_area", []).append(
comparison.floor_area_residual
)
accuracy.residuals.setdefault("window_count", []).append(
float(comparison.window_count_residual)
)
accuracy.residuals.setdefault("total_window_area", []).append(
comparison.total_window_area_residual
)
accuracy.residuals.setdefault("building_parts", []).append(
float(comparison.building_parts_residual)
)
accuracy.residuals.setdefault("door_count", []).append(
float(comparison.door_count_residual)
)
return accuracy

View file

@ -0,0 +1,71 @@
"""Load a postcode-clustered EPC corpus into Comparable cohorts (ADR-0030).
The IO half of the EPC Prediction validation: read each postcode's cached cert
payloads, map them through `EpcPropertyDataMapper.from_api_response`, and build
`Comparable`s carrying the register metadata (address + registration date) the
leave-one-out scorer needs to dedupe re-lodgements and hold out a whole address.
A cert the mapper rejects (unsupported schema, malformed) is skipped, never fatal.
Shared by the committed-fixture gate, the local validation script, and the
offline national battle-test the corpus directory differs, the loading does
not. Layout: `<dir>/<POSTCODE>/<cert>.json` + `<dir>/_index.json`.
"""
from __future__ import annotations
import json
from datetime import date
from pathlib import Path
from typing import Any, Optional
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.epc_prediction.comparable_properties import Comparable
def load_corpus(corpus_dir: Path) -> list[list[Comparable]]:
"""Load every postcode cohort under `corpus_dir`. Returns one list of
Comparables per postcode (the unit the leave-one-out scorer iterates)."""
index_path = corpus_dir / "_index.json"
if not index_path.exists():
raise FileNotFoundError(
f"no corpus index at {index_path} — run a corpus fetch first"
)
index: dict[str, list[str]] = json.loads(index_path.read_text())
return [
_load_cohort(corpus_dir, postcode, certs)
for postcode, certs in index.items()
]
def _load_cohort(
corpus_dir: Path, postcode: str, certs: list[str]
) -> list[Comparable]:
cohort: list[Comparable] = []
for cert in certs:
path = corpus_dir / postcode / f"{cert}.json"
if not path.exists():
continue
raw: dict[str, Any] = json.loads(path.read_text())
try:
epc = EpcPropertyDataMapper.from_api_response(raw)
except Exception: # noqa: BLE001 — a bad cert must not abort the sweep
continue
cohort.append(
Comparable(
epc=epc,
certificate_number=cert,
address=_address(raw),
registration_date=_registration_date(raw),
)
)
return cohort
def _address(raw: dict[str, Any]) -> Optional[str]:
value = raw.get("address_line_1")
return str(value).strip().upper() if value else None
def _registration_date(raw: dict[str, Any]) -> Optional[date]:
value = raw.get("registration_date")
return date.fromisoformat(str(value)) if value else None

View file

@ -23,90 +23,24 @@ Corpus dir: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus).
from __future__ import annotations
import json
import os
import statistics
from datetime import date
from pathlib import Path
from typing import Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.epc_prediction.comparable_properties import (
Comparable,
PredictionTarget,
select_comparables,
from domain.epc_prediction.validation import (
evaluate_component_accuracy,
iter_predictions,
)
from domain.epc_prediction.epc_prediction import EpcPrediction
from domain.epc_prediction.prediction_comparison import compare_prediction
from domain.sap10_calculator.calculator import Sap10Calculator, SapResult
from harness.epc_prediction_corpus import load_corpus
# Target-cert spec gate: only SAP 10.2 certs (schema 21.0.x) carry full-fidelity
# lodged components + a same-spec lodged figure to check against (ADR-0030). The
# source cohort keeps all vintages — components are methodology-agnostic.
_SAP_10_2: float = 10.2
_KG_PER_TONNE: float = 1000.0
CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus"))
def _load_cohort(postcode: str, certs: list[str]) -> list[Comparable]:
"""Map a postcode's cached cert payloads to Comparables, skipping any the
mapper rejects (unsupported schema, malformed). Address + registration date
come straight off the cached payload (the register metadata) so the harness
can dedupe re-lodgements and hold out a whole address."""
cohort: list[Comparable] = []
for cert in certs:
path = CORPUS / postcode / f"{cert}.json"
if not path.exists():
continue
raw = json.loads(path.read_text())
try:
epc = EpcPropertyDataMapper.from_api_response(raw)
except Exception: # noqa: BLE001 — a bad cert must not abort the sweep
continue
cohort.append(
Comparable(
epc=epc,
certificate_number=cert,
address=_address(raw),
registration_date=_registration_date(raw),
)
)
return cohort
def _address(raw: dict[str, object]) -> Optional[str]:
value = raw.get("address_line_1")
return str(value).strip().upper() if value else None
def _registration_date(raw: dict[str, object]) -> Optional[date]:
value = raw.get("registration_date")
return date.fromisoformat(str(value)) if value else None
def _ground_truth_properties(cohort: list[Comparable]) -> list[Comparable]:
"""Collapse a postcode's certs to one held-out property per address — the
latest cert, the best ground truth. Comparables with no address each stand
alone."""
latest: dict[str, Comparable] = {}
standalone: list[Comparable] = []
for c in cohort:
if c.address is None:
standalone.append(c)
elif c.address not in latest or _recency(c) > _recency(latest[c.address]):
latest[c.address] = c
return list(latest.values()) + standalone
def _recency(comparable: Comparable) -> tuple[date, str]:
return (
comparable.registration_date or date.min,
comparable.certificate_number,
)
def _result(
calculator: Sap10Calculator, epc: EpcPropertyData
) -> Optional[SapResult]:
@ -123,111 +57,55 @@ def _co2_tonnes(result: SapResult) -> float:
def main() -> None:
index_path = CORPUS / "_index.json"
if not index_path.exists():
raise SystemExit(f"no corpus at {CORPUS} — run fetch_epc_prediction_corpus.py")
index: dict[str, list[str]] = json.loads(index_path.read_text())
cohorts = load_corpus(CORPUS)
calculator = Sap10Calculator()
predictor = EpcPrediction()
# Classification: name -> [hits, applicable-total], populated from whatever
# components compare_prediction reports (insertion order preserved). A None
# hit (the actual lodges no value) is excluded from the denominator.
categoricals: dict[str, list[int]] = {}
floor_res: list[float] = []
window_count_res: list[int] = []
window_area_res: list[float] = []
parts_res: list[int] = []
door_res: list[int] = []
# End-to-end (calculator-FLOORED) vs API-lodged — secondary guard, ADR-0030.
sap_vs_lodged: list[float] = []
co2_vs_lodged: list[float] = []
pei_vs_lodged: list[float] = []
# Attribution readout: how far the calculator alone is from lodged on the
# ACTUAL components — the floor the end-to-end numbers can reach.
sap_calc_actual_vs_lodged: list[float] = []
predicted_n = skipped_non_102 = skipped_no_cohort = 0
for postcode, certs in index.items():
cohort = _load_cohort(postcode, certs)
targets = _ground_truth_properties(cohort)
if len(targets) < 2:
skipped_no_cohort += len(targets)
continue
for held_out in targets:
# Only SAP 10.2 certs are valid validation targets (ADR-0030); the
# source cohort (`others`) keeps every vintage.
if held_out.epc.sap_version != _SAP_10_2:
skipped_non_102 += 1
continue
# Exclude every cert of the held-out address (not just the held cert)
# so a re-lodgement of the same property cannot leak into the cohort.
others = [
c
for c in cohort
if c.address is None or c.address != held_out.address
]
actual = held_out.epc
target = PredictionTarget(
postcode=postcode,
property_type=actual.property_type or "",
built_form=actual.built_form,
)
comparables = select_comparables(target, others)
if not comparables.members:
continue
predicted = predictor.predict(target, comparables)
predicted_n += 1
cmp = compare_prediction(predicted, actual)
for name, hit in cmp.categorical_hits.items():
_tally(categoricals.setdefault(name, [0, 0]), hit)
floor_res.append(cmp.floor_area_residual)
window_count_res.append(cmp.window_count_residual)
window_area_res.append(cmp.total_window_area_residual)
parts_res.append(cmp.building_parts_residual)
door_res.append(cmp.door_count_residual)
pred_result = _result(calculator, predicted)
actual_result = _result(calculator, actual)
lodged_sap = actual.energy_rating_current
lodged_co2 = actual.co2_emissions_current
lodged_pei = actual.energy_consumption_current
if pred_result is not None:
if lodged_sap is not None:
sap_vs_lodged.append(
abs(pred_result.sap_score_continuous - lodged_sap)
)
if lodged_co2 is not None:
co2_vs_lodged.append(
abs(_co2_tonnes(pred_result) - lodged_co2)
)
if lodged_pei is not None:
pei_vs_lodged.append(
abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei)
)
if actual_result is not None and lodged_sap is not None:
sap_calc_actual_vs_lodged.append(
abs(actual_result.sap_score_continuous - lodged_sap)
)
# PRIMARY signal — Component Accuracy, calculator-free (the shared scorer).
accuracy = evaluate_component_accuracy(cohorts)
print(f"corpus: {CORPUS}")
print(
f"predicted {predicted_n} SAP-10.2 held-out targets "
f"({skipped_non_102} non-10.2 targets skipped, "
f"{skipped_no_cohort} had no cohort)\n"
)
print(f"predicted {accuracy.targets} SAP-10.2 held-out targets\n")
print("--- Component Accuracy (PRIMARY, calculator-independent) ---")
for name, (hits, total) in categoricals.items():
for name, (hits, total) in accuracy.classification.items():
if total:
print(f"CLASSIFICATION {name}: {hits}/{total} = {hits / total:.1%}")
print()
_residual("floor_area (m2)", floor_res)
_residual("window_count", [float(x) for x in window_count_res])
_residual("total_window_area (m2)", window_area_res)
_residual("building_parts", [float(x) for x in parts_res])
_residual("door_count", [float(x) for x in door_res])
_residual("floor_area (m2)", accuracy.residuals.get("floor_area", []))
_residual("window_count", accuracy.residuals.get("window_count", []))
_residual(
"total_window_area (m2)", accuracy.residuals.get("total_window_area", [])
)
_residual("building_parts", accuracy.residuals.get("building_parts", []))
_residual("door_count", accuracy.residuals.get("door_count", []))
# SECONDARY guard — end-to-end vs API-lodged, calculator-FLOORED. Re-walks the
# same held-out targets (one orchestration via iter_predictions).
sap_vs_lodged: list[float] = []
co2_vs_lodged: list[float] = []
pei_vs_lodged: list[float] = []
sap_calc_actual_vs_lodged: list[float] = [] # the floor the end-to-end reaches
for predicted, actual in iter_predictions(cohorts):
pred_result = _result(calculator, predicted)
actual_result = _result(calculator, actual)
lodged_sap = actual.energy_rating_current
lodged_co2 = actual.co2_emissions_current
lodged_pei = actual.energy_consumption_current
if pred_result is not None:
if lodged_sap is not None:
sap_vs_lodged.append(
abs(pred_result.sap_score_continuous - lodged_sap)
)
if lodged_co2 is not None:
co2_vs_lodged.append(abs(_co2_tonnes(pred_result) - lodged_co2))
if lodged_pei is not None:
pei_vs_lodged.append(
abs(pred_result.primary_energy_kwh_per_m2 - lodged_pei)
)
if actual_result is not None and lodged_sap is not None:
sap_calc_actual_vs_lodged.append(
abs(actual_result.sap_score_continuous - lodged_sap)
)
print()
print("--- End-to-end vs API-lodged (SECONDARY, calculator-FLOORED) ---")
_sap_line("SAP |pred lodged|", sap_vs_lodged)
@ -236,15 +114,6 @@ def main() -> None:
_sap_line(" floor: SAP |calc(actual) lodged|", sap_calc_actual_vs_lodged)
def _tally(counter: list[int], hit: Optional[bool]) -> None:
"""Record one classification outcome: a None hit (actual absent) is not
applicable and skipped; else increment the applicable total and the hits."""
if hit is None:
return
counter[1] += 1
counter[0] += int(hit)
def _residual(label: str, values: list[float]) -> None:
if not values:
print(f"RESIDUAL {label}: (none)")

View file

@ -0,0 +1,123 @@
"""Behaviour of the Component Accuracy leave-one-out scorer (ADR-0030): given
loaded postcode cohorts, hold out each SAP 10.2 target, predict it from its
all-vintage neighbours, and aggregate the per-component hits + residuals. Pure
(no IO, no calculator) corpus loading is the caller's job.
"""
from datetime import date
from typing import Optional, Union
from datatypes.epc.domain.epc_property_data import (
EpcPropertyData,
MainHeatingDetail,
SapBuildingPart,
SapEnergySource,
SapFloorDimension,
SapHeating,
)
from domain.epc_prediction.comparable_properties import Comparable
from domain.epc_prediction.validation import evaluate_component_accuracy
def _comparable(
*,
certificate_number: str,
address: str,
sap_version: float,
wall_construction: Union[int, str] = 1,
registration_date: Optional[date] = None,
) -> Comparable:
"""A Comparable carrying a fully-populated opaque EpcPropertyData — every
field the predictor + comparison read (the partial-instance idiom)."""
epc: EpcPropertyData = object.__new__(EpcPropertyData)
epc.sap_version = sap_version
epc.postcode = "LS6 1AA"
epc.property_type = "2"
epc.built_form = "4"
epc.total_floor_area_m2 = 80.0
epc.door_count = 2
epc.solar_water_heating = False
epc.has_hot_water_cylinder = True
part: SapBuildingPart = object.__new__(SapBuildingPart)
part.wall_construction = wall_construction
part.wall_insulation_type = 1
part.construction_age_band = "K"
part.roof_construction = 1
part.roof_insulation_thickness = 100
part.sap_room_in_roof = None
floor_dim: SapFloorDimension = object.__new__(SapFloorDimension)
floor_dim.floor_construction = 1
floor_dim.floor_insulation = 1
part.sap_floor_dimensions = [floor_dim]
epc.sap_building_parts = [part]
epc.sap_windows = []
detail: MainHeatingDetail = object.__new__(MainHeatingDetail)
detail.main_fuel_type = 20
detail.main_heating_category = 2
detail.main_heating_control = 2100
heating: SapHeating = object.__new__(SapHeating)
heating.main_heating_details = [detail]
heating.water_heating_fuel = 20
heating.water_heating_code = 901
heating.cylinder_insulation_type = 1
heating.secondary_heating_type = None
epc.sap_heating = heating
energy: SapEnergySource = object.__new__(SapEnergySource)
energy.photovoltaic_supply = None
energy.photovoltaic_arrays = None
epc.sap_energy_source = energy
return Comparable(
epc=epc,
certificate_number=certificate_number,
address=address,
registration_date=registration_date,
)
def test_scores_only_sap_10_2_targets() -> None:
# Arrange — a cohort of two distinct addresses: one SAP 10.2, one older
# (SAP 9.94). Only the 10.2 cert is a valid held-out target; the older one
# is kept as source evidence (its components are still valid).
cohort = [
_comparable(
certificate_number="A", address="1 THE ROW", sap_version=10.2
),
_comparable(
certificate_number="B", address="2 THE ROW", sap_version=9.94
),
]
# Act
accuracy = evaluate_component_accuracy([cohort])
# Assert — exactly one target scored (the 10.2 cert), predicted from the
# older neighbour; the older cert was never held out.
assert accuracy.targets == 1
assert accuracy.rate("wall_construction") == 1.0
def test_aggregates_a_wall_classification_miss() -> None:
# Arrange — the 10.2 target is solid brick (2); its only neighbour (the
# source) is cavity (1), so the predicted mode misses the wall.
cohort = [
_comparable(
certificate_number="A",
address="1 THE ROW",
sap_version=10.2,
wall_construction=2,
),
_comparable(
certificate_number="B",
address="2 THE ROW",
sap_version=10.2,
wall_construction=1,
),
]
# Act
accuracy = evaluate_component_accuracy([cohort])
# Assert — both are 10.2 targets, and each is predicted from the other (the
# opposite wall), so wall_construction is missed both times.
assert accuracy.targets == 2
assert accuracy.rate("wall_construction") == 0.0