From 1696cccba60e63a33ffe8cb3d743781667c08879 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sat, 30 May 2026 19:58:21 +0000 Subject: [PATCH] feat(ingestion): IngestionOrchestrator end-to-end (#1134) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 1 of the pipeline: per property, read its UPRN from the property row, fetch its EPC, resolve coordinates from the Geospatial reference repo, thread those into the Solar fetcher, and persist EPC + solar via repos. Fetchers never call each other — the orchestrator threads the coordinate (ADR-0011). Coordinates are reference data (deterministic from UPRN), resolved transiently to drive the solar fetch rather than persisted per-property. Depends on thin EpcFetcher/SolarFetcher Protocols (EpcClientService and GoogleSolarApiClient satisfy them structurally). Unit-tested against fakes — no DB, gov API, or network: persists EPC, threads coords into solar, skips UPRN-less properties and skips solar when coordinates are absent. pyright clean. Co-Authored-By: Claude Opus 4.8 --- orchestration/ingestion_orchestrator.py | 72 +++++++ .../test_ingestion_orchestrator.py | 175 ++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 orchestration/ingestion_orchestrator.py create mode 100644 tests/orchestration/test_ingestion_orchestrator.py diff --git a/orchestration/ingestion_orchestrator.py b/orchestration/ingestion_orchestrator.py new file mode 100644 index 00000000..a3d60d8f --- /dev/null +++ b/orchestration/ingestion_orchestrator.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from typing import Any, Optional, Protocol + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from repositories.epc.epc_repository import EpcRepository +from repositories.geospatial.geospatial_repository import GeospatialRepository +from repositories.property.property_repository import PropertyRepository +from repositories.solar.solar_repository import SolarRepository + + +class EpcFetcher(Protocol): + """The slice of the New-EPC-API client Ingestion needs (e.g. EpcClientService).""" + + def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ... + + +class SolarFetcher(Protocol): + """The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient).""" + + def get_building_insights( + self, longitude: float, latitude: float + ) -> dict[str, Any]: ... + + +class IngestionOrchestrator: + """Stage 1: acquire a Property's external source data and persist it. + + For each property: read its UPRN from the property row, fetch its EPC, resolve + its coordinates from the Geospatial reference Repo, thread those into the Solar + fetcher, and persist EPC + solar via repos. The orchestrator is the only place + a Fetcher and a Repo meet, and it threads the coordinate from the Repo into the + Solar Fetcher — Fetchers never call each other (ADR-0011). Coordinates are + reference data (deterministic from UPRN), so they are resolved transiently to + drive the Solar fetch rather than persisted per-property. + """ + + def __init__( + self, + *, + property_repo: PropertyRepository, + epc_fetcher: EpcFetcher, + geospatial_repo: GeospatialRepository, + solar_fetcher: SolarFetcher, + epc_repo: EpcRepository, + solar_repo: SolarRepository, + ) -> None: + self._property_repo = property_repo + self._epc_fetcher = epc_fetcher + self._geospatial_repo = geospatial_repo + self._solar_fetcher = solar_fetcher + self._epc_repo = epc_repo + self._solar_repo = solar_repo + + def run(self, property_ids: list[int]) -> None: + for property_id in property_ids: + uprn = self._property_repo.get(property_id).identity.uprn + if uprn is None: + # No UPRN to fetch against (e.g. landlord_property_id-only); a + # later Site-Notes path covers these. + continue + + epc = self._epc_fetcher.get_by_uprn(uprn) + if epc is not None: + self._epc_repo.save(epc, property_id=property_id) + + coordinates = self._geospatial_repo.coordinates_for(uprn) + if coordinates is not None: + insights = self._solar_fetcher.get_building_insights( + coordinates.longitude, coordinates.latitude + ) + self._solar_repo.save(property_id, insights) diff --git a/tests/orchestration/test_ingestion_orchestrator.py b/tests/orchestration/test_ingestion_orchestrator.py new file mode 100644 index 00000000..1c6a0f89 --- /dev/null +++ b/tests/orchestration/test_ingestion_orchestrator.py @@ -0,0 +1,175 @@ +"""IngestionOrchestrator wires fetchers + repos with no real IO (ADR-0011). + +Tested entirely against fakes: it must fetch EPC + solar, thread the +Geospatial-resolved coordinates into the solar fetcher, and persist via repos. +""" + +from __future__ import annotations + +from typing import Any, Optional + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.geospatial.coordinates import Coordinates +from domain.property.property import Property, PropertyIdentity +from orchestration.ingestion_orchestrator import IngestionOrchestrator +from repositories.epc.epc_repository import EpcRepository +from repositories.geospatial.geospatial_repository import GeospatialRepository +from repositories.property.property_repository import PropertyRepository +from repositories.solar.solar_repository import SolarRepository + + +class _FakePropertyRepo(PropertyRepository): + def __init__(self, by_id: dict[int, Property]) -> None: + self._by_id = by_id + + def get(self, property_id: int) -> Property: + return self._by_id[property_id] + + +class _FakeEpcFetcher: + def __init__(self, epc: Optional[EpcPropertyData]) -> None: + self.epc = epc + self.uprns: list[int] = [] + + def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: + self.uprns.append(uprn) + return self.epc + + +class _FakeGeospatialRepo(GeospatialRepository): + def __init__(self, coordinates: Optional[Coordinates]) -> None: + self._coordinates = coordinates + + def coordinates_for(self, uprn: int) -> Optional[Coordinates]: + return self._coordinates + + +class _FakeSolarFetcher: + def __init__(self, insights: dict[str, Any]) -> None: + self.insights = insights + self.calls: list[tuple[float, float]] = [] + + def get_building_insights( + self, longitude: float, latitude: float + ) -> dict[str, Any]: + self.calls.append((longitude, latitude)) + return self.insights + + +class _FakeEpcRepo(EpcRepository): + def __init__(self) -> None: + self.saved: list[tuple[EpcPropertyData, Optional[int]]] = [] + + def save( + self, + data: EpcPropertyData, + property_id: Optional[int] = None, + portfolio_id: Optional[int] = None, + ) -> int: + self.saved.append((data, property_id)) + return 1 + + def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover + raise NotImplementedError + + def get_for_property( + self, property_id: int + ) -> Optional[EpcPropertyData]: # pragma: no cover + raise NotImplementedError + + +class _FakeSolarRepo(SolarRepository): + def __init__(self) -> None: + self.saved: list[tuple[int, dict[str, Any]]] = [] + + def save(self, property_id: int, insights: dict[str, Any]) -> None: + self.saved.append((property_id, insights)) + + def get(self, property_id: int) -> Optional[dict[str, Any]]: # pragma: no cover + raise NotImplementedError + + +def _property(uprn: Optional[int]) -> Property: + return Property( + identity=PropertyIdentity( + portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=uprn + ) + ) + + +def _epc() -> EpcPropertyData: + # A bare placeholder is enough — the orchestrator treats the EPC opaquely. + return object.__new__(EpcPropertyData) + + +def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None: + # Arrange + epc = _epc() + insights = {"name": "buildings/X"} + coords = Coordinates(longitude=-0.1278, latitude=51.5074) + epc_repo = _FakeEpcRepo() + solar_repo = _FakeSolarRepo() + solar_fetcher = _FakeSolarFetcher(insights) + orchestrator = IngestionOrchestrator( + property_repo=_FakePropertyRepo({10: _property(uprn=12345)}), + epc_fetcher=_FakeEpcFetcher(epc), + geospatial_repo=_FakeGeospatialRepo(coords), + solar_fetcher=solar_fetcher, + epc_repo=epc_repo, + solar_repo=solar_repo, + ) + + # Act + orchestrator.run([10]) + + # Assert + assert epc_repo.saved == [(epc, 10)] + assert solar_fetcher.calls == [(-0.1278, 51.5074)] # coords threaded from repo + assert solar_repo.saved == [(10, insights)] + + +def test_ingestion_skips_property_without_uprn() -> None: + # Arrange + epc_repo = _FakeEpcRepo() + solar_repo = _FakeSolarRepo() + solar_fetcher = _FakeSolarFetcher({}) + orchestrator = IngestionOrchestrator( + property_repo=_FakePropertyRepo({10: _property(uprn=None)}), + epc_fetcher=_FakeEpcFetcher(_epc()), + geospatial_repo=_FakeGeospatialRepo(None), + solar_fetcher=solar_fetcher, + epc_repo=epc_repo, + solar_repo=solar_repo, + ) + + # Act + orchestrator.run([10]) + + # Assert — nothing fetched or persisted for a UPRN-less property + assert epc_repo.saved == [] + assert solar_repo.saved == [] + assert solar_fetcher.calls == [] + + +def test_ingestion_persists_epc_but_skips_solar_when_no_coordinates() -> None: + # Arrange + epc = _epc() + epc_repo = _FakeEpcRepo() + solar_repo = _FakeSolarRepo() + solar_fetcher = _FakeSolarFetcher({}) + orchestrator = IngestionOrchestrator( + property_repo=_FakePropertyRepo({10: _property(uprn=12345)}), + epc_fetcher=_FakeEpcFetcher(epc), + geospatial_repo=_FakeGeospatialRepo(None), + solar_fetcher=solar_fetcher, + epc_repo=epc_repo, + solar_repo=solar_repo, + ) + + # Act + orchestrator.run([10]) + + # Assert + assert epc_repo.saved == [(epc, 10)] + assert solar_fetcher.calls == [] + assert solar_repo.saved == []