feat(modelling): ModellingOrchestrator persists a Plan end-to-end (#1157)

Slice 4b — closes the #1157 tracer. ModellingOrchestrator.run(property_ids,
scenario_ids, portfolio_id) now does real work in one Unit of Work,
committed once (ADR-0011/0012/0016/0017):

  read Property (effective EPC) + Scenario via repos → recommend_cavity_wall
  → select its Option → PackageScorer.score (role-2 package total) +
  marginal_impacts (role-3 attribution) → build Plan/PlanMeasure →
  uow.plan.save → commit.

- AraFirstRunPipeline / ModellingStage thread portfolio_id from the trigger
  body (one source of truth); handler builds the real orchestrator
  (unit_of_work + Sap10Calculator), dropping the Scenario/Materials stubs.
- ScenarioRepository.get_many promoted to @abstractmethod now the bare-stub
  instantiations are gone.
- New ara_first_run-style integration test: a property with an uninsulated
  cavity wall yields a persisted Plan + one cavity_wall_insulation Plan
  Measure (priced from the Product, figures present, linked by plan_id).
  Numeric SAP correctness is pinned separately in test_elmhurst_cascade_pins.
- Existing pipeline integration test updated: seeds scenario 7 and runs the
  real Modelling stage (its already-insulated sample wall yields an empty
  package — no crash).

121 pass across repositories/modelling/orchestration/app; pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-03 12:08:32 +00:00
parent e778d1fb97
commit c7e2aa3755
6 changed files with 230 additions and 41 deletions

View file

@ -27,9 +27,7 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import (
FuelRatesStaticFileRepository,
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
from repositories.unit_of_work import UnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
@ -72,8 +70,7 @@ def build_first_run_pipeline(
Each stage opens its own unit(s) and commits per batch (ADR-0012); the
handler no longer holds a session. The source clients are passed in because
their config is not settled see ``_source_clients_from_env``. Modelling is
stubbed (#1136); its Scenario / Materials ports are seams.
their config is not settled see ``_source_clients_from_env``.
"""
return AraFirstRunPipeline(
ingestion=IngestionOrchestrator(
@ -91,8 +88,8 @@ def build_first_run_pipeline(
fuel_rates=FuelRatesStaticFileRepository(),
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
unit_of_work=unit_of_work,
calculator=Sap10Calculator(),
),
)

View file

@ -38,7 +38,9 @@ class PropertyBaselineStage(Protocol):
class ModellingStage(Protocol):
"""Stage 3 — scores each Property against its Scenarios into Plans."""
def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: ...
def run(
self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int
) -> None: ...
class AraFirstRunPipeline:
@ -67,4 +69,6 @@ class AraFirstRunPipeline:
def run(self, command: AraFirstRunCommand) -> None:
self._ingestion.run(command.property_ids)
self._baseline.run(command.property_ids)
self._modelling.run(command.property_ids, command.scenario_ids)
self._modelling.run(
command.property_ids, command.scenario_ids, command.portfolio_id
)

View file

@ -1,29 +1,105 @@
from __future__ import annotations
from repositories.materials.materials_repository import MaterialsRepository
from repositories.scenario.scenario_repository import ScenarioRepository
from collections.abc import Callable
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.package_scorer import PackageScorer, Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import MeasureOption, Recommendation
from domain.modelling.scenario import Scenario
from domain.modelling.scoring import MeasureImpact, marginal_impacts
from domain.modelling.simulation import EpcSimulation
from domain.modelling.wall_recommendation import recommend_cavity_wall
from domain.sap10_calculator.calculator import SapCalculator
from repositories.product.product_repository import ProductRepository
from repositories.unit_of_work import UnitOfWork
class ModellingOrchestrator:
"""Stage 3 — scores each baselined Property against its Scenarios, producing
Recommendations -> an Optimised Package per Scenario Phase -> Plans
(CONTEXT.md: Modelling).
"""Stage 3 — scores each baselined Property against its Scenarios into Plans
and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 /
ADR-0017).
Stub at this stage (#1136): ``run`` reads its inputs through repos (it takes
only ``property_ids`` + ``scenario_ids``, never an in-memory hand-off from
Baseline) but does no scoring yet. Full Modelling lands via later TDD slices
+ per-service grills. The Scenario / Materials repos are injected now so the
composition and wiring are real even while the body is empty.
Runs the whole batch in **one** Unit of Work and commits once: for each
(Property × Scenario) it reads the Property's Effective EPC and the Scenario
through repos, generates the candidate Recommendation, selects its Option
into a trivial Optimised Package, scores the package (role 2) and attributes
each measure (role-3 marginal cascade), and persists a **Plan** with its
**Plan Measures**. The optimiser, exclusions, and multi-measure generators
land in later slices; this is the single-measure tracer.
Reads only through repos and threads only IDs (`property_ids`,
`scenario_ids`, `portfolio_id`) never an in-memory hand-off from Baseline
(ADR-0011). The injected `SapCalculator` is the scoring engine seam.
"""
def __init__(
self,
*,
scenario_repo: ScenarioRepository,
materials_repo: MaterialsRepository,
unit_of_work: Callable[[], UnitOfWork],
calculator: SapCalculator,
) -> None:
self._scenario_repo = scenario_repo
self._materials_repo = materials_repo
self._unit_of_work = unit_of_work
self._calculator = calculator
def run(self, property_ids: list[int], scenario_ids: list[int]) -> None:
return None
def run(
self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int
) -> None:
scorer = PackageScorer(self._calculator)
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
scenarios: list[Scenario] = uow.scenario.get_many(scenario_ids)
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc: EpcPropertyData = prop.effective_epc
for scenario in scenarios:
plan = self._plan_for(scorer, effective_epc, uow.product)
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
uow.commit()
def _plan_for(
self,
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
) -> Plan:
"""Generate → select → score → attribute the single-measure package for
one Property + Scenario, and assemble its Plan."""
recommendation: Recommendation | None = recommend_cavity_wall(
effective_epc, products
)
selected: list[MeasureOption] = (
[recommendation.options[0]] if recommendation is not None else []
)
overlays: list[EpcSimulation] = [option.overlay for option in selected]
baseline: Score = scorer.score(effective_epc, [])
post_retrofit: Score = scorer.score(effective_epc, overlays)
impacts: list[MeasureImpact] = marginal_impacts(
scorer, effective_epc, overlays
)
measures: tuple[PlanMeasure, ...] = tuple(
_plan_measure(option, impact)
for option, impact in zip(selected, impacts, strict=True)
)
return Plan(
measures=measures, baseline=baseline, post_retrofit=post_retrofit
)
def _plan_measure(option: MeasureOption, impact: MeasureImpact) -> PlanMeasure:
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot persist"
)
return PlanMeasure(
measure_type=option.measure_type,
description=option.description,
cost=option.cost,
impact=impact,
)

View file

@ -1,6 +1,8 @@
from __future__ import annotations
from abc import ABC
from abc import ABC, abstractmethod
from domain.modelling.scenario import Scenario
class ScenarioRepository(ABC):
@ -8,12 +10,11 @@ class ScenarioRepository(ABC):
The FE creates a Scenario in the scenario-builder and passes only its id
to the pipeline (#1130); the orchestrator reads it back through this port
at modelling time.
The concrete method shape is ``get_many(scenario_ids) -> list[Scenario]``
(bulk read by id, load-whole per ADR-0012), implemented by
``ScenarioPostgresRepository``. It is promoted to an ``@abstractmethod``
here when the real ``ModellingOrchestrator`` is wired and the bare-stub
instantiations are retired (#1157 orchestrator slice) — until then the port
stays instantiable so the stubbed Modelling wiring composes.
at modelling time. Bulk read by id, load-whole per ADR-0012.
"""
@abstractmethod
def get_many(self, scenario_ids: list[int]) -> list[Scenario]:
"""Return the Scenarios for ``scenario_ids``, in the same order,
raising if any id has no row."""
...

View file

@ -34,8 +34,10 @@ class _SpyModelling:
def __init__(self, log: list[tuple[object, ...]]) -> None:
self._log = log
def run(self, property_ids: list[int], scenario_ids: list[int]) -> None:
self._log.append(("modelling", property_ids, scenario_ids))
def run(
self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int
) -> None:
self._log.append(("modelling", property_ids, scenario_ids, portfolio_id))
def test_run_sequences_the_three_stages_threading_only_property_ids() -> None:
@ -60,5 +62,5 @@ def test_run_sequences_the_three_stages_threading_only_property_ids() -> None:
assert log == [
("ingestion", [10, 11]),
("baseline", [10, 11]),
("modelling", [10, 11], [7]),
("modelling", [10, 11], [7], 1),
]

View file

@ -13,17 +13,21 @@ from pathlib import Path
from typing import Any, Optional
from sqlalchemy import Engine
from sqlmodel import Session, select
from sqlmodel import Session, col, select
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.property_baseline.rebaseliner import StubRebaseliner
from domain.sap10_calculator.calculator import Sap10Calculator
from infrastructure.postgres.scenario_table import ScenarioRow
from domain.geospatial.coordinates import Coordinates
from infrastructure.postgres.property_baseline_performance_table import (
PropertyBaselinePerformanceModel,
)
from infrastructure.postgres.epc_property_table import EpcPropertyModel
from infrastructure.postgres.plan_table import PlanRow, RecommendationRow
from infrastructure.postgres.product_table import MaterialRow
from infrastructure.postgres.property_table import PropertyRow
from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator
from orchestration.ara_first_run_pipeline import AraFirstRunPipeline
@ -36,9 +40,7 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import (
FuelRatesStaticFileRepository,
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples"
@ -101,6 +103,13 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
uprn=12345,
)
)
# Modelling now runs for real: it reads scenario 7 (the command's
# scenario_ids) through the repo, so the row must exist.
session.add(
ScenarioRow(
id=7, goal="INCREASING_EPC", goal_value="C", is_default=True
)
)
session.commit()
def unit_of_work() -> PostgresUnitOfWork:
@ -119,8 +128,8 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
fuel_rates=FuelRatesStaticFileRepository(),
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
unit_of_work=unit_of_work,
calculator=Sap10Calculator(),
),
)
command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])
@ -148,3 +157,103 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
assert baseline.space_heating_kwh == 13120.0
assert len(epc_rows) == 1
assert len(baseline_rows) == 1
def _uninsulated_cavity_epc() -> EpcPropertyData:
"""The sample EPC with its MAIN wall flipped to an uninsulated cavity, so
the wall Recommendation Generator fires."""
epc = _lodged_epc()
main = epc.sap_building_parts[0]
uninsulated_main = dataclasses.replace(main, wall_insulation_type=4)
return dataclasses.replace(epc, sap_building_parts=[uninsulated_main])
def test_first_run_persists_a_plan_with_a_cavity_wall_measure(
db_engine: Engine,
) -> None:
# Arrange — a property to ingest, the Scenario the FE created, and a
# cavity-wall Product so the measure can be priced. (The SAP-numeric
# correctness of the cascade is pinned in test_elmhurst_cascade_pins; here
# we prove the Plan is generated, priced and persisted end-to-end.)
with Session(db_engine) as session:
session.add(
PropertyRow(
id=20,
portfolio_id=1,
postcode="A0 0AA",
address="2 Some Street",
uprn=22222,
)
)
session.add(
ScenarioRow(
id=7, goal="INCREASING_EPC", goal_value="C", is_default=True
)
)
session.add(
MaterialRow(
id=1,
type="cavity_wall_insulation",
total_cost=18.5,
cost_unit="gbp_per_m2",
is_active=True,
description="Cavity wall insulation",
)
)
session.commit()
def unit_of_work() -> PostgresUnitOfWork:
return PostgresUnitOfWork(lambda: Session(db_engine))
pipeline = AraFirstRunPipeline(
ingestion=IngestionOrchestrator(
unit_of_work=unit_of_work,
epc_fetcher=_FetcherReturning(_uninsulated_cavity_epc()),
geospatial_repo=_NoCoordinates(),
solar_fetcher=_UnusedSolarFetcher(),
),
baseline=PropertyBaselineOrchestrator(
unit_of_work=unit_of_work,
rebaseliner=StubRebaseliner(),
fuel_rates=FuelRatesStaticFileRepository(),
),
modelling=ModellingOrchestrator(
unit_of_work=unit_of_work,
calculator=Sap10Calculator(),
),
)
command = _FakeCommand(portfolio_id=1, property_ids=[20], scenario_ids=[7])
# Act
pipeline.run(command)
# Assert — one Plan for (property 20, scenario 7) with a single cavity-wall
# Plan Measure linked by plan_id, priced from the Product, figures present.
with Session(db_engine) as session:
plan = session.exec(
select(PlanRow).where(col(PlanRow.property_id) == 20)
).first()
assert plan is not None
rec_rows = session.exec(
select(RecommendationRow).where(
col(RecommendationRow.plan_id) == plan.id
)
).all()
assert plan.scenario_id == 7
assert plan.portfolio_id == 1
assert plan.is_default is True
assert plan.post_sap_points is not None
assert plan.post_epc_rating is not None
assert plan.cost_of_works is not None
assert plan.cost_of_works > 0.0
assert len(rec_rows) == 1
rec = rec_rows[0]
assert rec.type == "cavity_wall_insulation"
assert rec.default is True
assert rec.already_installed is False
assert rec.sap_points is not None
assert rec.co2_equivalent_savings is not None
assert rec.estimated_cost is not None
assert rec.estimated_cost > 0.0