Model/orchestration/modelling_orchestrator.py
Khalim Conn-Kowlessar 5c19737fc5 feat(modelling): gate generation by the considered-measures allowlist
`restrict_to_considered_measures` filtered candidates only *after* every
generator had run, so an excluded measure still queried the catalogue.
That crashed properties with a lodged secondary heater: the live
`material.type` enum has no `secondary_heating_removal` value, so the
query raised a psycopg2 `InvalidTextRepresentation` before the allowlist
could drop it.

`_candidate_recommendations` now pairs each generator with the measure
types it can emit and runs it only when the allowlist admits one of them
(None = all), so an excluded measure never reaches the catalogue.
`restrict_to_considered_measures` still trims disallowed Options off the
multi-Option survivors. Add `--exclude-measures` to run_modelling_e2e
(allowlist minus the excluded set) for excluding one measure without
enumerating the rest.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 14:56:09 +00:00

421 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from collections.abc import Callable
from typing import Final, Optional
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.billing.bill import Bill, EnergyBreakdown
from domain.billing.bill_derivation import BillDerivation
from domain.modelling.considered_measures import restrict_to_considered_measures
from domain.modelling.generators.floor_recommendation import recommend_floor_insulation
from domain.modelling.measure_type import MeasureType
from domain.modelling.optimisation.measure_dependency import ventilation_dependency
from domain.modelling.optimisation.optimiser import (
MeasureDependency,
OptimisedPackage,
ScoredOption,
optimise_package,
)
from domain.modelling.scoring.package_scorer import PackageScorer, Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import MeasureOption, Recommendation
from domain.modelling.generators.roof_recommendation import recommend_roof_insulation
from domain.modelling.scenario import Scenario
from domain.modelling.scoring.scoring import (
MeasureImpact,
cascade_scores,
independent_option_impacts,
marginals_from_scores,
)
from domain.modelling.generators.wall_recommendation import recommend_cavity_wall
from domain.modelling.generators.solid_wall_recommendation import recommend_solid_wall
from domain.modelling.generators.glazing_recommendation import recommend_glazing
from domain.modelling.generators.lighting_recommendation import recommend_lighting
from domain.modelling.generators.heating_recommendation import recommend_heating
from domain.modelling.generators.secondary_heating_recommendation import (
recommend_secondary_heating_removal,
)
from domain.modelling.generators.solar_recommendation import recommend_solar
from domain.modelling.solar_potential import SolarPotential
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.sap10_calculator.calculator import SapCalculator
from repositories.fuel_rates.fuel_rates_repository import FuelRatesRepository
from repositories.product.product_repository import ProductRepository
from repositories.solar.solar_repository import SolarRepository
from repositories.unit_of_work import UnitOfWork
# The PortfolioGoal value that targets a SAP band (cf.
# backend.app.db.models.portfolio.PortfolioGoal.INCREASING_EPC). Other goals
# (Energy Savings, Reducing CO2 emissions) don't yet set a SAP repair target —
# the optimiser just maximises SAP gain within budget for them (later slice).
_INCREASING_EPC_GOAL: Final[str] = "Increasing EPC"
# Best-practice install sequence for the role-3 attribution cascade (ADR-0016):
# walls → roof → ventilation → floor, per the legacy `Recommendations` class.
# Ventilation sits after the fabric that triggers it so its (negative) marginal
# is attributed against the insulated envelope.
_BEST_PRACTICE_ORDER: Final[tuple[str, ...]] = (
"cavity_wall_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"loft_insulation",
"mechanical_ventilation",
"suspended_floor_insulation",
"solid_floor_insulation",
)
class ModellingOrchestrator:
"""Stage 3 — scores each baselined Property against its Scenarios into Plans
and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 /
ADR-0017).
Runs the whole batch in **one** Unit of Work and commits once. For each
(Property × Scenario) it reads the Property's Effective EPC and the Scenario
through repos, generates the candidate Recommendations (wall / roof /
floor), scores each Option independently (role 1), runs the grouped-knapsack
Optimiser + whole-package re-score + greedy repair toward the Scenario's SAP
target (role 2, ADR-0016), attributes each selected measure via the
best-practice marginal cascade (role 3), and persists a **Plan** with its
**Plan Measures**. Single-phase — multi-phase is deferred (ADR-0005).
Reads only through repos and threads only IDs (`property_ids`,
`scenario_ids`, `portfolio_id`) — never an in-memory hand-off from Baseline
(ADR-0011). The injected `SapCalculator` is the scoring-engine seam.
"""
def __init__(
self,
*,
unit_of_work: Callable[[], UnitOfWork],
calculator: SapCalculator,
fuel_rates: FuelRatesRepository,
) -> None:
self._unit_of_work = unit_of_work
self._calculator = calculator
self._fuel_rates = fuel_rates
def run(
self,
property_ids: list[int],
scenario_ids: list[int],
portfolio_id: int,
*,
considered_measures: Optional[frozenset[MeasureType]] = None,
) -> None:
"""Model the batch. ``considered_measures`` restricts the run to those
measure types (mirroring the legacy `inclusions`); None considers every
modelled measure."""
scorer = PackageScorer(self._calculator)
# Resolve Fuel Rates once and reuse the BillDerivation across the batch,
# so every baseline/post bill is priced at the same snapshot (ADR-0014).
bill_derivation = BillDerivation(self._fuel_rates.get_current())
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
scenarios: list[Scenario] = uow.scenario.get_many(scenario_ids)
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc: EpcPropertyData = prop.effective_epc
# The Property's Google Solar potential (raw buildingInsights
# JSON persisted by Ingestion), projected once per Property and
# threaded into the solar Generator (ADR-0026). None when no
# solar data was fetched — the Generator then offers nothing.
solar_potential: Optional[SolarPotential] = _solar_potential_for(
uow.solar, prop.identity.uprn
)
for scenario in scenarios:
plan = self._plan_for(
scorer,
bill_derivation,
effective_epc,
uow.product,
scenario,
current_market_value=prop.current_market_value,
planning_restrictions=prop.planning_restrictions,
solar_potential=solar_potential,
considered_measures=considered_measures,
)
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
uow.commit()
def _plan_for(
self,
scorer: PackageScorer,
bill_derivation: BillDerivation,
effective_epc: EpcPropertyData,
products: ProductRepository,
scenario: Scenario,
*,
current_market_value: Optional[float],
planning_restrictions: PlanningRestrictions,
solar_potential: Optional[SolarPotential],
considered_measures: Optional[frozenset[MeasureType]],
) -> Plan:
"""Generate → score → optimise → re-score/repair → attribute → bill →
assemble the Plan for one Property + Scenario."""
groups: list[list[ScoredOption]] = _scored_candidate_groups(
scorer,
effective_epc,
products,
planning_restrictions,
solar_potential,
considered_measures,
)
# Forced Measure Dependencies (ventilation) are excluded from the pool
# but injected into the package before the re-score (ADR-0016).
dependencies: list[MeasureDependency] = _measure_dependencies(
effective_epc, products, considered_measures
)
package: OptimisedPackage = optimise_package(
groups=groups,
scorer=scorer,
baseline_epc=effective_epc,
budget=scenario.budget,
target_sap=_target_sap(scenario),
dependencies=dependencies,
)
# Role-3 attribution: re-apply the *selected* set in best-practice order
# so each measure's marginal telescopes to the truthful package total.
ordered: list[MeasureOption] = sorted(
(scored.option for scored in package.selected), key=_best_practice_key
)
# Score the baseline + every cumulative prefix once (cascade[0] is the
# baseline, cascade[-1] the whole package), then reuse those Scores for
# both the marginal attribution and the per-measure bill cascade.
cascade: list[Score] = cascade_scores(
scorer, effective_epc, [option.overlay for option in ordered]
)
impacts: list[MeasureImpact] = marginals_from_scores(cascade)
# Bill every prefix at one Fuel Rates snapshot; consecutive Bill deltas
# are each measure's marginal energy/cost saving — negative for
# ventilation — telescoping exactly to the Plan totals (ADR-0014). The
# Plan's baseline/post Bills are the cascade endpoints, so the
# per-measure savings and the headline savings share one source.
bills: list[Bill] = [_bill_for(bill_derivation, score) for score in cascade]
measures: tuple[PlanMeasure, ...] = tuple(
_plan_measure(option, impact, before, after)
for option, impact, before, after in zip(
ordered, impacts, bills[:-1], bills[1:], strict=True
)
)
return Plan(
measures=measures,
baseline=cascade[0],
post_retrofit=package.score,
baseline_bill=bills[0],
post_bill=bills[-1],
current_market_value=current_market_value,
)
def _bill_for(bill_derivation: BillDerivation, score: Score) -> Bill:
"""Derive the annual Bill for a scored end-state, pricing the delivered
energy off the Score's SapResult. The real PackageScorer always attaches the
SapResult; a missing one is a wiring error, so raise rather than bill at a
default (ADR-0014)."""
if score.sap_result is None:
raise ValueError(
"cannot derive a bill: the Score carries no SapResult to price"
)
return bill_derivation.derive(EnergyBreakdown.from_sap_result(score.sap_result))
def _solar_potential_for(
solar_repo: SolarRepository, uprn: Optional[int]
) -> Optional[SolarPotential]:
"""Project the UPRN's persisted Google Solar `buildingInsights` JSON
into a typed `SolarPotential` (ADR-0026), or None when there is no UPRN /
none was fetched / the lookup returned an error payload (no `solarPotential`
block). Solar is keyed by UPRN to match the live ``solar`` table."""
if uprn is None:
return None
insights = solar_repo.get(uprn)
if not insights or "solarPotential" not in insights:
return None
return SolarPotential.from_building_insights(insights)
def _candidate_recommendations(
effective_epc: EpcPropertyData,
products: ProductRepository,
planning_restrictions: PlanningRestrictions,
solar_potential: Optional[SolarPotential],
considered_measures: Optional[frozenset[MeasureType]],
) -> list[Recommendation]:
"""Run the applicable Recommendation Generators; keep the ones that apply.
Solid-wall insulation, glazing, heating and solar are additionally gated by
the Property's planning protections (ADR-0019 / ADR-0022 / ADR-0024 /
ADR-0026); solar also needs the Property's Google solar potential.
``considered_measures`` gates generation *up front*: a generator runs only
when the allowlist admits at least one of the measure types it can emit
(None = every measure), so an excluded measure never reaches the catalogue —
which matters when the live ``material.type`` enum cannot even represent it
(e.g. ``secondary_heating_removal``). ``restrict_to_considered_measures``
then trims any disallowed Options off the multi-Option survivors."""
def admitted(*emits: MeasureType) -> bool:
return considered_measures is None or any(
measure in considered_measures for measure in emits
)
# Each generator paired with the measure types it can emit, so the allowlist
# can skip a generator whose every type is excluded before it is invoked.
generators: tuple[
tuple[bool, Callable[[], Optional[Recommendation]]], ...
] = (
(
admitted(MeasureType.CAVITY_WALL_INSULATION),
lambda: recommend_cavity_wall(effective_epc, products),
),
(
admitted(
MeasureType.INTERNAL_WALL_INSULATION,
MeasureType.EXTERNAL_WALL_INSULATION,
),
lambda: recommend_solid_wall(
effective_epc, products, planning_restrictions
),
),
(
admitted(
MeasureType.LOFT_INSULATION,
MeasureType.SLOPING_CEILING_INSULATION,
MeasureType.FLAT_ROOF_INSULATION,
),
lambda: recommend_roof_insulation(effective_epc, products),
),
(
admitted(
MeasureType.SUSPENDED_FLOOR_INSULATION,
MeasureType.SOLID_FLOOR_INSULATION,
),
lambda: recommend_floor_insulation(effective_epc, products),
),
(
admitted(MeasureType.DOUBLE_GLAZING, MeasureType.SECONDARY_GLAZING),
lambda: recommend_glazing(effective_epc, products, planning_restrictions),
),
(
admitted(MeasureType.LOW_ENERGY_LIGHTING),
lambda: recommend_lighting(effective_epc, products),
),
(
admitted(
MeasureType.HIGH_HEAT_RETENTION_STORAGE_HEATERS,
MeasureType.AIR_SOURCE_HEAT_PUMP,
MeasureType.GAS_BOILER_UPGRADE,
MeasureType.SYSTEM_TUNE_UP,
MeasureType.SYSTEM_TUNE_UP_ZONED,
),
lambda: recommend_heating(effective_epc, products, planning_restrictions),
),
(
admitted(MeasureType.SECONDARY_HEATING_REMOVAL),
lambda: recommend_secondary_heating_removal(effective_epc, products),
),
(
admitted(MeasureType.SOLAR_PV),
lambda: recommend_solar(
effective_epc, products, solar_potential, planning_restrictions
),
),
)
found = [thunk() for is_admitted, thunk in generators if is_admitted]
applicable = [
recommendation for recommendation in found if recommendation is not None
]
return restrict_to_considered_measures(applicable, considered_measures)
def _measure_dependencies(
effective_epc: EpcPropertyData,
products: ProductRepository,
considered_measures: Optional[frozenset[MeasureType]],
) -> list[MeasureDependency]:
"""The forced Measure Dependencies for this Property — currently just
ventilation, suppressed when the dwelling is already mechanically
ventilated (ADR-0016). A dependency whose required measure is outside the
run's allowlist is also suppressed, so a restricted run forces nothing it is
not considering."""
dependency: Optional[MeasureDependency] = ventilation_dependency(
effective_epc, products
)
if dependency is None:
return []
if (
considered_measures is not None
and dependency.required.option.measure_type not in considered_measures
):
return []
return [dependency]
def _scored_candidate_groups(
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
planning_restrictions: PlanningRestrictions,
solar_potential: Optional[SolarPotential],
considered_measures: Optional[frozenset[MeasureType]],
) -> list[list[ScoredOption]]:
"""One group per Recommendation: each Option scored independently against
the baseline (role-1 warm-start signal, ADR-0016)."""
groups: list[list[ScoredOption]] = []
for recommendation in _candidate_recommendations(
effective_epc, products, planning_restrictions, solar_potential, considered_measures
):
options = list(recommendation.options)
impacts: list[MeasureImpact] = independent_option_impacts(
scorer, effective_epc, options
)
groups.append(
[
ScoredOption(option=option, sap_gain=impact.sap_points)
for option, impact in zip(options, impacts, strict=True)
]
)
return groups
def _target_sap(scenario: Scenario) -> Optional[float]:
"""The SAP rating the Optimiser repairs toward — the floor of the goal
band for an INCREASING_EPC goal, else None (no SAP target)."""
if scenario.goal != _INCREASING_EPC_GOAL:
return None
return float(Epc(scenario.goal_value).sap_lower_bound())
def _best_practice_key(option: MeasureOption) -> int:
try:
return _BEST_PRACTICE_ORDER.index(option.measure_type)
except ValueError:
return len(_BEST_PRACTICE_ORDER)
def _plan_measure(
option: MeasureOption, impact: MeasureImpact, before: Bill, after: Bill
) -> PlanMeasure:
"""Assemble a Plan Measure, attributing this measure's marginal bill saving
as the delta between the running package Bill before and after it (delivered
kWh and £). Signed so positive is a saving; ventilation is negative."""
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot persist"
)
return PlanMeasure(
measure_type=option.measure_type,
description=option.description,
cost=option.cost,
impact=impact,
kwh_savings=before.total_consumption_kwh - after.total_consumption_kwh,
energy_cost_savings=before.total_gbp - after.total_gbp,
material_id=option.material_id,
)