Model/harness/report.py
Khalim Conn-Kowlessar ae267070b1 feat(modelling): flat per-property CSV for the inspection report
format_report_csv emits one comma-safe row per property: the calculator-error
fields (lodged/calculated/Δ/flag), the Plan headline figures (baseline+post
SAP/band, measures, cost+contingency, bill & CO2 savings, valuation %), the
flattened measure triggers, and any captured error — sortable in a spreadsheet
for a large dump.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 11:16:52 +00:00

394 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Per-property inspection report over a dump of API-shaped EPC JSONs.
Builds, for each cert, the three things an inspection wants:
1. **Calculator error** — the lodged SAP on the cert (`energy_rating_current`)
versus our deterministic calculator's un-rounded SAP, flagging divergence
beyond half a SAP point. This is the Validation Cohort / shadow-validation
idea (ADR-0010/0013): the calculator runs alongside the lodged figure and
logs where they disagree.
2. **Plan + costings** — the optimised Plan (measures, cost, SAP/band jump,
bill & CO₂ savings, valuation uplift). Carried on `PropertyReport.plan`.
3. **Measures + their triggers** — each fired measure and the EPC attribute(s)
that caused its generator to recommend it.
The calculator can raise on an un-mapped cert (UnmappedSapCode / UnmappedApiCode)
and modelling can raise independently; both are captured per-cert so one bad
cert never aborts the report. Run from the worktree root (import trap).
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Final, Iterable, Optional
from datatypes.epc.domain.epc_property_data import (
BuildingPartIdentifier,
EpcPropertyData,
SapBuildingPart,
)
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.modelling.plan import Plan
from domain.sap10_calculator.calculator import Sap10Calculator
from domain.sap10_calculator.validation.parity_report import (
ParityCase,
ParityReport,
build_parity_report,
)
from harness.console import DEFAULT_CATALOGUE, run_modelling
# A lodged-vs-calculated SAP gap beyond this many points is flagged for
# investigation (the ADR-0010/0013 shadow-validation design target).
SAP_ERROR_THRESHOLD: Final[float] = 0.5
@dataclass(frozen=True)
class MeasureTrigger:
"""One fired measure and the EPC attribute(s) that triggered its generator
— the "why" behind the recommendation (e.g. cavity fill fired because
`wall_construction == 4` and `wall_insulation_type == 4`)."""
measure_type: str
triggers: dict[str, Any]
@dataclass(frozen=True)
class PropertyReport:
"""One property's inspection result. `calculator_error` records a raise
from mapping or scoring the cert (then the SAP figures are None);
`plan_error` records a raise from the Modelling stage (then `plan` is None
and no triggers are surfaced)."""
name: str
lodged_sap: Optional[int]
calculated_sap: Optional[float]
calculator_error: Optional[str] = None
plan: Optional[Plan] = None
plan_error: Optional[str] = None
measure_triggers: tuple[MeasureTrigger, ...] = ()
@property
def sap_error(self) -> Optional[float]:
"""Lodged calculated (positive = the cert rates higher than us).
None when either figure is missing."""
if self.lodged_sap is None or self.calculated_sap is None:
return None
return self.lodged_sap - self.calculated_sap
@property
def sap_error_exceeds_threshold(self) -> bool:
"""True when |lodged calculated| > 0.5 — the shadow-validation flag."""
error: Optional[float] = self.sap_error
return error is not None and abs(error) > SAP_ERROR_THRESHOLD
def _main_part(epc: EpcPropertyData) -> SapBuildingPart:
"""The MAIN building part the fabric generators read."""
return next(
part
for part in epc.sap_building_parts
if part.identifier is BuildingPartIdentifier.MAIN
)
def _triggers_for(epc: EpcPropertyData, measure_type: str) -> dict[str, Any]:
"""The EPC attribute(s) that caused `measure_type`'s generator to fire.
Mirrors each generator's guard so the report can explain the "why":
- cavity_wall_insulation : wall_recommendation.py (wall_construction == 4
and wall_insulation_type == 4)
- loft_insulation : roof_recommendation.py (roof_insulation_thickness == 0)
- {solid,suspended}_floor_insulation : floor_recommendation.py
(uninsulated floor_insulation_thickness + floor_construction_type)
- mechanical_ventilation : ventilation_recommendation.py (no lodged kind)
"""
main: SapBuildingPart = _main_part(epc)
if measure_type == "cavity_wall_insulation":
return {
"wall_construction": main.wall_construction,
"wall_insulation_type": main.wall_insulation_type,
}
if measure_type == "loft_insulation":
return {"roof_insulation_thickness": main.roof_insulation_thickness}
if measure_type in ("solid_floor_insulation", "suspended_floor_insulation"):
return {
"floor_insulation_thickness": main.floor_insulation_thickness,
"floor_construction_type": main.floor_construction_type,
}
if measure_type == "mechanical_ventilation":
kind: Optional[str] = (
None
if epc.sap_ventilation is None
else epc.sap_ventilation.mechanical_ventilation_kind
)
return {"mechanical_ventilation_kind": kind}
return {}
def build_property_report(
path: Path,
*,
goal_band: str = "C",
catalogue_path: Path = DEFAULT_CATALOGUE,
) -> PropertyReport:
"""Build one `PropertyReport` from an API-shaped EPC JSON file: the
lodged-vs-calculated SAP comparison, the optimised Plan, and each fired
measure's trigger attributes. A mapping/scoring raise is captured as
`calculator_error`; a Modelling raise as `plan_error`; neither propagates."""
name: str = path.stem
try:
epc = EpcPropertyDataMapper.from_api_response(json.loads(path.read_text()))
lodged_sap: Optional[int] = epc.energy_rating_current
calculated_sap: float = Sap10Calculator().calculate(epc).sap_score_continuous
except Exception as error: # noqa: BLE001 — one bad cert must not abort the report
return PropertyReport(
name=name,
lodged_sap=None,
calculated_sap=None,
calculator_error=f"{type(error).__name__}: {error}",
)
plan: Optional[Plan] = None
plan_error: Optional[str] = None
measure_triggers: tuple[MeasureTrigger, ...] = ()
try:
plan = run_modelling(
epc,
goal_band=goal_band,
catalogue_path=catalogue_path,
print_table=False,
)
measure_triggers = tuple(
MeasureTrigger(
measure_type=measure.measure_type,
triggers=_triggers_for(epc, measure.measure_type),
)
for measure in plan.measures
)
except Exception as error: # noqa: BLE001 — modelling raise must not abort the report
plan_error = f"{type(error).__name__}: {error}"
return PropertyReport(
name=name,
lodged_sap=lodged_sap,
calculated_sap=calculated_sap,
plan=plan,
plan_error=plan_error,
measure_triggers=measure_triggers,
)
def build_property_reports(
paths: Iterable[Path],
*,
goal_band: str = "C",
catalogue_path: Path = DEFAULT_CATALOGUE,
) -> list[PropertyReport]:
"""Build one `PropertyReport` per path, in order. Errors are captured on
each report, never raised, so one bad cert never aborts the cohort."""
return [
build_property_report(path, goal_band=goal_band, catalogue_path=catalogue_path)
for path in paths
]
def parity_report_for(reports: Iterable[PropertyReport]) -> ParityReport:
"""Aggregate the cohort's lodged-vs-calculated SAP into a `ParityReport`
(MAE / RMSE / bias / worst-N) for the cohort-level calculator-error view.
Certs that failed to map or score (no lodged or calculated SAP) are
excluded — they have no parity case to compare. The residual convention is
the calculator's own (predicted actual = calculated lodged), the
negative of each report's `sap_error`."""
cases: list[ParityCase] = [
ParityCase(
certificate_number=report.name,
actual_sap=report.lodged_sap,
predicted_sap=report.calculated_sap,
is_typical=True,
)
for report in reports
if report.lodged_sap is not None and report.calculated_sap is not None
]
return build_parity_report(cases)
def _fmt_money(value: Optional[float]) -> str:
return "n/a" if value is None else f"£{value:,.0f}"
def _fmt_triggers(triggers: dict[str, Any]) -> str:
"""Render trigger fields as `field=value, field=value` for the "why" line."""
return ", ".join(f"{field}={value}" for field, value in triggers.items())
def _calculator_error_section(reports: list[PropertyReport]) -> list[str]:
"""Section 1 — the cohort parity stats plus a per-property lodged-vs-
calculated table with the |Δ| > 0.5 flag (and any scoring errors)."""
parity: ParityReport = parity_report_for(reports)
flagged: int = sum(1 for report in reports if report.sap_error_exceeds_threshold)
worst: str = (
f" · worst Δ {abs(parity.worst_cases[0].predicted_sap - parity.worst_cases[0].actual_sap):.2f}"
if parity.worst_cases
else ""
)
lines: list[str] = [
"## 1. Calculator error — lodged vs calculated SAP",
"",
f"Cohort parity ({parity.case_count} scorable certs): "
f"MAE {parity.global_mae:.2f} · RMSE {parity.global_rmse:.2f} · "
f"bias {parity.global_bias:+.2f}{worst}",
f"Flagged (|Δ| > {SAP_ERROR_THRESHOLD}): {flagged} of {len(reports)}",
"",
"| Cert | Lodged | Calculated | Δ (lodgedcalc) | Flag |",
"| --- | --- | --- | --- | --- |",
]
for report in reports:
if report.calculator_error is not None:
lines.append(
f"| {report.name} | — | — | — | error: {report.calculator_error} |"
)
continue
lodged: str = "" if report.lodged_sap is None else str(report.lodged_sap)
calculated: str = (
"" if report.calculated_sap is None else f"{report.calculated_sap:.2f}"
)
delta: str = "" if report.sap_error is None else f"{report.sap_error:+.2f}"
flag: str = "⚠ FLAG" if report.sap_error_exceeds_threshold else ""
lines.append(
f"| {report.name} | {lodged} | {calculated} | {delta} | {flag} |"
)
return lines
def _plan_costings_section(reports: list[PropertyReport]) -> list[str]:
"""Section 2 — the optimised Plan and its costings, per property."""
lines: list[str] = ["## 2. Plans + costings", ""]
for report in reports:
if report.plan is None:
note: str = report.plan_error or report.calculator_error or "not modelled"
lines.extend([f"### {report.name}", f"- No Plan — {note}", ""])
continue
plan: Plan = report.plan
measure_types: str = (
", ".join(measure.measure_type for measure in plan.measures)
if plan.measures
else "none (already efficient)"
)
lines.extend(
[
f"### {report.name}",
f"- SAP: {plan.baseline.sap_continuous:.1f}"
f"{plan.post_sap_continuous:.1f} "
f"(band {plan.baseline_epc_rating.value}{plan.post_epc_rating.value})",
f"- Measures: {len(plan.measures)}{measure_types}",
f"- Cost of works: {_fmt_money(plan.cost_of_works)} "
f"(+ {_fmt_money(plan.contingency_cost)} contingency)",
f"- Bill savings: {_fmt_money(plan.energy_bill_savings)}/yr · "
f"CO₂ savings: {plan.co2_savings_kg_per_yr:,.0f} kg/yr",
f"- Valuation uplift: {plan.valuation.average_pct * 100:+.1f}%",
"",
]
)
return lines
def _measures_triggers_section(reports: list[PropertyReport]) -> list[str]:
"""Section 3 — each fired measure and the EPC attribute(s) behind it."""
lines: list[str] = ["## 3. Recommended measures + their triggers", ""]
for report in reports:
if not report.measure_triggers:
continue
lines.append(f"### {report.name}")
lines.extend(
f"- **{trigger.measure_type}** — fired because "
f"{_fmt_triggers(trigger.triggers)}"
for trigger in report.measure_triggers
)
lines.append("")
return lines
def format_report_markdown(reports: list[PropertyReport]) -> str:
"""Render the three-section property inspection report as Markdown:
(1) calculator error vs lodged SAP, (2) Plans + costings, (3) recommended
measures and the attributes that triggered them."""
modelled: int = sum(1 for report in reports if report.plan is not None)
errored: int = sum(1 for report in reports if report.calculator_error is not None)
header: list[str] = [
"# Property inspection report",
"",
f"{len(reports)} properties · {modelled} modelled · "
f"{errored} calculator errors",
"",
]
sections: list[str] = [
*header,
*_calculator_error_section(reports),
"",
*_plan_costings_section(reports),
*_measures_triggers_section(reports),
]
return "\n".join(sections).rstrip() + "\n"
_CSV_HEADER: Final[str] = (
"cert,lodged_sap,calculated_sap,sap_error,sap_error_flag,"
"baseline_sap,post_sap,baseline_band,post_band,measures,measure_types,"
"cost_of_works,contingency,bill_savings,co2_savings,valuation_pct,"
"triggers,error"
)
def _csv_cell(value: object) -> str:
"""Render a CSV cell, rounding floats and keeping the row comma-safe
(commas in any value become ';' so the column count never changes)."""
if value is None:
return ""
if isinstance(value, float):
return f"{value:.2f}"
return str(value).replace(",", ";")
def _csv_triggers(report: PropertyReport) -> str:
"""Flatten the fired measures and their triggers into one comma-safe cell:
`type(field=value;field=value)|type(field=value)`."""
return "|".join(
f"{trigger.measure_type}("
+ ";".join(f"{field}={value}" for field, value in trigger.triggers.items())
+ ")"
for trigger in report.measure_triggers
)
def format_report_csv(reports: list[PropertyReport]) -> str:
"""Render the report as a flat CSV — one row per property, browsable and
sortable in a spreadsheet for a large dump. The calculator-error fields, the
Plan headline figures, and the flattened triggers all share one row."""
rows: list[str] = [_CSV_HEADER]
for report in reports:
plan: Optional[Plan] = report.plan
cells: list[object] = [
report.name,
report.lodged_sap,
report.calculated_sap,
report.sap_error,
1 if report.sap_error_exceeds_threshold else 0,
None if plan is None else plan.baseline.sap_continuous,
None if plan is None else plan.post_sap_continuous,
None if plan is None else plan.baseline_epc_rating.value,
None if plan is None else plan.post_epc_rating.value,
None if plan is None else len(plan.measures),
None
if plan is None
else ";".join(measure.measure_type for measure in plan.measures),
None if plan is None else plan.cost_of_works,
None if plan is None else plan.contingency_cost,
None if plan is None else plan.energy_bill_savings,
None if plan is None else plan.co2_savings_kg_per_yr,
None if plan is None else plan.valuation.average_pct * 100,
_csv_triggers(report),
report.calculator_error or report.plan_error,
]
rows.append(",".join(_csv_cell(cell) for cell in cells))
return "\n".join(rows)