from __future__ import annotations from collections.abc import Callable from dataclasses import dataclass from typing import Any, Optional, Protocol from datatypes.epc.domain.epc_property_data import EpcPropertyData from domain.epc_prediction.comparable_properties import ( ComparableProperty, select_comparables, ) from domain.epc_prediction.epc_prediction import EpcPrediction from domain.epc_prediction.prediction_target import ( PredictionTargetAttributes, build_prediction_target, ) from domain.geospatial.coordinates import Coordinates from domain.geospatial.spatial_reference import SpatialReference from domain.property.property import PropertyIdentity from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.unit_of_work import UnitOfWork class EpcFetcher(Protocol): """The slice of the New-EPC-API client Ingestion needs (e.g. EpcClientService).""" def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ... class ComparablesRepo(Protocol): """The cohort source for EPC Prediction (e.g. EpcComparablePropertiesRepository).""" def candidates_for(self, postcode: str) -> list[ComparableProperty]: ... class PredictionAttributesReader(Protocol): """Resolves an EPC-less Property's prediction attributes from Landlord Overrides (e.g. the property_overrides read adapter).""" def attributes_for(self, property_id: int) -> PredictionTargetAttributes: ... class SolarFetcher(Protocol): """The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient).""" def get_building_insights( self, longitude: float, latitude: float ) -> dict[str, Any]: ... @dataclass class _Prep: """A property's transactional inputs read in the unit phase, before external IO: its identity (postcode + uprn) and, when the predictor is wired, its resolved prediction attributes (so the no-unit fetch phase can predict).""" property_id: int identity: PropertyIdentity attributes: Optional[PredictionTargetAttributes] @dataclass class _Fetched: """One property's externally-fetched source data, awaiting the write phase.""" property_id: int uprn: int epc: Optional[EpcPropertyData] predicted_epc: Optional[EpcPropertyData] solar_insights: Optional[dict[str, Any]] spatial: Optional[SpatialReference] class IngestionOrchestrator: """Stage 1: acquire a batch's external source data and persist it. Runs in two phases so a DB connection is never held during external IO (ADR-0012): **fetch** the whole batch — read each UPRN, fetch its EPC, resolve its spatial reference (coordinates + planning protections) from the Geospatial reference Repo, thread the coordinates into the Solar fetcher — with *no unit open*; then **write** the batch in one Unit of Work and commit once. Fetchers never call each other (ADR-0011); the orchestrator threads the coordinate. The coordinates drive the Solar fetch transiently; the whole spatial reference is cached per-UPRN in the transactional store so Modelling reads the planning protections back off the Property (ADR-0020). The geospatial repo reads S3 reference data, not the transactional store, so it is injected separately rather than taken from the unit. """ def __init__( self, *, unit_of_work: Callable[[], UnitOfWork], epc_fetcher: EpcFetcher, geospatial_repo: GeospatialRepository, solar_fetcher: SolarFetcher, comparables_repo: Optional[ComparablesRepo] = None, prediction_attributes_reader: Optional[PredictionAttributesReader] = None, epc_prediction: Optional[EpcPrediction] = None, ) -> None: self._unit_of_work = unit_of_work self._epc_fetcher = epc_fetcher self._geospatial_repo = geospatial_repo self._solar_fetcher = solar_fetcher # EPC Prediction gap-fill (ADR-0031): when all three are wired, an EPC-less # Property is predicted from its postcode cohort and persisted to the # predicted slot. When any is absent, prediction is simply off and # ingestion behaves exactly as before. self._comparables_repo = comparables_repo self._prediction_attributes_reader = prediction_attributes_reader self._epc_prediction = epc_prediction def run(self, property_ids: list[int]) -> None: preps = self._prepare(property_ids) fetched = [self._fetch(prep) for prep in preps] self._persist(fetched) def _prepare(self, property_ids: list[int]) -> list[_Prep]: # A short read unit; properties with no UPRN (e.g. landlord_property_id # only) are skipped — a later Site-Notes path covers them. Prediction # attributes (Landlord Overrides) are resolved here, in-unit, so the # no-unit fetch phase holds everything it needs to predict. with self._unit_of_work() as uow: properties = uow.property.get_many(property_ids) preps: list[_Prep] = [] for property_id, prop in zip(property_ids, properties, strict=True): if prop.identity.uprn is None: continue attributes = ( self._prediction_attributes_reader.attributes_for(property_id) if self._prediction_attributes_reader is not None else None ) preps.append(_Prep(property_id, prop.identity, attributes)) return preps def _fetch(self, prep: _Prep) -> _Fetched: # No unit open here — this is the external-IO phase. One spatial # reference lookup yields the coordinates (which drive the Solar fetch) # and the planning protections (cached for Modelling, ADR-0020). uprn = prep.identity.uprn assert uprn is not None # _prepare drops UPRN-less properties epc = self._epc_fetcher.get_by_uprn(uprn) solar_insights: Optional[dict[str, Any]] = None spatial: Optional[SpatialReference] = self._geospatial_repo.spatial_for(uprn) coordinates = spatial.coordinates if spatial is not None else None if coordinates is not None: solar_insights = self._solar_fetcher.get_building_insights( coordinates.longitude, coordinates.latitude ) predicted_epc = ( self._predict(prep.identity, coordinates, prep.attributes) if epc is None else None ) return _Fetched( prep.property_id, uprn, epc, predicted_epc, solar_insights, spatial ) def _predict( self, identity: PropertyIdentity, coordinates: Optional[Coordinates], attributes: Optional[PredictionTargetAttributes], ) -> Optional[EpcPropertyData]: """Synthesise the EPC-less Property's picture from its postcode cohort, or None when the predictor is unwired, the Property is gated out (unknown property type), or no comparables survive selection (ADR-0031).""" if ( self._comparables_repo is None or self._epc_prediction is None or attributes is None ): return None target = build_prediction_target(identity, coordinates, attributes) if target is None: return None candidates = self._comparables_repo.candidates_for(identity.postcode) comparables = select_comparables(target, candidates) if not comparables.members: return None return self._epc_prediction.predict(target, comparables) def _persist(self, fetched: list[_Fetched]) -> None: with self._unit_of_work() as uow: for item in fetched: if item.epc is not None: uow.epc.save(item.epc, property_id=item.property_id) elif item.predicted_epc is not None: uow.epc.save( item.predicted_epc, property_id=item.property_id, source="predicted", ) # The live `solar` table is keyed by UPRN and needs the fetch's # coordinates; insights are only set when those coordinates were # resolved, so spatial.coordinates is non-None alongside them. if ( item.solar_insights is not None and item.spatial is not None and item.spatial.coordinates is not None ): uow.solar.save( item.uprn, longitude=item.spatial.coordinates.longitude, latitude=item.spatial.coordinates.latitude, insights=item.solar_insights, ) if item.spatial is not None: uow.spatial.save(item.uprn, item.spatial) uow.commit()