diff --git a/domain/modelling/optimiser.py b/domain/modelling/optimiser.py index a7ac28ec..beeb630e 100644 --- a/domain/modelling/optimiser.py +++ b/domain/modelling/optimiser.py @@ -21,9 +21,12 @@ from __future__ import annotations import itertools from dataclasses import dataclass -from typing import Optional +from typing import Optional, Protocol, Sequence +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.modelling.package_scorer import Score from domain.modelling.recommendation import MeasureOption +from domain.modelling.simulation import EpcSimulation @dataclass(frozen=True) @@ -72,3 +75,106 @@ def optimise( if (total_gain, -total_cost) > (best_gain, -best_cost): best, best_gain, best_cost = selected, total_gain, total_cost return best + + +class Scorer(Protocol): + """The whole-package scoring primitive — `PackageScorer` satisfies it. + Kept structural so the repair loop is testable with a stub scorer.""" + + def score( + self, baseline: EpcPropertyData, simulations: Sequence[EpcSimulation] + ) -> Score: ... + + +@dataclass(frozen=True) +class OptimisedPackage: + """The package the Optimiser commits to: the selected ScoredOptions and the + **truthful** whole-package re-score (ADR-0016 role 2), after any greedy + repair. The per-Option `sap_gain` on the selections is the approximate + warm-start signal — never the package total, which is `score`.""" + + selected: list[ScoredOption] + score: Score + + +def optimise_package( + *, + groups: list[list[ScoredOption]], + scorer: Scorer, + baseline_epc: EpcPropertyData, + budget: Optional[float], + target_sap: Optional[float], +) -> OptimisedPackage: + """Warm-start with the grouped knapsack (role-1 signal), re-score the chosen + package on the real scorer (role-2 truth), then — while the true SAP + undershoots ``target_sap`` and budget remains — greedy-add the untreated- + group Option with the best marginal SAP-per-£ and re-score, until the target + is met, no positive-marginal Option is affordable, or the budget is spent + (ADR-0016). ``target_sap``/``budget`` of None mean unconstrained.""" + selected: list[ScoredOption] = optimise(groups, budget) + score: Score = _score(scorer, baseline_epc, selected) + if target_sap is None: + return OptimisedPackage(selected=selected, score=score) + + spent: float = sum(_option_cost(s.option) for s in selected) + while score.sap_continuous < target_sap: + remaining: Optional[float] = None if budget is None else budget - spent + candidate = _best_repair_candidate( + groups, selected, scorer, baseline_epc, score, remaining + ) + if candidate is None: + break + selected = [*selected, candidate] + spent += _option_cost(candidate.option) + score = _score(scorer, baseline_epc, selected) + return OptimisedPackage(selected=selected, score=score) + + +def _score( + scorer: Scorer, baseline_epc: EpcPropertyData, selected: list[ScoredOption] +) -> Score: + return scorer.score(baseline_epc, [s.option.overlay for s in selected]) + + +def _used_group_indices( + groups: list[list[ScoredOption]], selected: list[ScoredOption] +) -> set[int]: + """Indices of groups already represented in the selection (≤1 per group), + matched by object identity — the selection holds the very ScoredOptions + from ``groups``.""" + return { + index + for index, group in enumerate(groups) + if any(option is chosen for option in group for chosen in selected) + } + + +def _best_repair_candidate( + groups: list[list[ScoredOption]], + selected: list[ScoredOption], + scorer: Scorer, + baseline_epc: EpcPropertyData, + current: Score, + remaining_budget: Optional[float], +) -> Optional[ScoredOption]: + """The untreated-group Option giving the best **marginal** SAP-per-£ when + added to the current package (re-scored, not the role-1 signal), affordable + within ``remaining_budget`` and strictly improving. None if there is none.""" + used: set[int] = _used_group_indices(groups, selected) + best: Optional[ScoredOption] = None + best_ratio: float = 0.0 + for index, group in enumerate(groups): + if index in used: + continue + for option in group: + cost: float = _option_cost(option.option) + if remaining_budget is not None and cost > remaining_budget: + continue + trial: Score = _score(scorer, baseline_epc, [*selected, option]) + marginal: float = trial.sap_continuous - current.sap_continuous + if marginal <= 0.0: + continue + ratio: float = float("inf") if cost == 0.0 else marginal / cost + if ratio > best_ratio: + best, best_ratio = option, ratio + return best diff --git a/tests/domain/modelling/test_optimiser.py b/tests/domain/modelling/test_optimiser.py index bbbaebb3..77e83680 100644 --- a/tests/domain/modelling/test_optimiser.py +++ b/tests/domain/modelling/test_optimiser.py @@ -8,9 +8,24 @@ selection with synthetic scores and no calculator. from __future__ import annotations -from domain.modelling.optimiser import ScoredOption, optimise +from typing import Sequence + +from datatypes.epc.domain.epc_property_data import ( + BuildingPartIdentifier, + EpcPropertyData, +) +from domain.modelling.optimiser import ( + OptimisedPackage, + ScoredOption, + optimise, + optimise_package, +) +from domain.modelling.package_scorer import Score from domain.modelling.recommendation import Cost, MeasureOption -from domain.modelling.simulation import EpcSimulation +from domain.modelling.simulation import BuildingPartOverlay, EpcSimulation +from tests.domain.sap10_calculator.worksheet._elmhurst_worksheet_000490 import ( + build_epc, +) def _scored(measure_type: str, *, gain: float, cost: float) -> ScoredOption: @@ -25,6 +40,66 @@ def _scored(measure_type: str, *, gain: float, cost: float) -> ScoredOption: ) +# Distinguishable overlays so the stub scorer can attribute a true gain per +# measure (wall / roof / floor) regardless of the role-1 signal. +_WALL_OVERLAY = EpcSimulation( + building_parts={ + BuildingPartIdentifier.MAIN: BuildingPartOverlay(wall_insulation_type=2) + } +) +_ROOF_OVERLAY = EpcSimulation( + building_parts={ + BuildingPartIdentifier.MAIN: BuildingPartOverlay(roof_insulation_thickness=300) + } +) +_FLOOR_OVERLAY = EpcSimulation( + building_parts={ + BuildingPartIdentifier.MAIN: BuildingPartOverlay(floor_insulation_thickness=100) + } +) + + +def _scored_overlay( + measure_type: str, *, gain: float, cost: float, overlay: EpcSimulation +) -> ScoredOption: + return ScoredOption( + option=MeasureOption( + measure_type=measure_type, + description=measure_type, + overlay=overlay, + cost=Cost(total=cost, contingency_rate=0.0), + ), + sap_gain=gain, + ) + + +class _StubScorer: + """A deterministic stand-in for PackageScorer: the package SAP is a base + plus a fixed *true* gain per measure present (by overlay field), decoupled + from the role-1 signal — so the repair loop is exercised without the + calculator (ADR-0016).""" + + def __init__(self, *, base: float, wall: float, roof: float, floor: float) -> None: + self._base = base + self._wall = wall + self._roof = roof + self._floor = floor + + def score( + self, baseline: EpcPropertyData, simulations: Sequence[EpcSimulation] + ) -> Score: + sap = self._base + for sim in simulations: + part = sim.building_parts[BuildingPartIdentifier.MAIN] + if part.wall_insulation_type is not None: + sap += self._wall + if part.roof_insulation_thickness is not None: + sap += self._roof + if part.floor_insulation_thickness is not None: + sap += self._floor + return Score(sap_continuous=sap, co2_kg_per_yr=0.0, primary_energy_kwh_per_yr=0.0) + + def _selected_types(selection: list[ScoredOption]) -> set[str]: return {scored.option.measure_type for scored in selection} @@ -121,3 +196,82 @@ def test_within_budget_partial_selection_prefers_the_higher_gain_option() -> Non # Assert — EWI is unaffordable; loft alone is the best within £2000. assert _selected_types(selection) == {"loft_insulation"} + + +def test_repair_adds_an_untreated_group_option_to_close_the_undershoot() -> None: + # Arrange — role-1 under-counts roof (signal 0 → warm-start skips it), but + # its true re-scored gain (+4) is what closes the target. + groups: list[list[ScoredOption]] = [ + [_scored_overlay("cavity_wall_insulation", gain=10.0, cost=1000.0, overlay=_WALL_OVERLAY)], + [_scored_overlay("loft_insulation", gain=0.0, cost=1000.0, overlay=_ROOF_OVERLAY)], + [_scored_overlay("suspended_floor_insulation", gain=8.0, cost=1000.0, overlay=_FLOOR_OVERLAY)], + ] + scorer = _StubScorer(base=40.0, wall=5.0, roof=4.0, floor=3.0) + + # Act + package: OptimisedPackage = optimise_package( + groups=groups, + scorer=scorer, + baseline_epc=build_epc(), + budget=5000.0, + target_sap=50.0, + ) + + # Assert — warm-start took wall+floor (re-score 48 < 50); repair added the + # roof (true +4) to reach 52, the truthful package total. + types = {scored.option.measure_type for scored in package.selected} + assert "loft_insulation" in types + assert types == { + "cavity_wall_insulation", + "suspended_floor_insulation", + "loft_insulation", + } + assert abs(package.score.sap_continuous - 52.0) <= 1e-9 + + +def test_no_target_returns_the_warm_start_package_without_repair() -> None: + # Arrange + groups: list[list[ScoredOption]] = [ + [_scored_overlay("cavity_wall_insulation", gain=10.0, cost=1000.0, overlay=_WALL_OVERLAY)], + ] + scorer = _StubScorer(base=40.0, wall=5.0, roof=4.0, floor=3.0) + + # Act + package: OptimisedPackage = optimise_package( + groups=groups, + scorer=scorer, + baseline_epc=build_epc(), + budget=None, + target_sap=None, + ) + + # Assert — no target → no repair; warm-start package re-scored as the truth. + assert {s.option.measure_type for s in package.selected} == { + "cavity_wall_insulation" + } + assert abs(package.score.sap_continuous - 45.0) <= 1e-9 + + +def test_repair_stops_when_no_affordable_improving_option_remains() -> None: + # Arrange — the only untreated-group option costs more than the budget left. + groups: list[list[ScoredOption]] = [ + [_scored_overlay("cavity_wall_insulation", gain=10.0, cost=1000.0, overlay=_WALL_OVERLAY)], + [_scored_overlay("loft_insulation", gain=0.0, cost=5000.0, overlay=_ROOF_OVERLAY)], + ] + scorer = _StubScorer(base=40.0, wall=5.0, roof=4.0, floor=3.0) + + # Act + package: OptimisedPackage = optimise_package( + groups=groups, + scorer=scorer, + baseline_epc=build_epc(), + budget=1000.0, + target_sap=50.0, + ) + + # Assert — wall only (re-score 45 < 50); roof unaffordable, so repair stops + # at the best achievable package rather than overspending. + assert {s.option.measure_type for s in package.selected} == { + "cavity_wall_insulation" + } + assert abs(package.score.sap_continuous - 45.0) <= 1e-9