From c7e2aa37550ca350e2b68f99ef44a998cdb8aa30 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 3 Jun 2026 12:08:32 +0000 Subject: [PATCH] feat(modelling): ModellingOrchestrator persists a Plan end-to-end (#1157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice 4b — closes the #1157 tracer. ModellingOrchestrator.run(property_ids, scenario_ids, portfolio_id) now does real work in one Unit of Work, committed once (ADR-0011/0012/0016/0017): read Property (effective EPC) + Scenario via repos → recommend_cavity_wall → select its Option → PackageScorer.score (role-2 package total) + marginal_impacts (role-3 attribution) → build Plan/PlanMeasure → uow.plan.save → commit. - AraFirstRunPipeline / ModellingStage thread portfolio_id from the trigger body (one source of truth); handler builds the real orchestrator (unit_of_work + Sap10Calculator), dropping the Scenario/Materials stubs. - ScenarioRepository.get_many promoted to @abstractmethod now the bare-stub instantiations are gone. - New ara_first_run-style integration test: a property with an uninsulated cavity wall yields a persisted Plan + one cavity_wall_insulation Plan Measure (priced from the Product, figures present, linked by plan_id). Numeric SAP correctness is pinned separately in test_elmhurst_cascade_pins. - Existing pipeline integration test updated: seeds scenario 7 and runs the real Modelling stage (its already-insulated sample wall yields an empty package — no crash). 121 pass across repositories/modelling/orchestration/app; pyright strict clean. Co-Authored-By: Claude Opus 4.8 --- applications/ara_first_run/handler.py | 9 +- orchestration/ara_first_run_pipeline.py | 8 +- orchestration/modelling_orchestrator.py | 108 +++++++++++++--- repositories/scenario/scenario_repository.py | 19 +-- .../test_ara_first_run_pipeline.py | 8 +- ...test_ara_first_run_pipeline_integration.py | 119 +++++++++++++++++- 6 files changed, 230 insertions(+), 41 deletions(-) diff --git a/applications/ara_first_run/handler.py b/applications/ara_first_run/handler.py index a546d0f4..837730b6 100644 --- a/applications/ara_first_run/handler.py +++ b/applications/ara_first_run/handler.py @@ -27,9 +27,7 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import ( FuelRatesStaticFileRepository, ) from repositories.geospatial.geospatial_repository import GeospatialRepository -from repositories.materials.materials_repository import MaterialsRepository from repositories.postgres_unit_of_work import PostgresUnitOfWork -from repositories.scenario.scenario_repository import ScenarioRepository from repositories.unit_of_work import UnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler @@ -72,8 +70,7 @@ def build_first_run_pipeline( Each stage opens its own unit(s) and commits per batch (ADR-0012); the handler no longer holds a session. The source clients are passed in because - their config is not settled — see ``_source_clients_from_env``. Modelling is - stubbed (#1136); its Scenario / Materials ports are seams. + their config is not settled — see ``_source_clients_from_env``. """ return AraFirstRunPipeline( ingestion=IngestionOrchestrator( @@ -91,8 +88,8 @@ def build_first_run_pipeline( fuel_rates=FuelRatesStaticFileRepository(), ), modelling=ModellingOrchestrator( - scenario_repo=ScenarioRepository(), - materials_repo=MaterialsRepository(), + unit_of_work=unit_of_work, + calculator=Sap10Calculator(), ), ) diff --git a/orchestration/ara_first_run_pipeline.py b/orchestration/ara_first_run_pipeline.py index ed507d6e..c17f88be 100644 --- a/orchestration/ara_first_run_pipeline.py +++ b/orchestration/ara_first_run_pipeline.py @@ -38,7 +38,9 @@ class PropertyBaselineStage(Protocol): class ModellingStage(Protocol): """Stage 3 — scores each Property against its Scenarios into Plans.""" - def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: ... + def run( + self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int + ) -> None: ... class AraFirstRunPipeline: @@ -67,4 +69,6 @@ class AraFirstRunPipeline: def run(self, command: AraFirstRunCommand) -> None: self._ingestion.run(command.property_ids) self._baseline.run(command.property_ids) - self._modelling.run(command.property_ids, command.scenario_ids) + self._modelling.run( + command.property_ids, command.scenario_ids, command.portfolio_id + ) diff --git a/orchestration/modelling_orchestrator.py b/orchestration/modelling_orchestrator.py index 48f70b19..97c2bbe9 100644 --- a/orchestration/modelling_orchestrator.py +++ b/orchestration/modelling_orchestrator.py @@ -1,29 +1,105 @@ from __future__ import annotations -from repositories.materials.materials_repository import MaterialsRepository -from repositories.scenario.scenario_repository import ScenarioRepository +from collections.abc import Callable + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.modelling.package_scorer import PackageScorer, Score +from domain.modelling.plan import Plan, PlanMeasure +from domain.modelling.recommendation import MeasureOption, Recommendation +from domain.modelling.scenario import Scenario +from domain.modelling.scoring import MeasureImpact, marginal_impacts +from domain.modelling.simulation import EpcSimulation +from domain.modelling.wall_recommendation import recommend_cavity_wall +from domain.sap10_calculator.calculator import SapCalculator +from repositories.product.product_repository import ProductRepository +from repositories.unit_of_work import UnitOfWork class ModellingOrchestrator: - """Stage 3 — scores each baselined Property against its Scenarios, producing - Recommendations -> an Optimised Package per Scenario Phase -> Plans - (CONTEXT.md: Modelling). + """Stage 3 — scores each baselined Property against its Scenarios into Plans + and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 / + ADR-0017). - Stub at this stage (#1136): ``run`` reads its inputs through repos (it takes - only ``property_ids`` + ``scenario_ids``, never an in-memory hand-off from - Baseline) but does no scoring yet. Full Modelling lands via later TDD slices - + per-service grills. The Scenario / Materials repos are injected now so the - composition and wiring are real even while the body is empty. + Runs the whole batch in **one** Unit of Work and commits once: for each + (Property × Scenario) it reads the Property's Effective EPC and the Scenario + through repos, generates the candidate Recommendation, selects its Option + into a trivial Optimised Package, scores the package (role 2) and attributes + each measure (role-3 marginal cascade), and persists a **Plan** with its + **Plan Measures**. The optimiser, exclusions, and multi-measure generators + land in later slices; this is the single-measure tracer. + + Reads only through repos and threads only IDs (`property_ids`, + `scenario_ids`, `portfolio_id`) — never an in-memory hand-off from Baseline + (ADR-0011). The injected `SapCalculator` is the scoring engine seam. """ def __init__( self, *, - scenario_repo: ScenarioRepository, - materials_repo: MaterialsRepository, + unit_of_work: Callable[[], UnitOfWork], + calculator: SapCalculator, ) -> None: - self._scenario_repo = scenario_repo - self._materials_repo = materials_repo + self._unit_of_work = unit_of_work + self._calculator = calculator - def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: - return None + def run( + self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int + ) -> None: + scorer = PackageScorer(self._calculator) + with self._unit_of_work() as uow: + properties = uow.property.get_many(property_ids) + scenarios: list[Scenario] = uow.scenario.get_many(scenario_ids) + for property_id, prop in zip(property_ids, properties, strict=True): + effective_epc: EpcPropertyData = prop.effective_epc + for scenario in scenarios: + plan = self._plan_for(scorer, effective_epc, uow.product) + uow.plan.save( + plan, + property_id=property_id, + scenario_id=scenario.id, + portfolio_id=portfolio_id, + is_default=scenario.is_default, + ) + uow.commit() + + def _plan_for( + self, + scorer: PackageScorer, + effective_epc: EpcPropertyData, + products: ProductRepository, + ) -> Plan: + """Generate → select → score → attribute the single-measure package for + one Property + Scenario, and assemble its Plan.""" + recommendation: Recommendation | None = recommend_cavity_wall( + effective_epc, products + ) + selected: list[MeasureOption] = ( + [recommendation.options[0]] if recommendation is not None else [] + ) + overlays: list[EpcSimulation] = [option.overlay for option in selected] + + baseline: Score = scorer.score(effective_epc, []) + post_retrofit: Score = scorer.score(effective_epc, overlays) + impacts: list[MeasureImpact] = marginal_impacts( + scorer, effective_epc, overlays + ) + measures: tuple[PlanMeasure, ...] = tuple( + _plan_measure(option, impact) + for option, impact in zip(selected, impacts, strict=True) + ) + return Plan( + measures=measures, baseline=baseline, post_retrofit=post_retrofit + ) + + +def _plan_measure(option: MeasureOption, impact: MeasureImpact) -> PlanMeasure: + if option.cost is None: + raise ValueError( + f"measure option {option.measure_type!r} has no cost; cannot persist" + ) + return PlanMeasure( + measure_type=option.measure_type, + description=option.description, + cost=option.cost, + impact=impact, + ) diff --git a/repositories/scenario/scenario_repository.py b/repositories/scenario/scenario_repository.py index f92d30d0..f5d0c252 100644 --- a/repositories/scenario/scenario_repository.py +++ b/repositories/scenario/scenario_repository.py @@ -1,6 +1,8 @@ from __future__ import annotations -from abc import ABC +from abc import ABC, abstractmethod + +from domain.modelling.scenario import Scenario class ScenarioRepository(ABC): @@ -8,12 +10,11 @@ class ScenarioRepository(ABC): The FE creates a Scenario in the scenario-builder and passes only its id to the pipeline (#1130); the orchestrator reads it back through this port - at modelling time. - - The concrete method shape is ``get_many(scenario_ids) -> list[Scenario]`` - (bulk read by id, load-whole per ADR-0012), implemented by - ``ScenarioPostgresRepository``. It is promoted to an ``@abstractmethod`` - here when the real ``ModellingOrchestrator`` is wired and the bare-stub - instantiations are retired (#1157 orchestrator slice) — until then the port - stays instantiable so the stubbed Modelling wiring composes. + at modelling time. Bulk read by id, load-whole per ADR-0012. """ + + @abstractmethod + def get_many(self, scenario_ids: list[int]) -> list[Scenario]: + """Return the Scenarios for ``scenario_ids``, in the same order, + raising if any id has no row.""" + ... diff --git a/tests/orchestration/test_ara_first_run_pipeline.py b/tests/orchestration/test_ara_first_run_pipeline.py index 8d78ff2c..bb0399ab 100644 --- a/tests/orchestration/test_ara_first_run_pipeline.py +++ b/tests/orchestration/test_ara_first_run_pipeline.py @@ -34,8 +34,10 @@ class _SpyModelling: def __init__(self, log: list[tuple[object, ...]]) -> None: self._log = log - def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: - self._log.append(("modelling", property_ids, scenario_ids)) + def run( + self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int + ) -> None: + self._log.append(("modelling", property_ids, scenario_ids, portfolio_id)) def test_run_sequences_the_three_stages_threading_only_property_ids() -> None: @@ -60,5 +62,5 @@ def test_run_sequences_the_three_stages_threading_only_property_ids() -> None: assert log == [ ("ingestion", [10, 11]), ("baseline", [10, 11]), - ("modelling", [10, 11], [7]), + ("modelling", [10, 11], [7], 1), ] diff --git a/tests/orchestration/test_ara_first_run_pipeline_integration.py b/tests/orchestration/test_ara_first_run_pipeline_integration.py index 3d6aeb4a..1fe4dc2d 100644 --- a/tests/orchestration/test_ara_first_run_pipeline_integration.py +++ b/tests/orchestration/test_ara_first_run_pipeline_integration.py @@ -13,17 +13,21 @@ from pathlib import Path from typing import Any, Optional from sqlalchemy import Engine -from sqlmodel import Session, select +from sqlmodel import Session, col, select from datatypes.epc.domain.epc import Epc from datatypes.epc.domain.epc_property_data import EpcPropertyData from datatypes.epc.domain.mapper import EpcPropertyDataMapper from domain.property_baseline.rebaseliner import StubRebaseliner +from domain.sap10_calculator.calculator import Sap10Calculator +from infrastructure.postgres.scenario_table import ScenarioRow from domain.geospatial.coordinates import Coordinates from infrastructure.postgres.property_baseline_performance_table import ( PropertyBaselinePerformanceModel, ) from infrastructure.postgres.epc_property_table import EpcPropertyModel +from infrastructure.postgres.plan_table import PlanRow, RecommendationRow +from infrastructure.postgres.product_table import MaterialRow from infrastructure.postgres.property_table import PropertyRow from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator from orchestration.ara_first_run_pipeline import AraFirstRunPipeline @@ -36,9 +40,7 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import ( FuelRatesStaticFileRepository, ) from repositories.geospatial.geospatial_repository import GeospatialRepository -from repositories.materials.materials_repository import MaterialsRepository from repositories.postgres_unit_of_work import PostgresUnitOfWork -from repositories.scenario.scenario_repository import ScenarioRepository _JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples" @@ -101,6 +103,13 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun( uprn=12345, ) ) + # Modelling now runs for real: it reads scenario 7 (the command's + # scenario_ids) through the repo, so the row must exist. + session.add( + ScenarioRow( + id=7, goal="INCREASING_EPC", goal_value="C", is_default=True + ) + ) session.commit() def unit_of_work() -> PostgresUnitOfWork: @@ -119,8 +128,8 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun( fuel_rates=FuelRatesStaticFileRepository(), ), modelling=ModellingOrchestrator( - scenario_repo=ScenarioRepository(), - materials_repo=MaterialsRepository(), + unit_of_work=unit_of_work, + calculator=Sap10Calculator(), ), ) command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7]) @@ -148,3 +157,103 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun( assert baseline.space_heating_kwh == 13120.0 assert len(epc_rows) == 1 assert len(baseline_rows) == 1 + + +def _uninsulated_cavity_epc() -> EpcPropertyData: + """The sample EPC with its MAIN wall flipped to an uninsulated cavity, so + the wall Recommendation Generator fires.""" + epc = _lodged_epc() + main = epc.sap_building_parts[0] + uninsulated_main = dataclasses.replace(main, wall_insulation_type=4) + return dataclasses.replace(epc, sap_building_parts=[uninsulated_main]) + + +def test_first_run_persists_a_plan_with_a_cavity_wall_measure( + db_engine: Engine, +) -> None: + # Arrange — a property to ingest, the Scenario the FE created, and a + # cavity-wall Product so the measure can be priced. (The SAP-numeric + # correctness of the cascade is pinned in test_elmhurst_cascade_pins; here + # we prove the Plan is generated, priced and persisted end-to-end.) + with Session(db_engine) as session: + session.add( + PropertyRow( + id=20, + portfolio_id=1, + postcode="A0 0AA", + address="2 Some Street", + uprn=22222, + ) + ) + session.add( + ScenarioRow( + id=7, goal="INCREASING_EPC", goal_value="C", is_default=True + ) + ) + session.add( + MaterialRow( + id=1, + type="cavity_wall_insulation", + total_cost=18.5, + cost_unit="gbp_per_m2", + is_active=True, + description="Cavity wall insulation", + ) + ) + session.commit() + + def unit_of_work() -> PostgresUnitOfWork: + return PostgresUnitOfWork(lambda: Session(db_engine)) + + pipeline = AraFirstRunPipeline( + ingestion=IngestionOrchestrator( + unit_of_work=unit_of_work, + epc_fetcher=_FetcherReturning(_uninsulated_cavity_epc()), + geospatial_repo=_NoCoordinates(), + solar_fetcher=_UnusedSolarFetcher(), + ), + baseline=PropertyBaselineOrchestrator( + unit_of_work=unit_of_work, + rebaseliner=StubRebaseliner(), + fuel_rates=FuelRatesStaticFileRepository(), + ), + modelling=ModellingOrchestrator( + unit_of_work=unit_of_work, + calculator=Sap10Calculator(), + ), + ) + command = _FakeCommand(portfolio_id=1, property_ids=[20], scenario_ids=[7]) + + # Act + pipeline.run(command) + + # Assert — one Plan for (property 20, scenario 7) with a single cavity-wall + # Plan Measure linked by plan_id, priced from the Product, figures present. + with Session(db_engine) as session: + plan = session.exec( + select(PlanRow).where(col(PlanRow.property_id) == 20) + ).first() + assert plan is not None + rec_rows = session.exec( + select(RecommendationRow).where( + col(RecommendationRow.plan_id) == plan.id + ) + ).all() + + assert plan.scenario_id == 7 + assert plan.portfolio_id == 1 + assert plan.is_default is True + assert plan.post_sap_points is not None + assert plan.post_epc_rating is not None + assert plan.cost_of_works is not None + assert plan.cost_of_works > 0.0 + + assert len(rec_rows) == 1 + rec = rec_rows[0] + assert rec.type == "cavity_wall_insulation" + assert rec.default is True + assert rec.already_installed is False + assert rec.sap_points is not None + assert rec.co2_equivalent_savings is not None + assert rec.estimated_cost is not None + assert rec.estimated_cost > 0.0