mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
The warm-start (and max-gain fallback) now price each forced Measure Dependency the candidate triggers, not just inject it afterwards: optimise/optimise_min_cost fold dependencies into each candidate's cost+gain via _augmented_cost_gain, and optimise_package scores each dependency's true role-1 signal (_with_role1_signals) instead of the 0.0 placeholder. This stops the min-cost objective (i) ignoring the ~£900 a wall drags in (a wall-free package reaching target can be cheaper) and (ii) picking a small-gain wall whose mandatory ventilation (down to -5 SAP) makes it net-negative, which repair cannot un-pick. Budget is now a hard envelope: the constraint applies to the augmented (measure + its ventilation) cost, so a wall that fits alone but whose ventilation would bust the budget is DROPPED rather than forced over budget. This reverses the earlier 'forced regardless of budget' call (which made sense when selection was ventilation-blind). Safety invariant intact — presence still injected on every path; we just never recommend a wall we can't afford to ventilate. ADR-0016 amendment updated. 94 modelling+orchestration tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
375 lines
16 KiB
Python
375 lines
16 KiB
Python
"""The Optimiser core — a grouped (multiple-choice) knapsack over per-Option
|
|
role-1 scores (ADR-0016).
|
|
|
|
Recycles the formulation of the legacy ``GainOptimiser`` / ``CostOptimiser``
|
|
(``recommendations/optimiser/``): pick **at most one** Option per Recommendation
|
|
(disjoint groups, no cross-group exclusion constraints — the Recommendation
|
|
partition makes selected overlays collision-free), maximising total SAP gain
|
|
subject to the Scenario budget. The legacy classes solve this as a `mip` MILP;
|
|
here it is an exact pure-Python multiple-choice knapsack — no native solver
|
|
dependency, so it runs everywhere and is deterministically testable.
|
|
|
|
This is the warm-start **signal** only: per ADR-0016 the role-1 per-Option
|
|
scores are approximate (independent-vs-baseline), so the truthful figure comes
|
|
from the whole-package re-score + greedy repair, not from this selection. Exact
|
|
enumeration is therefore more than adequate, and at retrofit scale (a handful
|
|
of Recommendations, a few Options each) the candidate space — ``Π(|group|+1)``
|
|
— is tiny.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import itertools
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Protocol, Sequence
|
|
|
|
from datatypes.epc.domain.epc_property_data import EpcPropertyData
|
|
from domain.modelling.scoring.package_scorer import Score
|
|
from domain.modelling.recommendation import MeasureOption
|
|
from domain.modelling.simulation import EpcSimulation
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScoredOption:
|
|
"""A candidate Measure Option paired with its role-1 (independent-vs-
|
|
baseline) SAP gain — the optimiser's input signal. Cost is read from the
|
|
Option; the gain is supplied by scoring."""
|
|
|
|
option: MeasureOption
|
|
sap_gain: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MeasureDependency:
|
|
"""A forced "A requires B" edge (ADR-0016 Measure Dependency): when any
|
|
selected Option's `measure_type` is in `triggers`, `required` is injected
|
|
into the package **before** the whole-package re-score — never competing in
|
|
the optimiser pool, but its (negative) SAP and its cost land in the truthful
|
|
figure, the repair decision, and the persisted package. Held as data so
|
|
extending the triggers is a data edit, not control flow."""
|
|
|
|
triggers: frozenset[str]
|
|
required: ScoredOption
|
|
|
|
|
|
def _option_cost(option: MeasureOption) -> float:
|
|
if option.cost is None:
|
|
raise ValueError(
|
|
f"measure option {option.measure_type!r} has no cost; cannot optimise"
|
|
)
|
|
return option.cost.total
|
|
|
|
|
|
def optimise(
|
|
groups: list[list[ScoredOption]],
|
|
budget: Optional[float],
|
|
dependencies: Sequence[MeasureDependency] = (),
|
|
) -> list[ScoredOption]:
|
|
"""Select at most one ScoredOption per group to maximise total SAP gain
|
|
subject to ``budget`` (None = unconstrained). Exact: enumerates every
|
|
pick-one-or-skip-per-group package, keeps the affordable one with the
|
|
greatest gain, breaking ties toward lower cost. Returns the selected
|
|
ScoredOptions (empty if nothing affordable beats selecting none).
|
|
|
|
Candidate cost and gain are evaluated with any forced ``dependencies`` the
|
|
candidate triggers folded in (ADR-0016 amendment — ventilation-aware), so a
|
|
package is judged on what it will really cost and gain once its dependency
|
|
is injected. The returned list holds only the group selections, not the
|
|
folded-in dependencies (the caller injects those)."""
|
|
choices_per_group: list[list[Optional[ScoredOption]]] = [
|
|
[None, *group] for group in groups
|
|
]
|
|
|
|
best: list[ScoredOption] = []
|
|
best_gain: float = -1.0
|
|
best_cost: float = 0.0
|
|
for combo in itertools.product(*choices_per_group):
|
|
selected: list[ScoredOption] = [
|
|
choice for choice in combo if choice is not None
|
|
]
|
|
total_cost, total_gain = _augmented_cost_gain(selected, dependencies)
|
|
if budget is not None and total_cost > budget:
|
|
continue
|
|
# Maximise gain; on a tie prefer the cheaper package.
|
|
if (total_gain, -total_cost) > (best_gain, -best_cost):
|
|
best, best_gain, best_cost = selected, total_gain, total_cost
|
|
return best
|
|
|
|
|
|
def _augmented_cost_gain(
|
|
selected: list[ScoredOption], dependencies: Sequence[MeasureDependency]
|
|
) -> tuple[float, float]:
|
|
"""The total cost and total role-1 gain of a candidate **with the forced
|
|
dependencies it triggers folded in** — what the package will really cost and
|
|
gain once injected. Dependency gains are negative (ventilation), so this is
|
|
how selection 'prices' the ventilation a wall drags in."""
|
|
augmented: list[ScoredOption] = _inject(selected, dependencies)
|
|
total_cost: float = sum(_option_cost(s.option) for s in augmented)
|
|
total_gain: float = sum(s.sap_gain for s in augmented)
|
|
return total_cost, total_gain
|
|
|
|
|
|
def optimise_min_cost(
|
|
groups: list[list[ScoredOption]],
|
|
budget: Optional[float],
|
|
target_gain: float,
|
|
dependencies: Sequence[MeasureDependency] = (),
|
|
) -> Optional[list[ScoredOption]]:
|
|
"""Select at most one ScoredOption per group to **minimise total cost**
|
|
subject to total SAP gain ``>= target_gain`` and total cost ``<= budget``
|
|
(None = unconstrained) — the least-cost-to-target objective (ADR-0016
|
|
amendment). Exact enumeration over every pick-one-or-skip-per-group package.
|
|
Returns the cheapest target-reaching package (ties broken toward the higher
|
|
gain — "recommend more"), or ``None`` when no package within budget reaches
|
|
the target (the caller falls back to max-gain). A non-positive
|
|
``target_gain`` is met by the empty package.
|
|
|
|
Candidate cost and gain are evaluated with any forced ``dependencies`` the
|
|
candidate triggers folded in (ventilation-aware), so a wall whose mandatory
|
|
ventilation cancels its gain is not mistaken for a cheap way to the target.
|
|
The returned list holds only the group selections, not the dependencies."""
|
|
choices_per_group: list[list[Optional[ScoredOption]]] = [
|
|
[None, *group] for group in groups
|
|
]
|
|
|
|
best: Optional[list[ScoredOption]] = None
|
|
best_cost: float = 0.0
|
|
best_gain: float = 0.0
|
|
for combo in itertools.product(*choices_per_group):
|
|
selected: list[ScoredOption] = [
|
|
choice for choice in combo if choice is not None
|
|
]
|
|
total_cost, total_gain = _augmented_cost_gain(selected, dependencies)
|
|
if budget is not None and total_cost > budget:
|
|
continue
|
|
if total_gain < target_gain:
|
|
continue
|
|
# Minimise cost; on a tie prefer the higher-gain package.
|
|
if best is None or (-total_cost, total_gain) > (-best_cost, best_gain):
|
|
best, best_cost, best_gain = selected, total_cost, total_gain
|
|
return best
|
|
|
|
|
|
class Scorer(Protocol):
|
|
"""The whole-package scoring primitive — `PackageScorer` satisfies it.
|
|
Kept structural so the repair loop is testable with a stub scorer."""
|
|
|
|
def score(
|
|
self, baseline: EpcPropertyData, simulations: Sequence[EpcSimulation]
|
|
) -> Score: ...
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OptimisedPackage:
|
|
"""The package the Optimiser commits to: the selected ScoredOptions and the
|
|
**truthful** whole-package re-score (ADR-0016 role 2), after any greedy
|
|
repair. The per-Option `sap_gain` on the selections is the approximate
|
|
warm-start signal — never the package total, which is `score`."""
|
|
|
|
selected: list[ScoredOption]
|
|
score: Score
|
|
|
|
|
|
def optimise_package(
|
|
*,
|
|
groups: list[list[ScoredOption]],
|
|
scorer: Scorer,
|
|
baseline_epc: EpcPropertyData,
|
|
budget: Optional[float],
|
|
target_sap: Optional[float],
|
|
dependencies: Sequence[MeasureDependency] = (),
|
|
) -> OptimisedPackage:
|
|
"""Select the Optimised Package for one Property + Scenario (ADR-0016 +
|
|
its amendment).
|
|
|
|
With a ``target_sap`` (an Increasing EPC goal) the objective is
|
|
**least-cost-to-target**: warm-start with the cheapest package whose role-1
|
|
signal reaches the target gain within budget (`optimise_min_cost`), inject
|
|
any forced Measure Dependencies, re-score the whole package for the truth,
|
|
and greedy-repair toward ``target_sap`` while it undershoots. If the target
|
|
is unreachable within budget — the warm-start is infeasible, or the repaired
|
|
package still falls short on the true score — fall back to the **maximum
|
|
improvement the budget buys** (`optimise`). The min-cost objective stops at
|
|
the target and does not overshoot into a higher band; surplus budget is left
|
|
unspent.
|
|
|
|
Without a ``target_sap`` (other goals) it is max-gain-within-budget. Either
|
|
way forced dependencies are injected on every path and their cost counts
|
|
toward the spend; the returned `selected` includes them. ``budget`` of None
|
|
means unconstrained."""
|
|
baseline_sap: float = _score(scorer, baseline_epc, []).sap_continuous
|
|
# Score each forced dependency's independent (role-1) impact so the selection
|
|
# can price the ventilation a wall drags in — negative for ventilation.
|
|
deps: list[MeasureDependency] = _with_role1_signals(
|
|
dependencies, scorer, baseline_epc, baseline_sap
|
|
)
|
|
|
|
if target_sap is None:
|
|
return _max_gain_package(groups, scorer, baseline_epc, budget, deps)
|
|
|
|
target_gain: float = target_sap - baseline_sap
|
|
chosen: Optional[list[ScoredOption]] = optimise_min_cost(
|
|
groups, budget, target_gain, deps
|
|
)
|
|
if chosen is not None:
|
|
package: OptimisedPackage = _repair_to_target(
|
|
chosen, groups, deps, scorer, baseline_epc, budget, target_sap
|
|
)
|
|
if package.score.sap_continuous >= target_sap:
|
|
return package
|
|
# Target unreachable within budget (warm-start infeasible, or the repaired
|
|
# package still falls short) → best effort: the most improvement budget buys.
|
|
return _max_gain_package(groups, scorer, baseline_epc, budget, deps)
|
|
|
|
|
|
def _with_role1_signals(
|
|
dependencies: Sequence[MeasureDependency],
|
|
scorer: Scorer,
|
|
baseline_epc: EpcPropertyData,
|
|
baseline_sap: float,
|
|
) -> list[MeasureDependency]:
|
|
"""Replace each dependency's placeholder role-1 signal with its true
|
|
independent-vs-baseline SAP impact, so the selectors price what the
|
|
dependency really does to the package (ADR-0016 amendment)."""
|
|
scored: list[MeasureDependency] = []
|
|
for dependency in dependencies:
|
|
signal: float = (
|
|
scorer.score(
|
|
baseline_epc, [dependency.required.option.overlay]
|
|
).sap_continuous
|
|
- baseline_sap
|
|
)
|
|
scored.append(
|
|
MeasureDependency(
|
|
triggers=dependency.triggers,
|
|
required=ScoredOption(
|
|
option=dependency.required.option, sap_gain=signal
|
|
),
|
|
)
|
|
)
|
|
return scored
|
|
|
|
|
|
def _max_gain_package(
|
|
groups: list[list[ScoredOption]],
|
|
scorer: Scorer,
|
|
baseline_epc: EpcPropertyData,
|
|
budget: Optional[float],
|
|
dependencies: Sequence[MeasureDependency],
|
|
) -> OptimisedPackage:
|
|
"""Max-gain-within-budget, dependencies priced in the selection then
|
|
injected and re-scored — the no-target objective and the unreachable-target
|
|
fallback."""
|
|
chosen: list[ScoredOption] = optimise(groups, budget, dependencies)
|
|
selected: list[ScoredOption] = _inject(chosen, dependencies)
|
|
return OptimisedPackage(
|
|
selected=selected, score=_score(scorer, baseline_epc, selected)
|
|
)
|
|
|
|
|
|
def _repair_to_target(
|
|
chosen: list[ScoredOption],
|
|
groups: list[list[ScoredOption]],
|
|
dependencies: Sequence[MeasureDependency],
|
|
scorer: Scorer,
|
|
baseline_epc: EpcPropertyData,
|
|
budget: Optional[float],
|
|
target_sap: float,
|
|
) -> OptimisedPackage:
|
|
"""Inject dependencies onto the warm-start, re-score for the truth, then
|
|
greedy-add the untreated-group Option with the best marginal SAP-per-£ (its
|
|
own dependency folded in) until the true SAP clears ``target_sap`` or no
|
|
affordable improving Option remains."""
|
|
selected: list[ScoredOption] = _inject(chosen, dependencies)
|
|
score: Score = _score(scorer, baseline_epc, selected)
|
|
while score.sap_continuous < target_sap:
|
|
candidate = _best_repair_candidate(
|
|
groups, chosen, dependencies, scorer, baseline_epc, score, budget
|
|
)
|
|
if candidate is None:
|
|
break
|
|
chosen = [*chosen, candidate]
|
|
selected = _inject(chosen, dependencies)
|
|
score = _score(scorer, baseline_epc, selected)
|
|
return OptimisedPackage(selected=selected, score=score)
|
|
|
|
|
|
def _inject(
|
|
chosen: list[ScoredOption], dependencies: Sequence[MeasureDependency]
|
|
) -> list[ScoredOption]:
|
|
"""``chosen`` plus every forced dependency whose triggers intersect the
|
|
chosen measure-types, de-duplicated by required measure-type (a dependency
|
|
several measures trigger is injected once)."""
|
|
chosen_types: set[str] = {s.option.measure_type for s in chosen}
|
|
injected: list[ScoredOption] = list(chosen)
|
|
present: set[str] = set(chosen_types)
|
|
for dependency in dependencies:
|
|
required_type: str = dependency.required.option.measure_type
|
|
if dependency.triggers & chosen_types and required_type not in present:
|
|
injected.append(dependency.required)
|
|
present.add(required_type)
|
|
return injected
|
|
|
|
|
|
def _package_cost(selected: list[ScoredOption]) -> float:
|
|
return sum(_option_cost(s.option) for s in selected)
|
|
|
|
|
|
def _score(
|
|
scorer: Scorer, baseline_epc: EpcPropertyData, selected: list[ScoredOption]
|
|
) -> Score:
|
|
return scorer.score(baseline_epc, [s.option.overlay for s in selected])
|
|
|
|
|
|
def _used_group_indices(
|
|
groups: list[list[ScoredOption]], selected: list[ScoredOption]
|
|
) -> set[int]:
|
|
"""Indices of groups already represented in the selection (≤1 per group),
|
|
matched by object identity — the selection holds the very ScoredOptions
|
|
from ``groups``."""
|
|
return {
|
|
index
|
|
for index, group in enumerate(groups)
|
|
if any(option is chosen for option in group for chosen in selected)
|
|
}
|
|
|
|
|
|
def _best_repair_candidate(
|
|
groups: list[list[ScoredOption]],
|
|
chosen: list[ScoredOption],
|
|
dependencies: Sequence[MeasureDependency],
|
|
scorer: Scorer,
|
|
baseline_epc: EpcPropertyData,
|
|
current: Score,
|
|
budget: Optional[float],
|
|
) -> Optional[ScoredOption]:
|
|
"""The untreated-group Option giving the best **marginal** SAP-per-£ when
|
|
added to the current package — re-scored (not the role-1 signal) with any
|
|
ventilation dependency it newly triggers folded in, so both its SAP and its
|
|
incremental cost are truthful. Affordable when the resulting whole-package
|
|
cost is within ``budget`` and strictly improving. None if there is none."""
|
|
used: set[int] = _used_group_indices(groups, chosen)
|
|
base_cost: float = _package_cost(_inject(chosen, dependencies))
|
|
best: Optional[ScoredOption] = None
|
|
best_ratio: float = 0.0
|
|
for index, group in enumerate(groups):
|
|
if index in used:
|
|
continue
|
|
for option in group:
|
|
trial_selected: list[ScoredOption] = _inject(
|
|
[*chosen, option], dependencies
|
|
)
|
|
package_cost: float = _package_cost(trial_selected)
|
|
if budget is not None and package_cost > budget:
|
|
continue
|
|
trial: Score = _score(scorer, baseline_epc, trial_selected)
|
|
marginal: float = trial.sap_continuous - current.sap_continuous
|
|
if marginal <= 0.0:
|
|
continue
|
|
incremental: float = package_cost - base_cost
|
|
ratio: float = (
|
|
float("inf") if incremental <= 0.0 else marginal / incremental
|
|
)
|
|
if ratio > best_ratio:
|
|
best, best_ratio = option, ratio
|
|
return best
|