Model/orchestration/modelling_orchestrator.py
Khalim Conn-Kowlessar 84ec6da032 refactor(modelling): group domain/modelling into generators/scoring/optimisation
domain/modelling/ had grown to 15 flat modules. Group the behavioural ones into
subpackages — generators/ (wall/roof/floor Recommendation Generators), scoring/
(overlay applicator, package scorer, role-1/3 scoring), optimisation/ (optimiser
+ measure dependency) — and leave the shared value-object vocabulary
(recommendation, plan, scenario, product, contingencies, simulation) flat at the
top, since it is imported everywhere. Pure move + import-path rewrite across 89
import sites; no behaviour change. 136 pass, pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 13:48:36 +00:00

218 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from collections.abc import Callable
from typing import Final, Optional
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.generators.floor_recommendation import recommend_floor_insulation
from domain.modelling.optimisation.measure_dependency import ventilation_dependency
from domain.modelling.optimisation.optimiser import (
MeasureDependency,
OptimisedPackage,
ScoredOption,
optimise_package,
)
from domain.modelling.scoring.package_scorer import PackageScorer, Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import MeasureOption, Recommendation
from domain.modelling.generators.roof_recommendation import recommend_loft_insulation
from domain.modelling.scenario import Scenario
from domain.modelling.scoring.scoring import (
MeasureImpact,
independent_option_impacts,
marginal_impacts,
)
from domain.modelling.generators.wall_recommendation import recommend_cavity_wall
from domain.sap10_calculator.calculator import SapCalculator
from repositories.product.product_repository import ProductRepository
from repositories.unit_of_work import UnitOfWork
# The PortfolioGoal value that targets a SAP band (cf.
# backend.app.db.models.portfolio.PortfolioGoal.INCREASING_EPC). Other goals
# (Energy Savings, Reducing CO2 emissions) don't yet set a SAP repair target —
# the optimiser just maximises SAP gain within budget for them (later slice).
_INCREASING_EPC_GOAL: Final[str] = "Increasing EPC"
# Best-practice install sequence for the role-3 attribution cascade (ADR-0016):
# walls → roof → ventilation → floor, per the legacy `Recommendations` class.
# Ventilation sits after the fabric that triggers it so its (negative) marginal
# is attributed against the insulated envelope.
_BEST_PRACTICE_ORDER: Final[tuple[str, ...]] = (
"cavity_wall_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"loft_insulation",
"mechanical_ventilation",
"suspended_floor_insulation",
"solid_floor_insulation",
)
class ModellingOrchestrator:
"""Stage 3 — scores each baselined Property against its Scenarios into Plans
and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 /
ADR-0017).
Runs the whole batch in **one** Unit of Work and commits once. For each
(Property × Scenario) it reads the Property's Effective EPC and the Scenario
through repos, generates the candidate Recommendations (wall / roof /
floor), scores each Option independently (role 1), runs the grouped-knapsack
Optimiser + whole-package re-score + greedy repair toward the Scenario's SAP
target (role 2, ADR-0016), attributes each selected measure via the
best-practice marginal cascade (role 3), and persists a **Plan** with its
**Plan Measures**. Single-phase — multi-phase is deferred (ADR-0005).
Reads only through repos and threads only IDs (`property_ids`,
`scenario_ids`, `portfolio_id`) — never an in-memory hand-off from Baseline
(ADR-0011). The injected `SapCalculator` is the scoring-engine seam.
"""
def __init__(
self,
*,
unit_of_work: Callable[[], UnitOfWork],
calculator: SapCalculator,
) -> None:
self._unit_of_work = unit_of_work
self._calculator = calculator
def run(
self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int
) -> None:
scorer = PackageScorer(self._calculator)
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
scenarios: list[Scenario] = uow.scenario.get_many(scenario_ids)
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc: EpcPropertyData = prop.effective_epc
for scenario in scenarios:
plan = self._plan_for(
scorer, effective_epc, uow.product, scenario
)
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
uow.commit()
def _plan_for(
self,
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
scenario: Scenario,
) -> Plan:
"""Generate → score → optimise → re-score/repair → attribute → assemble
the Plan for one Property + Scenario."""
groups: list[list[ScoredOption]] = _scored_candidate_groups(
scorer, effective_epc, products
)
# Forced Measure Dependencies (ventilation) are excluded from the pool
# but injected into the package before the re-score (ADR-0016).
dependencies: list[MeasureDependency] = _measure_dependencies(
effective_epc, products
)
package: OptimisedPackage = optimise_package(
groups=groups,
scorer=scorer,
baseline_epc=effective_epc,
budget=scenario.budget,
target_sap=_target_sap(scenario),
dependencies=dependencies,
)
# Role-3 attribution: re-apply the *selected* set in best-practice order
# so each measure's marginal telescopes to the truthful package total.
ordered: list[MeasureOption] = sorted(
(scored.option for scored in package.selected), key=_best_practice_key
)
impacts: list[MeasureImpact] = marginal_impacts(
scorer, effective_epc, [option.overlay for option in ordered]
)
baseline: Score = scorer.score(effective_epc, [])
measures: tuple[PlanMeasure, ...] = tuple(
_plan_measure(option, impact)
for option, impact in zip(ordered, impacts, strict=True)
)
return Plan(
measures=measures, baseline=baseline, post_retrofit=package.score
)
def _candidate_recommendations(
effective_epc: EpcPropertyData, products: ProductRepository
) -> list[Recommendation]:
"""Run every fabric Recommendation Generator; keep the ones that apply."""
generators = (
recommend_cavity_wall,
recommend_loft_insulation,
recommend_floor_insulation,
)
found = (generator(effective_epc, products) for generator in generators)
return [recommendation for recommendation in found if recommendation is not None]
def _measure_dependencies(
effective_epc: EpcPropertyData, products: ProductRepository
) -> list[MeasureDependency]:
"""The forced Measure Dependencies for this Property — currently just
ventilation, suppressed when the dwelling is already mechanically
ventilated (ADR-0016)."""
dependency: Optional[MeasureDependency] = ventilation_dependency(
effective_epc, products
)
return [dependency] if dependency is not None else []
def _scored_candidate_groups(
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
) -> list[list[ScoredOption]]:
"""One group per Recommendation: each Option scored independently against
the baseline (role-1 warm-start signal, ADR-0016)."""
groups: list[list[ScoredOption]] = []
for recommendation in _candidate_recommendations(effective_epc, products):
options = list(recommendation.options)
impacts: list[MeasureImpact] = independent_option_impacts(
scorer, effective_epc, options
)
groups.append(
[
ScoredOption(option=option, sap_gain=impact.sap_points)
for option, impact in zip(options, impacts, strict=True)
]
)
return groups
def _target_sap(scenario: Scenario) -> Optional[float]:
"""The SAP rating the Optimiser repairs toward — the floor of the goal
band for an INCREASING_EPC goal, else None (no SAP target)."""
if scenario.goal != _INCREASING_EPC_GOAL:
return None
return float(Epc(scenario.goal_value).sap_lower_bound())
def _best_practice_key(option: MeasureOption) -> int:
try:
return _BEST_PRACTICE_ORDER.index(option.measure_type)
except ValueError:
return len(_BEST_PRACTICE_ORDER)
def _plan_measure(option: MeasureOption, impact: MeasureImpact) -> PlanMeasure:
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot persist"
)
return PlanMeasure(
measure_type=option.measure_type,
description=option.description,
cost=option.cost,
impact=impact,
)