refactor(orchestration): wire stages onto the UnitOfWork; per-stage commit (#1138)

Replaces the handler's whole-pipeline Session (one transaction across all
three stages, connection pinned during Ingestion's external IO) with a
Unit-of-Work per stage (ADR-0012, added here). Each stage runs its batch in
one unit and commits once; any property raising aborts the batch and the
subtask fails noisily.

- BaselineOrchestrator(unit_of_work, rebaseliner): one unit for the batch,
  commit once. Raise on a pre-SAP10 property leaves the unit uncommitted.
- IngestionOrchestrator(unit_of_work, epc_fetcher, geospatial_repo,
  solar_fetcher): fetch/write split — phase 1 fetches the whole batch (EPC /
  coords / solar) with NO unit open; phase 2 writes in one unit and commits.
  The connection is never held during external IO. Geospatial S3 repo stays
  injected (reference data, not transactional).
- Handler: module-scoped engine (pool reused across warm invocations) + a UoW
  factory; whole-pipeline `with Session` gone. `build_first_run_pipeline`
  composes on the factory. Source clients still behind the raising seam.
- ADR-0012 records the decision (per-stage boundary, all-or-nothing batch,
  idempotent re-run, fetch/write split, module-scoped engine). Modelling stub
  left untouched (no-op, no DB) per the ADR.

Tests: orchestrators on a shared FakeUnitOfWork (assert persisted batch +
exactly-once commit + no-commit-on-raise). New real-DB E2E integration test:
real PostgresUnitOfWork, Ingestion writes the EPC → Baseline reads it back
through the repo → re-run replaces, not duplicates (1 EPC row, 1 baseline row
after two runs). 121 pass in tests/; pyright strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-31 09:54:47 +00:00
parent 559ae1b4ec
commit 48a488d1e9
8 changed files with 423 additions and 346 deletions

View file

@ -1,8 +1,10 @@
from __future__ import annotations
import os
from typing import Any, Protocol
from collections.abc import Callable
from typing import Any, Optional, Protocol
from sqlalchemy import Engine
from sqlmodel import Session
from applications.ara_first_run.ara_first_run_trigger_body import (
@ -20,19 +22,24 @@ from orchestration.ingestion_orchestrator import (
)
from orchestration.modelling_orchestrator import ModellingOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.baseline.baseline_postgres_repository import (
BaselinePostgresRepository,
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from repositories.unit_of_work import UnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
# Module-scoped so the connection pool is reused across warm Lambda invocations
# rather than rebuilt per invocation (ADR-0012).
_engine: Optional[Engine] = None
def _get_engine() -> Engine:
global _engine
if _engine is None:
_engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
return _engine
class _RunsFirstRun(Protocol):
"""The slice of FirstRunPipeline the handler delegates to."""
@ -44,8 +51,7 @@ def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None
"""Validate the raw event body and hand the command to the pipeline.
The handler's entire decision logic — kept as a named seam so it is
exercised without the Lambda runtime. No business logic lives here: validate,
then delegate (issue #1130/#1136).
exercised without the Lambda runtime. No business logic: validate, delegate.
"""
trigger = AraFirstRunTriggerBody.model_validate(body)
pipeline.run(trigger)
@ -53,35 +59,28 @@ def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None
def build_first_run_pipeline(
*,
session: Session,
unit_of_work: Callable[[], UnitOfWork],
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
) -> FirstRunPipeline:
"""Compose the real three-stage pipeline over Postgres-backed repos.
"""Compose the real three-stage pipeline on a Unit-of-Work factory.
The stages share the session's repos and hand off only ``property_ids``
through them (ADR-0011). The source clients are passed in rather than built
here because their config is not settled see ``_source_clients_from_env``.
Modelling is stubbed (#1136); its Scenario / Materials ports are seams.
Each stage opens its own unit(s) and commits per batch (ADR-0012); the
handler no longer holds a session. The source clients are passed in because
their config is not settled see ``_source_clients_from_env``. Modelling is
stubbed (#1136); its Scenario / Materials ports are seams.
"""
epc_repo = EpcPostgresRepository(session)
property_repo = PropertyPostgresRepository(session, epc_repo)
solar_repo = SolarPostgresRepository(session)
baseline_repo = BaselinePostgresRepository(session)
return FirstRunPipeline(
ingestion=IngestionOrchestrator(
property_repo=property_repo,
unit_of_work=unit_of_work,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
),
baseline=BaselineOrchestrator(
property_repo=property_repo,
unit_of_work=unit_of_work,
rebaseliner=StubRebaseliner(),
baseline_repo=baseline_repo,
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
@ -108,14 +107,15 @@ def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarF
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> None:
engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
engine = _get_engine()
unit_of_work: Callable[[], UnitOfWork] = lambda: PostgresUnitOfWork(
lambda: Session(engine)
)
epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env()
with Session(engine) as session:
pipeline = build_first_run_pipeline(
session=session,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
)
dispatch_first_run(body, pipeline=pipeline)
session.commit()
pipeline = build_first_run_pipeline(
unit_of_work=unit_of_work,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
)
dispatch_first_run(body, pipeline=pipeline)

View file

@ -0,0 +1,31 @@
# Each stage commits its batch once, through a Unit of Work
**Status: Accepted.** Refines [ADR-0011](0011-composable-stage-orchestrators.md) (composable stage orchestrators, stages communicate through repos) with the persistence/transaction mechanics for batch processing. Decided in a `/grill-with-docs` session (2026-05-31) after the First Run spine (#1136) landed, prompted by reviewing the handler's session lifecycle.
## Context
A First Run trigger carries a **batch** of ~30 `property_ids`. The pipeline runs that batch through Ingestion → Baseline → Modelling. The first cut (#1136) wrapped **all three stages in one `Session` and one final `commit()`** in the handler. That has three problems:
1. **A connection is pinned for the whole long-running pipeline.** SQLAlchemy checks out a pooled connection on the first statement and holds it until commit. Ingestion is the only IO-heavy stage (per property: EPC HTTP, Google-Solar HTTP, geospatial S3), so the connection sits checked-out-but-idle across all that external IO — the RDS-Proxy/pgbouncer "transaction-pinned connection" anti-pattern.
2. **One giant transaction** for the batch: long-held locks, identity-map growth, all-or-nothing across stages.
3. **Cross-stage hand-off through an *uncommitted* transaction.** Baseline reads Ingestion's writes only because they share one open transaction — which contradicts ADR-0011/0003's "stages hand off through *persisted* state." If a stage ever moves to its own lambda, this breaks.
A tempting fix — commit per property — is **rejected**: per-property commits are a commit storm that has overloaded the database before. The unit of commit must be the **batch**, not the property.
## Decision
- **Transaction boundary = one stage = one Unit of Work = one commit.** A batch yields ~3 commits (Ingestion, Baseline, Modelling), never N. No per-property commits.
- **All-or-nothing per batch, fail noisily.** Any property failing aborts that stage's unit (rollback); the exception propagates so `@subtask_handler` marks the subtask FAILED on the task table. Operators debug and re-run the batch. There is no per-property partial success.
- **Re-runs are idempotent.** Because stages commit independently, a re-run after a mid-pipeline failure re-executes already-committed earlier stages. So each stage's batch write **replaces** the rows for the batch's `property_ids` (delete-for-these-ids then bulk insert, or upsert) inside its unit. This is also what the future re-score-on-override path needs (re-baselining overwrites, never duplicates).
- **Bulk reads, load-whole (ADR-0002).** Repos expose `get_many(property_ids) -> Properties` returning fully-hydrated aggregates, implemented as one IN-filtered query per table composed in memory — a handful of round-trips per batch, not 30 × tables. No lean stage-specific read path.
- **Ingestion splits fetch from write.** Phase 1 fetches the whole batch (EPC / coordinates / solar) over HTTP/S3 with **no DB unit open**; phase 2 opens a unit and writes the batch, committing once. The connection is therefore held only for the short batch write, never across external IO. This sharpens the Fetcher-vs-Repo taxonomy of ADR-0011: Fetchers do IO outside any unit; Repos do DB inside the committed unit.
- **Mechanism: a `UnitOfWork`.** A `UnitOfWork` port + a `PostgresUnitOfWork` adapter (built on a module-scoped engine + sessionmaker) owns the session and constructs the DB-backed repos on it (`uow.property`, `uow.epc`, `uow.solar`, `uow.baseline`). It commits on explicit `commit()` and rolls back on any exception. Orchestrators take a `unit_of_work` factory plus their **non-DB** dependencies, injected separately: the EPC/Solar fetchers, the geospatial **S3** repo (reference data — read outside the transaction), and the Rebaseliner. Baseline uses one unit for the batch; Ingestion uses two (read uprns → fetch outside any unit → write batch).
## Consequences
- The orchestrators' dependency shape changes from "individual session-bound repos" to "a `unit_of_work` factory + non-DB deps". The #1134 Ingestion and #1135 Baseline orchestrators are refactored accordingly; `FirstRunPipeline` is unchanged (it still composes the three stages and threads only `property_ids`).
- Hard to reverse once every stage depends on the UoW — hence this ADR.
- Atomicity is **stage-level**, not per-property; correctness of the re-run workflow depends on the idempotent batch writes above.
- The engine + sessionmaker move to module scope so the pool is reused across warm Lambda invocations, rather than rebuilt per invocation (the existing `default_orchestrator` has the same per-invocation smell and should follow).
- EPC writes span child tables, so the idempotent "replace for these `property_ids`" must delete child rows too (cascade) before re-insert.
- The Modelling stub is left untouched this slice — its `run` is a no-op that touches no DB, so giving it a `unit_of_work` now would be an unused dependency. It takes a unit when its scoring body is built (the per-service Modelling grills).

View file

@ -1,5 +1,7 @@
from __future__ import annotations
from collections.abc import Callable
from datatypes.epc.domain.epc_property_data import (
EpcPropertyData,
RenewableHeatIncentive,
@ -7,50 +9,51 @@ from datatypes.epc.domain.epc_property_data import (
from domain.baseline.baseline_performance import BaselinePerformance
from domain.baseline.performance import lodged_performance
from domain.baseline.rebaseliner import Rebaseliner
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.property.property_repository import PropertyRepository
from repositories.unit_of_work import UnitOfWork
class BaselineOrchestrator:
"""Stage 2: establish each Property's Baseline Performance and persist it.
For each property: hydrate the Property aggregate via PropertyRepo, resolve
its Effective EPC, read Lodged Performance off it, run the Rebaseliner to
produce Effective Performance (equal to Lodged unless a trigger fires), and
persist the pair plus the deterministic kWh.
Runs the whole batch in **one** Unit of Work and commits once (ADR-0012):
for each property it hydrates the Property via the unit's PropertyRepo,
resolves the Effective EPC, reads Lodged Performance off it, runs the
Rebaseliner to produce Effective Performance, and persists the pair plus the
deterministic kWh. Any property raising aborts the batch the unit is left
uncommitted, so nothing persists and the subtask fails noisily.
Reads only from repos never a Fetcher or HTTP (ADR-0003). That is what
makes it byte-identical whether Ingestion ran milliseconds ago (First Run)
or last week (single-property review). The injected Rebaseliner is the
re-score-on-override seam: the future single-property flow re-runs the same
step after a Landlord Override changes the Effective EPC (ADR-0011).
Reads only from repos never a Fetcher or HTTP (ADR-0003) so it is
byte-identical whether Ingestion ran milliseconds ago (First Run) or last
week. The injected Rebaseliner is the re-score-on-override seam (ADR-0011).
"""
def __init__(
self,
*,
property_repo: PropertyRepository,
unit_of_work: Callable[[], UnitOfWork],
rebaseliner: Rebaseliner,
baseline_repo: BaselineRepository,
) -> None:
self._property_repo = property_repo
self._unit_of_work = unit_of_work
self._rebaseliner = rebaseliner
self._baseline_repo = baseline_repo
def run(self, property_ids: list[int]) -> None:
for property_id in property_ids:
effective_epc = self._property_repo.get(property_id).effective_epc
lodged = lodged_performance(effective_epc)
effective, reason = self._rebaseliner.rebaseline(effective_epc, lodged)
rhi = _require_rhi(effective_epc)
baseline = BaselinePerformance(
lodged=lodged,
effective=effective,
rebaseline_reason=reason,
space_heating_kwh=rhi.space_heating_kwh,
water_heating_kwh=rhi.water_heating_kwh,
)
self._baseline_repo.save(baseline, property_id)
with self._unit_of_work() as uow:
for property_id in property_ids:
effective_epc = uow.property.get(property_id).effective_epc
lodged = lodged_performance(effective_epc)
effective, reason = self._rebaseliner.rebaseline(
effective_epc, lodged
)
rhi = _require_rhi(effective_epc)
baseline = BaselinePerformance(
lodged=lodged,
effective=effective,
rebaseline_reason=reason,
space_heating_kwh=rhi.space_heating_kwh,
water_heating_kwh=rhi.water_heating_kwh,
)
uow.baseline.save(baseline, property_id)
uow.commit()
def _require_rhi(epc: EpcPropertyData) -> RenewableHeatIncentive:

View file

@ -1,12 +1,12 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Optional, Protocol
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from repositories.epc.epc_repository import EpcRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
from repositories.unit_of_work import UnitOfWork
class EpcFetcher(Protocol):
@ -23,50 +23,75 @@ class SolarFetcher(Protocol):
) -> dict[str, Any]: ...
class IngestionOrchestrator:
"""Stage 1: acquire a Property's external source data and persist it.
@dataclass
class _Fetched:
"""One property's externally-fetched source data, awaiting the write phase."""
For each property: read its UPRN from the property row, fetch its EPC, resolve
its coordinates from the Geospatial reference Repo, thread those into the Solar
fetcher, and persist EPC + solar via repos. The orchestrator is the only place
a Fetcher and a Repo meet, and it threads the coordinate from the Repo into the
Solar Fetcher Fetchers never call each other (ADR-0011). Coordinates are
reference data (deterministic from UPRN), so they are resolved transiently to
drive the Solar fetch rather than persisted per-property.
property_id: int
epc: Optional[EpcPropertyData]
solar_insights: Optional[dict[str, Any]]
class IngestionOrchestrator:
"""Stage 1: acquire a batch's external source data and persist it.
Runs in two phases so a DB connection is never held during external IO
(ADR-0012): **fetch** the whole batch read each UPRN, fetch its EPC, resolve
coordinates from the Geospatial reference Repo, thread those into the Solar
fetcher with *no unit open*; then **write** the batch in one Unit of Work
and commit once. Fetchers never call each other (ADR-0011); the orchestrator
threads the coordinate. Coordinates are reference data (deterministic from
UPRN), resolved transiently to drive the Solar fetch, never persisted.
The geospatial repo reads S3 reference data, not the transactional store, so
it is injected separately rather than taken from the unit.
"""
def __init__(
self,
*,
property_repo: PropertyRepository,
unit_of_work: Callable[[], UnitOfWork],
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
epc_repo: EpcRepository,
solar_repo: SolarRepository,
) -> None:
self._property_repo = property_repo
self._unit_of_work = unit_of_work
self._epc_fetcher = epc_fetcher
self._geospatial_repo = geospatial_repo
self._solar_fetcher = solar_fetcher
self._epc_repo = epc_repo
self._solar_repo = solar_repo
def run(self, property_ids: list[int]) -> None:
for property_id in property_ids:
uprn = self._property_repo.get(property_id).identity.uprn
if uprn is None:
# No UPRN to fetch against (e.g. landlord_property_id-only); a
# later Site-Notes path covers these.
continue
uprns = self._uprns_for(property_ids)
fetched = [self._fetch(property_id, uprn) for property_id, uprn in uprns]
self._persist(fetched)
epc = self._epc_fetcher.get_by_uprn(uprn)
if epc is not None:
self._epc_repo.save(epc, property_id=property_id)
def _uprns_for(self, property_ids: list[int]) -> list[tuple[int, int]]:
# A short read unit; properties with no UPRN (e.g. landlord_property_id
# only) are skipped — a later Site-Notes path covers them.
with self._unit_of_work() as uow:
pairs: list[tuple[int, int]] = []
for property_id in property_ids:
uprn = uow.property.get(property_id).identity.uprn
if uprn is not None:
pairs.append((property_id, uprn))
return pairs
coordinates = self._geospatial_repo.coordinates_for(uprn)
if coordinates is not None:
insights = self._solar_fetcher.get_building_insights(
coordinates.longitude, coordinates.latitude
)
self._solar_repo.save(property_id, insights)
def _fetch(self, property_id: int, uprn: int) -> _Fetched:
# No unit open here — this is the external-IO phase.
epc = self._epc_fetcher.get_by_uprn(uprn)
solar_insights: Optional[dict[str, Any]] = None
coordinates = self._geospatial_repo.coordinates_for(uprn)
if coordinates is not None:
solar_insights = self._solar_fetcher.get_building_insights(
coordinates.longitude, coordinates.latitude
)
return _Fetched(property_id, epc, solar_insights)
def _persist(self, fetched: list[_Fetched]) -> None:
with self._unit_of_work() as uow:
for item in fetched:
if item.epc is not None:
uow.epc.save(item.epc, property_id=item.property_id)
if item.solar_insights is not None:
uow.solar.save(item.property_id, item.solar_insights)
uow.commit()

View file

@ -0,0 +1,110 @@
"""In-memory fakes for orchestrator unit tests (no DB, no network).
A `FakeUnitOfWork` exposes dict-backed fake repos and records commits, so a
test can drive an orchestrator and then assert what was persisted and that the
batch committed exactly once (ADR-0012)."""
from __future__ import annotations
from types import TracebackType
from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.baseline.baseline_performance import BaselinePerformance
from domain.property.property import Property
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
from repositories.unit_of_work import UnitOfWork
class FakePropertyRepo(PropertyRepository):
def __init__(self, by_id: dict[int, Property]) -> None:
self._by_id = by_id
def get(self, property_id: int) -> Property:
return self._by_id[property_id]
class FakeEpcRepo(EpcRepository):
def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None:
self.saved: list[tuple[EpcPropertyData, Optional[int]]] = []
self._by_property = by_property or {}
def save(
self,
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> int:
self.saved.append((data, property_id))
if property_id is not None:
self._by_property[property_id] = data
return len(self.saved)
def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover
raise NotImplementedError
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
return self._by_property.get(property_id)
class FakeSolarRepo(SolarRepository):
def __init__(self) -> None:
self.saved: list[tuple[int, dict[str, Any]]] = []
def save(self, property_id: int, insights: dict[str, Any]) -> None:
self.saved.append((property_id, insights))
def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover
raise NotImplementedError
class FakeBaselineRepo(BaselineRepository):
def __init__(self) -> None:
self.saved: list[tuple[BaselinePerformance, int]] = []
def save(self, baseline: BaselinePerformance, property_id: int) -> int:
self.saved.append((baseline, property_id))
return len(self.saved)
def get_for_property(
self, property_id: int
) -> Optional[BaselinePerformance]: # pragma: no cover
raise NotImplementedError
class FakeUnitOfWork(UnitOfWork):
"""A unit that holds in-memory repos and counts commits."""
def __init__(
self,
*,
property: FakePropertyRepo,
epc: Optional[FakeEpcRepo] = None,
solar: Optional[FakeSolarRepo] = None,
baseline: Optional[FakeBaselineRepo] = None,
) -> None:
self.property = property
self.epc = epc or FakeEpcRepo()
self.solar = solar or FakeSolarRepo()
self.baseline = baseline or FakeBaselineRepo()
self.commits = 0
def __enter__(self) -> "FakeUnitOfWork":
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
return None
def commit(self) -> None:
self.commits += 1
def rollback(self) -> None:
return None

View file

@ -1,7 +1,5 @@
from __future__ import annotations
from typing import Optional
import pytest
from datatypes.epc.domain.epc import Epc
@ -14,30 +12,11 @@ from domain.baseline.performance import Performance
from domain.baseline.rebaseliner import RebaselineNotImplemented, StubRebaseliner
from domain.property.property import Property, PropertyIdentity
from orchestration.baseline_orchestrator import BaselineOrchestrator
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.property.property_repository import PropertyRepository
class _FakePropertyRepo(PropertyRepository):
def __init__(self, by_id: dict[int, Property]) -> None:
self._by_id = by_id
def get(self, property_id: int) -> Property:
return self._by_id[property_id]
class _FakeBaselineRepo(BaselineRepository):
def __init__(self) -> None:
self.saved: list[tuple[BaselinePerformance, int]] = []
def save(self, baseline: BaselinePerformance, property_id: int) -> int:
self.saved.append((baseline, property_id))
return len(self.saved)
def get_for_property(
self, property_id: int
) -> Optional[BaselinePerformance]: # pragma: no cover
raise NotImplementedError
from tests.orchestration.fakes import (
FakeBaselineRepo,
FakePropertyRepo,
FakeUnitOfWork,
)
def _property(*, sap_version: float) -> Property:
@ -58,25 +37,22 @@ def _property(*, sap_version: float) -> Property:
)
def _sap10_property() -> Property:
return _property(sap_version=10.2)
def test_run_establishes_and_persists_baseline_performance() -> None:
def test_run_establishes_persists_and_commits_the_batch_once() -> None:
# Arrange
property_repo = _FakePropertyRepo({10: _sap10_property()})
baseline_repo = _FakeBaselineRepo()
baseline_repo = FakeBaselineRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(sap_version=10.2)}),
baseline=baseline_repo,
)
orchestrator = BaselineOrchestrator(
property_repo=property_repo,
rebaseliner=StubRebaseliner(),
baseline_repo=baseline_repo,
unit_of_work=lambda: uow, rebaseliner=StubRebaseliner()
)
# Act
orchestrator.run([10])
# Assert — one Baseline Performance persisted for property 10, both halves
# equal (no rebaselining), kWh read off the RHI.
# Assert — one Baseline Performance persisted (both halves equal, kWh off the
# RHI), and the batch committed exactly once.
lodged = Performance(
sap_score=72, epc_band=Epc.C, co2_emissions=1.8, primary_energy_intensity=180
)
@ -92,19 +68,23 @@ def test_run_establishes_and_persists_baseline_performance() -> None:
10,
)
]
assert uow.commits == 1
def test_run_raises_on_a_pre_sap10_property_and_persists_nothing() -> None:
def test_run_raises_on_a_pre_sap10_property_and_does_not_commit() -> None:
# Arrange — a pre-SAP10 cert needs ML rebaselining, which is not wired yet.
property_repo = _FakePropertyRepo({10: _property(sap_version=9.94)})
baseline_repo = _FakeBaselineRepo()
baseline_repo = FakeBaselineRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(sap_version=9.94)}),
baseline=baseline_repo,
)
orchestrator = BaselineOrchestrator(
property_repo=property_repo,
rebaseliner=StubRebaseliner(),
baseline_repo=baseline_repo,
unit_of_work=lambda: uow, rebaseliner=StubRebaseliner()
)
# Act / Assert — the raise propagates; no half-baked baseline is written.
# Act / Assert — the raise propagates; the batch is neither persisted nor
# committed (all-or-nothing).
with pytest.raises(RebaselineNotImplemented):
orchestrator.run([10])
assert baseline_repo.saved == []
assert uow.commits == 0

View file

@ -1,28 +1,43 @@
"""End-to-end through-repos integration for First Run (ADR-0012, #1138).
Real PostgresUnitOfWork over an ephemeral DB: Ingestion writes the EPC, Baseline
reads it back *through the repo* (not in memory), and a re-run replaces rather
than duplicates. Stub Modelling. The source clients are faked (no IO)."""
from __future__ import annotations
import dataclasses
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from sqlalchemy import Engine
from sqlmodel import Session, select
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import (
EpcPropertyData,
RenewableHeatIncentive,
)
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.baseline.rebaseliner import StubRebaseliner
from domain.geospatial.coordinates import Coordinates
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.baseline_performance_table import (
BaselinePerformanceModel,
)
from infrastructure.postgres.epc_property_table import EpcPropertyModel
from infrastructure.postgres.property_table import PropertyRow
from orchestration.baseline_orchestrator import BaselineOrchestrator
from orchestration.first_run_pipeline import FirstRunPipeline
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from orchestration.modelling_orchestrator import ModellingOrchestrator
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.baseline.baseline_postgres_repository import (
BaselinePostgresRepository,
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.property.property_repository import PropertyRepository
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
from repositories.solar.solar_repository import SolarRepository
from domain.baseline.baseline_performance import BaselinePerformance
_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples"
@dataclass
@ -32,48 +47,7 @@ class _FakeCommand:
scenario_ids: list[int]
class _SharedEpcRepo(EpcRepository):
"""Stands in for the persisted EPC slice both stages talk through."""
def __init__(self) -> None:
self._by_property: dict[int, EpcPropertyData] = {}
def save(
self,
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> int:
assert property_id is not None
self._by_property[property_id] = data
return property_id
def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover
raise NotImplementedError
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
return self._by_property.get(property_id)
class _RepoBackedPropertyRepo(PropertyRepository):
"""Composes the Property from its identity row + the EPC slice in the shared
EPC repo mirroring PropertyPostgresRepository, so the stages genuinely
hand off through repo state, not in memory."""
def __init__(
self, identities: dict[int, PropertyIdentity], epc_repo: _SharedEpcRepo
) -> None:
self._identities = identities
self._epc_repo = epc_repo
def get(self, property_id: int) -> Property:
return Property(
identity=self._identities[property_id],
epc=self._epc_repo.get_for_property(property_id),
)
class _FakeEpcFetcher:
class _FetcherReturning:
def __init__(self, epc: EpcPropertyData) -> None:
self._epc = epc
@ -81,103 +55,91 @@ class _FakeEpcFetcher:
return self._epc
class _NoCoordinatesGeospatialRepo(GeospatialRepository):
class _NoCoordinates(GeospatialRepository):
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return None # skip the solar leg — not under test here
class _FakeSolarFetcher:
class _UnusedSolarFetcher:
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]: # pragma: no cover
return {}
class _FakeSolarRepo(SolarRepository):
def save(self, property_id: int, insights: dict[str, Any]) -> None: # pragma: no cover
return None
def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover
raise NotImplementedError
class _CollectingBaselineRepo(BaselineRepository):
def __init__(self) -> None:
self.saved: list[tuple[BaselinePerformance, int]] = []
def save(self, baseline: BaselinePerformance, property_id: int) -> int:
self.saved.append((baseline, property_id))
return len(self.saved)
def get_for_property(
self, property_id: int
) -> Optional[BaselinePerformance]: # pragma: no cover
raise NotImplementedError
class _FakeScenarioRepo(ScenarioRepository):
pass
class _FakeMaterialsRepo(MaterialsRepository):
pass
def _ingestible_epc() -> EpcPropertyData:
epc = object.__new__(EpcPropertyData)
epc.energy_rating_current = 72
epc.current_energy_efficiency_band = Epc.C
epc.co2_emissions_current = 1.8
epc.energy_consumption_current = 180
epc.sap_version = 10.2
epc.renewable_heat_incentive = RenewableHeatIncentive(
space_heating_kwh=5000.0, water_heating_kwh=2000.0
def _lodged_epc() -> EpcPropertyData:
# A real, persistable EPC (so it round-trips through the EPC repo), with the
# recorded-performance fields the sample leaves blank filled in so Baseline
# can read its Lodged Performance.
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
epc = EpcPropertyDataMapper.from_api_response(raw)
return dataclasses.replace(
epc,
energy_rating_current=72,
current_energy_efficiency_band=Epc.C,
co2_emissions_current=1.8,
energy_consumption_current=180,
)
return epc
def test_baseline_reads_the_epc_ingestion_persisted_through_repos() -> None:
# Arrange — one property; the EPC the fetcher returns is what Ingestion
# persists and Baseline must then read back through the shared repo.
epc = _ingestible_epc()
epc_repo = _SharedEpcRepo()
identities = {
10: PropertyIdentity(
portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=123
def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
db_engine: Engine,
) -> None:
# Arrange — a property row to ingest against, and the EPC its fetcher returns.
with Session(db_engine) as session:
session.add(
PropertyRow(
id=10,
portfolio_id=1,
postcode="A0 0AA",
address="1 Some Street",
uprn=12345,
)
)
}
property_repo = _RepoBackedPropertyRepo(identities, epc_repo)
baseline_repo = _CollectingBaselineRepo()
session.commit()
def unit_of_work() -> PostgresUnitOfWork:
return PostgresUnitOfWork(lambda: Session(db_engine))
pipeline = FirstRunPipeline(
ingestion=IngestionOrchestrator(
property_repo=property_repo,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_NoCoordinatesGeospatialRepo(),
solar_fetcher=_FakeSolarFetcher(),
epc_repo=epc_repo,
solar_repo=_FakeSolarRepo(),
unit_of_work=unit_of_work,
epc_fetcher=_FetcherReturning(_lodged_epc()),
geospatial_repo=_NoCoordinates(),
solar_fetcher=_UnusedSolarFetcher(),
),
baseline=BaselineOrchestrator(
property_repo=property_repo,
rebaseliner=StubRebaseliner(),
baseline_repo=baseline_repo,
unit_of_work=unit_of_work, rebaseliner=StubRebaseliner()
),
modelling=ModellingOrchestrator(
scenario_repo=_FakeScenarioRepo(),
materials_repo=_FakeMaterialsRepo(),
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
),
)
command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])
# Act
pipeline.run(_FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7]))
# Act — First Run, then a re-run over the same batch.
pipeline.run(command)
pipeline.run(command)
# Assert — a Baseline Performance landed for property 10, its Lodged half
# read off the very EPC Ingestion persisted. Only property_ids crossed the
# stage boundary; the EPC itself travelled through the repo.
assert len(baseline_repo.saved) == 1
baseline, property_id = baseline_repo.saved[0]
assert property_id == 10
# Assert — Baseline read the EPC Ingestion persisted (through the repo, only
# property_ids crossed the stage boundary), and the re-run replaced rather
# than duplicated either row.
with Session(db_engine) as session:
baseline = BaselinePostgresRepository(session).get_for_property(10)
epc_rows = session.exec(
select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10)
).all()
baseline_rows = session.exec(
select(BaselinePerformanceModel).where(
BaselinePerformanceModel.property_id == 10
)
).all()
assert baseline is not None
assert baseline.lodged.sap_score == 72
assert baseline.lodged.epc_band == Epc.C
assert baseline.space_heating_kwh == 5000.0
assert baseline.space_heating_kwh == 13120.0
assert len(epc_rows) == 1
assert len(baseline_rows) == 1

View file

@ -1,8 +1,5 @@
"""IngestionOrchestrator wires fetchers + repos with no real IO (ADR-0011).
Tested entirely against fakes: it must fetch EPC + solar, thread the
Geospatial-resolved coordinates into the solar fetcher, and persist via repos.
"""
"""IngestionOrchestrator fetches the batch (no DB unit open), then writes it in
one Unit of Work and commits once (ADR-0012). Tested against fakes no IO."""
from __future__ import annotations
@ -12,18 +9,13 @@ from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.geospatial.coordinates import Coordinates
from domain.property.property import Property, PropertyIdentity
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from repositories.epc.epc_repository import EpcRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
class _FakePropertyRepo(PropertyRepository):
def __init__(self, by_id: dict[int, Property]) -> None:
self._by_id = by_id
def get(self, property_id: int) -> Property:
return self._by_id[property_id]
from tests.orchestration.fakes import (
FakeEpcRepo,
FakePropertyRepo,
FakeSolarRepo,
FakeUnitOfWork,
)
class _FakeEpcFetcher:
@ -56,39 +48,6 @@ class _FakeSolarFetcher:
return self.insights
class _FakeEpcRepo(EpcRepository):
def __init__(self) -> None:
self.saved: list[tuple[EpcPropertyData, Optional[int]]] = []
def save(
self,
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> int:
self.saved.append((data, property_id))
return 1
def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover
raise NotImplementedError
def get_for_property(
self, property_id: int
) -> Optional[EpcPropertyData]: # pragma: no cover
raise NotImplementedError
class _FakeSolarRepo(SolarRepository):
def __init__(self) -> None:
self.saved: list[tuple[int, dict[str, Any]]] = []
def save(self, property_id: int, insights: dict[str, Any]) -> None:
self.saved.append((property_id, insights))
def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover
raise NotImplementedError
def _property(uprn: Optional[int]) -> Property:
return Property(
identity=PropertyIdentity(
@ -97,55 +56,59 @@ def _property(uprn: Optional[int]) -> Property:
)
def _epc() -> EpcPropertyData:
# A bare placeholder is enough — the orchestrator treats the EPC opaquely.
return object.__new__(EpcPropertyData)
def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None:
# Arrange
epc = _epc()
epc = object.__new__(EpcPropertyData)
insights = {"name": "buildings/X"}
coords = Coordinates(longitude=-0.1278, latitude=51.5074)
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher(insights)
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=12345)}),
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(coords),
geospatial_repo=_FakeGeospatialRepo(
Coordinates(longitude=-0.1278, latitude=51.5074)
),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
orchestrator.run([10])
# Assert
# Assert — EPC persisted, coords threaded from the repo into the solar
# fetcher, solar persisted, batch committed once.
assert epc_repo.saved == [(epc, 10)]
assert solar_fetcher.calls == [(-0.1278, 51.5074)] # coords threaded from repo
assert solar_fetcher.calls == [(-0.1278, 51.5074)]
assert solar_repo.saved == [(10, insights)]
assert uow.commits == 1
def test_ingestion_skips_property_without_uprn() -> None:
# Arrange
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=None)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=None)}),
epc_fetcher=_FakeEpcFetcher(_epc()),
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(object.__new__(EpcPropertyData)),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
orchestrator.run([10])
# Assert — nothing fetched or persisted for a UPRN-less property
# Assert — nothing fetched or persisted for a UPRN-less property.
assert epc_repo.saved == []
assert solar_repo.saved == []
assert solar_fetcher.calls == []
@ -153,17 +116,20 @@ def test_ingestion_skips_property_without_uprn() -> None:
def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None:
# Arrange
epc = _epc()
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
epc = object.__new__(EpcPropertyData)
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=12345)}),
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
@ -171,5 +137,5 @@ def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None:
# Assert
assert epc_repo.saved == [(epc, 10)]
assert solar_fetcher.calls == []
assert solar_repo.saved == []
assert solar_fetcher.calls == []