feat(modelling): wire the Optimiser into the orchestrator (#1160)

Slice 3b — closes #1160. ModellingOrchestrator._plan_for now runs the
full ADR-0016 flow instead of a single cavity measure:

  generate wall + roof + floor Recommendations → score each Option
  independently (role 1) into grouped ScoredOptions → optimise_package
  (grouped knapsack within budget + whole-package re-score + greedy
  repair toward the Scenario's SAP target) → attribute the selected set
  via the best-practice marginal cascade (role 3) → persist the Plan
  with its Plan Measures.

The repair target comes from the goal: INCREASING_EPC → the goal_value
band floor via Epc.sap_lower_bound(); other goals carry no SAP target
yet (later slice). Best-practice order walls → roof → floor.

Integration test: an uninsulated cavity wall + suspended floor (000490)
driven directly through the Modelling stage off a repo-seeded EPC
(the calculator fixture has no lodged recorded-performance fields, so
Baseline can't run it) persists a Plan with two attributed, priced Plan
Measures. The existing first-run test keeps full-pipeline coverage and
now exercises real modelling (its sample EPC's uninsulated solid floor
yields a floor measure). Replaces the single-measure cavity integration
test (subsumed). 138 pass; pyright strict clean.

Multi-phase remains descoped (ADR-0005); single-phase optimiser.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-03 13:07:14 +00:00
parent 504f592a27
commit 34d4748a3a
2 changed files with 186 additions and 83 deletions

View file

@ -1,36 +1,66 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Final, Optional
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.floor_recommendation import recommend_floor_insulation
from domain.modelling.optimiser import (
OptimisedPackage,
ScoredOption,
optimise_package,
)
from domain.modelling.package_scorer import PackageScorer, Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import MeasureOption, Recommendation
from domain.modelling.roof_recommendation import recommend_loft_insulation
from domain.modelling.scenario import Scenario
from domain.modelling.scoring import MeasureImpact, marginal_impacts
from domain.modelling.simulation import EpcSimulation
from domain.modelling.scoring import (
MeasureImpact,
independent_option_impacts,
marginal_impacts,
)
from domain.modelling.wall_recommendation import recommend_cavity_wall
from domain.sap10_calculator.calculator import SapCalculator
from repositories.product.product_repository import ProductRepository
from repositories.unit_of_work import UnitOfWork
# The PortfolioGoal value that targets a SAP band (cf.
# backend.app.db.models.portfolio.PortfolioGoal.INCREASING_EPC). Other goals
# (Energy Savings, Reducing CO2 emissions) don't yet set a SAP repair target —
# the optimiser just maximises SAP gain within budget for them (later slice).
_INCREASING_EPC_GOAL: Final[str] = "Increasing EPC"
# Best-practice install sequence for the role-3 attribution cascade (ADR-0016):
# fabric in walls → roof → floor order, per the legacy `Recommendations` class.
_BEST_PRACTICE_ORDER: Final[tuple[str, ...]] = (
"cavity_wall_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"loft_insulation",
"suspended_floor_insulation",
"solid_floor_insulation",
)
class ModellingOrchestrator:
"""Stage 3 — scores each baselined Property against its Scenarios into Plans
and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 /
ADR-0017).
Runs the whole batch in **one** Unit of Work and commits once: for each
Runs the whole batch in **one** Unit of Work and commits once. For each
(Property × Scenario) it reads the Property's Effective EPC and the Scenario
through repos, generates the candidate Recommendation, selects its Option
into a trivial Optimised Package, scores the package (role 2) and attributes
each measure (role-3 marginal cascade), and persists a **Plan** with its
**Plan Measures**. The optimiser, exclusions, and multi-measure generators
land in later slices; this is the single-measure tracer.
through repos, generates the candidate Recommendations (wall / roof /
floor), scores each Option independently (role 1), runs the grouped-knapsack
Optimiser + whole-package re-score + greedy repair toward the Scenario's SAP
target (role 2, ADR-0016), attributes each selected measure via the
best-practice marginal cascade (role 3), and persists a **Plan** with its
**Plan Measures**. Single-phase multi-phase is deferred (ADR-0005).
Reads only through repos and threads only IDs (`property_ids`,
`scenario_ids`, `portfolio_id`) never an in-memory hand-off from Baseline
(ADR-0011). The injected `SapCalculator` is the scoring engine seam.
(ADR-0011). The injected `SapCalculator` is the scoring-engine seam.
"""
def __init__(
@ -52,7 +82,9 @@ class ModellingOrchestrator:
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc: EpcPropertyData = prop.effective_epc
for scenario in scenarios:
plan = self._plan_for(scorer, effective_epc, uow.product)
plan = self._plan_for(
scorer, effective_epc, uow.product, scenario
)
uow.plan.save(
plan,
property_id=property_id,
@ -67,31 +99,89 @@ class ModellingOrchestrator:
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
scenario: Scenario,
) -> Plan:
"""Generate → select → score → attribute the single-measure package for
one Property + Scenario, and assemble its Plan."""
recommendation: Recommendation | None = recommend_cavity_wall(
effective_epc, products
"""Generate → score → optimise → re-score/repair → attribute → assemble
the Plan for one Property + Scenario."""
groups: list[list[ScoredOption]] = _scored_candidate_groups(
scorer, effective_epc, products
)
selected: list[MeasureOption] = (
[recommendation.options[0]] if recommendation is not None else []
package: OptimisedPackage = optimise_package(
groups=groups,
scorer=scorer,
baseline_epc=effective_epc,
budget=scenario.budget,
target_sap=_target_sap(scenario),
)
overlays: list[EpcSimulation] = [option.overlay for option in selected]
baseline: Score = scorer.score(effective_epc, [])
post_retrofit: Score = scorer.score(effective_epc, overlays)
impacts: list[MeasureImpact] = marginal_impacts(
scorer, effective_epc, overlays
# Role-3 attribution: re-apply the *selected* set in best-practice order
# so each measure's marginal telescopes to the truthful package total.
ordered: list[MeasureOption] = sorted(
(scored.option for scored in package.selected), key=_best_practice_key
)
impacts: list[MeasureImpact] = marginal_impacts(
scorer, effective_epc, [option.overlay for option in ordered]
)
baseline: Score = scorer.score(effective_epc, [])
measures: tuple[PlanMeasure, ...] = tuple(
_plan_measure(option, impact)
for option, impact in zip(selected, impacts, strict=True)
for option, impact in zip(ordered, impacts, strict=True)
)
return Plan(
measures=measures, baseline=baseline, post_retrofit=post_retrofit
measures=measures, baseline=baseline, post_retrofit=package.score
)
def _candidate_recommendations(
effective_epc: EpcPropertyData, products: ProductRepository
) -> list[Recommendation]:
"""Run every fabric Recommendation Generator; keep the ones that apply."""
generators = (
recommend_cavity_wall,
recommend_loft_insulation,
recommend_floor_insulation,
)
found = (generator(effective_epc, products) for generator in generators)
return [recommendation for recommendation in found if recommendation is not None]
def _scored_candidate_groups(
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
) -> list[list[ScoredOption]]:
"""One group per Recommendation: each Option scored independently against
the baseline (role-1 warm-start signal, ADR-0016)."""
groups: list[list[ScoredOption]] = []
for recommendation in _candidate_recommendations(effective_epc, products):
options = list(recommendation.options)
impacts: list[MeasureImpact] = independent_option_impacts(
scorer, effective_epc, options
)
groups.append(
[
ScoredOption(option=option, sap_gain=impact.sap_points)
for option, impact in zip(options, impacts, strict=True)
]
)
return groups
def _target_sap(scenario: Scenario) -> Optional[float]:
"""The SAP rating the Optimiser repairs toward — the floor of the goal
band for an INCREASING_EPC goal, else None (no SAP target)."""
if scenario.goal != _INCREASING_EPC_GOAL:
return None
return float(Epc(scenario.goal_value).sap_lower_bound())
def _best_practice_key(option: MeasureOption) -> int:
try:
return _BEST_PRACTICE_ORDER.index(option.measure_type)
except ValueError:
return len(_BEST_PRACTICE_ORDER)
def _plan_measure(option: MeasureOption, impact: MeasureImpact) -> PlanMeasure:
if option.cost is None:
raise ValueError(

View file

@ -29,10 +29,14 @@ from infrastructure.postgres.epc_property_table import EpcPropertyModel
from infrastructure.postgres.plan_table import PlanRow, RecommendationRow
from infrastructure.postgres.product_table import MaterialRow
from infrastructure.postgres.property_table import PropertyRow
from tests.domain.sap10_calculator.worksheet._elmhurst_worksheet_000490 import (
build_epc as _build_uninsulated_cavity_and_floor_epc,
)
from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator
from orchestration.ara_first_run_pipeline import AraFirstRunPipeline
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from orchestration.modelling_orchestrator import ModellingOrchestrator
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.property_baseline.property_baseline_postgres_repository import (
PropertyBaselinePostgresRepository,
)
@ -107,7 +111,19 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
# scenario_ids) through the repo, so the row must exist.
session.add(
ScenarioRow(
id=7, goal="INCREASING_EPC", goal_value="C", is_default=True
id=7, goal="Increasing EPC", goal_value="C", is_default=True
)
)
# The sample EPC's solid floor is uninsulated, so the floor generator
# fires during candidate generation and prices against this Product.
session.add(
MaterialRow(
id=1,
type="solid_floor_insulation",
total_cost=25.0,
cost_unit="gbp_per_m2",
is_active=True,
description="Solid floor insulation",
)
)
session.commit()
@ -159,79 +175,74 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
assert len(baseline_rows) == 1
def _uninsulated_cavity_epc() -> EpcPropertyData:
"""The sample EPC with its MAIN wall flipped to an uninsulated cavity, so
the wall Recommendation Generator fires."""
epc = _lodged_epc()
main = epc.sap_building_parts[0]
uninsulated_main = dataclasses.replace(main, wall_insulation_type=4)
return dataclasses.replace(epc, sap_building_parts=[uninsulated_main])
def test_first_run_persists_a_plan_with_a_cavity_wall_measure(
def test_modelling_optimises_and_persists_a_multi_measure_plan(
db_engine: Engine,
) -> None:
# Arrange — a property to ingest, the Scenario the FE created, and a
# cavity-wall Product so the measure can be priced. (The SAP-numeric
# correctness of the cascade is pinned in test_elmhurst_cascade_pins; here
# we prove the Plan is generated, priced and persisted end-to-end.)
# Arrange — an EPC with an uninsulated cavity wall AND an uninsulated
# suspended floor (loft already at 300mm), so the wall + floor Generators
# both fire and the Optimiser selects from two groups. We drive the
# Modelling stage directly off a repo-seeded EPC rather than the full
# pipeline: this calculator fixture has no lodged recorded-performance
# fields, so the Baseline stage (not under test here) can't run on it.
# SAP-numeric correctness is pinned in test_elmhurst_cascade_pins; here we
# prove the multi-measure Plan is optimised, priced, attributed and
# persisted.
with Session(db_engine) as session:
session.add(
PropertyRow(
id=20,
id=30,
portfolio_id=1,
postcode="A0 0AA",
address="2 Some Street",
uprn=22222,
address="3 Some Street",
uprn=33333,
)
)
session.add(
ScenarioRow(
id=7, goal="INCREASING_EPC", goal_value="C", is_default=True
id=7, goal="Increasing EPC", goal_value="C", is_default=True
)
)
session.add(
MaterialRow(
id=1,
type="cavity_wall_insulation",
total_cost=18.5,
cost_unit="gbp_per_m2",
is_active=True,
description="Cavity wall insulation",
)
session.add_all(
[
MaterialRow(
id=1,
type="cavity_wall_insulation",
total_cost=18.5,
cost_unit="gbp_per_m2",
is_active=True,
description="Cavity wall insulation",
),
MaterialRow(
id=2,
type="suspended_floor_insulation",
total_cost=25.0,
cost_unit="gbp_per_m2",
is_active=True,
description="Suspended floor insulation",
),
]
)
session.commit()
EpcPostgresRepository(session).save(
_build_uninsulated_cavity_and_floor_epc(),
property_id=30,
portfolio_id=1,
)
session.commit()
def unit_of_work() -> PostgresUnitOfWork:
return PostgresUnitOfWork(lambda: Session(db_engine))
pipeline = AraFirstRunPipeline(
ingestion=IngestionOrchestrator(
unit_of_work=unit_of_work,
epc_fetcher=_FetcherReturning(_uninsulated_cavity_epc()),
geospatial_repo=_NoCoordinates(),
solar_fetcher=_UnusedSolarFetcher(),
),
baseline=PropertyBaselineOrchestrator(
unit_of_work=unit_of_work,
rebaseliner=StubRebaseliner(),
fuel_rates=FuelRatesStaticFileRepository(),
),
modelling=ModellingOrchestrator(
unit_of_work=unit_of_work,
calculator=Sap10Calculator(),
),
)
command = _FakeCommand(portfolio_id=1, property_ids=[20], scenario_ids=[7])
# Act
pipeline.run(command)
ModellingOrchestrator(
unit_of_work=unit_of_work, calculator=Sap10Calculator()
).run(property_ids=[30], scenario_ids=[7], portfolio_id=1)
# Assert — one Plan for (property 20, scenario 7) with a single cavity-wall
# Plan Measure linked by plan_id, priced from the Product, figures present.
# Assert — one Plan with two Plan Measures (wall + floor), each priced and
# attributed, linked by plan_id.
with Session(db_engine) as session:
plan = session.exec(
select(PlanRow).where(col(PlanRow.property_id) == 20)
select(PlanRow).where(col(PlanRow.property_id) == 30)
).first()
assert plan is not None
rec_rows = session.exec(
@ -248,12 +259,14 @@ def test_first_run_persists_a_plan_with_a_cavity_wall_measure(
assert plan.cost_of_works is not None
assert plan.cost_of_works > 0.0
assert len(rec_rows) == 1
rec = rec_rows[0]
assert rec.type == "cavity_wall_insulation"
assert rec.default is True
assert rec.already_installed is False
assert rec.sap_points is not None
assert rec.co2_equivalent_savings is not None
assert rec.estimated_cost is not None
assert rec.estimated_cost > 0.0
measure_types = {rec.type for rec in rec_rows}
assert measure_types == {
"cavity_wall_insulation",
"suspended_floor_insulation",
}
for rec in rec_rows:
assert rec.default is True
assert rec.already_installed is False
assert rec.sap_points is not None
assert rec.estimated_cost is not None
assert rec.estimated_cost > 0.0