From af5dfdf8e26b91ee6c222a7199417c51a471f101 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 4 Jun 2026 17:24:03 +0000 Subject: [PATCH] feat(ingestion): cache the spatial reference per UPRN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice 3c.4. Ingestion now resolves the whole spatial reference in one lookup (`spatial_for`) — the coordinates drive the Solar fetch as before, and the reference (coordinates + planning protections) is persisted per-UPRN via `uow.spatial` in the same write batch, so Modelling can read the protections back off the Property (ADR-0020). `_Fetched` carries the UPRN and the reference into the write phase. Co-Authored-By: Claude Opus 4.8 --- orchestration/ingestion_orchestrator.py | 31 +++++++---- .../test_ingestion_orchestrator.py | 51 +++++++++++++++++-- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/orchestration/ingestion_orchestrator.py b/orchestration/ingestion_orchestrator.py index 1662ecf9..6263af0b 100644 --- a/orchestration/ingestion_orchestrator.py +++ b/orchestration/ingestion_orchestrator.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from typing import Any, Optional, Protocol from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.geospatial.spatial_reference import SpatialReference from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.unit_of_work import UnitOfWork @@ -28,20 +29,24 @@ class _Fetched: """One property's externally-fetched source data, awaiting the write phase.""" property_id: int + uprn: int epc: Optional[EpcPropertyData] solar_insights: Optional[dict[str, Any]] + spatial: Optional[SpatialReference] class IngestionOrchestrator: """Stage 1: acquire a batch's external source data and persist it. Runs in two phases so a DB connection is never held during external IO - (ADR-0012): **fetch** the whole batch — read each UPRN, fetch its EPC, resolve - coordinates from the Geospatial reference Repo, thread those into the Solar - fetcher — with *no unit open*; then **write** the batch in one Unit of Work - and commit once. Fetchers never call each other (ADR-0011); the orchestrator - threads the coordinate. Coordinates are reference data (deterministic from - UPRN), resolved transiently to drive the Solar fetch, never persisted. + (ADR-0012): **fetch** the whole batch — read each UPRN, fetch its EPC, + resolve its spatial reference (coordinates + planning protections) from the + Geospatial reference Repo, thread the coordinates into the Solar fetcher — + with *no unit open*; then **write** the batch in one Unit of Work and commit + once. Fetchers never call each other (ADR-0011); the orchestrator threads + the coordinate. The coordinates drive the Solar fetch transiently; the whole + spatial reference is cached per-UPRN in the transactional store so Modelling + reads the planning protections back off the Property (ADR-0020). The geospatial repo reads S3 reference data, not the transactional store, so it is injected separately rather than taken from the unit. @@ -77,15 +82,17 @@ class IngestionOrchestrator: ] def _fetch(self, property_id: int, uprn: int) -> _Fetched: - # No unit open here — this is the external-IO phase. + # No unit open here — this is the external-IO phase. One spatial + # reference lookup yields the coordinates (which drive the Solar fetch) + # and the planning protections (cached for Modelling, ADR-0020). epc = self._epc_fetcher.get_by_uprn(uprn) solar_insights: Optional[dict[str, Any]] = None - coordinates = self._geospatial_repo.coordinates_for(uprn) - if coordinates is not None: + spatial: Optional[SpatialReference] = self._geospatial_repo.spatial_for(uprn) + if spatial is not None and spatial.coordinates is not None: solar_insights = self._solar_fetcher.get_building_insights( - coordinates.longitude, coordinates.latitude + spatial.coordinates.longitude, spatial.coordinates.latitude ) - return _Fetched(property_id, epc, solar_insights) + return _Fetched(property_id, uprn, epc, solar_insights, spatial) def _persist(self, fetched: list[_Fetched]) -> None: with self._unit_of_work() as uow: @@ -94,4 +101,6 @@ class IngestionOrchestrator: uow.epc.save(item.epc, property_id=item.property_id) if item.solar_insights is not None: uow.solar.save(item.property_id, item.solar_insights) + if item.spatial is not None: + uow.spatial.save(item.uprn, item.spatial) uow.commit() diff --git a/tests/orchestration/test_ingestion_orchestrator.py b/tests/orchestration/test_ingestion_orchestrator.py index be2d86b4..0c178185 100644 --- a/tests/orchestration/test_ingestion_orchestrator.py +++ b/tests/orchestration/test_ingestion_orchestrator.py @@ -7,6 +7,8 @@ from typing import Any, Optional from datatypes.epc.domain.epc_property_data import EpcPropertyData from domain.geospatial.coordinates import Coordinates +from domain.geospatial.planning_restrictions import PlanningRestrictions +from domain.geospatial.spatial_reference import SpatialReference from domain.property.property import Property, PropertyIdentity from orchestration.ingestion_orchestrator import IngestionOrchestrator from repositories.geospatial.geospatial_repository import GeospatialRepository @@ -14,6 +16,7 @@ from tests.orchestration.fakes import ( FakeEpcRepo, FakePropertyRepo, FakeSolarRepo, + FakeSpatialRepo, FakeUnitOfWork, ) @@ -29,11 +32,22 @@ class _FakeEpcFetcher: class _FakeGeospatialRepo(GeospatialRepository): - def __init__(self, coordinates: Optional[Coordinates]) -> None: - self._coordinates = coordinates + def __init__( + self, + coordinates: Optional[Coordinates], + restrictions: PlanningRestrictions = PlanningRestrictions(), + ) -> None: + self._reference: Optional[SpatialReference] = ( + SpatialReference(coordinates=coordinates, restrictions=restrictions) + if coordinates is not None + else None + ) def coordinates_for(self, uprn: int) -> Optional[Coordinates]: - return self._coordinates + return self._reference.coordinates if self._reference is not None else None + + def spatial_for(self, uprn: int) -> Optional[SpatialReference]: + return self._reference class _FakeSolarFetcher: @@ -88,6 +102,37 @@ def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None: assert uow.commits == 1 +def test_ingestion_caches_the_spatial_reference_by_uprn() -> None: + # Arrange — the geospatial repo resolves a listed-building UPRN. + epc = object.__new__(EpcPropertyData) + reference = SpatialReference( + coordinates=Coordinates(longitude=-0.1278, latitude=51.5074), + restrictions=PlanningRestrictions(is_listed=True), + ) + spatial_repo = FakeSpatialRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=FakeEpcRepo(), + spatial=spatial_repo, + ) + orchestrator = IngestionOrchestrator( + unit_of_work=lambda: uow, + epc_fetcher=_FakeEpcFetcher(epc), + geospatial_repo=_FakeGeospatialRepo( + reference.coordinates, restrictions=reference.restrictions + ), + solar_fetcher=_FakeSolarFetcher({}), + ) + + # Act + orchestrator.run([10]) + + # Assert — the resolved reference is cached against the UPRN so Modelling + # reads the protections back (ADR-0020), and the batch commits once. + assert spatial_repo.saved == [(12345, reference)] + assert uow.commits == 1 + + def test_ingestion_skips_property_without_uprn() -> None: # Arrange epc_repo = FakeEpcRepo()