Model/tests/orchestration/test_ingestion_orchestrator.py
Khalim Conn-Kowlessar 48a488d1e9 refactor(orchestration): wire stages onto the UnitOfWork; per-stage commit (#1138)
Replaces the handler's whole-pipeline Session (one transaction across all
three stages, connection pinned during Ingestion's external IO) with a
Unit-of-Work per stage (ADR-0012, added here). Each stage runs its batch in
one unit and commits once; any property raising aborts the batch and the
subtask fails noisily.

- BaselineOrchestrator(unit_of_work, rebaseliner): one unit for the batch,
  commit once. Raise on a pre-SAP10 property leaves the unit uncommitted.
- IngestionOrchestrator(unit_of_work, epc_fetcher, geospatial_repo,
  solar_fetcher): fetch/write split — phase 1 fetches the whole batch (EPC /
  coords / solar) with NO unit open; phase 2 writes in one unit and commits.
  The connection is never held during external IO. Geospatial S3 repo stays
  injected (reference data, not transactional).
- Handler: module-scoped engine (pool reused across warm invocations) + a UoW
  factory; whole-pipeline `with Session` gone. `build_first_run_pipeline`
  composes on the factory. Source clients still behind the raising seam.
- ADR-0012 records the decision (per-stage boundary, all-or-nothing batch,
  idempotent re-run, fetch/write split, module-scoped engine). Modelling stub
  left untouched (no-op, no DB) per the ADR.

Tests: orchestrators on a shared FakeUnitOfWork (assert persisted batch +
exactly-once commit + no-commit-on-raise). New real-DB E2E integration test:
real PostgresUnitOfWork, Ingestion writes the EPC → Baseline reads it back
through the repo → re-run replaces, not duplicates (1 EPC row, 1 baseline row
after two runs). 121 pass in tests/; pyright strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 09:54:47 +00:00

141 lines
4.3 KiB
Python

"""IngestionOrchestrator fetches the batch (no DB unit open), then writes it in
one Unit of Work and commits once (ADR-0012). Tested against fakes — no IO."""
from __future__ import annotations
from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.geospatial.coordinates import Coordinates
from domain.property.property import Property, PropertyIdentity
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from repositories.geospatial.geospatial_repository import GeospatialRepository
from tests.orchestration.fakes import (
FakeEpcRepo,
FakePropertyRepo,
FakeSolarRepo,
FakeUnitOfWork,
)
class _FakeEpcFetcher:
def __init__(self, epc: Optional[EpcPropertyData]) -> None:
self.epc = epc
self.uprns: list[int] = []
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
self.uprns.append(uprn)
return self.epc
class _FakeGeospatialRepo(GeospatialRepository):
def __init__(self, coordinates: Optional[Coordinates]) -> None:
self._coordinates = coordinates
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return self._coordinates
class _FakeSolarFetcher:
def __init__(self, insights: dict[str, Any]) -> None:
self.insights = insights
self.calls: list[tuple[float, float]] = []
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]:
self.calls.append((longitude, latitude))
return self.insights
def _property(uprn: Optional[int]) -> Property:
return Property(
identity=PropertyIdentity(
portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=uprn
)
)
def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None:
# Arrange
epc = object.__new__(EpcPropertyData)
insights = {"name": "buildings/X"}
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher(insights)
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(
Coordinates(longitude=-0.1278, latitude=51.5074)
),
solar_fetcher=solar_fetcher,
)
# Act
orchestrator.run([10])
# Assert — EPC persisted, coords threaded from the repo into the solar
# fetcher, solar persisted, batch committed once.
assert epc_repo.saved == [(epc, 10)]
assert solar_fetcher.calls == [(-0.1278, 51.5074)]
assert solar_repo.saved == [(10, insights)]
assert uow.commits == 1
def test_ingestion_skips_property_without_uprn() -> None:
# Arrange
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=None)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(object.__new__(EpcPropertyData)),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
)
# Act
orchestrator.run([10])
# Assert — nothing fetched or persisted for a UPRN-less property.
assert epc_repo.saved == []
assert solar_repo.saved == []
assert solar_fetcher.calls == []
def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None:
# Arrange
epc = object.__new__(EpcPropertyData)
epc_repo = FakeEpcRepo()
solar_repo = FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=solar_repo,
)
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
)
# Act
orchestrator.run([10])
# Assert
assert epc_repo.saved == [(epc, 10)]
assert solar_repo.saved == []
assert solar_fetcher.calls == []