diff --git a/orchestration/ingestion_orchestrator.py b/orchestration/ingestion_orchestrator.py index f88f4771..0d4a5cff 100644 --- a/orchestration/ingestion_orchestrator.py +++ b/orchestration/ingestion_orchestrator.py @@ -5,7 +5,18 @@ from dataclasses import dataclass from typing import Any, Optional, Protocol from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.epc_prediction.comparable_properties import ( + Comparable, + select_comparables, +) +from domain.epc_prediction.epc_prediction import EpcPrediction +from domain.epc_prediction.prediction_target import ( + PredictionTargetAttributes, + build_prediction_target, +) +from domain.geospatial.coordinates import Coordinates from domain.geospatial.spatial_reference import SpatialReference +from domain.property.property import PropertyIdentity from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.unit_of_work import UnitOfWork @@ -16,6 +27,19 @@ class EpcFetcher(Protocol): def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ... +class ComparablesRepo(Protocol): + """The cohort source for EPC Prediction (e.g. EpcComparablePropertiesRepository).""" + + def candidates_for(self, postcode: str) -> list[Comparable]: ... + + +class PredictionAttributesReader(Protocol): + """Resolves an EPC-less Property's prediction attributes from Landlord + Overrides (e.g. the property_overrides read adapter).""" + + def attributes_for(self, property_id: int) -> PredictionTargetAttributes: ... + + class SolarFetcher(Protocol): """The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient).""" @@ -24,6 +48,17 @@ class SolarFetcher(Protocol): ) -> dict[str, Any]: ... +@dataclass +class _Prep: + """A property's transactional inputs read in the unit phase, before external + IO: its identity (postcode + uprn) and, when the predictor is wired, its + resolved prediction attributes (so the no-unit fetch phase can predict).""" + + property_id: int + identity: PropertyIdentity + attributes: Optional[PredictionTargetAttributes] + + @dataclass class _Fetched: """One property's externally-fetched source data, awaiting the write phase.""" @@ -31,6 +66,7 @@ class _Fetched: property_id: int uprn: int epc: Optional[EpcPropertyData] + predicted_epc: Optional[EpcPropertyData] solar_insights: Optional[dict[str, Any]] spatial: Optional[SpatialReference] @@ -59,46 +95,104 @@ class IngestionOrchestrator: epc_fetcher: EpcFetcher, geospatial_repo: GeospatialRepository, solar_fetcher: SolarFetcher, + comparables_repo: Optional[ComparablesRepo] = None, + prediction_attributes_reader: Optional[PredictionAttributesReader] = None, + epc_prediction: Optional[EpcPrediction] = None, ) -> None: self._unit_of_work = unit_of_work self._epc_fetcher = epc_fetcher self._geospatial_repo = geospatial_repo self._solar_fetcher = solar_fetcher + # EPC Prediction gap-fill (ADR-0031): when all three are wired, an EPC-less + # Property is predicted from its postcode cohort and persisted to the + # predicted slot. When any is absent, prediction is simply off and + # ingestion behaves exactly as before. + self._comparables_repo = comparables_repo + self._prediction_attributes_reader = prediction_attributes_reader + self._epc_prediction = epc_prediction def run(self, property_ids: list[int]) -> None: - uprns = self._uprns_for(property_ids) - fetched = [self._fetch(property_id, uprn) for property_id, uprn in uprns] + preps = self._prepare(property_ids) + fetched = [self._fetch(prep) for prep in preps] self._persist(fetched) - def _uprns_for(self, property_ids: list[int]) -> list[tuple[int, int]]: + def _prepare(self, property_ids: list[int]) -> list[_Prep]: # A short read unit; properties with no UPRN (e.g. landlord_property_id - # only) are skipped — a later Site-Notes path covers them. + # only) are skipped — a later Site-Notes path covers them. Prediction + # attributes (Landlord Overrides) are resolved here, in-unit, so the + # no-unit fetch phase holds everything it needs to predict. with self._unit_of_work() as uow: properties = uow.property.get_many(property_ids) - return [ - (property_id, prop.identity.uprn) - for property_id, prop in zip(property_ids, properties, strict=True) - if prop.identity.uprn is not None - ] + preps: list[_Prep] = [] + for property_id, prop in zip(property_ids, properties, strict=True): + if prop.identity.uprn is None: + continue + attributes = ( + self._prediction_attributes_reader.attributes_for(property_id) + if self._prediction_attributes_reader is not None + else None + ) + preps.append(_Prep(property_id, prop.identity, attributes)) + return preps - def _fetch(self, property_id: int, uprn: int) -> _Fetched: + def _fetch(self, prep: _Prep) -> _Fetched: # No unit open here — this is the external-IO phase. One spatial # reference lookup yields the coordinates (which drive the Solar fetch) # and the planning protections (cached for Modelling, ADR-0020). + uprn = prep.identity.uprn + assert uprn is not None # _prepare drops UPRN-less properties epc = self._epc_fetcher.get_by_uprn(uprn) solar_insights: Optional[dict[str, Any]] = None spatial: Optional[SpatialReference] = self._geospatial_repo.spatial_for(uprn) - if spatial is not None and spatial.coordinates is not None: + coordinates = spatial.coordinates if spatial is not None else None + if coordinates is not None: solar_insights = self._solar_fetcher.get_building_insights( - spatial.coordinates.longitude, spatial.coordinates.latitude + coordinates.longitude, coordinates.latitude ) - return _Fetched(property_id, uprn, epc, solar_insights, spatial) + predicted_epc = ( + self._predict(prep.identity, coordinates, prep.attributes) + if epc is None + else None + ) + return _Fetched( + prep.property_id, uprn, epc, predicted_epc, solar_insights, spatial + ) + + def _predict( + self, + identity: PropertyIdentity, + coordinates: Optional[Coordinates], + attributes: Optional[PredictionTargetAttributes], + ) -> Optional[EpcPropertyData]: + """Synthesise the EPC-less Property's picture from its postcode cohort, or + None when the predictor is unwired, the Property is gated out (unknown + property type), or no comparables survive selection (ADR-0031).""" + if ( + self._comparables_repo is None + or self._epc_prediction is None + or attributes is None + ): + return None + target = build_prediction_target(identity, coordinates, attributes) + if target is None: + return None + candidates = self._comparables_repo.candidates_for(identity.postcode) + comparables = select_comparables(target, candidates) + if not comparables.members: + return None + return self._epc_prediction.predict(target, comparables) def _persist(self, fetched: list[_Fetched]) -> None: with self._unit_of_work() as uow: for item in fetched: if item.epc is not None: uow.epc.save(item.epc, property_id=item.property_id) + elif item.predicted_epc is not None: + uow.epc.save( + item.predicted_epc, + property_id=item.property_id, + source="predicted", + ) # The live `solar` table is keyed by UPRN and needs the fetch's # coordinates; insights are only set when those coordinates were # resolved, so spatial.coordinates is non-None alongside them. diff --git a/tests/orchestration/test_ingestion_prediction.py b/tests/orchestration/test_ingestion_prediction.py new file mode 100644 index 00000000..0a79a79e --- /dev/null +++ b/tests/orchestration/test_ingestion_prediction.py @@ -0,0 +1,204 @@ +"""IngestionOrchestrator predicts an EPC for an EPC-less Property and persists it +to the predicted slot (ADR-0031 slice-5e). Tested against fakes — no IO. The +prediction collaborators are optional; when unwired, ingestion is unchanged.""" + +from __future__ import annotations + +import json +from datetime import date +from pathlib import Path +from typing import Any, Optional + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from domain.epc_prediction.comparable_properties import Comparable +from domain.epc_prediction.epc_prediction import EpcPrediction +from domain.epc_prediction.prediction_target import PredictionTargetAttributes +from domain.geospatial.coordinates import Coordinates +from domain.geospatial.planning_restrictions import PlanningRestrictions +from domain.geospatial.spatial_reference import SpatialReference +from domain.property.property import Property, PropertyIdentity +from orchestration.ingestion_orchestrator import IngestionOrchestrator +from repositories.geospatial.geospatial_repository import GeospatialRepository +from tests.orchestration.fakes import ( + FakeEpcRepo, + FakePropertyRepo, + FakeSolarRepo, + FakeUnitOfWork, +) + +_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples" + + +def _epc() -> EpcPropertyData: + raw: dict[str, Any] = json.loads( + (_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text() + ) + return EpcPropertyDataMapper.from_api_response(raw) + + +def _property(uprn: Optional[int], postcode: str = "A0 0AA") -> Property: + return Property( + identity=PropertyIdentity( + portfolio_id=1, postcode=postcode, address="1 Some Street", uprn=uprn + ) + ) + + +class _FakeEpcFetcher: + def __init__(self, epc: Optional[EpcPropertyData]) -> None: + self.epc = epc + + def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: + return self.epc + + +class _FakeGeospatialRepo(GeospatialRepository): + def __init__(self, coordinates: Optional[Coordinates]) -> None: + self._reference = ( + SpatialReference(coordinates=coordinates, restrictions=PlanningRestrictions()) + if coordinates is not None + else None + ) + + def coordinates_for(self, uprn: int) -> Optional[Coordinates]: + return self._reference.coordinates if self._reference is not None else None + + def spatial_for(self, uprn: int) -> Optional[SpatialReference]: + return self._reference + + +class _FakeSolarFetcher: + def get_building_insights( + self, longitude: float, latitude: float + ) -> dict[str, Any]: + return {} + + +class _FakeComparablesRepo: + def __init__(self, candidates: list[Comparable]) -> None: + self._candidates = candidates + self.searched: list[str] = [] + + def candidates_for(self, postcode: str) -> list[Comparable]: + self.searched.append(postcode) + return self._candidates + + +class _FakeAttributesReader: + def __init__(self, attributes: PredictionTargetAttributes) -> None: + self._attributes = attributes + + def attributes_for(self, property_id: int) -> PredictionTargetAttributes: + return self._attributes + + +def _cohort() -> list[Comparable]: + # Three same-type neighbours (property_type "0"), distinct addresses so the + # dedupe keeps all three. + return [ + Comparable( + epc=_epc(), + certificate_number=f"CERT-{i}", + address=f"{i} Some Street", + registration_date=date(2023, 1, i + 1), + ) + for i in range(3) + ] + + +def test_epc_less_property_is_predicted_and_persisted_to_the_predicted_slot() -> None: + # Arrange — no lodged EPC, a known property type, and a same-type cohort. + epc_repo = FakeEpcRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=epc_repo, + solar=FakeSolarRepo(), + ) + comparables_repo = _FakeComparablesRepo(_cohort()) + orchestrator = IngestionOrchestrator( + unit_of_work=lambda: uow, + epc_fetcher=_FakeEpcFetcher(None), + geospatial_repo=_FakeGeospatialRepo( + Coordinates(longitude=-0.1, latitude=51.5) + ), + solar_fetcher=_FakeSolarFetcher(), + comparables_repo=comparables_repo, + prediction_attributes_reader=_FakeAttributesReader( + PredictionTargetAttributes(property_type="0") + ), + epc_prediction=EpcPrediction(), + ) + + # Act + orchestrator.run([10]) + + # Assert — a prediction was synthesised from the postcode cohort and persisted + # to the predicted slot; the lodged slot stays empty. + assert comparables_repo.searched == ["A0 0AA"] + predicted = epc_repo.get_predicted_for_property(10) + assert predicted is not None + assert predicted.property_type == "0" + assert epc_repo.get_for_property(10) is None + assert uow.commits == 1 + + +def test_unknown_property_type_gates_the_property_out_of_prediction() -> None: + # Arrange — EPC-less, but the property type could not be resolved. + epc_repo = FakeEpcRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=epc_repo, + solar=FakeSolarRepo(), + ) + comparables_repo = _FakeComparablesRepo(_cohort()) + orchestrator = IngestionOrchestrator( + unit_of_work=lambda: uow, + epc_fetcher=_FakeEpcFetcher(None), + geospatial_repo=_FakeGeospatialRepo(Coordinates(longitude=-0.1, latitude=51.5)), + solar_fetcher=_FakeSolarFetcher(), + comparables_repo=comparables_repo, + prediction_attributes_reader=_FakeAttributesReader( + PredictionTargetAttributes(property_type=None) + ), + epc_prediction=EpcPrediction(), + ) + + # Act + orchestrator.run([10]) + + # Assert — gated out: no cohort fetched, nothing predicted or persisted. + assert comparables_repo.searched == [] + assert epc_repo.get_predicted_for_property(10) is None + assert epc_repo.get_for_property(10) is None + + +def test_a_lodged_epc_is_not_predicted_over() -> None: + # Arrange — a real EPC is fetched, so prediction must not run. + lodged = _epc() + epc_repo = FakeEpcRepo() + uow = FakeUnitOfWork( + property=FakePropertyRepo({10: _property(uprn=12345)}), + epc=epc_repo, + solar=FakeSolarRepo(), + ) + comparables_repo = _FakeComparablesRepo(_cohort()) + orchestrator = IngestionOrchestrator( + unit_of_work=lambda: uow, + epc_fetcher=_FakeEpcFetcher(lodged), + geospatial_repo=_FakeGeospatialRepo(Coordinates(longitude=-0.1, latitude=51.5)), + solar_fetcher=_FakeSolarFetcher(), + comparables_repo=comparables_repo, + prediction_attributes_reader=_FakeAttributesReader( + PredictionTargetAttributes(property_type="0") + ), + epc_prediction=EpcPrediction(), + ) + + # Act + orchestrator.run([10]) + + # Assert — the lodged EPC is saved; no cohort fetch, no predicted slot. + assert comparables_repo.searched == [] + assert epc_repo.get_for_property(10) == lodged + assert epc_repo.get_predicted_for_property(10) is None