feat(ingestion): cache the spatial reference per UPRN

Slice 3c.4. Ingestion now resolves the whole spatial reference in one lookup
(`spatial_for`) — the coordinates drive the Solar fetch as before, and the
reference (coordinates + planning protections) is persisted per-UPRN via
`uow.spatial` in the same write batch, so Modelling can read the protections
back off the Property (ADR-0020). `_Fetched` carries the UPRN and the reference
into the write phase.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-04 17:24:03 +00:00
parent 234c4ae947
commit af5dfdf8e2
2 changed files with 68 additions and 14 deletions

View file

@ -5,6 +5,7 @@ from dataclasses import dataclass
from typing import Any, Optional, Protocol
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.geospatial.spatial_reference import SpatialReference
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.unit_of_work import UnitOfWork
@ -28,20 +29,24 @@ class _Fetched:
"""One property's externally-fetched source data, awaiting the write phase."""
property_id: int
uprn: int
epc: Optional[EpcPropertyData]
solar_insights: Optional[dict[str, Any]]
spatial: Optional[SpatialReference]
class IngestionOrchestrator:
"""Stage 1: acquire a batch's external source data and persist it.
Runs in two phases so a DB connection is never held during external IO
(ADR-0012): **fetch** the whole batch read each UPRN, fetch its EPC, resolve
coordinates from the Geospatial reference Repo, thread those into the Solar
fetcher with *no unit open*; then **write** the batch in one Unit of Work
and commit once. Fetchers never call each other (ADR-0011); the orchestrator
threads the coordinate. Coordinates are reference data (deterministic from
UPRN), resolved transiently to drive the Solar fetch, never persisted.
(ADR-0012): **fetch** the whole batch read each UPRN, fetch its EPC,
resolve its spatial reference (coordinates + planning protections) from the
Geospatial reference Repo, thread the coordinates into the Solar fetcher
with *no unit open*; then **write** the batch in one Unit of Work and commit
once. Fetchers never call each other (ADR-0011); the orchestrator threads
the coordinate. The coordinates drive the Solar fetch transiently; the whole
spatial reference is cached per-UPRN in the transactional store so Modelling
reads the planning protections back off the Property (ADR-0020).
The geospatial repo reads S3 reference data, not the transactional store, so
it is injected separately rather than taken from the unit.
@ -77,15 +82,17 @@ class IngestionOrchestrator:
]
def _fetch(self, property_id: int, uprn: int) -> _Fetched:
# No unit open here — this is the external-IO phase.
# No unit open here — this is the external-IO phase. One spatial
# reference lookup yields the coordinates (which drive the Solar fetch)
# and the planning protections (cached for Modelling, ADR-0020).
epc = self._epc_fetcher.get_by_uprn(uprn)
solar_insights: Optional[dict[str, Any]] = None
coordinates = self._geospatial_repo.coordinates_for(uprn)
if coordinates is not None:
spatial: Optional[SpatialReference] = self._geospatial_repo.spatial_for(uprn)
if spatial is not None and spatial.coordinates is not None:
solar_insights = self._solar_fetcher.get_building_insights(
coordinates.longitude, coordinates.latitude
spatial.coordinates.longitude, spatial.coordinates.latitude
)
return _Fetched(property_id, epc, solar_insights)
return _Fetched(property_id, uprn, epc, solar_insights, spatial)
def _persist(self, fetched: list[_Fetched]) -> None:
with self._unit_of_work() as uow:
@ -94,4 +101,6 @@ class IngestionOrchestrator:
uow.epc.save(item.epc, property_id=item.property_id)
if item.solar_insights is not None:
uow.solar.save(item.property_id, item.solar_insights)
if item.spatial is not None:
uow.spatial.save(item.uprn, item.spatial)
uow.commit()

View file

@ -7,6 +7,8 @@ from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from domain.property.property import Property, PropertyIdentity
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from repositories.geospatial.geospatial_repository import GeospatialRepository
@ -14,6 +16,7 @@ from tests.orchestration.fakes import (
FakeEpcRepo,
FakePropertyRepo,
FakeSolarRepo,
FakeSpatialRepo,
FakeUnitOfWork,
)
@ -29,11 +32,22 @@ class _FakeEpcFetcher:
class _FakeGeospatialRepo(GeospatialRepository):
def __init__(self, coordinates: Optional[Coordinates]) -> None:
self._coordinates = coordinates
def __init__(
self,
coordinates: Optional[Coordinates],
restrictions: PlanningRestrictions = PlanningRestrictions(),
) -> None:
self._reference: Optional[SpatialReference] = (
SpatialReference(coordinates=coordinates, restrictions=restrictions)
if coordinates is not None
else None
)
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return self._coordinates
return self._reference.coordinates if self._reference is not None else None
def spatial_for(self, uprn: int) -> Optional[SpatialReference]:
return self._reference
class _FakeSolarFetcher:
@ -88,6 +102,37 @@ def test_ingestion_persists_epc_and_threads_coords_into_solar() -> None:
assert uow.commits == 1
def test_ingestion_caches_the_spatial_reference_by_uprn() -> None:
# Arrange — the geospatial repo resolves a listed-building UPRN.
epc = object.__new__(EpcPropertyData)
reference = SpatialReference(
coordinates=Coordinates(longitude=-0.1278, latitude=51.5074),
restrictions=PlanningRestrictions(is_listed=True),
)
spatial_repo = FakeSpatialRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=FakeEpcRepo(),
spatial=spatial_repo,
)
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(epc),
geospatial_repo=_FakeGeospatialRepo(
reference.coordinates, restrictions=reference.restrictions
),
solar_fetcher=_FakeSolarFetcher({}),
)
# Act
orchestrator.run([10])
# Assert — the resolved reference is cached against the UPRN so Modelling
# reads the protections back (ADR-0020), and the batch commits once.
assert spatial_repo.saved == [(12345, reference)]
assert uow.commits == 1
def test_ingestion_skips_property_without_uprn() -> None:
# Arrange
epc_repo = FakeEpcRepo()