diff --git a/applications/ara_first_run/handler.py b/applications/ara_first_run/handler.py index c0df86a9..f9cb6be7 100644 --- a/applications/ara_first_run/handler.py +++ b/applications/ara_first_run/handler.py @@ -1,8 +1,10 @@ from __future__ import annotations import os -from typing import Any, Protocol +from collections.abc import Callable +from typing import Any, Optional, Protocol +from sqlalchemy import Engine from sqlmodel import Session from applications.ara_first_run.ara_first_run_trigger_body import ( @@ -20,19 +22,24 @@ from orchestration.ingestion_orchestrator import ( ) from orchestration.modelling_orchestrator import ModellingOrchestrator from orchestration.task_orchestrator import TaskOrchestrator -from repositories.baseline.baseline_postgres_repository import ( - BaselinePostgresRepository, -) -from repositories.epc.epc_postgres_repository import EpcPostgresRepository from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.materials.materials_repository import MaterialsRepository -from repositories.property.property_postgres_repository import ( - PropertyPostgresRepository, -) +from repositories.postgres_unit_of_work import PostgresUnitOfWork from repositories.scenario.scenario_repository import ScenarioRepository -from repositories.solar.solar_postgres_repository import SolarPostgresRepository +from repositories.unit_of_work import UnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler +# Module-scoped so the connection pool is reused across warm Lambda invocations +# rather than rebuilt per invocation (ADR-0012). +_engine: Optional[Engine] = None + + +def _get_engine() -> Engine: + global _engine + if _engine is None: + _engine = make_engine(PostgresConfig.from_env(dict(os.environ))) + return _engine + class _RunsFirstRun(Protocol): """The slice of FirstRunPipeline the handler delegates to.""" @@ -44,8 +51,7 @@ def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None """Validate the raw event body and hand the command to the pipeline. The handler's entire decision logic — kept as a named seam so it is - exercised without the Lambda runtime. No business logic lives here: validate, - then delegate (issue #1130/#1136). + exercised without the Lambda runtime. No business logic: validate, delegate. """ trigger = AraFirstRunTriggerBody.model_validate(body) pipeline.run(trigger) @@ -53,35 +59,28 @@ def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None def build_first_run_pipeline( *, - session: Session, + unit_of_work: Callable[[], UnitOfWork], epc_fetcher: EpcFetcher, geospatial_repo: GeospatialRepository, solar_fetcher: SolarFetcher, ) -> FirstRunPipeline: - """Compose the real three-stage pipeline over Postgres-backed repos. + """Compose the real three-stage pipeline on a Unit-of-Work factory. - The stages share the session's repos and hand off only ``property_ids`` - through them (ADR-0011). The source clients are passed in rather than built - here because their config is not settled — see ``_source_clients_from_env``. - Modelling is stubbed (#1136); its Scenario / Materials ports are seams. + Each stage opens its own unit(s) and commits per batch (ADR-0012); the + handler no longer holds a session. The source clients are passed in because + their config is not settled — see ``_source_clients_from_env``. Modelling is + stubbed (#1136); its Scenario / Materials ports are seams. """ - epc_repo = EpcPostgresRepository(session) - property_repo = PropertyPostgresRepository(session, epc_repo) - solar_repo = SolarPostgresRepository(session) - baseline_repo = BaselinePostgresRepository(session) return FirstRunPipeline( ingestion=IngestionOrchestrator( - property_repo=property_repo, + unit_of_work=unit_of_work, epc_fetcher=epc_fetcher, geospatial_repo=geospatial_repo, solar_fetcher=solar_fetcher, - epc_repo=epc_repo, - solar_repo=solar_repo, ), baseline=BaselineOrchestrator( - property_repo=property_repo, + unit_of_work=unit_of_work, rebaseliner=StubRebaseliner(), - baseline_repo=baseline_repo, ), modelling=ModellingOrchestrator( scenario_repo=ScenarioRepository(), @@ -108,14 +107,15 @@ def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarF def handler( body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator ) -> None: - engine = make_engine(PostgresConfig.from_env(dict(os.environ))) + engine = _get_engine() + unit_of_work: Callable[[], UnitOfWork] = lambda: PostgresUnitOfWork( + lambda: Session(engine) + ) epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env() - with Session(engine) as session: - pipeline = build_first_run_pipeline( - session=session, - epc_fetcher=epc_fetcher, - geospatial_repo=geospatial_repo, - solar_fetcher=solar_fetcher, - ) - dispatch_first_run(body, pipeline=pipeline) - session.commit() + pipeline = build_first_run_pipeline( + unit_of_work=unit_of_work, + epc_fetcher=epc_fetcher, + geospatial_repo=geospatial_repo, + solar_fetcher=solar_fetcher, + ) + dispatch_first_run(body, pipeline=pipeline) diff --git a/docs/adr/0012-unit-of-work-per-stage-batch-transaction.md b/docs/adr/0012-unit-of-work-per-stage-batch-transaction.md new file mode 100644 index 00000000..c31e6e7c --- /dev/null +++ b/docs/adr/0012-unit-of-work-per-stage-batch-transaction.md @@ -0,0 +1,31 @@ +# Each stage commits its batch once, through a Unit of Work + +**Status: Accepted.** Refines [ADR-0011](0011-composable-stage-orchestrators.md) (composable stage orchestrators, stages communicate through repos) with the persistence/transaction mechanics for batch processing. Decided in a `/grill-with-docs` session (2026-05-31) after the First Run spine (#1136) landed, prompted by reviewing the handler's session lifecycle. + +## Context + +A First Run trigger carries a **batch** of ~30 `property_ids`. The pipeline runs that batch through Ingestion → Baseline → Modelling. The first cut (#1136) wrapped **all three stages in one `Session` and one final `commit()`** in the handler. That has three problems: + +1. **A connection is pinned for the whole long-running pipeline.** SQLAlchemy checks out a pooled connection on the first statement and holds it until commit. Ingestion is the only IO-heavy stage (per property: EPC HTTP, Google-Solar HTTP, geospatial S3), so the connection sits checked-out-but-idle across all that external IO — the RDS-Proxy/pgbouncer "transaction-pinned connection" anti-pattern. +2. **One giant transaction** for the batch: long-held locks, identity-map growth, all-or-nothing across stages. +3. **Cross-stage hand-off through an *uncommitted* transaction.** Baseline reads Ingestion's writes only because they share one open transaction — which contradicts ADR-0011/0003's "stages hand off through *persisted* state." If a stage ever moves to its own lambda, this breaks. + +A tempting fix — commit per property — is **rejected**: per-property commits are a commit storm that has overloaded the database before. The unit of commit must be the **batch**, not the property. + +## Decision + +- **Transaction boundary = one stage = one Unit of Work = one commit.** A batch yields ~3 commits (Ingestion, Baseline, Modelling), never N. No per-property commits. +- **All-or-nothing per batch, fail noisily.** Any property failing aborts that stage's unit (rollback); the exception propagates so `@subtask_handler` marks the subtask FAILED on the task table. Operators debug and re-run the batch. There is no per-property partial success. +- **Re-runs are idempotent.** Because stages commit independently, a re-run after a mid-pipeline failure re-executes already-committed earlier stages. So each stage's batch write **replaces** the rows for the batch's `property_ids` (delete-for-these-ids then bulk insert, or upsert) inside its unit. This is also what the future re-score-on-override path needs (re-baselining overwrites, never duplicates). +- **Bulk reads, load-whole (ADR-0002).** Repos expose `get_many(property_ids) -> Properties` returning fully-hydrated aggregates, implemented as one IN-filtered query per table composed in memory — a handful of round-trips per batch, not 30 × tables. No lean stage-specific read path. +- **Ingestion splits fetch from write.** Phase 1 fetches the whole batch (EPC / coordinates / solar) over HTTP/S3 with **no DB unit open**; phase 2 opens a unit and writes the batch, committing once. The connection is therefore held only for the short batch write, never across external IO. This sharpens the Fetcher-vs-Repo taxonomy of ADR-0011: Fetchers do IO outside any unit; Repos do DB inside the committed unit. +- **Mechanism: a `UnitOfWork`.** A `UnitOfWork` port + a `PostgresUnitOfWork` adapter (built on a module-scoped engine + sessionmaker) owns the session and constructs the DB-backed repos on it (`uow.property`, `uow.epc`, `uow.solar`, `uow.baseline`). It commits on explicit `commit()` and rolls back on any exception. Orchestrators take a `unit_of_work` factory plus their **non-DB** dependencies, injected separately: the EPC/Solar fetchers, the geospatial **S3** repo (reference data — read outside the transaction), and the Rebaseliner. Baseline uses one unit for the batch; Ingestion uses two (read uprns → fetch outside any unit → write batch). + +## Consequences + +- The orchestrators' dependency shape changes from "individual session-bound repos" to "a `unit_of_work` factory + non-DB deps". The #1134 Ingestion and #1135 Baseline orchestrators are refactored accordingly; `FirstRunPipeline` is unchanged (it still composes the three stages and threads only `property_ids`). +- Hard to reverse once every stage depends on the UoW — hence this ADR. +- Atomicity is **stage-level**, not per-property; correctness of the re-run workflow depends on the idempotent batch writes above. +- The engine + sessionmaker move to module scope so the pool is reused across warm Lambda invocations, rather than rebuilt per invocation (the existing `default_orchestrator` has the same per-invocation smell and should follow). +- EPC writes span child tables, so the idempotent "replace for these `property_ids`" must delete child rows too (cascade) before re-insert. +- The Modelling stub is left untouched this slice — its `run` is a no-op that touches no DB, so giving it a `unit_of_work` now would be an unused dependency. It takes a unit when its scoring body is built (the per-service Modelling grills). diff --git a/orchestration/baseline_orchestrator.py b/orchestration/baseline_orchestrator.py index 298e3683..4ae3a480 100644 --- a/orchestration/baseline_orchestrator.py +++ b/orchestration/baseline_orchestrator.py @@ -1,5 +1,7 @@ from __future__ import annotations +from collections.abc import Callable + from datatypes.epc.domain.epc_property_data import ( EpcPropertyData, RenewableHeatIncentive, @@ -7,50 +9,51 @@ from datatypes.epc.domain.epc_property_data import ( from domain.baseline.baseline_performance import BaselinePerformance from domain.baseline.performance import lodged_performance from domain.baseline.rebaseliner import Rebaseliner -from repositories.baseline.baseline_repository import BaselineRepository -from repositories.property.property_repository import PropertyRepository +from repositories.unit_of_work import UnitOfWork class BaselineOrchestrator: """Stage 2: establish each Property's Baseline Performance and persist it. - For each property: hydrate the Property aggregate via PropertyRepo, resolve - its Effective EPC, read Lodged Performance off it, run the Rebaseliner to - produce Effective Performance (equal to Lodged unless a trigger fires), and - persist the pair plus the deterministic kWh. + Runs the whole batch in **one** Unit of Work and commits once (ADR-0012): + for each property it hydrates the Property via the unit's PropertyRepo, + resolves the Effective EPC, reads Lodged Performance off it, runs the + Rebaseliner to produce Effective Performance, and persists the pair plus the + deterministic kWh. Any property raising aborts the batch — the unit is left + uncommitted, so nothing persists and the subtask fails noisily. - Reads only from repos — never a Fetcher or HTTP (ADR-0003). That is what - makes it byte-identical whether Ingestion ran milliseconds ago (First Run) - or last week (single-property review). The injected Rebaseliner is the - re-score-on-override seam: the future single-property flow re-runs the same - step after a Landlord Override changes the Effective EPC (ADR-0011). + Reads only from repos — never a Fetcher or HTTP (ADR-0003) — so it is + byte-identical whether Ingestion ran milliseconds ago (First Run) or last + week. The injected Rebaseliner is the re-score-on-override seam (ADR-0011). """ def __init__( self, *, - property_repo: PropertyRepository, + unit_of_work: Callable[[], UnitOfWork], rebaseliner: Rebaseliner, - baseline_repo: BaselineRepository, ) -> None: - self._property_repo = property_repo + self._unit_of_work = unit_of_work self._rebaseliner = rebaseliner - self._baseline_repo = baseline_repo def run(self, property_ids: list[int]) -> None: - for property_id in property_ids: - effective_epc = self._property_repo.get(property_id).effective_epc - lodged = lodged_performance(effective_epc) - effective, reason = self._rebaseliner.rebaseline(effective_epc, lodged) - rhi = _require_rhi(effective_epc) - baseline = BaselinePerformance( - lodged=lodged, - effective=effective, - rebaseline_reason=reason, - space_heating_kwh=rhi.space_heating_kwh, - water_heating_kwh=rhi.water_heating_kwh, - ) - self._baseline_repo.save(baseline, property_id) + with self._unit_of_work() as uow: + for property_id in property_ids: + effective_epc = uow.property.get(property_id).effective_epc + lodged = lodged_performance(effective_epc) + effective, reason = self._rebaseliner.rebaseline( + effective_epc, lodged + ) + rhi = _require_rhi(effective_epc) + baseline = BaselinePerformance( + lodged=lodged, + effective=effective, + rebaseline_reason=reason, + space_heating_kwh=rhi.space_heating_kwh, + water_heating_kwh=rhi.water_heating_kwh, + ) + uow.baseline.save(baseline, property_id) + uow.commit() def _require_rhi(epc: EpcPropertyData) -> RenewableHeatIncentive: diff --git a/orchestration/ingestion_orchestrator.py b/orchestration/ingestion_orchestrator.py index a3d60d8f..f2bce52b 100644 --- a/orchestration/ingestion_orchestrator.py +++ b/orchestration/ingestion_orchestrator.py @@ -1,12 +1,12 @@ from __future__ import annotations +from collections.abc import Callable +from dataclasses import dataclass from typing import Any, Optional, Protocol from datatypes.epc.domain.epc_property_data import EpcPropertyData -from repositories.epc.epc_repository import EpcRepository from repositories.geospatial.geospatial_repository import GeospatialRepository -from repositories.property.property_repository import PropertyRepository -from repositories.solar.solar_repository import SolarRepository +from repositories.unit_of_work import UnitOfWork class EpcFetcher(Protocol): @@ -23,50 +23,75 @@ class SolarFetcher(Protocol): ) -> dict[str, Any]: ... -class IngestionOrchestrator: - """Stage 1: acquire a Property's external source data and persist it. +@dataclass +class _Fetched: + """One property's externally-fetched source data, awaiting the write phase.""" - For each property: read its UPRN from the property row, fetch its EPC, resolve - its coordinates from the Geospatial reference Repo, thread those into the Solar - fetcher, and persist EPC + solar via repos. The orchestrator is the only place - a Fetcher and a Repo meet, and it threads the coordinate from the Repo into the - Solar Fetcher — Fetchers never call each other (ADR-0011). Coordinates are - reference data (deterministic from UPRN), so they are resolved transiently to - drive the Solar fetch rather than persisted per-property. + property_id: int + epc: Optional[EpcPropertyData] + solar_insights: Optional[dict[str, Any]] + + +class IngestionOrchestrator: + """Stage 1: acquire a batch's external source data and persist it. + + Runs in two phases so a DB connection is never held during external IO + (ADR-0012): **fetch** the whole batch — read each UPRN, fetch its EPC, resolve + coordinates from the Geospatial reference Repo, thread those into the Solar + fetcher — with *no unit open*; then **write** the batch in one Unit of Work + and commit once. Fetchers never call each other (ADR-0011); the orchestrator + threads the coordinate. Coordinates are reference data (deterministic from + UPRN), resolved transiently to drive the Solar fetch, never persisted. + + The geospatial repo reads S3 reference data, not the transactional store, so + it is injected separately rather than taken from the unit. """ def __init__( self, *, - property_repo: PropertyRepository, + unit_of_work: Callable[[], UnitOfWork], epc_fetcher: EpcFetcher, geospatial_repo: GeospatialRepository, solar_fetcher: SolarFetcher, - epc_repo: EpcRepository, - solar_repo: SolarRepository, ) -> None: - self._property_repo = property_repo + self._unit_of_work = unit_of_work self._epc_fetcher = epc_fetcher self._geospatial_repo = geospatial_repo self._solar_fetcher = solar_fetcher - self._epc_repo = epc_repo - self._solar_repo = solar_repo def run(self, property_ids: list[int]) -> None: - for property_id in property_ids: - uprn = self._property_repo.get(property_id).identity.uprn - if uprn is None: - # No UPRN to fetch against (e.g. landlord_property_id-only); a - # later Site-Notes path covers these. - continue + uprns = self._uprns_for(property_ids) + fetched = [self._fetch(property_id, uprn) for property_id, uprn in uprns] + self._persist(fetched) - epc = self._epc_fetcher.get_by_uprn(uprn) - if epc is not None: - self._epc_repo.save(epc, property_id=property_id) + def _uprns_for(self, property_ids: list[int]) -> list[tuple[int, int]]: + # A short read unit; properties with no UPRN (e.g. landlord_property_id + # only) are skipped — a later Site-Notes path covers them. + with self._unit_of_work() as uow: + pairs: list[tuple[int, int]] = [] + for property_id in property_ids: + uprn = uow.property.get(property_id).identity.uprn + if uprn is not None: + pairs.append((property_id, uprn)) + return pairs - coordinates = self._geospatial_repo.coordinates_for(uprn) - if coordinates is not None: - insights = self._solar_fetcher.get_building_insights( - coordinates.longitude, coordinates.latitude - ) - self._solar_repo.save(property_id, insights) + def _fetch(self, property_id: int, uprn: int) -> _Fetched: + # No unit open here — this is the external-IO phase. + epc = self._epc_fetcher.get_by_uprn(uprn) + solar_insights: Optional[dict[str, Any]] = None + coordinates = self._geospatial_repo.coordinates_for(uprn) + if coordinates is not None: + solar_insights = self._solar_fetcher.get_building_insights( + coordinates.longitude, coordinates.latitude + ) + return _Fetched(property_id, epc, solar_insights) + + def _persist(self, fetched: list[_Fetched]) -> None: + with self._unit_of_work() as uow: + for item in fetched: + if item.epc is not None: + uow.epc.save(item.epc, property_id=item.property_id) + if item.solar_insights is not None: + uow.solar.save(item.property_id, item.solar_insights) + uow.commit() diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py new file mode 100644 index 00000000..5891434a --- /dev/null +++ b/tests/orchestration/fakes.py @@ -0,0 +1,110 @@ +"""In-memory fakes for orchestrator unit tests (no DB, no network). + +A `FakeUnitOfWork` exposes dict-backed fake repos and records commits, so a +test can drive an orchestrator and then assert what was persisted and that the +batch committed exactly once (ADR-0012).""" + +from __future__ import annotations + +from types import TracebackType +from typing import Any, Optional + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.baseline.baseline_performance import BaselinePerformance +from domain.property.property import Property +from repositories.baseline.baseline_repository import BaselineRepository +from repositories.epc.epc_repository import EpcRepository +from repositories.property.property_repository import PropertyRepository +from repositories.solar.solar_repository import SolarRepository +from repositories.unit_of_work import UnitOfWork + + +class FakePropertyRepo(PropertyRepository): + def __init__(self, by_id: dict[int, Property]) -> None: + self._by_id = by_id + + def get(self, property_id: int) -> Property: + return self._by_id[property_id] + + +class FakeEpcRepo(EpcRepository): + def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None: + self.saved: list[tuple[EpcPropertyData, Optional[int]]] = [] + self._by_property = by_property or {} + + def save( + self, + data: EpcPropertyData, + property_id: Optional[int] = None, + portfolio_id: Optional[int] = None, + ) -> int: + self.saved.append((data, property_id)) + if property_id is not None: + self._by_property[property_id] = data + return len(self.saved) + + def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover + raise NotImplementedError + + def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: + return self._by_property.get(property_id) + + +class FakeSolarRepo(SolarRepository): + def __init__(self) -> None: + self.saved: list[tuple[int, dict[str, Any]]] = [] + + def save(self, property_id: int, insights: dict[str, Any]) -> None: + self.saved.append((property_id, insights)) + + def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover + raise NotImplementedError + + +class FakeBaselineRepo(BaselineRepository): + def __init__(self) -> None: + self.saved: list[tuple[BaselinePerformance, int]] = [] + + def save(self, baseline: BaselinePerformance, property_id: int) -> int: + self.saved.append((baseline, property_id)) + return len(self.saved) + + def get_for_property( + self, property_id: int + ) -> Optional[BaselinePerformance]: # pragma: no cover + raise NotImplementedError + + +class FakeUnitOfWork(UnitOfWork): + """A unit that holds in-memory repos and counts commits.""" + + def __init__( + self, + *, + property: FakePropertyRepo, + epc: Optional[FakeEpcRepo] = None, + solar: Optional[FakeSolarRepo] = None, + baseline: Optional[FakeBaselineRepo] = None, + ) -> None: + self.property = property + self.epc = epc or FakeEpcRepo() + self.solar = solar or FakeSolarRepo() + self.baseline = baseline or FakeBaselineRepo() + self.commits = 0 + + def __enter__(self) -> "FakeUnitOfWork": + return self + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + return None + + def commit(self) -> None: + self.commits += 1 + + def rollback(self) -> None: + return None diff --git a/tests/orchestration/test_baseline_orchestrator.py b/tests/orchestration/test_baseline_orchestrator.py index 3958b9b4..a18628ec 100644 --- a/tests/orchestration/test_baseline_orchestrator.py +++ b/tests/orchestration/test_baseline_orchestrator.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Optional - import pytest from datatypes.epc.domain.epc import Epc @@ -14,30 +12,11 @@ from domain.baseline.performance import Performance from domain.baseline.rebaseliner import RebaselineNotImplemented, StubRebaseliner from domain.property.property import Property, PropertyIdentity from orchestration.baseline_orchestrator import BaselineOrchestrator -from repositories.baseline.baseline_repository import BaselineRepository -from repositories.property.property_repository import PropertyRepository - - -class _FakePropertyRepo(PropertyRepository): - def __init__(self, by_id: dict[int, Property]) -> None: - self._by_id = by_id - - def get(self, property_id: int) -> Property: - return self._by_id[property_id] - - -class _FakeBaselineRepo(BaselineRepository): - def __init__(self) -> None: - self.saved: list[tuple[BaselinePerformance, int]] = [] - - def save(self, baseline: BaselinePerformance, property_id: int) -> int: - self.saved.append((baseline, property_id)) - return len(self.saved) - - def get_for_property( - self, property_id: int - ) -> Optional[BaselinePerformance]: # pragma: no cover - raise NotImplementedError +from tests.orchestration.fakes import ( + FakeBaselineRepo, + FakePropertyRepo, + FakeUnitOfWork, +) def _property(*, sap_version: float) -> Property: @@ -58,25 +37,22 @@ def _property(*, sap_version: float) -> Property: ) -def _sap10_property() -> Property: - return _property(sap_version=10.2) - - -def test_run_establishes_and_persists_baseline_performance() -> None: +def test_run_establishes_persists_and_commits_the_batch_once() -> None: # Arrange - property_repo = _FakePropertyRepo({10: _sap10_property()}) - baseline_repo = _FakeBaselineRepo() + baseline_repo = FakeBaselineRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(sap_version=10.2)}), + baseline=baseline_repo, + ) orchestrator = BaselineOrchestrator( - property_repo=property_repo, - rebaseliner=StubRebaseliner(), - baseline_repo=baseline_repo, + unit_of_work=lambda: uow, rebaseliner=StubRebaseliner() ) # Act orchestrator.run([10]) - # Assert — one Baseline Performance persisted for property 10, both halves - # equal (no rebaselining), kWh read off the RHI. + # Assert — one Baseline Performance persisted (both halves equal, kWh off the + # RHI), and the batch committed exactly once. lodged = Performance( sap_score=72, epc_band=Epc.C, co2_emissions=1.8, primary_energy_intensity=180 ) @@ -92,19 +68,23 @@ def test_run_establishes_and_persists_baseline_performance() -> None: 10, ) ] + assert uow.commits == 1 -def test_run_raises_on_a_pre_sap10_property_and_persists_nothing() -> None: +def test_run_raises_on_a_pre_sap10_property_and_does_not_commit() -> None: # Arrange — a pre-SAP10 cert needs ML rebaselining, which is not wired yet. - property_repo = _FakePropertyRepo({10: _property(sap_version=9.94)}) - baseline_repo = _FakeBaselineRepo() + baseline_repo = FakeBaselineRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(sap_version=9.94)}), + baseline=baseline_repo, + ) orchestrator = BaselineOrchestrator( - property_repo=property_repo, - rebaseliner=StubRebaseliner(), - baseline_repo=baseline_repo, + unit_of_work=lambda: uow, rebaseliner=StubRebaseliner() ) - # Act / Assert — the raise propagates; no half-baked baseline is written. + # Act / Assert — the raise propagates; the batch is neither persisted nor + # committed (all-or-nothing). with pytest.raises(RebaselineNotImplemented): orchestrator.run([10]) assert baseline_repo.saved == [] + assert uow.commits == 0 diff --git a/tests/orchestration/test_first_run_pipeline_integration.py b/tests/orchestration/test_first_run_pipeline_integration.py index 55ca34ed..d96351c7 100644 --- a/tests/orchestration/test_first_run_pipeline_integration.py +++ b/tests/orchestration/test_first_run_pipeline_integration.py @@ -1,28 +1,43 @@ +"""End-to-end through-repos integration for First Run (ADR-0012, #1138). + +Real PostgresUnitOfWork over an ephemeral DB: Ingestion writes the EPC, Baseline +reads it back *through the repo* (not in memory), and a re-run replaces rather +than duplicates. Stub Modelling. The source clients are faked (no IO).""" + from __future__ import annotations +import dataclasses +import json from dataclasses import dataclass +from pathlib import Path from typing import Any, Optional +from sqlalchemy import Engine +from sqlmodel import Session, select + from datatypes.epc.domain.epc import Epc -from datatypes.epc.domain.epc_property_data import ( - EpcPropertyData, - RenewableHeatIncentive, -) +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from datatypes.epc.domain.mapper import EpcPropertyDataMapper from domain.baseline.rebaseliner import StubRebaseliner from domain.geospatial.coordinates import Coordinates -from domain.property.property import Property, PropertyIdentity +from infrastructure.postgres.baseline_performance_table import ( + BaselinePerformanceModel, +) +from infrastructure.postgres.epc_property_table import EpcPropertyModel +from infrastructure.postgres.property_table import PropertyRow from orchestration.baseline_orchestrator import BaselineOrchestrator from orchestration.first_run_pipeline import FirstRunPipeline from orchestration.ingestion_orchestrator import IngestionOrchestrator from orchestration.modelling_orchestrator import ModellingOrchestrator -from repositories.baseline.baseline_repository import BaselineRepository -from repositories.epc.epc_repository import EpcRepository +from repositories.baseline.baseline_postgres_repository import ( + BaselinePostgresRepository, +) from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.materials.materials_repository import MaterialsRepository -from repositories.property.property_repository import PropertyRepository +from repositories.postgres_unit_of_work import PostgresUnitOfWork from repositories.scenario.scenario_repository import ScenarioRepository -from repositories.solar.solar_repository import SolarRepository -from domain.baseline.baseline_performance import BaselinePerformance + +_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples" @dataclass @@ -32,48 +47,7 @@ class _FakeCommand: scenario_ids: list[int] -class _SharedEpcRepo(EpcRepository): - """Stands in for the persisted EPC slice both stages talk through.""" - - def __init__(self) -> None: - self._by_property: dict[int, EpcPropertyData] = {} - - def save( - self, - data: EpcPropertyData, - property_id: Optional[int] = None, - portfolio_id: Optional[int] = None, - ) -> int: - assert property_id is not None - self._by_property[property_id] = data - return property_id - - def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover - raise NotImplementedError - - def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: - return self._by_property.get(property_id) - - -class _RepoBackedPropertyRepo(PropertyRepository): - """Composes the Property from its identity row + the EPC slice in the shared - EPC repo — mirroring PropertyPostgresRepository, so the stages genuinely - hand off through repo state, not in memory.""" - - def __init__( - self, identities: dict[int, PropertyIdentity], epc_repo: _SharedEpcRepo - ) -> None: - self._identities = identities - self._epc_repo = epc_repo - - def get(self, property_id: int) -> Property: - return Property( - identity=self._identities[property_id], - epc=self._epc_repo.get_for_property(property_id), - ) - - -class _FakeEpcFetcher: +class _FetcherReturning: def __init__(self, epc: EpcPropertyData) -> None: self._epc = epc @@ -81,103 +55,91 @@ class _FakeEpcFetcher: return self._epc -class _NoCoordinatesGeospatialRepo(GeospatialRepository): +class _NoCoordinates(GeospatialRepository): def coordinates_for(self, uprn: int) -> Optional[Coordinates]: return None # skip the solar leg — not under test here -class _FakeSolarFetcher: +class _UnusedSolarFetcher: def get_building_insights( self, longitude: float, latitude: float ) -> dict[str, Any]: # pragma: no cover return {} -class _FakeSolarRepo(SolarRepository): - def save(self, property_id: int, insights: dict[str, Any]) -> None: # pragma: no cover - return None - - def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover - raise NotImplementedError - - -class _CollectingBaselineRepo(BaselineRepository): - def __init__(self) -> None: - self.saved: list[tuple[BaselinePerformance, int]] = [] - - def save(self, baseline: BaselinePerformance, property_id: int) -> int: - self.saved.append((baseline, property_id)) - return len(self.saved) - - def get_for_property( - self, property_id: int - ) -> Optional[BaselinePerformance]: # pragma: no cover - raise NotImplementedError - - -class _FakeScenarioRepo(ScenarioRepository): - pass - - -class _FakeMaterialsRepo(MaterialsRepository): - pass - - -def _ingestible_epc() -> EpcPropertyData: - epc = object.__new__(EpcPropertyData) - epc.energy_rating_current = 72 - epc.current_energy_efficiency_band = Epc.C - epc.co2_emissions_current = 1.8 - epc.energy_consumption_current = 180 - epc.sap_version = 10.2 - epc.renewable_heat_incentive = RenewableHeatIncentive( - space_heating_kwh=5000.0, water_heating_kwh=2000.0 +def _lodged_epc() -> EpcPropertyData: + # A real, persistable EPC (so it round-trips through the EPC repo), with the + # recorded-performance fields the sample leaves blank filled in so Baseline + # can read its Lodged Performance. + raw: dict[str, Any] = json.loads( + (_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text() + ) + epc = EpcPropertyDataMapper.from_api_response(raw) + return dataclasses.replace( + epc, + energy_rating_current=72, + current_energy_efficiency_band=Epc.C, + co2_emissions_current=1.8, + energy_consumption_current=180, ) - return epc -def test_baseline_reads_the_epc_ingestion_persisted_through_repos() -> None: - # Arrange — one property; the EPC the fetcher returns is what Ingestion - # persists and Baseline must then read back through the shared repo. - epc = _ingestible_epc() - epc_repo = _SharedEpcRepo() - identities = { - 10: PropertyIdentity( - portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=123 +def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun( + db_engine: Engine, +) -> None: + # Arrange — a property row to ingest against, and the EPC its fetcher returns. + with Session(db_engine) as session: + session.add( + PropertyRow( + id=10, + portfolio_id=1, + postcode="A0 0AA", + address="1 Some Street", + uprn=12345, + ) ) - } - property_repo = _RepoBackedPropertyRepo(identities, epc_repo) - baseline_repo = _CollectingBaselineRepo() + session.commit() + + def unit_of_work() -> PostgresUnitOfWork: + return PostgresUnitOfWork(lambda: Session(db_engine)) pipeline = FirstRunPipeline( ingestion=IngestionOrchestrator( - property_repo=property_repo, - epc_fetcher=_FakeEpcFetcher(epc), - geospatial_repo=_NoCoordinatesGeospatialRepo(), - solar_fetcher=_FakeSolarFetcher(), - epc_repo=epc_repo, - solar_repo=_FakeSolarRepo(), + unit_of_work=unit_of_work, + epc_fetcher=_FetcherReturning(_lodged_epc()), + geospatial_repo=_NoCoordinates(), + solar_fetcher=_UnusedSolarFetcher(), ), baseline=BaselineOrchestrator( - property_repo=property_repo, - rebaseliner=StubRebaseliner(), - baseline_repo=baseline_repo, + unit_of_work=unit_of_work, rebaseliner=StubRebaseliner() ), modelling=ModellingOrchestrator( - scenario_repo=_FakeScenarioRepo(), - materials_repo=_FakeMaterialsRepo(), + scenario_repo=ScenarioRepository(), + materials_repo=MaterialsRepository(), ), ) + command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7]) - # Act - pipeline.run(_FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])) + # Act — First Run, then a re-run over the same batch. + pipeline.run(command) + pipeline.run(command) - # Assert — a Baseline Performance landed for property 10, its Lodged half - # read off the very EPC Ingestion persisted. Only property_ids crossed the - # stage boundary; the EPC itself travelled through the repo. - assert len(baseline_repo.saved) == 1 - baseline, property_id = baseline_repo.saved[0] - assert property_id == 10 + # Assert — Baseline read the EPC Ingestion persisted (through the repo, only + # property_ids crossed the stage boundary), and the re-run replaced rather + # than duplicated either row. + with Session(db_engine) as session: + baseline = BaselinePostgresRepository(session).get_for_property(10) + epc_rows = session.exec( + select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10) + ).all() + baseline_rows = session.exec( + select(BaselinePerformanceModel).where( + BaselinePerformanceModel.property_id == 10 + ) + ).all() + + assert baseline is not None assert baseline.lodged.sap_score == 72 - assert baseline.lodged.epc_band == Epc.C - assert baseline.space_heating_kwh == 5000.0 + assert baseline.space_heating_kwh == 13120.0 + assert len(epc_rows) == 1 + assert len(baseline_rows) == 1 diff --git a/tests/orchestration/test_ingestion_orchestrator.py b/tests/orchestration/test_ingestion_orchestrator.py index 1c6a0f89..be2d86b4 100644 --- a/tests/orchestration/test_ingestion_orchestrator.py +++ b/tests/orchestration/test_ingestion_orchestrator.py @@ -1,8 +1,5 @@ -"""IngestionOrchestrator wires fetchers + repos with no real IO (ADR-0011). - -Tested entirely against fakes: it must fetch EPC + solar, thread the -Geospatial-resolved coordinates into the solar fetcher, and persist via repos. -""" +"""IngestionOrchestrator fetches the batch (no DB unit open), then writes it in +one Unit of Work and commits once (ADR-0012). Tested against fakes — no IO.""" from __future__ import annotations @@ -12,18 +9,13 @@ from datatypes.epc.domain.epc_property_data import EpcPropertyData from domain.geospatial.coordinates import Coordinates from domain.property.property import Property, PropertyIdentity from orchestration.ingestion_orchestrator import IngestionOrchestrator -from repositories.epc.epc_repository import EpcRepository from repositories.geospatial.geospatial_repository import GeospatialRepository -from repositories.property.property_repository import PropertyRepository -from repositories.solar.solar_repository import SolarRepository - - -class _FakePropertyRepo(PropertyRepository): - def __init__(self, by_id: dict[int, Property]) -> None: - self._by_id = by_id - - def get(self, property_id: int) -> Property: - return self._by_id[property_id] +from tests.orchestration.fakes import ( + FakeEpcRepo, + FakePropertyRepo, + FakeSolarRepo, + FakeUnitOfWork, +) class _FakeEpcFetcher: @@ -56,39 +48,6 @@ class _FakeSolarFetcher: return self.insights -class _FakeEpcRepo(EpcRepository): - def __init__(self) -> None: - self.saved: list[tuple[EpcPropertyData, Optional[int]]] = [] - - def save( - self, - data: EpcPropertyData, - property_id: Optional[int] = None, - portfolio_id: Optional[int] = None, - ) -> int: - self.saved.append((data, property_id)) - return 1 - - def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover - raise NotImplementedError - - def get_for_property( - self, property_id: int - ) -> Optional[EpcPropertyData]: # pragma: no cover - raise NotImplementedError - - -class _FakeSolarRepo(SolarRepository): - def __init__(self) -> None: - self.saved: list[tuple[int, dict[str, Any]]] = [] - - def save(self, property_id: int, insights: dict[str, Any]) -> None: - self.saved.append((property_id, insights)) - - def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover - raise NotImplementedError - - def _property(uprn: Optional[int]) -> Property: return Property( identity=PropertyIdentity( @@ -97,55 +56,59 @@ def _property(uprn: Optional[int]) -> Property: ) -def _epc() -> EpcPropertyData: - # A bare placeholder is enough — the orchestrator treats the EPC opaquely. - return object.__new__(EpcPropertyData) - - def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None: # Arrange - epc = _epc() + epc = object.__new__(EpcPropertyData) insights = {"name": "buildings/X"} - coords = Coordinates(longitude=-0.1278, latitude=51.5074) - epc_repo = _FakeEpcRepo() - solar_repo = _FakeSolarRepo() + epc_repo = FakeEpcRepo() + solar_repo = FakeSolarRepo() solar_fetcher = _FakeSolarFetcher(insights) + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=epc_repo, + solar=solar_repo, + ) orchestrator = IngestionOrchestrator( - property_repo=_FakePropertyRepo({10: _property(uprn=12345)}), + unit_of_work=lambda: uow, epc_fetcher=_FakeEpcFetcher(epc), - geospatial_repo=_FakeGeospatialRepo(coords), + geospatial_repo=_FakeGeospatialRepo( + Coordinates(longitude=-0.1278, latitude=51.5074) + ), solar_fetcher=solar_fetcher, - epc_repo=epc_repo, - solar_repo=solar_repo, ) # Act orchestrator.run([10]) - # Assert + # Assert — EPC persisted, coords threaded from the repo into the solar + # fetcher, solar persisted, batch committed once. assert epc_repo.saved == [(epc, 10)] - assert solar_fetcher.calls == [(-0.1278, 51.5074)] # coords threaded from repo + assert solar_fetcher.calls == [(-0.1278, 51.5074)] assert solar_repo.saved == [(10, insights)] + assert uow.commits == 1 def test_ingestion_skips_property_without_uprn() -> None: # Arrange - epc_repo = _FakeEpcRepo() - solar_repo = _FakeSolarRepo() + epc_repo = FakeEpcRepo() + solar_repo = FakeSolarRepo() solar_fetcher = _FakeSolarFetcher({}) + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=None)}), + epc=epc_repo, + solar=solar_repo, + ) orchestrator = IngestionOrchestrator( - property_repo=_FakePropertyRepo({10: _property(uprn=None)}), - epc_fetcher=_FakeEpcFetcher(_epc()), + unit_of_work=lambda: uow, + epc_fetcher=_FakeEpcFetcher(object.__new__(EpcPropertyData)), geospatial_repo=_FakeGeospatialRepo(None), solar_fetcher=solar_fetcher, - epc_repo=epc_repo, - solar_repo=solar_repo, ) # Act orchestrator.run([10]) - # Assert — nothing fetched or persisted for a UPRN-less property + # Assert — nothing fetched or persisted for a UPRN-less property. assert epc_repo.saved == [] assert solar_repo.saved == [] assert solar_fetcher.calls == [] @@ -153,17 +116,20 @@ def test_ingestion_skips_property_without_uprn() -> None: def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None: # Arrange - epc = _epc() - epc_repo = _FakeEpcRepo() - solar_repo = _FakeSolarRepo() + epc = object.__new__(EpcPropertyData) + epc_repo = FakeEpcRepo() + solar_repo = FakeSolarRepo() solar_fetcher = _FakeSolarFetcher({}) + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=epc_repo, + solar=solar_repo, + ) orchestrator = IngestionOrchestrator( - property_repo=_FakePropertyRepo({10: _property(uprn=12345)}), + unit_of_work=lambda: uow, epc_fetcher=_FakeEpcFetcher(epc), geospatial_repo=_FakeGeospatialRepo(None), solar_fetcher=solar_fetcher, - epc_repo=epc_repo, - solar_repo=solar_repo, ) # Act @@ -171,5 +137,5 @@ def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None: # Assert assert epc_repo.saved == [(epc, 10)] - assert solar_fetcher.calls == [] assert solar_repo.saved == [] + assert solar_fetcher.calls == []