Model/domain/modelling/optimiser.py
Khalim Conn-Kowlessar 49e86344d2 feat(modelling): whole-package re-score + greedy repair (#1160)
Slice 2 of #1160 — the ADR-0016 truth step on top of the warm-start
knapsack. optimise_package(groups, scorer, baseline_epc, budget,
target_sap) -> OptimisedPackage:

  warm-start optimise() (role-1 signal) → re-score the chosen package on
  the real scorer (role-2 truth) → while the true SAP undershoots
  target_sap and budget remains, greedy-add the untreated-group Option
  with the best *marginal* SAP-per-£ (re-scored, not the role-1 signal),
  re-score, repeat until the target is met, nothing positive-marginal is
  affordable, or the budget is spent.

`Scorer` is a structural Protocol (PackageScorer satisfies it) so the
repair loop is tested with a stub scorer — no calculator, runs on ARM.
The key case: role-1 under-counts roof so the warm-start skips it, the
re-score undershoots, and repair adds roof back to hit the target. 3
repair tests + the 6 core tests; pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 12:45:05 +00:00

180 lines
7.2 KiB
Python

"""The Optimiser core — a grouped (multiple-choice) knapsack over per-Option
role-1 scores (ADR-0016).
Recycles the formulation of the legacy ``GainOptimiser`` / ``CostOptimiser``
(``recommendations/optimiser/``): pick **at most one** Option per Recommendation
(disjoint groups, no cross-group exclusion constraints — the Recommendation
partition makes selected overlays collision-free), maximising total SAP gain
subject to the Scenario budget. The legacy classes solve this as a `mip` MILP;
here it is an exact pure-Python multiple-choice knapsack — no native solver
dependency, so it runs everywhere and is deterministically testable.
This is the warm-start **signal** only: per ADR-0016 the role-1 per-Option
scores are approximate (independent-vs-baseline), so the truthful figure comes
from the whole-package re-score + greedy repair, not from this selection. Exact
enumeration is therefore more than adequate, and at retrofit scale (a handful
of Recommendations, a few Options each) the candidate space — ``Π(|group|+1)``
— is tiny.
"""
from __future__ import annotations
import itertools
from dataclasses import dataclass
from typing import Optional, Protocol, Sequence
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.package_scorer import Score
from domain.modelling.recommendation import MeasureOption
from domain.modelling.simulation import EpcSimulation
@dataclass(frozen=True)
class ScoredOption:
"""A candidate Measure Option paired with its role-1 (independent-vs-
baseline) SAP gain — the optimiser's input signal. Cost is read from the
Option; the gain is supplied by scoring."""
option: MeasureOption
sap_gain: float
def _option_cost(option: MeasureOption) -> float:
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot optimise"
)
return option.cost.total
def optimise(
groups: list[list[ScoredOption]], budget: Optional[float]
) -> list[ScoredOption]:
"""Select at most one ScoredOption per group to maximise total SAP gain
subject to ``budget`` (None = unconstrained). Exact: enumerates every
pick-one-or-skip-per-group package, keeps the affordable one with the
greatest gain, breaking ties toward lower cost. Returns the selected
ScoredOptions (empty if nothing affordable beats selecting none)."""
# Each group offers: skip it (None) or take exactly one of its Options.
choices_per_group: list[list[Optional[ScoredOption]]] = [
[None, *group] for group in groups
]
best: list[ScoredOption] = []
best_gain: float = -1.0
best_cost: float = 0.0
for combo in itertools.product(*choices_per_group):
selected: list[ScoredOption] = [
choice for choice in combo if choice is not None
]
total_cost: float = sum(_option_cost(s.option) for s in selected)
if budget is not None and total_cost > budget:
continue
total_gain: float = sum(s.sap_gain for s in selected)
# Maximise gain; on a tie prefer the cheaper package.
if (total_gain, -total_cost) > (best_gain, -best_cost):
best, best_gain, best_cost = selected, total_gain, total_cost
return best
class Scorer(Protocol):
"""The whole-package scoring primitive — `PackageScorer` satisfies it.
Kept structural so the repair loop is testable with a stub scorer."""
def score(
self, baseline: EpcPropertyData, simulations: Sequence[EpcSimulation]
) -> Score: ...
@dataclass(frozen=True)
class OptimisedPackage:
"""The package the Optimiser commits to: the selected ScoredOptions and the
**truthful** whole-package re-score (ADR-0016 role 2), after any greedy
repair. The per-Option `sap_gain` on the selections is the approximate
warm-start signal — never the package total, which is `score`."""
selected: list[ScoredOption]
score: Score
def optimise_package(
*,
groups: list[list[ScoredOption]],
scorer: Scorer,
baseline_epc: EpcPropertyData,
budget: Optional[float],
target_sap: Optional[float],
) -> OptimisedPackage:
"""Warm-start with the grouped knapsack (role-1 signal), re-score the chosen
package on the real scorer (role-2 truth), then — while the true SAP
undershoots ``target_sap`` and budget remains — greedy-add the untreated-
group Option with the best marginal SAP-per-£ and re-score, until the target
is met, no positive-marginal Option is affordable, or the budget is spent
(ADR-0016). ``target_sap``/``budget`` of None mean unconstrained."""
selected: list[ScoredOption] = optimise(groups, budget)
score: Score = _score(scorer, baseline_epc, selected)
if target_sap is None:
return OptimisedPackage(selected=selected, score=score)
spent: float = sum(_option_cost(s.option) for s in selected)
while score.sap_continuous < target_sap:
remaining: Optional[float] = None if budget is None else budget - spent
candidate = _best_repair_candidate(
groups, selected, scorer, baseline_epc, score, remaining
)
if candidate is None:
break
selected = [*selected, candidate]
spent += _option_cost(candidate.option)
score = _score(scorer, baseline_epc, selected)
return OptimisedPackage(selected=selected, score=score)
def _score(
scorer: Scorer, baseline_epc: EpcPropertyData, selected: list[ScoredOption]
) -> Score:
return scorer.score(baseline_epc, [s.option.overlay for s in selected])
def _used_group_indices(
groups: list[list[ScoredOption]], selected: list[ScoredOption]
) -> set[int]:
"""Indices of groups already represented in the selection (≤1 per group),
matched by object identity — the selection holds the very ScoredOptions
from ``groups``."""
return {
index
for index, group in enumerate(groups)
if any(option is chosen for option in group for chosen in selected)
}
def _best_repair_candidate(
groups: list[list[ScoredOption]],
selected: list[ScoredOption],
scorer: Scorer,
baseline_epc: EpcPropertyData,
current: Score,
remaining_budget: Optional[float],
) -> Optional[ScoredOption]:
"""The untreated-group Option giving the best **marginal** SAP-per-£ when
added to the current package (re-scored, not the role-1 signal), affordable
within ``remaining_budget`` and strictly improving. None if there is none."""
used: set[int] = _used_group_indices(groups, selected)
best: Optional[ScoredOption] = None
best_ratio: float = 0.0
for index, group in enumerate(groups):
if index in used:
continue
for option in group:
cost: float = _option_cost(option.option)
if remaining_budget is not None and cost > remaining_budget:
continue
trial: Score = _score(scorer, baseline_epc, [*selected, option])
marginal: float = trial.sap_continuous - current.sap_continuous
if marginal <= 0.0:
continue
ratio: float = float("inf") if cost == 0.0 else marginal / cost
if ratio > best_ratio:
best, best_ratio = option, ratio
return best