diff --git a/applications/ara_first_run/handler.py b/applications/ara_first_run/handler.py index b944227b..c0df86a9 100644 --- a/applications/ara_first_run/handler.py +++ b/applications/ara_first_run/handler.py @@ -1,12 +1,36 @@ from __future__ import annotations +import os from typing import Any, Protocol +from sqlmodel import Session + from applications.ara_first_run.ara_first_run_trigger_body import ( AraFirstRunTriggerBody, ) +from domain.baseline.rebaseliner import StubRebaseliner +from infrastructure.postgres.config import PostgresConfig +from infrastructure.postgres.engine import make_engine +from orchestration.baseline_orchestrator import BaselineOrchestrator from orchestration.first_run_pipeline import FirstRunPipeline +from orchestration.ingestion_orchestrator import ( + EpcFetcher, + IngestionOrchestrator, + SolarFetcher, +) +from orchestration.modelling_orchestrator import ModellingOrchestrator from orchestration.task_orchestrator import TaskOrchestrator +from repositories.baseline.baseline_postgres_repository import ( + BaselinePostgresRepository, +) +from repositories.epc.epc_postgres_repository import EpcPostgresRepository +from repositories.geospatial.geospatial_repository import GeospatialRepository +from repositories.materials.materials_repository import MaterialsRepository +from repositories.property.property_postgres_repository import ( + PropertyPostgresRepository, +) +from repositories.scenario.scenario_repository import ScenarioRepository +from repositories.solar.solar_postgres_repository import SolarPostgresRepository from utilities.aws_lambda.subtask_handler import subtask_handler @@ -19,16 +43,79 @@ class _RunsFirstRun(Protocol): def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None: """Validate the raw event body and hand the command to the pipeline. - The handler's entire job — kept as a named seam so it is exercised without - the Lambda runtime. No business logic lives here: validate, then delegate - (issue #1130). + The handler's entire decision logic — kept as a named seam so it is + exercised without the Lambda runtime. No business logic lives here: validate, + then delegate (issue #1130/#1136). """ trigger = AraFirstRunTriggerBody.model_validate(body) pipeline.run(trigger) +def build_first_run_pipeline( + *, + session: Session, + epc_fetcher: EpcFetcher, + geospatial_repo: GeospatialRepository, + solar_fetcher: SolarFetcher, +) -> FirstRunPipeline: + """Compose the real three-stage pipeline over Postgres-backed repos. + + The stages share the session's repos and hand off only ``property_ids`` + through them (ADR-0011). The source clients are passed in rather than built + here because their config is not settled — see ``_source_clients_from_env``. + Modelling is stubbed (#1136); its Scenario / Materials ports are seams. + """ + epc_repo = EpcPostgresRepository(session) + property_repo = PropertyPostgresRepository(session, epc_repo) + solar_repo = SolarPostgresRepository(session) + baseline_repo = BaselinePostgresRepository(session) + return FirstRunPipeline( + ingestion=IngestionOrchestrator( + property_repo=property_repo, + epc_fetcher=epc_fetcher, + geospatial_repo=geospatial_repo, + solar_fetcher=solar_fetcher, + epc_repo=epc_repo, + solar_repo=solar_repo, + ), + baseline=BaselineOrchestrator( + property_repo=property_repo, + rebaseliner=StubRebaseliner(), + baseline_repo=baseline_repo, + ), + modelling=ModellingOrchestrator( + scenario_repo=ScenarioRepository(), + materials_repo=MaterialsRepository(), + ), + ) + + +def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarFetcher]: + """The Ingestion source clients — EPC API, Google Solar, geospatial S3. + + TODO(deploy): their config (EPC auth token, Google Solar API key, geospatial + S3 parquet reader), env-var names, and the pandas/s3fs runtime deps are not + settled — that wiring is a separate Terraform piece, out of scope for #1136. + Raises until then so the lambda fails loudly rather than half-running. + """ + raise NotImplementedError( + "ara_first_run source-client wiring (EPC / Google Solar / geospatial) " + "is pending the deploy/Terraform piece; see #1136." + ) + + @subtask_handler() def handler( body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator ) -> None: - dispatch_first_run(body, pipeline=FirstRunPipeline()) + engine = make_engine(PostgresConfig.from_env(dict(os.environ))) + epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env() + with Session(engine) as session: + pipeline = build_first_run_pipeline( + session=session, + epc_fetcher=epc_fetcher, + geospatial_repo=geospatial_repo, + solar_fetcher=solar_fetcher, + ) + dispatch_first_run(body, pipeline=pipeline) + session.commit() diff --git a/orchestration/first_run_pipeline.py b/orchestration/first_run_pipeline.py index 1fd8839b..3d642d9e 100644 --- a/orchestration/first_run_pipeline.py +++ b/orchestration/first_run_pipeline.py @@ -23,14 +23,48 @@ class FirstRunCommand(Protocol): def scenario_ids(self) -> list[int]: ... -class FirstRunPipeline: - """Composes the First Run stages end-to-end (Ingestion -> Baseline -> - Modelling), threading only ``property_ids`` between them through repos - (ADR-0011). +class IngestionStage(Protocol): + """Stage 1 — acquires and persists each Property's external source data.""" - Stub at this stage (#1130): ``run`` simply receives the validated command. - The real three-stage composition lands in #1136. + def run(self, property_ids: list[int]) -> None: ... + + +class BaselineStage(Protocol): + """Stage 2 — establishes each Property's Baseline Performance.""" + + def run(self, property_ids: list[int]) -> None: ... + + +class ModellingStage(Protocol): + """Stage 3 — scores each Property against its Scenarios into Plans.""" + + def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: ... + + +class FirstRunPipeline: + """Composes the First Run stages end-to-end: Ingestion -> Baseline -> + Modelling. + + Threads **only** ``property_ids`` between stages (and ``scenario_ids`` into + Modelling, off the command — not a prior stage). The stages communicate + through repos, never via in-memory hand-off, which is what makes each stage + independently runnable for the single-property review flow (ADR-0011, + ADR-0003). Stage orchestrators are injected so the handler owns wiring and + tests substitute fakes. """ + def __init__( + self, + *, + ingestion: IngestionStage, + baseline: BaselineStage, + modelling: ModellingStage, + ) -> None: + self._ingestion = ingestion + self._baseline = baseline + self._modelling = modelling + def run(self, command: FirstRunCommand) -> None: - return None + self._ingestion.run(command.property_ids) + self._baseline.run(command.property_ids) + self._modelling.run(command.property_ids, command.scenario_ids) diff --git a/orchestration/modelling_orchestrator.py b/orchestration/modelling_orchestrator.py new file mode 100644 index 00000000..48f70b19 --- /dev/null +++ b/orchestration/modelling_orchestrator.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from repositories.materials.materials_repository import MaterialsRepository +from repositories.scenario.scenario_repository import ScenarioRepository + + +class ModellingOrchestrator: + """Stage 3 — scores each baselined Property against its Scenarios, producing + Recommendations -> an Optimised Package per Scenario Phase -> Plans + (CONTEXT.md: Modelling). + + Stub at this stage (#1136): ``run`` reads its inputs through repos (it takes + only ``property_ids`` + ``scenario_ids``, never an in-memory hand-off from + Baseline) but does no scoring yet. Full Modelling lands via later TDD slices + + per-service grills. The Scenario / Materials repos are injected now so the + composition and wiring are real even while the body is empty. + """ + + def __init__( + self, + *, + scenario_repo: ScenarioRepository, + materials_repo: MaterialsRepository, + ) -> None: + self._scenario_repo = scenario_repo + self._materials_repo = materials_repo + + def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: + return None diff --git a/repositories/materials/__init__.py b/repositories/materials/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/repositories/materials/materials_repository.py b/repositories/materials/materials_repository.py new file mode 100644 index 00000000..5d94f166 --- /dev/null +++ b/repositories/materials/materials_repository.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from abc import ABC + + +class MaterialsRepository(ABC): + """Loads the retrofit Materials catalogue the Modelling stage draws measures + and costs from. + + Seam only at this stage (#1136): the method shape is deferred to the + Modelling per-service grill. Declared now so the pipeline can be composed + end-to-end with Modelling stubbed. + """ diff --git a/repositories/scenario/__init__.py b/repositories/scenario/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/repositories/scenario/scenario_repository.py b/repositories/scenario/scenario_repository.py new file mode 100644 index 00000000..f560db14 --- /dev/null +++ b/repositories/scenario/scenario_repository.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from abc import ABC + + +class ScenarioRepository(ABC): + """Loads the Scenarios (and Scenario Snapshots) the Modelling stage scores + a Property against. + + Seam only at this stage (#1136): the method shape is deferred to the + Modelling per-service grill, where Scenario / Scenario Phase / Scenario + Snapshot are designed (CONTEXT.md). Declared now so the pipeline can be + composed end-to-end with Modelling stubbed. + """ diff --git a/tests/orchestration/test_first_run_pipeline.py b/tests/orchestration/test_first_run_pipeline.py index 4b685bb2..705282ee 100644 --- a/tests/orchestration/test_first_run_pipeline.py +++ b/tests/orchestration/test_first_run_pipeline.py @@ -14,16 +14,51 @@ class _FakeCommand: scenario_ids: list[int] -def test_run_accepts_the_validated_command() -> None: +class _SpyIngestion: + def __init__(self, log: list[tuple[object, ...]]) -> None: + self._log = log + + def run(self, property_ids: list[int]) -> None: + self._log.append(("ingestion", property_ids)) + + +class _SpyBaseline: + def __init__(self, log: list[tuple[object, ...]]) -> None: + self._log = log + + def run(self, property_ids: list[int]) -> None: + self._log.append(("baseline", property_ids)) + + +class _SpyModelling: + def __init__(self, log: list[tuple[object, ...]]) -> None: + self._log = log + + def run(self, property_ids: list[int], scenario_ids: list[int]) -> None: + self._log.append(("modelling", property_ids, scenario_ids)) + + +def test_run_sequences_the_three_stages_threading_only_property_ids() -> None: # Arrange + log: list[tuple[object, ...]] = [] command: FirstRunCommand = _FakeCommand( - portfolio_id=42, property_ids=[101, 102], scenario_ids=[7] + portfolio_id=1, property_ids=[10, 11], scenario_ids=[7] + ) + pipeline = FirstRunPipeline( + ingestion=_SpyIngestion(log), + baseline=_SpyBaseline(log), + modelling=_SpyModelling(log), ) - pipeline = FirstRunPipeline() # Act - result = pipeline.run(command) + pipeline.run(command) - # Assert — the stub simply receives the command; full Ingestion -> Baseline - # -> Modelling composition lands in #1136. - assert result is None + # Assert — Ingestion -> Baseline -> Modelling, in order. Ingestion and + # Baseline receive only property_ids; Modelling additionally gets the + # scenario_ids (off the command, not a prior stage). Nothing else is + # threaded between stages — they communicate through repos (ADR-0011). + assert log == [ + ("ingestion", [10, 11]), + ("baseline", [10, 11]), + ("modelling", [10, 11], [7]), + ] diff --git a/tests/orchestration/test_first_run_pipeline_integration.py b/tests/orchestration/test_first_run_pipeline_integration.py new file mode 100644 index 00000000..55ca34ed --- /dev/null +++ b/tests/orchestration/test_first_run_pipeline_integration.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Optional + +from datatypes.epc.domain.epc import Epc +from datatypes.epc.domain.epc_property_data import ( + EpcPropertyData, + RenewableHeatIncentive, +) +from domain.baseline.rebaseliner import StubRebaseliner +from domain.geospatial.coordinates import Coordinates +from domain.property.property import Property, PropertyIdentity +from orchestration.baseline_orchestrator import BaselineOrchestrator +from orchestration.first_run_pipeline import FirstRunPipeline +from orchestration.ingestion_orchestrator import IngestionOrchestrator +from orchestration.modelling_orchestrator import ModellingOrchestrator +from repositories.baseline.baseline_repository import BaselineRepository +from repositories.epc.epc_repository import EpcRepository +from repositories.geospatial.geospatial_repository import GeospatialRepository +from repositories.materials.materials_repository import MaterialsRepository +from repositories.property.property_repository import PropertyRepository +from repositories.scenario.scenario_repository import ScenarioRepository +from repositories.solar.solar_repository import SolarRepository +from domain.baseline.baseline_performance import BaselinePerformance + + +@dataclass +class _FakeCommand: + portfolio_id: int + property_ids: list[int] + scenario_ids: list[int] + + +class _SharedEpcRepo(EpcRepository): + """Stands in for the persisted EPC slice both stages talk through.""" + + def __init__(self) -> None: + self._by_property: dict[int, EpcPropertyData] = {} + + def save( + self, + data: EpcPropertyData, + property_id: Optional[int] = None, + portfolio_id: Optional[int] = None, + ) -> int: + assert property_id is not None + self._by_property[property_id] = data + return property_id + + def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover + raise NotImplementedError + + def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: + return self._by_property.get(property_id) + + +class _RepoBackedPropertyRepo(PropertyRepository): + """Composes the Property from its identity row + the EPC slice in the shared + EPC repo — mirroring PropertyPostgresRepository, so the stages genuinely + hand off through repo state, not in memory.""" + + def __init__( + self, identities: dict[int, PropertyIdentity], epc_repo: _SharedEpcRepo + ) -> None: + self._identities = identities + self._epc_repo = epc_repo + + def get(self, property_id: int) -> Property: + return Property( + identity=self._identities[property_id], + epc=self._epc_repo.get_for_property(property_id), + ) + + +class _FakeEpcFetcher: + def __init__(self, epc: EpcPropertyData) -> None: + self._epc = epc + + def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: + return self._epc + + +class _NoCoordinatesGeospatialRepo(GeospatialRepository): + def coordinates_for(self, uprn: int) -> Optional[Coordinates]: + return None # skip the solar leg — not under test here + + +class _FakeSolarFetcher: + def get_building_insights( + self, longitude: float, latitude: float + ) -> dict[str, Any]: # pragma: no cover + return {} + + +class _FakeSolarRepo(SolarRepository): + def save(self, property_id: int, insights: dict[str, Any]) -> None: # pragma: no cover + return None + + def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover + raise NotImplementedError + + +class _CollectingBaselineRepo(BaselineRepository): + def __init__(self) -> None: + self.saved: list[tuple[BaselinePerformance, int]] = [] + + def save(self, baseline: BaselinePerformance, property_id: int) -> int: + self.saved.append((baseline, property_id)) + return len(self.saved) + + def get_for_property( + self, property_id: int + ) -> Optional[BaselinePerformance]: # pragma: no cover + raise NotImplementedError + + +class _FakeScenarioRepo(ScenarioRepository): + pass + + +class _FakeMaterialsRepo(MaterialsRepository): + pass + + +def _ingestible_epc() -> EpcPropertyData: + epc = object.__new__(EpcPropertyData) + epc.energy_rating_current = 72 + epc.current_energy_efficiency_band = Epc.C + epc.co2_emissions_current = 1.8 + epc.energy_consumption_current = 180 + epc.sap_version = 10.2 + epc.renewable_heat_incentive = RenewableHeatIncentive( + space_heating_kwh=5000.0, water_heating_kwh=2000.0 + ) + return epc + + +def test_baseline_reads_the_epc_ingestion_persisted_through_repos() -> None: + # Arrange — one property; the EPC the fetcher returns is what Ingestion + # persists and Baseline must then read back through the shared repo. + epc = _ingestible_epc() + epc_repo = _SharedEpcRepo() + identities = { + 10: PropertyIdentity( + portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=123 + ) + } + property_repo = _RepoBackedPropertyRepo(identities, epc_repo) + baseline_repo = _CollectingBaselineRepo() + + pipeline = FirstRunPipeline( + ingestion=IngestionOrchestrator( + property_repo=property_repo, + epc_fetcher=_FakeEpcFetcher(epc), + geospatial_repo=_NoCoordinatesGeospatialRepo(), + solar_fetcher=_FakeSolarFetcher(), + epc_repo=epc_repo, + solar_repo=_FakeSolarRepo(), + ), + baseline=BaselineOrchestrator( + property_repo=property_repo, + rebaseliner=StubRebaseliner(), + baseline_repo=baseline_repo, + ), + modelling=ModellingOrchestrator( + scenario_repo=_FakeScenarioRepo(), + materials_repo=_FakeMaterialsRepo(), + ), + ) + + # Act + pipeline.run(_FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])) + + # Assert — a Baseline Performance landed for property 10, its Lodged half + # read off the very EPC Ingestion persisted. Only property_ids crossed the + # stage boundary; the EPC itself travelled through the repo. + assert len(baseline_repo.saved) == 1 + baseline, property_id = baseline_repo.saved[0] + assert property_id == 10 + assert baseline.lodged.sap_score == 72 + assert baseline.lodged.epc_band == Epc.C + assert baseline.space_heating_kwh == 5000.0