feat(epc-prediction): slice-5e ingestion wiring (gate → predict → persist)

Wire EPC Prediction gap-fill into IngestionOrchestrator (ADR-0031). When the
predictor collaborators are injected (ComparablesRepo + PredictionAttributesReader
+ EpcPrediction), an EPC-less Property is predicted from its postcode cohort and
persisted to the predicted slot; the eligibility gate (unknown property_type) and
"a lodged EPC is never predicted over" both hold. The two-phase contract is kept:
prediction attributes (Landlord Overrides) resolve in the unit prep phase, the
cohort fetch + select + predict run in the no-unit IO phase, persistence in the
write phase. All three collaborators are OPTIONAL — unwired, ingestion behaves
exactly as before (existing tests unchanged).

3 tests (predict+persist, gate, lodged-wins); 228 pass across orchestration +
epc_prediction + repositories; pyright strict clean. Production composition-root
wiring (real ComparableProperties + override-attributes adapters) is part of the
Jun-te handover.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-16 04:03:02 +00:00
parent f2f954f459
commit 5727ac53c1
2 changed files with 311 additions and 13 deletions

View file

@ -5,7 +5,18 @@ from dataclasses import dataclass
from typing import Any, Optional, Protocol
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.epc_prediction.comparable_properties import (
Comparable,
select_comparables,
)
from domain.epc_prediction.epc_prediction import EpcPrediction
from domain.epc_prediction.prediction_target import (
PredictionTargetAttributes,
build_prediction_target,
)
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.spatial_reference import SpatialReference
from domain.property.property import PropertyIdentity
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.unit_of_work import UnitOfWork
@ -16,6 +27,19 @@ class EpcFetcher(Protocol):
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ...
class ComparablesRepo(Protocol):
"""The cohort source for EPC Prediction (e.g. EpcComparablePropertiesRepository)."""
def candidates_for(self, postcode: str) -> list[Comparable]: ...
class PredictionAttributesReader(Protocol):
"""Resolves an EPC-less Property's prediction attributes from Landlord
Overrides (e.g. the property_overrides read adapter)."""
def attributes_for(self, property_id: int) -> PredictionTargetAttributes: ...
class SolarFetcher(Protocol):
"""The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient)."""
@ -24,6 +48,17 @@ class SolarFetcher(Protocol):
) -> dict[str, Any]: ...
@dataclass
class _Prep:
"""A property's transactional inputs read in the unit phase, before external
IO: its identity (postcode + uprn) and, when the predictor is wired, its
resolved prediction attributes (so the no-unit fetch phase can predict)."""
property_id: int
identity: PropertyIdentity
attributes: Optional[PredictionTargetAttributes]
@dataclass
class _Fetched:
"""One property's externally-fetched source data, awaiting the write phase."""
@ -31,6 +66,7 @@ class _Fetched:
property_id: int
uprn: int
epc: Optional[EpcPropertyData]
predicted_epc: Optional[EpcPropertyData]
solar_insights: Optional[dict[str, Any]]
spatial: Optional[SpatialReference]
@ -59,46 +95,104 @@ class IngestionOrchestrator:
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
comparables_repo: Optional[ComparablesRepo] = None,
prediction_attributes_reader: Optional[PredictionAttributesReader] = None,
epc_prediction: Optional[EpcPrediction] = None,
) -> None:
self._unit_of_work = unit_of_work
self._epc_fetcher = epc_fetcher
self._geospatial_repo = geospatial_repo
self._solar_fetcher = solar_fetcher
# EPC Prediction gap-fill (ADR-0031): when all three are wired, an EPC-less
# Property is predicted from its postcode cohort and persisted to the
# predicted slot. When any is absent, prediction is simply off and
# ingestion behaves exactly as before.
self._comparables_repo = comparables_repo
self._prediction_attributes_reader = prediction_attributes_reader
self._epc_prediction = epc_prediction
def run(self, property_ids: list[int]) -> None:
uprns = self._uprns_for(property_ids)
fetched = [self._fetch(property_id, uprn) for property_id, uprn in uprns]
preps = self._prepare(property_ids)
fetched = [self._fetch(prep) for prep in preps]
self._persist(fetched)
def _uprns_for(self, property_ids: list[int]) -> list[tuple[int, int]]:
def _prepare(self, property_ids: list[int]) -> list[_Prep]:
# A short read unit; properties with no UPRN (e.g. landlord_property_id
# only) are skipped — a later Site-Notes path covers them.
# only) are skipped — a later Site-Notes path covers them. Prediction
# attributes (Landlord Overrides) are resolved here, in-unit, so the
# no-unit fetch phase holds everything it needs to predict.
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
return [
(property_id, prop.identity.uprn)
for property_id, prop in zip(property_ids, properties, strict=True)
if prop.identity.uprn is not None
]
preps: list[_Prep] = []
for property_id, prop in zip(property_ids, properties, strict=True):
if prop.identity.uprn is None:
continue
attributes = (
self._prediction_attributes_reader.attributes_for(property_id)
if self._prediction_attributes_reader is not None
else None
)
preps.append(_Prep(property_id, prop.identity, attributes))
return preps
def _fetch(self, property_id: int, uprn: int) -> _Fetched:
def _fetch(self, prep: _Prep) -> _Fetched:
# No unit open here — this is the external-IO phase. One spatial
# reference lookup yields the coordinates (which drive the Solar fetch)
# and the planning protections (cached for Modelling, ADR-0020).
uprn = prep.identity.uprn
assert uprn is not None # _prepare drops UPRN-less properties
epc = self._epc_fetcher.get_by_uprn(uprn)
solar_insights: Optional[dict[str, Any]] = None
spatial: Optional[SpatialReference] = self._geospatial_repo.spatial_for(uprn)
if spatial is not None and spatial.coordinates is not None:
coordinates = spatial.coordinates if spatial is not None else None
if coordinates is not None:
solar_insights = self._solar_fetcher.get_building_insights(
spatial.coordinates.longitude, spatial.coordinates.latitude
coordinates.longitude, coordinates.latitude
)
return _Fetched(property_id, uprn, epc, solar_insights, spatial)
predicted_epc = (
self._predict(prep.identity, coordinates, prep.attributes)
if epc is None
else None
)
return _Fetched(
prep.property_id, uprn, epc, predicted_epc, solar_insights, spatial
)
def _predict(
self,
identity: PropertyIdentity,
coordinates: Optional[Coordinates],
attributes: Optional[PredictionTargetAttributes],
) -> Optional[EpcPropertyData]:
"""Synthesise the EPC-less Property's picture from its postcode cohort, or
None when the predictor is unwired, the Property is gated out (unknown
property type), or no comparables survive selection (ADR-0031)."""
if (
self._comparables_repo is None
or self._epc_prediction is None
or attributes is None
):
return None
target = build_prediction_target(identity, coordinates, attributes)
if target is None:
return None
candidates = self._comparables_repo.candidates_for(identity.postcode)
comparables = select_comparables(target, candidates)
if not comparables.members:
return None
return self._epc_prediction.predict(target, comparables)
def _persist(self, fetched: list[_Fetched]) -> None:
with self._unit_of_work() as uow:
for item in fetched:
if item.epc is not None:
uow.epc.save(item.epc, property_id=item.property_id)
elif item.predicted_epc is not None:
uow.epc.save(
item.predicted_epc,
property_id=item.property_id,
source="predicted",
)
# The live `solar` table is keyed by UPRN and needs the fetch's
# coordinates; insights are only set when those coordinates were
# resolved, so spatial.coordinates is non-None alongside them.

View file

@ -0,0 +1,204 @@
"""IngestionOrchestrator predicts an EPC for an EPC-less Property and persists it
to the predicted slot (ADR-0031 slice-5e). Tested against fakes no IO. The
prediction collaborators are optional; when unwired, ingestion is unchanged."""
from __future__ import annotations
import json
from datetime import date
from pathlib import Path
from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.epc_prediction.comparable_properties import Comparable
from domain.epc_prediction.epc_prediction import EpcPrediction
from domain.epc_prediction.prediction_target import PredictionTargetAttributes
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from domain.property.property import Property, PropertyIdentity
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from repositories.geospatial.geospatial_repository import GeospatialRepository
from tests.orchestration.fakes import (
FakeEpcRepo,
FakePropertyRepo,
FakeSolarRepo,
FakeUnitOfWork,
)
_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples"
def _epc() -> EpcPropertyData:
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
return EpcPropertyDataMapper.from_api_response(raw)
def _property(uprn: Optional[int], postcode: str = "A0 0AA") -> Property:
return Property(
identity=PropertyIdentity(
portfolio_id=1, postcode=postcode, address="1 Some Street", uprn=uprn
)
)
class _FakeEpcFetcher:
def __init__(self, epc: Optional[EpcPropertyData]) -> None:
self.epc = epc
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
return self.epc
class _FakeGeospatialRepo(GeospatialRepository):
def __init__(self, coordinates: Optional[Coordinates]) -> None:
self._reference = (
SpatialReference(coordinates=coordinates, restrictions=PlanningRestrictions())
if coordinates is not None
else None
)
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return self._reference.coordinates if self._reference is not None else None
def spatial_for(self, uprn: int) -> Optional[SpatialReference]:
return self._reference
class _FakeSolarFetcher:
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]:
return {}
class _FakeComparablesRepo:
def __init__(self, candidates: list[Comparable]) -> None:
self._candidates = candidates
self.searched: list[str] = []
def candidates_for(self, postcode: str) -> list[Comparable]:
self.searched.append(postcode)
return self._candidates
class _FakeAttributesReader:
def __init__(self, attributes: PredictionTargetAttributes) -> None:
self._attributes = attributes
def attributes_for(self, property_id: int) -> PredictionTargetAttributes:
return self._attributes
def _cohort() -> list[Comparable]:
# Three same-type neighbours (property_type "0"), distinct addresses so the
# dedupe keeps all three.
return [
Comparable(
epc=_epc(),
certificate_number=f"CERT-{i}",
address=f"{i} Some Street",
registration_date=date(2023, 1, i + 1),
)
for i in range(3)
]
def test_epc_less_property_is_predicted_and_persisted_to_the_predicted_slot() -> None:
# Arrange — no lodged EPC, a known property type, and a same-type cohort.
epc_repo = FakeEpcRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=FakeSolarRepo(),
)
comparables_repo = _FakeComparablesRepo(_cohort())
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(None),
geospatial_repo=_FakeGeospatialRepo(
Coordinates(longitude=-0.1, latitude=51.5)
),
solar_fetcher=_FakeSolarFetcher(),
comparables_repo=comparables_repo,
prediction_attributes_reader=_FakeAttributesReader(
PredictionTargetAttributes(property_type="0")
),
epc_prediction=EpcPrediction(),
)
# Act
orchestrator.run([10])
# Assert — a prediction was synthesised from the postcode cohort and persisted
# to the predicted slot; the lodged slot stays empty.
assert comparables_repo.searched == ["A0 0AA"]
predicted = epc_repo.get_predicted_for_property(10)
assert predicted is not None
assert predicted.property_type == "0"
assert epc_repo.get_for_property(10) is None
assert uow.commits == 1
def test_unknown_property_type_gates_the_property_out_of_prediction() -> None:
# Arrange — EPC-less, but the property type could not be resolved.
epc_repo = FakeEpcRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=FakeSolarRepo(),
)
comparables_repo = _FakeComparablesRepo(_cohort())
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(None),
geospatial_repo=_FakeGeospatialRepo(Coordinates(longitude=-0.1, latitude=51.5)),
solar_fetcher=_FakeSolarFetcher(),
comparables_repo=comparables_repo,
prediction_attributes_reader=_FakeAttributesReader(
PredictionTargetAttributes(property_type=None)
),
epc_prediction=EpcPrediction(),
)
# Act
orchestrator.run([10])
# Assert — gated out: no cohort fetched, nothing predicted or persisted.
assert comparables_repo.searched == []
assert epc_repo.get_predicted_for_property(10) is None
assert epc_repo.get_for_property(10) is None
def test_a_lodged_epc_is_not_predicted_over() -> None:
# Arrange — a real EPC is fetched, so prediction must not run.
lodged = _epc()
epc_repo = FakeEpcRepo()
uow = FakeUnitOfWork(
property=FakePropertyRepo({10: _property(uprn=12345)}),
epc=epc_repo,
solar=FakeSolarRepo(),
)
comparables_repo = _FakeComparablesRepo(_cohort())
orchestrator = IngestionOrchestrator(
unit_of_work=lambda: uow,
epc_fetcher=_FakeEpcFetcher(lodged),
geospatial_repo=_FakeGeospatialRepo(Coordinates(longitude=-0.1, latitude=51.5)),
solar_fetcher=_FakeSolarFetcher(),
comparables_repo=comparables_repo,
prediction_attributes_reader=_FakeAttributesReader(
PredictionTargetAttributes(property_type="0")
),
epc_prediction=EpcPrediction(),
)
# Act
orchestrator.run([10])
# Assert — the lodged EPC is saved; no cohort fetch, no predicted slot.
assert comparables_repo.searched == []
assert epc_repo.get_for_property(10) == lodged
assert epc_repo.get_predicted_for_property(10) is None