Model/domain/modelling/optimisation/optimiser.py
Khalim Conn-Kowlessar 2bf42d046e feat(modelling): optimise_package targets least-cost, falls back to max-gain
Rewire the objective per the ADR-0016 amendment. With a target_sap (Increasing
EPC): warm-start optimise_min_cost (cheapest package reaching target_gain =
target_sap - baseline within budget) -> inject dependencies -> re-score ->
repair toward target; if the warm-start is infeasible or the repaired package
still falls short on the true score, fall back to max-gain-within-budget (best
effort). Without a target_sap: max-gain (unchanged). The min-cost objective
stops at the target without overshooting into a higher band; surplus budget is
left unspent. Extracted _max_gain_package (no-target path + fallback) and
_repair_to_target (inject + re-score + greedy repair). Dependency injection and
the repair loop are preserved; all prior optimiser + dependency tests pass
unchanged. Ventilation-aware *selection* is the next slice; injection is still
post-warm-start here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 15:43:06 +00:00

316 lines
13 KiB
Python

"""The Optimiser core — a grouped (multiple-choice) knapsack over per-Option
role-1 scores (ADR-0016).
Recycles the formulation of the legacy ``GainOptimiser`` / ``CostOptimiser``
(``recommendations/optimiser/``): pick **at most one** Option per Recommendation
(disjoint groups, no cross-group exclusion constraints — the Recommendation
partition makes selected overlays collision-free), maximising total SAP gain
subject to the Scenario budget. The legacy classes solve this as a `mip` MILP;
here it is an exact pure-Python multiple-choice knapsack — no native solver
dependency, so it runs everywhere and is deterministically testable.
This is the warm-start **signal** only: per ADR-0016 the role-1 per-Option
scores are approximate (independent-vs-baseline), so the truthful figure comes
from the whole-package re-score + greedy repair, not from this selection. Exact
enumeration is therefore more than adequate, and at retrofit scale (a handful
of Recommendations, a few Options each) the candidate space — ``Π(|group|+1)``
— is tiny.
"""
from __future__ import annotations
import itertools
from dataclasses import dataclass
from typing import Optional, Protocol, Sequence
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.scoring.package_scorer import Score
from domain.modelling.recommendation import MeasureOption
from domain.modelling.simulation import EpcSimulation
@dataclass(frozen=True)
class ScoredOption:
"""A candidate Measure Option paired with its role-1 (independent-vs-
baseline) SAP gain — the optimiser's input signal. Cost is read from the
Option; the gain is supplied by scoring."""
option: MeasureOption
sap_gain: float
@dataclass(frozen=True)
class MeasureDependency:
"""A forced "A requires B" edge (ADR-0016 Measure Dependency): when any
selected Option's `measure_type` is in `triggers`, `required` is injected
into the package **before** the whole-package re-score — never competing in
the optimiser pool, but its (negative) SAP and its cost land in the truthful
figure, the repair decision, and the persisted package. Held as data so
extending the triggers is a data edit, not control flow."""
triggers: frozenset[str]
required: ScoredOption
def _option_cost(option: MeasureOption) -> float:
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot optimise"
)
return option.cost.total
def optimise(
groups: list[list[ScoredOption]], budget: Optional[float]
) -> list[ScoredOption]:
"""Select at most one ScoredOption per group to maximise total SAP gain
subject to ``budget`` (None = unconstrained). Exact: enumerates every
pick-one-or-skip-per-group package, keeps the affordable one with the
greatest gain, breaking ties toward lower cost. Returns the selected
ScoredOptions (empty if nothing affordable beats selecting none)."""
# Each group offers: skip it (None) or take exactly one of its Options.
choices_per_group: list[list[Optional[ScoredOption]]] = [
[None, *group] for group in groups
]
best: list[ScoredOption] = []
best_gain: float = -1.0
best_cost: float = 0.0
for combo in itertools.product(*choices_per_group):
selected: list[ScoredOption] = [
choice for choice in combo if choice is not None
]
total_cost: float = sum(_option_cost(s.option) for s in selected)
if budget is not None and total_cost > budget:
continue
total_gain: float = sum(s.sap_gain for s in selected)
# Maximise gain; on a tie prefer the cheaper package.
if (total_gain, -total_cost) > (best_gain, -best_cost):
best, best_gain, best_cost = selected, total_gain, total_cost
return best
def optimise_min_cost(
groups: list[list[ScoredOption]],
budget: Optional[float],
target_gain: float,
) -> Optional[list[ScoredOption]]:
"""Select at most one ScoredOption per group to **minimise total cost**
subject to total SAP gain ``>= target_gain`` and total cost ``<= budget``
(None = unconstrained) — the least-cost-to-target objective (ADR-0016
amendment). Exact enumeration over every pick-one-or-skip-per-group package.
Returns the cheapest target-reaching package (ties broken toward the higher
gain — "recommend more"), or ``None`` when no package within budget reaches
the target (the caller falls back to max-gain). A non-positive
``target_gain`` is met by the empty package."""
choices_per_group: list[list[Optional[ScoredOption]]] = [
[None, *group] for group in groups
]
best: Optional[list[ScoredOption]] = None
best_cost: float = 0.0
best_gain: float = 0.0
for combo in itertools.product(*choices_per_group):
selected: list[ScoredOption] = [
choice for choice in combo if choice is not None
]
total_cost: float = sum(_option_cost(s.option) for s in selected)
if budget is not None and total_cost > budget:
continue
total_gain: float = sum(s.sap_gain for s in selected)
if total_gain < target_gain:
continue
# Minimise cost; on a tie prefer the higher-gain package.
if best is None or (-total_cost, total_gain) > (-best_cost, best_gain):
best, best_cost, best_gain = selected, total_cost, total_gain
return best
class Scorer(Protocol):
"""The whole-package scoring primitive — `PackageScorer` satisfies it.
Kept structural so the repair loop is testable with a stub scorer."""
def score(
self, baseline: EpcPropertyData, simulations: Sequence[EpcSimulation]
) -> Score: ...
@dataclass(frozen=True)
class OptimisedPackage:
"""The package the Optimiser commits to: the selected ScoredOptions and the
**truthful** whole-package re-score (ADR-0016 role 2), after any greedy
repair. The per-Option `sap_gain` on the selections is the approximate
warm-start signal — never the package total, which is `score`."""
selected: list[ScoredOption]
score: Score
def optimise_package(
*,
groups: list[list[ScoredOption]],
scorer: Scorer,
baseline_epc: EpcPropertyData,
budget: Optional[float],
target_sap: Optional[float],
dependencies: Sequence[MeasureDependency] = (),
) -> OptimisedPackage:
"""Select the Optimised Package for one Property + Scenario (ADR-0016 +
its amendment).
With a ``target_sap`` (an Increasing EPC goal) the objective is
**least-cost-to-target**: warm-start with the cheapest package whose role-1
signal reaches the target gain within budget (`optimise_min_cost`), inject
any forced Measure Dependencies, re-score the whole package for the truth,
and greedy-repair toward ``target_sap`` while it undershoots. If the target
is unreachable within budget — the warm-start is infeasible, or the repaired
package still falls short on the true score — fall back to the **maximum
improvement the budget buys** (`optimise`). The min-cost objective stops at
the target and does not overshoot into a higher band; surplus budget is left
unspent.
Without a ``target_sap`` (other goals) it is max-gain-within-budget. Either
way forced dependencies are injected on every path and their cost counts
toward the spend; the returned `selected` includes them. ``budget`` of None
means unconstrained."""
if target_sap is None:
return _max_gain_package(groups, scorer, baseline_epc, budget, dependencies)
baseline_sap: float = _score(scorer, baseline_epc, []).sap_continuous
target_gain: float = target_sap - baseline_sap
chosen: Optional[list[ScoredOption]] = optimise_min_cost(
groups, budget, target_gain
)
if chosen is not None:
package: OptimisedPackage = _repair_to_target(
chosen, groups, dependencies, scorer, baseline_epc, budget, target_sap
)
if package.score.sap_continuous >= target_sap:
return package
# Target unreachable within budget (warm-start infeasible, or the repaired
# package still falls short) → best effort: the most improvement budget buys.
return _max_gain_package(groups, scorer, baseline_epc, budget, dependencies)
def _max_gain_package(
groups: list[list[ScoredOption]],
scorer: Scorer,
baseline_epc: EpcPropertyData,
budget: Optional[float],
dependencies: Sequence[MeasureDependency],
) -> OptimisedPackage:
"""Max-gain-within-budget, dependencies injected and re-scored — the
no-target objective and the unreachable-target fallback."""
chosen: list[ScoredOption] = optimise(groups, budget)
selected: list[ScoredOption] = _inject(chosen, dependencies)
return OptimisedPackage(
selected=selected, score=_score(scorer, baseline_epc, selected)
)
def _repair_to_target(
chosen: list[ScoredOption],
groups: list[list[ScoredOption]],
dependencies: Sequence[MeasureDependency],
scorer: Scorer,
baseline_epc: EpcPropertyData,
budget: Optional[float],
target_sap: float,
) -> OptimisedPackage:
"""Inject dependencies onto the warm-start, re-score for the truth, then
greedy-add the untreated-group Option with the best marginal SAP-per-£ (its
own dependency folded in) until the true SAP clears ``target_sap`` or no
affordable improving Option remains."""
selected: list[ScoredOption] = _inject(chosen, dependencies)
score: Score = _score(scorer, baseline_epc, selected)
while score.sap_continuous < target_sap:
candidate = _best_repair_candidate(
groups, chosen, dependencies, scorer, baseline_epc, score, budget
)
if candidate is None:
break
chosen = [*chosen, candidate]
selected = _inject(chosen, dependencies)
score = _score(scorer, baseline_epc, selected)
return OptimisedPackage(selected=selected, score=score)
def _inject(
chosen: list[ScoredOption], dependencies: Sequence[MeasureDependency]
) -> list[ScoredOption]:
"""``chosen`` plus every forced dependency whose triggers intersect the
chosen measure-types, de-duplicated by required measure-type (a dependency
several measures trigger is injected once)."""
chosen_types: set[str] = {s.option.measure_type for s in chosen}
injected: list[ScoredOption] = list(chosen)
present: set[str] = set(chosen_types)
for dependency in dependencies:
required_type: str = dependency.required.option.measure_type
if dependency.triggers & chosen_types and required_type not in present:
injected.append(dependency.required)
present.add(required_type)
return injected
def _package_cost(selected: list[ScoredOption]) -> float:
return sum(_option_cost(s.option) for s in selected)
def _score(
scorer: Scorer, baseline_epc: EpcPropertyData, selected: list[ScoredOption]
) -> Score:
return scorer.score(baseline_epc, [s.option.overlay for s in selected])
def _used_group_indices(
groups: list[list[ScoredOption]], selected: list[ScoredOption]
) -> set[int]:
"""Indices of groups already represented in the selection (≤1 per group),
matched by object identity — the selection holds the very ScoredOptions
from ``groups``."""
return {
index
for index, group in enumerate(groups)
if any(option is chosen for option in group for chosen in selected)
}
def _best_repair_candidate(
groups: list[list[ScoredOption]],
chosen: list[ScoredOption],
dependencies: Sequence[MeasureDependency],
scorer: Scorer,
baseline_epc: EpcPropertyData,
current: Score,
budget: Optional[float],
) -> Optional[ScoredOption]:
"""The untreated-group Option giving the best **marginal** SAP-per-£ when
added to the current package — re-scored (not the role-1 signal) with any
ventilation dependency it newly triggers folded in, so both its SAP and its
incremental cost are truthful. Affordable when the resulting whole-package
cost is within ``budget`` and strictly improving. None if there is none."""
used: set[int] = _used_group_indices(groups, chosen)
base_cost: float = _package_cost(_inject(chosen, dependencies))
best: Optional[ScoredOption] = None
best_ratio: float = 0.0
for index, group in enumerate(groups):
if index in used:
continue
for option in group:
trial_selected: list[ScoredOption] = _inject(
[*chosen, option], dependencies
)
package_cost: float = _package_cost(trial_selected)
if budget is not None and package_cost > budget:
continue
trial: Score = _score(scorer, baseline_epc, trial_selected)
marginal: float = trial.sap_continuous - current.sap_continuous
if marginal <= 0.0:
continue
incremental: float = package_cost - base_cost
ratio: float = (
float("inf") if incremental <= 0.0 else marginal / incremental
)
if ratio > best_ratio:
best, best_ratio = option, ratio
return best