feat(ingestion): IngestionOrchestrator end-to-end (#1134)

Stage 1 of the pipeline: per property, read its UPRN from the property row,
fetch its EPC, resolve coordinates from the Geospatial reference repo, thread
those into the Solar fetcher, and persist EPC + solar via repos. Fetchers never
call each other — the orchestrator threads the coordinate (ADR-0011). Coordinates
are reference data (deterministic from UPRN), resolved transiently to drive the
solar fetch rather than persisted per-property.

Depends on thin EpcFetcher/SolarFetcher Protocols (EpcClientService and
GoogleSolarApiClient satisfy them structurally). Unit-tested against fakes — no
DB, gov API, or network: persists EPC, threads coords into solar, skips
UPRN-less properties and skips solar when coordinates are absent. pyright clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-30 19:58:21 +00:00
parent 3998ef586c
commit 1696cccba6
2 changed files with 247 additions and 0 deletions

View file

@ -0,0 +1,72 @@
from __future__ import annotations
from typing import Any, Optional, Protocol
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from repositories.epc.epc_repository import EpcRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
class EpcFetcher(Protocol):
"""The slice of the New-EPC-API client Ingestion needs (e.g. EpcClientService)."""
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ...
class SolarFetcher(Protocol):
"""The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient)."""
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]: ...
class IngestionOrchestrator:
"""Stage 1: acquire a Property's external source data and persist it.
For each property: read its UPRN from the property row, fetch its EPC, resolve
its coordinates from the Geospatial reference Repo, thread those into the Solar
fetcher, and persist EPC + solar via repos. The orchestrator is the only place
a Fetcher and a Repo meet, and it threads the coordinate from the Repo into the
Solar Fetcher Fetchers never call each other (ADR-0011). Coordinates are
reference data (deterministic from UPRN), so they are resolved transiently to
drive the Solar fetch rather than persisted per-property.
"""
def __init__(
self,
*,
property_repo: PropertyRepository,
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
epc_repo: EpcRepository,
solar_repo: SolarRepository,
) -> None:
self._property_repo = property_repo
self._epc_fetcher = epc_fetcher
self._geospatial_repo = geospatial_repo
self._solar_fetcher = solar_fetcher
self._epc_repo = epc_repo
self._solar_repo = solar_repo
def run(self, property_ids: list[int]) -> None:
for property_id in property_ids:
uprn = self._property_repo.get(property_id).identity.uprn
if uprn is None:
# No UPRN to fetch against (e.g. landlord_property_id-only); a
# later Site-Notes path covers these.
continue
epc = self._epc_fetcher.get_by_uprn(uprn)
if epc is not None:
self._epc_repo.save(epc, property_id=property_id)
coordinates = self._geospatial_repo.coordinates_for(uprn)
if coordinates is not None:
insights = self._solar_fetcher.get_building_insights(
coordinates.longitude, coordinates.latitude
)
self._solar_repo.save(property_id, insights)

View file

@ -0,0 +1,175 @@
"""IngestionOrchestrator wires fetchers + repos with no real IO (ADR-0011).
Tested entirely against fakes: it must fetch EPC + solar, thread the
Geospatial-resolved coordinates into the solar fetcher, and persist via repos.
"""
from __future__ import annotations
from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.geospatial.coordinates import Coordinates
from domain.property.property import Property, PropertyIdentity
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from repositories.epc.epc_repository import EpcRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
class _FakePropertyRepo(PropertyRepository):
def __init__(self, by_id: dict[int, Property]) -> None:
self._by_id = by_id
def get(self, property_id: int) -> Property:
return self._by_id[property_id]
class _FakeEpcFetcher:
def __init__(self, epc: Optional[EpcPropertyData]) -> None:
self.epc = epc
self.uprns: list[int] = []
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
self.uprns.append(uprn)
return self.epc
class _FakeGeospatialRepo(GeospatialRepository):
def __init__(self, coordinates: Optional[Coordinates]) -> None:
self._coordinates = coordinates
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return self._coordinates
class _FakeSolarFetcher:
def __init__(self, insights: dict[str, Any]) -> None:
self.insights = insights
self.calls: list[tuple[float, float]] = []
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]:
self.calls.append((longitude, latitude))
return self.insights
class _FakeEpcRepo(EpcRepository):
def __init__(self) -> None:
self.saved: list[tuple[EpcPropertyData, Optional[int]]] = []
def save(
self,
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> int:
self.saved.append((data, property_id))
return 1
def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover
raise NotImplementedError
def get_for_property(
self, property_id: int
) -> Optional[EpcPropertyData]: # pragma: no cover
raise NotImplementedError
class _FakeSolarRepo(SolarRepository):
def __init__(self) -> None:
self.saved: list[tuple[int, dict[str, Any]]] = []
def save(self, property_id: int, insights: dict[str, Any]) -> None:
self.saved.append((property_id, insights))
def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover
raise NotImplementedError
def _property(uprn: Optional[int]) -> Property:
return Property(
identity=PropertyIdentity(
portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=uprn
)
)
def _epc() -> EpcPropertyData:
# A bare placeholder is enough — the orchestrator treats the EPC opaquely.
return object.__new__(EpcPropertyData)
def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None:
# Arrange
epc = _epc()
insights = {"name": "buildings/X"}
coords = Coordinates(longitude=-0.1278, latitude=51.5074)
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher(insights)
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=12345)}),
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(coords),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
orchestrator.run([10])
# Assert
assert epc_repo.saved == [(epc, 10)]
assert solar_fetcher.calls == [(-0.1278, 51.5074)] # coords threaded from repo
assert solar_repo.saved == [(10, insights)]
def test_ingestion_skips_property_without_uprn() -> None:
# Arrange
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=None)}),
epc_fetcher=_FakeEpcFetcher(_epc()),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
orchestrator.run([10])
# Assert — nothing fetched or persisted for a UPRN-less property
assert epc_repo.saved == []
assert solar_repo.saved == []
assert solar_fetcher.calls == []
def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None:
# Arrange
epc = _epc()
epc_repo = _FakeEpcRepo()
solar_repo = _FakeSolarRepo()
solar_fetcher = _FakeSolarFetcher({})
orchestrator = IngestionOrchestrator(
property_repo=_FakePropertyRepo({10: _property(uprn=12345)}),
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(None),
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
)
# Act
orchestrator.run([10])
# Assert
assert epc_repo.saved == [(epc, 10)]
assert solar_fetcher.calls == []
assert solar_repo.saved == []