From 0bd2db4f032b3eb8f6619d20227c2a9ee0c67c0c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 23 Jun 2026 16:25:18 +0000 Subject: [PATCH 1/4] feat(modelling_e2e): price gap measures via overlay + broaden prediction to nearby postcodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two reconciliations to make the modelling_e2e Lambda handler production-ready. 1. Price through the off-catalogue overlay, drop the workarounds The handler priced through a plain ProductPostgresRepository and excluded secondary_heating_removal / system_tune_up / system_tune_up_zoned to dodge ProductNotFound (and a poisoning pgEnum DataError). Those measures are now priced by catalogue_with_off_catalogue_overrides (already used by the e2e runner and PostgresUnitOfWork), so the exclusions are removed and ALL measure types are considered. This also fixes gas-boiler / single-glazed properties, which Dan's handler never excluded and so still crashed (the standard system_tune_up option is built unconditionally — the considered-measures exclusion never actually gated it). 2. Broaden the EPC-Prediction cohort to nearby real postcodes (ADR-0031) A property with no lodged EPC and no same-type comparable in its own postcode (e.g. the only flat among houses) used to gate out and fail the subtask. The gov EPC API cannot search by radius/outcode, so we resolve the real unit postcodes physically nearest the target via postcodes.io (keyless; already a trusted in-repo dependency) and walk them nearest-first until enough same-type comparables surface. New PostcodesIoClient (transient-failure retry with exponential backoff, soft-failing to the seed so broadening never breaks prediction) and EpcComparablePropertiesRepository.candidates_near. Wired into the handler and e2e runner; broadening is lazy (only on gate-out) and memoised per (postcode, property_type). Validated live: property 728476 (gas boiler) prices system_tune_up at GBP295; property 718580 (lone flat in BR6 6BS) now predicts via nearby BR6 postcodes. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 45 +++- infrastructure/postcodes_io/__init__.py | 0 .../postcodes_io/postcodes_io_client.py | 151 ++++++++++++ .../epc_comparable_properties_repository.py | 62 ++++- scripts/run_modelling_e2e.py | 39 ++- .../modelling_e2e/test_handler.py | 119 ++++++++++ tests/infrastructure/postcodes_io/__init__.py | 0 .../postcodes_io/test_postcodes_io_client.py | 223 ++++++++++++++++++ ...st_epc_comparable_properties_repository.py | 115 +++++++++ 9 files changed, 744 insertions(+), 10 deletions(-) create mode 100644 infrastructure/postcodes_io/__init__.py create mode 100644 infrastructure/postcodes_io/postcodes_io_client.py create mode 100644 tests/infrastructure/postcodes_io/__init__.py create mode 100644 tests/infrastructure/postcodes_io/test_postcodes_io_client.py diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index de7b5542..c3c16924 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -42,7 +42,10 @@ from domain.epc_prediction.comparable_properties import ( select_comparables, ) from domain.epc_prediction.epc_prediction import EpcPrediction -from domain.epc_prediction.prediction_target import build_prediction_target +from domain.epc_prediction.prediction_target import ( + PredictionTarget, + build_prediction_target, +) from domain.geospatial.coordinates import Coordinates from domain.geospatial.planning_restrictions import PlanningRestrictions from domain.geospatial.spatial_reference import SpatialReference @@ -50,6 +53,7 @@ from domain.property.property import Property, PropertyIdentity from domain.tasks.tasks import Source from harness.console import run_modelling from infrastructure.epc_client.epc_client_service import EpcClientService +from infrastructure.postcodes_io.postcodes_io_client import PostcodesIoClient from infrastructure.postgres.config import PostgresConfig from infrastructure.postgres.engine import make_engine from infrastructure.solar.google_solar_api_client import ( @@ -85,6 +89,10 @@ from utilities.logger import setup_logger _engine: Optional[Engine] = None _cohort_cache: dict[str, list[ComparableProperty]] = {} +# Broadened (nearby-postcode) cohorts, keyed by (seed postcode, target property +# type): the early-stop walk depends on the type it is filling for, so two types +# in the same postcode must not share a cached result. +_nearby_cohort_cache: dict[tuple[str, str], list[ComparableProperty]] = {} logger = setup_logger() @@ -140,13 +148,18 @@ def _predict_epc( attributes_reader: OverrideBackedPredictionAttributesReader, coordinates: Optional[Coordinates], cohort_for: Callable[[str], list[ComparableProperty]], + broaden: Callable[[PredictionTarget], list[ComparableProperty]], predictor: EpcPrediction, ) -> Optional[EpcPropertyData]: """Synthesise an EpcPropertyData for an EPC-less property from its postcode cohort (EPC Prediction Path 3, ADR-0031), or None when ineligible. + When the property's own postcode holds no same-type comparables (a sparse + postcode — e.g. the only flat among houses), the cohort is broadened to the + real unit postcodes physically nearest it (``broaden``) before giving up. + Returns None when property_type is unresolvable (hard cohort filter cannot - fire) or when the postcode cohort is empty after filtering. + fire) or when even the broadened cohort is empty after filtering. """ attributes = attributes_reader.attributes_for(property_id) identity = PropertyIdentity( @@ -156,6 +169,8 @@ def _predict_epc( if target is None: return None comparables = select_comparables(target, cohort_for(target.postcode)) + if not comparables.members: + comparables = select_comparables(target, broaden(target)) if not comparables.members: return None predicted = predictor.predict(target, comparables) @@ -201,7 +216,9 @@ def handler(body: dict[str, Any], context: Any) -> None: overrides_reader = PropertyOverridesPostgresReader(lambda: Session(engine)) prediction_attrs_reader = OverrideBackedPredictionAttributesReader(overrides_reader) - comparables_repo = EpcComparablePropertiesRepository(epc_client, geospatial) + comparables_repo = EpcComparablePropertiesRepository( + epc_client, geospatial, nearby_postcodes=PostcodesIoClient() + ) predictor = EpcPrediction() def _get_cohort(postcode: str) -> list[ComparableProperty]: @@ -211,6 +228,24 @@ def handler(body: dict[str, Any], context: Any) -> None: ) return _cohort_cache[postcode] + def _broaden(target: PredictionTarget) -> list[ComparableProperty]: + """The nearby-postcode cohort for a gated-out target — the real unit + postcodes nearest it, walked until enough same-type comparables surface + (ADR-0031). Memoised per (postcode, property_type) so co-located + same-type misses share one walk.""" + key = (target.postcode, target.property_type) + if key not in _nearby_cohort_cache: + _nearby_cohort_cache[key] = ( + comparables_repo.candidates_near( + target.postcode, + target.coordinates, + enough=lambda c: c.epc.property_type == target.property_type, + ) + if target.postcode + else [] + ) + return _nearby_cohort_cache[key] + read_session = Session(engine) try: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] @@ -261,12 +296,14 @@ def handler(body: dict[str, Any], context: Any) -> None: attributes_reader=prediction_attrs_reader, coordinates=coordinates, cohort_for=_get_cohort, + broaden=_broaden, predictor=predictor, ) if predicted_epc is None: raise ValueError( f"no EPC for UPRN {uprn} and not predictable " - f"(unresolved property_type or empty '{postcode}' cohort)" + f"(unresolved property_type, or no same-type " + f"comparables in or near '{postcode}')" ) effective_epc = Property( identity=PropertyIdentity( diff --git a/infrastructure/postcodes_io/__init__.py b/infrastructure/postcodes_io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/infrastructure/postcodes_io/postcodes_io_client.py b/infrastructure/postcodes_io/postcodes_io_client.py new file mode 100644 index 00000000..6128e51c --- /dev/null +++ b/infrastructure/postcodes_io/postcodes_io_client.py @@ -0,0 +1,151 @@ +"""postcodes.io adapter — a coordinate (or seed postcode) → the real unit +postcodes physically near it. + +The gov EPC API only searches a *full* real postcode — no outcode/prefix, no +radius, no lat/long (confirmed against its OpenAPI spec). So to broaden an +EPC-Prediction cohort beyond the target's own postcode we must first discover the +real unit postcodes around it. postcodes.io's free, keyless ``nearest`` endpoint +does exactly that: given a point it returns the unit postcodes within a radius, +nearest first. + +Failure is deliberately non-fatal: any error (network, unknown seed, missing +coordinates) returns just the seed postcode, so broadening degrades to "no +broadening" rather than breaking prediction. +""" + +from __future__ import annotations + +import time +from typing import Any, Optional + +import httpx + +from domain.geospatial.coordinates import Coordinates + + +class PostcodesIoClient: + BASE_URL = "https://api.postcodes.io" + REQUEST_TIMEOUT = 10.0 + # Transient failures (transport errors, 429s, 5xx) are retried with + # exponential backoff; everything else (and exhaustion) soft-fails to the + # seed, so broadening never breaks prediction. + MAX_RETRIES = 3 + BACKOFF_BASE = 0.5 + BACKOFF_MULTIPLIER = 2.0 + MAX_BACKOFF = 8.0 + + def __init__(self, *, radius_m: int = 1000, limit: int = 30) -> None: + """``radius_m`` bounds how far the broadened cohort reaches; ``limit`` + caps how many nearby postcodes are returned (and so the per-gate-out + fetch cost).""" + self._radius_m = radius_m + self._limit = limit + + def nearby( + self, postcode: str, coordinates: Optional[Coordinates] = None + ) -> list[str]: + """The real unit postcodes within ``radius_m`` of ``postcode`` — nearest + first, the seed always included — or just ``[postcode]`` when the seed's + coordinates cannot be resolved or the lookup fails. + + ``coordinates`` (the target's own, resolved from its UPRN) is used when + given, sparing a postcode→centroid round-trip; otherwise postcodes.io + resolves the seed postcode's centroid itself.""" + point = coordinates if coordinates is not None else self._centroid_of(postcode) + if point is None: + return [postcode] + found = self._nearest_to(point) + ordered = [postcode] + [p for p in found if p != postcode] + return ordered[: self._limit] + + def _centroid_of(self, postcode: str) -> Optional[Coordinates]: + result = self._get(f"/postcodes/{postcode.replace(' ', '')}") + if result is None: + return None + latitude: Any = result.get("latitude") + longitude: Any = result.get("longitude") + if latitude is None or longitude is None: + return None + return Coordinates(longitude=float(longitude), latitude=float(latitude)) + + def _nearest_to(self, point: Coordinates) -> list[str]: + results = self._get_list( + "/postcodes", + { + "lon": point.longitude, + "lat": point.latitude, + "radius": self._radius_m, + "limit": self._limit, + }, + ) + return [str(row["postcode"]) for row in results if row.get("postcode")] + + def _get(self, path: str) -> Optional[dict[str, Any]]: + payload = self._call(path, None) + return payload if isinstance(payload, dict) else None + + def _get_list(self, path: str, params: dict[str, Any]) -> list[dict[str, Any]]: + payload = self._call(path, params) + if not isinstance(payload, list): + return [] + return [row for row in payload if isinstance(row, dict)] + + def _call(self, path: str, params: Optional[dict[str, Any]]) -> Any: + """One GET against postcodes.io, retrying transient failures (transport + errors, 429s, 5xx) with exponential backoff. Returns the parsed + ``result`` payload, or None on a non-transient failure (e.g. an unknown + postcode's 404) or once retries are exhausted — broadening then falls + back to the seed alone.""" + for attempt in range(self.MAX_RETRIES + 1): + try: + response = httpx.get( + f"{self.BASE_URL}{path}", + params=params, + timeout=self.REQUEST_TIMEOUT, + ) + except httpx.TransportError: + if not self._sleep_before_retry(attempt, retry_after=None): + return None + continue + except httpx.HTTPError: + return None # non-transient client-side error (e.g. bad URL) + if self._is_transient(response.status_code): + if not self._sleep_before_retry( + attempt, retry_after=self._retry_after(response) + ): + return None + continue + if not response.is_success: + return None + try: + body: Any = response.json() + except ValueError: + return None + return body.get("result") if isinstance(body, dict) else None + return None + + def _sleep_before_retry(self, attempt: int, retry_after: Optional[float]) -> bool: + """Sleep before the next attempt and report whether one remains; on the + final attempt, return False so the caller soft-fails instead of looping.""" + if attempt >= self.MAX_RETRIES: + return False + if retry_after is not None: + delay = retry_after + else: + delay = self.BACKOFF_BASE * (self.BACKOFF_MULTIPLIER**attempt) + time.sleep(min(delay, self.MAX_BACKOFF)) + return True + + @staticmethod + def _is_transient(status_code: int) -> bool: + return status_code == 429 or status_code >= 500 + + @staticmethod + def _retry_after(response: httpx.Response) -> Optional[float]: + header = response.headers.get("Retry-After") + if header is None: + return None + try: + return float(header) + except (TypeError, ValueError): + return None diff --git a/repositories/comparable_properties/epc_comparable_properties_repository.py b/repositories/comparable_properties/epc_comparable_properties_repository.py index 3bfd92b9..b49a6948 100644 --- a/repositories/comparable_properties/epc_comparable_properties_repository.py +++ b/repositories/comparable_properties/epc_comparable_properties_repository.py @@ -10,7 +10,7 @@ UPRNs share a partition). Register metadata the cert itself doesn't carry from __future__ import annotations from datetime import date -from typing import Optional, Protocol +from typing import Callable, Optional, Protocol from datatypes.epc.domain.epc_property_data import EpcPropertyData from datatypes.epc.search.epc_search_result import EpcSearchResult @@ -20,6 +20,11 @@ from repositories.comparable_properties.comparable_properties_repository import ComparablePropertiesRepository, ) +# The same default floor `select_comparables` uses: keep walking nearby postcodes +# until this many candidates match, so the broadened cohort is big enough for the +# downstream relax ladder rather than stopping at the first stray match. +_DEFAULT_MINIMUM_COHORT = 5 + class CohortEpcClient(Protocol): """The slice of the EPC-API client the cohort fetch needs (e.g. @@ -38,12 +43,26 @@ class CohortGeospatial(Protocol): ) -> dict[int, Coordinates]: ... +class NearbyPostcodes(Protocol): + """Resolves the real unit postcodes physically near a seed postcode (e.g. + `PostcodesIoClient`). The gov EPC API cannot search by radius, so this is how + the cohort reaches beyond the target's own postcode (ADR-0031).""" + + def nearby( + self, postcode: str, coordinates: Optional[Coordinates] = None + ) -> list[str]: ... + + class EpcComparablePropertiesRepository(ComparablePropertiesRepository): def __init__( - self, epc_client: CohortEpcClient, geospatial: CohortGeospatial + self, + epc_client: CohortEpcClient, + geospatial: CohortGeospatial, + nearby_postcodes: Optional[NearbyPostcodes] = None, ) -> None: self._epc_client = epc_client self._geospatial = geospatial + self._nearby_postcodes = nearby_postcodes def candidates_for(self, postcode: str) -> list[ComparableProperty]: results: list[EpcSearchResult] = self._epc_client.search_by_postcode( @@ -55,6 +74,45 @@ class EpcComparablePropertiesRepository(ComparablePropertiesRepository): ) return [self._comparable(result, coordinates) for result in results] + def candidates_near( + self, + postcode: str, + coordinates: Optional[Coordinates] = None, + *, + enough: Optional[Callable[[ComparableProperty], bool]] = None, + minimum: int = _DEFAULT_MINIMUM_COHORT, + ) -> list[ComparableProperty]: + """The broadened cohort: candidates drawn from the real unit postcodes + nearest ``postcode`` (ADR-0031), for when the target's own postcode holds + no same-type comparables. Postcodes are visited nearest first and each + candidate is deduped by certificate number across them. + + ``enough`` lets the caller stop the walk early — once ``minimum`` + candidates satisfy it (e.g. they match the target's property type) the + remaining, further-away postcodes are not fetched, so a dense area + resolves in one or two searches instead of the whole radius. Without a + configured ``NearbyPostcodes`` source this degrades to the seed postcode + alone.""" + postcodes = ( + self._nearby_postcodes.nearby(postcode, coordinates) + if self._nearby_postcodes is not None + else [postcode] + ) + candidates: list[ComparableProperty] = [] + seen_certs: set[str] = set() + matches = 0 + for nearby_postcode in postcodes: + for candidate in self.candidates_for(nearby_postcode): + if candidate.certificate_number in seen_certs: + continue + seen_certs.add(candidate.certificate_number) + candidates.append(candidate) + if enough is not None and enough(candidate): + matches += 1 + if enough is not None and matches >= minimum: + break + return candidates + def _comparable( self, result: EpcSearchResult, coordinates: dict[int, Coordinates] ) -> ComparableProperty: diff --git a/scripts/run_modelling_e2e.py b/scripts/run_modelling_e2e.py index 6625aa49..27ad473c 100644 --- a/scripts/run_modelling_e2e.py +++ b/scripts/run_modelling_e2e.py @@ -81,6 +81,7 @@ from domain.epc_prediction.comparable_properties import ( # noqa: E402 ) from domain.epc_prediction.epc_prediction import EpcPrediction # noqa: E402 from domain.epc_prediction.prediction_target import ( # noqa: E402 + PredictionTarget, build_prediction_target, ) from domain.geospatial.coordinates import Coordinates # noqa: E402 @@ -96,6 +97,9 @@ from domain.modelling.scenario import Scenario # noqa: E402 from harness.console import candidate_recommendations, run_modelling # noqa: E402 from harness.plan_table import format_plan_table # noqa: E402 from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402 +from infrastructure.postcodes_io.postcodes_io_client import ( # noqa: E402 + PostcodesIoClient, +) from infrastructure.solar.google_solar_api_client import ( # noqa: E402 BuildingInsightsNotFoundError, GoogleSolarApiClient, @@ -400,6 +404,7 @@ def _predict_epc( attributes_reader: OverrideBackedPredictionAttributesReader, coordinates: Optional[Coordinates], cohort_for: Callable[[str], list[ComparableProperty]], + broaden: Callable[[PredictionTarget], list[ComparableProperty]], predictor: EpcPrediction, ) -> Optional[EpcPropertyData]: """Synthesise an EpcPropertyData for an EPC-less Property from its postcode @@ -408,7 +413,8 @@ def _predict_epc( The cohort is found by POSTCODE, so a wrong postcode on the property row yields the wrong neighbours — a prediction is only as good as the postcode it - is given.""" + is given. When the own postcode holds no same-type comparables, the cohort is + broadened to the real unit postcodes physically nearest it (``broaden``).""" attributes = attributes_reader.attributes_for(property_id) identity = PropertyIdentity( portfolio_id=portfolio_id, postcode=postcode, address="", uprn=uprn @@ -418,7 +424,10 @@ def _predict_epc( return None # property_type unresolvable — gated out of prediction comparables = select_comparables(target, cohort_for(target.postcode)) if not comparables.members: - return None # no comparable neighbours in the postcode + # Sparse own postcode — reach out to the nearest real postcodes. + comparables = select_comparables(target, broaden(target)) + if not comparables.members: + return None # no comparable neighbours nearby either predicted = predictor.predict(target, comparables) # The calculator needs a MAIN building part; a cohort whose template carries # none (e.g. a malformed flat record) yields an unscoreable picture, so reject @@ -684,9 +693,12 @@ def main() -> None: # from the live EPC API (search-by-postcode + per-cert fetch), memoised per # postcode so co-located missing Properties don't refetch the same cohort. prediction_attributes = OverrideBackedPredictionAttributesReader(overrides_reader) - comparables_repo = EpcComparablePropertiesRepository(epc_client, geospatial) + comparables_repo = EpcComparablePropertiesRepository( + epc_client, geospatial, nearby_postcodes=PostcodesIoClient() + ) predictor = EpcPrediction() _cohort_cache: dict[str, list[ComparableProperty]] = {} + _nearby_cohort_cache: dict[tuple[str, str], list[ComparableProperty]] = {} def cohort_for(postcode: str) -> list[ComparableProperty]: if postcode not in _cohort_cache: @@ -694,6 +706,23 @@ def main() -> None: comparables_repo.candidates_for(postcode) if postcode else [] ) return _cohort_cache[postcode] + + def broaden(target: PredictionTarget) -> list[ComparableProperty]: + # Broadened cohort for a gated-out target: the nearest real postcodes, + # walked until enough same-type comparables surface (ADR-0031). Memoised + # per (postcode, property_type). + key = (target.postcode, target.property_type) + if key not in _nearby_cohort_cache: + _nearby_cohort_cache[key] = ( + comparables_repo.candidates_near( + target.postcode, + target.coordinates, + enough=lambda c: c.epc.property_type == target.property_type, + ) + if target.postcode + else [] + ) + return _nearby_cohort_cache[key] # One read-only session for the live `material` catalogue, reused across the # batch so both store and no-store runs price against the same DB rows. catalogue_session = Session(engine) @@ -831,12 +860,14 @@ def main() -> None: attributes_reader=prediction_attributes, coordinates=coordinates, cohort_for=cohort_for, + broaden=broaden, predictor=predictor, ) if predicted_epc is None: raise ValueError( f"no EPC for UPRN {uprn} and not predictable " - f"(unresolved property_type or empty '{postcode}' cohort)" + f"(unresolved property_type, or no same-type " + f"comparables in or near '{postcode}')" ) # Property.effective_epc folds any Landlord Overrides onto the # synthesised EPC (cohort fills the unknown fields, the landlord's diff --git a/tests/applications/modelling_e2e/test_handler.py b/tests/applications/modelling_e2e/test_handler.py index 762cea60..92697508 100644 --- a/tests/applications/modelling_e2e/test_handler.py +++ b/tests/applications/modelling_e2e/test_handler.py @@ -81,6 +81,7 @@ def _clear_cohort_cache() -> None: import applications.modelling_e2e.handler as h h._cohort_cache.clear() + h._nearby_cohort_cache.clear() # --------------------------------------------------------------------------- @@ -396,6 +397,124 @@ def test_empty_cohort_gates_property_out_and_raises() -> None: MockUoW.return_value.__enter__.assert_not_called() +# --------------------------------------------------------------------------- +# Broadened cohort — sparse own postcode falls back to nearby postcodes +# --------------------------------------------------------------------------- + + +def test_empty_own_postcode_broadens_to_nearby_and_predicts() -> None: + """When the property's own postcode holds no same-type comparables, the + handler broadens to the nearby-postcode cohort (candidates_near) and, finding + comparables there, synthesises the EPC and saves the plan.""" + # Arrange + mock_engine = _engine_mock([PROPERTY_ID], [UPRN], [POSTCODE]) + mock_plan = _plan_mock() + mock_uow = MagicMock() + + mock_predicted_epc = MagicMock() + from datatypes.epc.domain.epc_property_data import BuildingPartIdentifier + + mock_part = MagicMock() + mock_part.identifier = BuildingPartIdentifier.MAIN + mock_predicted_epc.sap_building_parts = [mock_part] + + # First select_comparables (own postcode) is empty → broaden; the second + # (nearby cohort) finds comparables. + empty_comparables = MagicMock() + empty_comparables.members = [] + found_comparables = MagicMock() + found_comparables.members = [MagicMock()] + + with ExitStack() as stack: + stack.enter_context( + patch("applications.modelling_e2e.handler.os.environ", _ENV) + ) + stack.enter_context( + patch( + "applications.modelling_e2e.handler._get_engine", + return_value=mock_engine, + ) + ) + stack.enter_context( + patch("applications.modelling_e2e.handler.EpcClientService") + ).return_value.get_by_uprn.return_value = None # no lodged EPC + stack.enter_context( + patch("applications.modelling_e2e.handler.GeospatialS3Repository") + ) + stack.enter_context( + patch("applications.modelling_e2e.handler.GoogleSolarApiClient") + ) + stack.enter_context( + patch("applications.modelling_e2e.handler._spatial_for", return_value=None) + ) + stack.enter_context( + patch( + "applications.modelling_e2e.handler._solar_insights_for", + return_value=None, + ) + ) + stack.enter_context( + patch("applications.modelling_e2e.handler.overlays_from", return_value=[]) + ) + stack.enter_context( + patch("applications.modelling_e2e.handler.PropertyOverridesPostgresReader") + ) + from domain.epc_prediction.prediction_target import PredictionTargetAttributes + + stack.enter_context( + patch( + "applications.modelling_e2e.handler.OverrideBackedPredictionAttributesReader" + ) + ).return_value.attributes_for.return_value = PredictionTargetAttributes( + property_type="2" + ) + MockRepo = stack.enter_context( + patch( + "applications.modelling_e2e.handler.EpcComparablePropertiesRepository" + ) + ) + MockRepo.return_value.candidates_for.return_value = [] + MockRepo.return_value.candidates_near.return_value = [MagicMock()] + stack.enter_context( + patch( + "applications.modelling_e2e.handler.select_comparables", + side_effect=[empty_comparables, found_comparables], + ) + ) + stack.enter_context( + patch("applications.modelling_e2e.handler.EpcPrediction") + ).return_value.predict.return_value = mock_predicted_epc + stack.enter_context( + patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") + ).return_value.get_many.return_value = [MagicMock()] + stack.enter_context( + patch( + "applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides" + ) + ) + stack.enter_context(patch("applications.modelling_e2e.handler.Session")) + stack.enter_context( + patch( + "applications.modelling_e2e.handler.run_modelling", + return_value=mock_plan, + ) + ) + MockUoW = stack.enter_context( + patch("applications.modelling_e2e.handler.PostgresUnitOfWork") + ) + MockUoW.return_value.__enter__.return_value = mock_uow + MockUoW.return_value.__exit__.return_value = False + + # Act + _call_handler(_BODY) + + # Assert — broadening fired, and the broadened cohort produced a saved plan. + MockRepo.return_value.candidates_near.assert_called_once() + mock_uow.epc.save.assert_not_called() # predicted, never lodged + mock_uow.plan.save.assert_called_once() + mock_uow.commit.assert_called_once() + + # --------------------------------------------------------------------------- # Partial batch failure # --------------------------------------------------------------------------- diff --git a/tests/infrastructure/postcodes_io/__init__.py b/tests/infrastructure/postcodes_io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/infrastructure/postcodes_io/test_postcodes_io_client.py b/tests/infrastructure/postcodes_io/test_postcodes_io_client.py new file mode 100644 index 00000000..708ac964 --- /dev/null +++ b/tests/infrastructure/postcodes_io/test_postcodes_io_client.py @@ -0,0 +1,223 @@ +"""PostcodesIoClient — coordinate/seed postcode → the real unit postcodes near +it, via postcodes.io's keyless nearest endpoint. Failure degrades to the seed +alone so broadening never breaks prediction.""" + +from __future__ import annotations + +from typing import Any, Iterator, Optional +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from domain.geospatial.coordinates import Coordinates +from infrastructure.postcodes_io.postcodes_io_client import PostcodesIoClient + +_MODULE = "infrastructure.postcodes_io.postcodes_io_client" + + +@pytest.fixture(autouse=True) +def _no_sleep() -> Iterator[MagicMock]: + """Never actually sleep during backoff — just record the calls.""" + with patch(f"{_MODULE}.time.sleep") as sleep: + yield sleep + + +def _response( + payload: Any, + *, + status_code: int = 200, + headers: Optional[dict[str, str]] = None, +) -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.is_success = 200 <= status_code < 300 + resp.headers = headers if headers is not None else {} + resp.json.return_value = payload + return resp + + +def _nearest_payload(postcodes: list[str]) -> dict[str, Any]: + return {"result": [{"postcode": p} for p in postcodes]} + + +def test_nearby_with_coordinates_skips_the_centroid_lookup() -> None: + """When the target's own coordinates are passed, only the radius search is + issued — no postcode→centroid round-trip — and the seed leads the result.""" + # Arrange + client = PostcodesIoClient(radius_m=500, limit=10) + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.return_value = _response( + _nearest_payload(["BR6 6BS", "BR6 6BU", "BR6 6NX"]) + ) + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert — one call (the radius search), seed first, neighbours follow + assert result == ["BR6 6BS", "BR6 6BU", "BR6 6NX"] + assert mock_get.call_count == 1 + _, kwargs = mock_get.call_args + assert kwargs["params"]["lat"] == 51.3 + assert kwargs["params"]["lon"] == 0.1 + + +def test_nearby_resolves_the_seed_centroid_when_no_coordinates_given() -> None: + """Without coordinates the client first resolves the seed's own centroid via + postcodes.io, then runs the radius search from it.""" + # Arrange + client = PostcodesIoClient() + centroid = {"result": {"latitude": 51.3, "longitude": 0.1}} + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.side_effect = [ + _response(centroid), + _response(_nearest_payload(["BR6 6BS", "BR6 6BU"])), + ] + + # Act + result = client.nearby("BR6 6BS") + + # Assert — two calls: centroid then radius + assert result == ["BR6 6BS", "BR6 6BU"] + assert mock_get.call_count == 2 + + +def test_nearby_dedupes_the_seed_and_caps_at_limit() -> None: + """The seed always leads exactly once even when the radius search echoes it, + and the result is capped at ``limit``.""" + # Arrange + client = PostcodesIoClient(limit=3) + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.return_value = _response( + _nearest_payload(["BR6 6BS", "BR6 6BU", "BR6 6NX", "BR6 6AA"]) + ) + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert + assert result == ["BR6 6BS", "BR6 6BU", "BR6 6NX"] + assert result.count("BR6 6BS") == 1 + + +def test_nearby_returns_just_the_seed_after_exhausting_retries( + _no_sleep: MagicMock, +) -> None: + """A persistent network error is retried, then degrades to broadening-off: + only the seed comes back, and the retries were actually attempted.""" + # Arrange + client = PostcodesIoClient() + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch( + f"{_MODULE}.httpx.get", side_effect=httpx.ConnectError("down") + ) as mock_get: + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert — one initial try + MAX_RETRIES, sleeping between each. + assert result == ["BR6 6BS"] + assert mock_get.call_count == client.MAX_RETRIES + 1 + assert _no_sleep.call_count == client.MAX_RETRIES + + +def test_nearby_retries_a_transport_error_then_succeeds(_no_sleep: MagicMock) -> None: + """A transient transport error is retried, and the subsequent success is + returned in full.""" + # Arrange + client = PostcodesIoClient() + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.side_effect = [ + httpx.ReadTimeout("slow"), + _response(_nearest_payload(["BR6 6BS", "BR6 6BU"])), + ] + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert + assert result == ["BR6 6BS", "BR6 6BU"] + assert mock_get.call_count == 2 + assert _no_sleep.call_count == 1 + + +def test_nearby_retries_a_429_honouring_retry_after(_no_sleep: MagicMock) -> None: + """A 429 is retried, and the server's Retry-After drives the backoff delay.""" + # Arrange + client = PostcodesIoClient() + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.side_effect = [ + _response(None, status_code=429, headers={"Retry-After": "2"}), + _response(_nearest_payload(["BR6 6BS", "BR6 6BU"])), + ] + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert — succeeded on the retry, having slept the advertised 2 seconds. + assert result == ["BR6 6BS", "BR6 6BU"] + assert mock_get.call_count == 2 + _no_sleep.assert_called_once_with(2.0) + + +def test_nearby_retries_a_server_error_then_succeeds(_no_sleep: MagicMock) -> None: + """A 5xx is treated as transient and retried.""" + # Arrange + client = PostcodesIoClient() + coords = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.side_effect = [ + _response(None, status_code=503), + _response(_nearest_payload(["BR6 6BS"])), + ] + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert + assert result == ["BR6 6BS"] + assert mock_get.call_count == 2 + + +def test_nearby_returns_just_the_seed_when_centroid_unresolvable() -> None: + """An unknown seed (no coordinates, centroid lookup fails) yields the seed + alone rather than raising.""" + # Arrange + client = PostcodesIoClient() + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.return_value = _response(None, status_code=404) + + # Act + result: list[str] = client.nearby("ZZ99 9ZZ") + + # Assert — a 404 is non-transient, so no retry was attempted. + assert result == ["ZZ99 9ZZ"] + assert mock_get.call_count == 1 + + +def test_nearby_tolerates_a_null_nearest_result() -> None: + """postcodes.io returns ``result: null`` when a point has no neighbours; the + client treats that as an empty neighbour set (seed only).""" + # Arrange + client = PostcodesIoClient() + coords: Optional[Coordinates] = Coordinates(longitude=0.1, latitude=51.3) + + with patch(f"{_MODULE}.httpx.get") as mock_get: + mock_get.return_value = _response({"result": None}) + + # Act + result = client.nearby("BR6 6BS", coords) + + # Assert + assert result == ["BR6 6BS"] diff --git a/tests/repositories/comparable_properties/test_epc_comparable_properties_repository.py b/tests/repositories/comparable_properties/test_epc_comparable_properties_repository.py index 662d5b75..f9c6ecdc 100644 --- a/tests/repositories/comparable_properties/test_epc_comparable_properties_repository.py +++ b/tests/repositories/comparable_properties/test_epc_comparable_properties_repository.py @@ -126,3 +126,118 @@ def test_no_certs_in_the_postcode_yields_no_candidates() -> None: # Assert — no candidates, and the postcode was searched (normalisation/IO ran). assert candidates == [] assert client.searched_postcode == "LS6 1AA" + + +# --------------------------------------------------------------------------- +# Broadened cohort — candidates_near (ADR-0031 nearby-postcode broadening) +# --------------------------------------------------------------------------- + + +class _MultiPostcodeEpcClient: + """Serves a different cohort per postcode and records every search, so the + broadened walk's reach and ordering can be asserted.""" + + def __init__(self, by_postcode: dict[str, list[EpcSearchResult]]) -> None: + self._by_postcode = by_postcode + self.searched: list[str] = [] + + def search_by_postcode(self, postcode: str) -> list[EpcSearchResult]: + self.searched.append(postcode) + return self._by_postcode.get(postcode, []) + + def get_by_certificate_number(self, cert_num: str) -> EpcPropertyData: + return _epc() + + +class _FakeNearbyPostcodes: + """Returns a fixed nearest-first list and records the seed it was asked for.""" + + def __init__(self, postcodes: list[str]) -> None: + self._postcodes = postcodes + self.calls: list[tuple[str, Optional[Coordinates]]] = [] + + def nearby( + self, postcode: str, coordinates: Optional[Coordinates] = None + ) -> list[str]: + self.calls.append((postcode, coordinates)) + return self._postcodes + + +def test_candidates_near_aggregates_and_dedupes_across_nearby_postcodes() -> None: + # Arrange — three nearby postcodes; CERT-1 is re-lodged in two of them. + client = _MultiPostcodeEpcClient( + { + "P0": [_result("CERT-1", uprn=1)], + "P1": [_result("CERT-2", uprn=2), _result("CERT-1", uprn=1)], + "P2": [_result("CERT-3", uprn=3)], + } + ) + nearby = _FakeNearbyPostcodes(["P0", "P1", "P2"]) + repo = EpcComparablePropertiesRepository( + client, _FakeGeospatial({}), nearby_postcodes=nearby + ) + + # Act — no early-stop predicate, so the whole nearby set is visited. + candidates = repo.candidates_near("P0", None) + + # Assert — one candidate per distinct cert, all three postcodes searched. + certs = {c.certificate_number for c in candidates} + assert certs == {"CERT-1", "CERT-2", "CERT-3"} + assert client.searched == ["P0", "P1", "P2"] + + +def test_candidates_near_stops_early_once_enough_match() -> None: + # Arrange — the seed postcode alone already yields enough matches; the two + # further postcodes must not be fetched. + client = _MultiPostcodeEpcClient( + { + "P0": [_result(f"MATCH-{i}", uprn=i) for i in range(5)], + "P1": [_result("OTHER-1", uprn=99)], + "P2": [_result("OTHER-2", uprn=98)], + } + ) + nearby = _FakeNearbyPostcodes(["P0", "P1", "P2"]) + repo = EpcComparablePropertiesRepository( + client, _FakeGeospatial({}), nearby_postcodes=nearby + ) + + # Act + candidates = repo.candidates_near( + "P0", + None, + enough=lambda c: c.certificate_number.startswith("MATCH"), + minimum=5, + ) + + # Assert — walk halted after the seed; the further postcodes were never hit. + assert client.searched == ["P0"] + assert len(candidates) == 5 + + +def test_candidates_near_passes_coordinates_to_the_nearby_source() -> None: + # Arrange + here = Coordinates(longitude=0.1, latitude=51.3) + client = _MultiPostcodeEpcClient({"P0": []}) + nearby = _FakeNearbyPostcodes(["P0"]) + repo = EpcComparablePropertiesRepository( + client, _FakeGeospatial({}), nearby_postcodes=nearby + ) + + # Act + repo.candidates_near("P0", here) + + # Assert — the target's own coordinates seed the radius search. + assert nearby.calls == [("P0", here)] + + +def test_candidates_near_without_a_source_uses_only_the_seed() -> None: + # Arrange — no NearbyPostcodes configured (broadening unavailable). + client = _MultiPostcodeEpcClient({"P0": [_result("CERT-1", uprn=1)]}) + repo = EpcComparablePropertiesRepository(client, _FakeGeospatial({})) + + # Act + candidates = repo.candidates_near("P0", None) + + # Assert — degrades to the seed postcode alone. + assert client.searched == ["P0"] + assert [c.certificate_number for c in candidates] == ["CERT-1"] From de7fb94ff7be89abe0d5c4323d1c5a96f3c4e60e Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 23 Jun 2026 16:54:06 +0000 Subject: [PATCH 2/4] docs(adr): record nearby-postcode broadening (0034) + share HTTP retry primitive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes out the cohort-broadening work with its decision record and consolidates the retry plumbing. ADR-0034 documents broadening the EPC-Prediction cohort to the real unit postcodes nearest the target (via postcodes.io) when its own postcode holds no same-type comparable — extending ADR-0031 decision 5. Records why postcodes.io was chosen over council[] (whole-LA, no property_type in rows), a bulk Code-Point Open / ONSPD dataset, and the OS Places radius API, and the lazy / nearest-first early-stop / soft-fail policy. Broadening-specific docstrings now cite 0034. Retry consolidation: extract the EPC client's call_with_retry into a shared infrastructure/http_retry.py keyed off a generic TransientHttpError marker, so the mechanism (exponential backoff, Retry-After) is shared while each client keeps its own transient policy. EpcRateLimitError now subclasses TransientHttpError (still an EpcApiError); PostcodesIoClient routes through the same helper, raising TransientHttpError on 429/5xx and soft-failing to the seed once exhausted (the EPC client propagates instead). Direct tests for the shared helper; EPC + postcodes.io suites repointed at the shared sleep. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 2 +- ...ion-cohort-broadens-to-nearby-postcodes.md | 90 +++++++++++++ infrastructure/epc_client/_retry.py | 35 ----- .../epc_client/epc_client_service.py | 2 +- infrastructure/epc_client/exceptions.py | 12 +- infrastructure/http_retry.py | 60 +++++++++ .../postcodes_io/postcodes_io_client.py | 82 ++++++------ .../epc_comparable_properties_repository.py | 4 +- scripts/run_modelling_e2e.py | 2 +- .../infrastructure/epc_client/test_client.py | 8 +- .../postcodes_io/test_postcodes_io_client.py | 5 +- tests/infrastructure/test_http_retry.py | 126 ++++++++++++++++++ 12 files changed, 331 insertions(+), 97 deletions(-) create mode 100644 docs/adr/0034-epc-prediction-cohort-broadens-to-nearby-postcodes.md delete mode 100644 infrastructure/epc_client/_retry.py create mode 100644 infrastructure/http_retry.py create mode 100644 tests/infrastructure/test_http_retry.py diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index c3c16924..3c21421d 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -231,7 +231,7 @@ def handler(body: dict[str, Any], context: Any) -> None: def _broaden(target: PredictionTarget) -> list[ComparableProperty]: """The nearby-postcode cohort for a gated-out target — the real unit postcodes nearest it, walked until enough same-type comparables surface - (ADR-0031). Memoised per (postcode, property_type) so co-located + (ADR-0034). Memoised per (postcode, property_type) so co-located same-type misses share one walk.""" key = (target.postcode, target.property_type) if key not in _nearby_cohort_cache: diff --git a/docs/adr/0034-epc-prediction-cohort-broadens-to-nearby-postcodes.md b/docs/adr/0034-epc-prediction-cohort-broadens-to-nearby-postcodes.md new file mode 100644 index 00000000..29d58d45 --- /dev/null +++ b/docs/adr/0034-epc-prediction-cohort-broadens-to-nearby-postcodes.md @@ -0,0 +1,90 @@ +# EPC Prediction cohort broadens to nearby postcodes + +ADR-0029 sizes an EPC-less Property from the **Comparable Properties** in its +postcode; ADR-0031 decision 5 made `property_type` the **hard** cohort filter — a +flat is never sized from houses — and gated out any Property with no same-type +comparable. This records how we **broaden** the cohort when a Property's own +postcode is too sparse to fill, rather than gating it out. Resolved against a +real failure on branch `feature/e2e-runs`: property 718580, the only flat lodged +in postcode `BR6 6BS` (the other 8 certs were houses/bungalows), gated out and +failed its modelling subtask. + +## Status + +Accepted. Extends ADR-0029 / ADR-0031 decision 5 (the postcode cohort). + +## Context + +ADR-0031 decision 5 anticipated this — "an Ordnance Survey `postcode_search` +source can supply property type more broadly … wiring it is a later enhancement +that widens the eligible population." The gating it describes is correct (never +size a flat from a mixed-type cohort), but it makes prediction only as good as +the *own* postcode's lodged stock. A genuinely-isolated type (one flat among +houses) is unpredictable with no recourse, and a no-EPC + can't-predict Property +fails its subtask — which, over SQS, retries and dead-letters even though the +input data is permanently insufficient. + +The legacy engine handled this by **trimming the postcode** (`BR6 6BS` → `BR6 6B` +→ `BR6` …) and re-querying, because the old EPC API accepted partial postcodes. +The current gov EPC API (`api.get-energy-performance-data.communities.gov.uk`) +does **not**: its domestic search accepts only a *full* real postcode, `uprn`, +`council[]`, `constituency[]`, or `address` — confirmed against its OpenAPI spec. +There is no outcode/prefix, radius, or lat/long search. + +## Decisions + +### 1. Broaden to the real unit postcodes physically nearest the target + +When the own-postcode cohort yields no same-type member, the cohort is broadened +to the real unit postcodes around the target, resolved from its coordinates via +**postcodes.io**'s keyless `nearest` endpoint (already a trusted dependency in +`scripts/fetch_epc_prediction_dense_corpus.py`). Each nearby postcode is then +searched exactly as the own postcode is — so `property_type` stays the same hard +filter and ADR-0031 decision 5 is upheld; only the *reach* of the candidate pool +widens, never the selection rule. The hard gate still fires when even the +broadened cohort has no same-type comparable: broadening widens the eligible +population, it does not force a prediction. + +### 2. postcodes.io over `council[]` and a bulk centroid dataset + +- **`council[]`** (the only "wider than postcode" gov-API filter) returns the + *whole* local authority — tens of thousands of certs, many pages — and its + result rows carry **no `property_type`**, so narrowing to the target's type + would mean fetching every certificate. Rejected: cost is unbounded by distance + and dominated by per-cert fetches. +- **OS Code-Point Open / ONSPD → S3** would give an offline unit-postcode→ + coordinate table, but is a ~1.7M-row asset to ingest and host for a path that + fires rarely. Rejected as disproportionate; reconsider if postcodes.io's + availability becomes a liability. +- **OS Places radius API** needs an `OS_API_KEY` the modelling path does not + otherwise carry. Rejected to avoid a new secret. + +postcodes.io is keyless, bounded by radius (a tight neighbourhood, not an +authority), and reuses the existing per-postcode search unchanged. + +### 3. Lazy, nearest-first with early-stop, and soft-failing + +Broadening runs **only on gate-out** (a no-EPC Property whose own postcode lacks +a same-type comparable) — the common cases never pay for it. Nearby postcodes are +walked **nearest first** and the walk **stops early** once enough same-type +comparables surface, so a dense area resolves in one or two searches instead of +the whole radius. Results are memoised per `(postcode, property_type)`. The +postcodes.io call is **soft-failing**: any error (after the shared transient +retry) degrades to the seed postcode alone, so a flaky or unavailable +postcodes.io can never break prediction — it just declines to broaden. + +## Consequences + +- A new external dependency, postcodes.io, sits in the modelling/Lambda path. It + is keyless and isolated behind `PostcodesIoClient` (shared transient-retry + + soft-fail), but its availability and rate limits are now a (bounded) operational + concern. ADR-0031 decision 5's `postcode_search` lever remains an alternative. +- The `ComparablePropertiesRepository` gains `candidates_near` alongside + `candidates_for`; the predictor/handler try the own postcode first and broaden + only on an empty cohort. +- Prediction reaches across postcode boundaries, so the geo-distance weighting + ADR-0029 already applies (a comparable's vote decays with haversine distance) + becomes load-bearing for broadened cohorts, not just a refinement. +- `property_type` gating is unchanged: a Property with no same-type comparable + *anywhere nearby* is still gated out — now a genuine "no comparable stock" + signal rather than an artefact of a sparse single postcode. diff --git a/infrastructure/epc_client/_retry.py b/infrastructure/epc_client/_retry.py deleted file mode 100644 index e9f18dcb..00000000 --- a/infrastructure/epc_client/_retry.py +++ /dev/null @@ -1,35 +0,0 @@ -import time -from typing import Callable, Optional, TypeVar - -import httpx - -from infrastructure.epc_client.exceptions import EpcRateLimitError - -T = TypeVar("T") - - -def call_with_retry( - fn: Callable[[], T], - max_retries: int = 5, - backoff_base: float = 1.0, - backoff_multiplier: float = 2.0, - max_backoff: float = 60.0, -) -> T: - """Retry `fn` on transient EPC-API failures: HTTP 429 rate limits and - transport errors (read/connect timeouts, connection resets). A 429 honours - the server's `Retry-After`; transport errors back off exponentially. Non- - transient failures (other 4xx/5xx, mapping errors) propagate immediately.""" - last_exc: Optional[Exception] = None - for attempt in range(max_retries + 1): - try: - return fn() - except (EpcRateLimitError, httpx.TransportError) as exc: - last_exc = exc - if attempt < max_retries: - if isinstance(exc, EpcRateLimitError) and exc.retry_after is not None: - delay = exc.retry_after - else: - delay = backoff_base * (backoff_multiplier**attempt) - time.sleep(min(delay, max_backoff)) - assert last_exc is not None - raise last_exc diff --git a/infrastructure/epc_client/epc_client_service.py b/infrastructure/epc_client/epc_client_service.py index 111de1f5..692b10a2 100644 --- a/infrastructure/epc_client/epc_client_service.py +++ b/infrastructure/epc_client/epc_client_service.py @@ -10,7 +10,7 @@ from infrastructure.epc_client.exceptions import ( EpcNotFoundError, EpcRateLimitError, ) -from infrastructure.epc_client._retry import call_with_retry +from infrastructure.http_retry import call_with_retry from datatypes.epc.domain.epc_property_data import EpcPropertyData from datatypes.epc.domain.mapper import EpcPropertyDataMapper from datatypes.epc.search import EpcSearchResult diff --git a/infrastructure/epc_client/exceptions.py b/infrastructure/epc_client/exceptions.py index fb7d96fa..af28fb74 100644 --- a/infrastructure/epc_client/exceptions.py +++ b/infrastructure/epc_client/exceptions.py @@ -1,4 +1,4 @@ -from typing import Optional +from infrastructure.http_retry import TransientHttpError class EpcApiError(Exception): @@ -9,9 +9,7 @@ class EpcNotFoundError(EpcApiError): """Raised when the API returns 404.""" -class EpcRateLimitError(EpcApiError): - """Raised when the API returns 429 and all retries are exhausted.""" - - def __init__(self, message: str, retry_after: Optional[float] = None) -> None: - super().__init__(message) - self.retry_after = retry_after +class EpcRateLimitError(EpcApiError, TransientHttpError): + """Raised when the API returns 429. A ``TransientHttpError`` so the shared + ``call_with_retry`` retries it (honouring ``Retry-After``), while remaining an + ``EpcApiError`` for callers that catch the EPC hierarchy.""" diff --git a/infrastructure/http_retry.py b/infrastructure/http_retry.py new file mode 100644 index 00000000..aa3689a5 --- /dev/null +++ b/infrastructure/http_retry.py @@ -0,0 +1,60 @@ +"""Shared transient-failure retry for the HTTP source clients. + +The retry *mechanism* is generic; each client owns the *policy* of what counts as +transient. A caller signals "retry this" by raising ``TransientHttpError`` +(carrying any server-advised ``retry_after``); transport-level errors (read / +connect timeouts, connection resets) are always treated as transient. +``call_with_retry`` backs off exponentially between attempts — honouring +``retry_after`` when present — and re-raises the last error once attempts are +exhausted, leaving the caller to decide how to surface it (the EPC client lets it +propagate; postcodes.io soft-fails to the seed postcode). +""" + +from __future__ import annotations + +import time +from typing import Callable, Optional, TypeVar + +import httpx + +T = TypeVar("T") + + +class TransientHttpError(Exception): + """A failure worth retrying. ``retry_after`` is the server-advised delay (a + 429's ``Retry-After``), used in place of the computed backoff when present.""" + + def __init__(self, message: str, retry_after: Optional[float] = None) -> None: + super().__init__(message) + self.retry_after = retry_after + + +def call_with_retry( + fn: Callable[[], T], + max_retries: int = 5, + backoff_base: float = 1.0, + backoff_multiplier: float = 2.0, + max_backoff: float = 60.0, +) -> T: + """Retry ``fn`` on transient failures — ``TransientHttpError`` (e.g. a 429) + and ``httpx.TransportError`` (read/connect timeouts, connection resets) — + backing off exponentially, or by the error's ``retry_after`` when it carries + one. Non-transient failures propagate immediately; the last transient error + is re-raised once ``max_retries`` is exhausted.""" + last_exc: Optional[Exception] = None + for attempt in range(max_retries + 1): + try: + return fn() + except (TransientHttpError, httpx.TransportError) as exc: + last_exc = exc + if attempt < max_retries: + retry_after = ( + exc.retry_after if isinstance(exc, TransientHttpError) else None + ) + if retry_after is not None: + delay = retry_after + else: + delay = backoff_base * (backoff_multiplier**attempt) + time.sleep(min(delay, max_backoff)) + assert last_exc is not None + raise last_exc diff --git a/infrastructure/postcodes_io/postcodes_io_client.py b/infrastructure/postcodes_io/postcodes_io_client.py index 6128e51c..7216d1b1 100644 --- a/infrastructure/postcodes_io/postcodes_io_client.py +++ b/infrastructure/postcodes_io/postcodes_io_client.py @@ -15,12 +15,12 @@ broadening" rather than breaking prediction. from __future__ import annotations -import time from typing import Any, Optional import httpx from domain.geospatial.coordinates import Coordinates +from infrastructure.http_retry import TransientHttpError, call_with_retry class PostcodesIoClient: @@ -91,50 +91,44 @@ class PostcodesIoClient: return [row for row in payload if isinstance(row, dict)] def _call(self, path: str, params: Optional[dict[str, Any]]) -> Any: - """One GET against postcodes.io, retrying transient failures (transport - errors, 429s, 5xx) with exponential backoff. Returns the parsed - ``result`` payload, or None on a non-transient failure (e.g. an unknown - postcode's 404) or once retries are exhausted — broadening then falls - back to the seed alone.""" - for attempt in range(self.MAX_RETRIES + 1): - try: - response = httpx.get( - f"{self.BASE_URL}{path}", - params=params, - timeout=self.REQUEST_TIMEOUT, - ) - except httpx.TransportError: - if not self._sleep_before_retry(attempt, retry_after=None): - return None - continue - except httpx.HTTPError: - return None # non-transient client-side error (e.g. bad URL) - if self._is_transient(response.status_code): - if not self._sleep_before_retry( - attempt, retry_after=self._retry_after(response) - ): - return None - continue - if not response.is_success: - return None - try: - body: Any = response.json() - except ValueError: - return None - return body.get("result") if isinstance(body, dict) else None - return None + """The parsed ``result`` payload for a postcodes.io GET, retried on + transient failures via the shared ``call_with_retry``, or None on a + non-transient failure (e.g. an unknown postcode's 404) or once retries are + exhausted — broadening then falls back to the seed alone. The soft-fail is + the difference from the EPC client, which lets the error propagate.""" + try: + return call_with_retry( + lambda: self._fetch(path, params), + max_retries=self.MAX_RETRIES, + backoff_base=self.BACKOFF_BASE, + backoff_multiplier=self.BACKOFF_MULTIPLIER, + max_backoff=self.MAX_BACKOFF, + ) + except (TransientHttpError, httpx.HTTPError): + return None - def _sleep_before_retry(self, attempt: int, retry_after: Optional[float]) -> bool: - """Sleep before the next attempt and report whether one remains; on the - final attempt, return False so the caller soft-fails instead of looping.""" - if attempt >= self.MAX_RETRIES: - return False - if retry_after is not None: - delay = retry_after - else: - delay = self.BACKOFF_BASE * (self.BACKOFF_MULTIPLIER**attempt) - time.sleep(min(delay, self.MAX_BACKOFF)) - return True + def _fetch(self, path: str, params: Optional[dict[str, Any]]) -> Any: + """One GET. Raises ``TransientHttpError`` on a 429/5xx (and lets + ``httpx.TransportError`` propagate) so ``call_with_retry`` retries it; + returns None for a non-transient non-success (e.g. 404) or unparseable + body.""" + response = httpx.get( + f"{self.BASE_URL}{path}", + params=params, + timeout=self.REQUEST_TIMEOUT, + ) + if self._is_transient(response.status_code): + raise TransientHttpError( + f"postcodes.io {response.status_code} on {path}", + retry_after=self._retry_after(response), + ) + if not response.is_success: + return None + try: + body: Any = response.json() + except ValueError: + return None + return body.get("result") if isinstance(body, dict) else None @staticmethod def _is_transient(status_code: int) -> bool: diff --git a/repositories/comparable_properties/epc_comparable_properties_repository.py b/repositories/comparable_properties/epc_comparable_properties_repository.py index b49a6948..8d712307 100644 --- a/repositories/comparable_properties/epc_comparable_properties_repository.py +++ b/repositories/comparable_properties/epc_comparable_properties_repository.py @@ -46,7 +46,7 @@ class CohortGeospatial(Protocol): class NearbyPostcodes(Protocol): """Resolves the real unit postcodes physically near a seed postcode (e.g. `PostcodesIoClient`). The gov EPC API cannot search by radius, so this is how - the cohort reaches beyond the target's own postcode (ADR-0031).""" + the cohort reaches beyond the target's own postcode (ADR-0034).""" def nearby( self, postcode: str, coordinates: Optional[Coordinates] = None @@ -83,7 +83,7 @@ class EpcComparablePropertiesRepository(ComparablePropertiesRepository): minimum: int = _DEFAULT_MINIMUM_COHORT, ) -> list[ComparableProperty]: """The broadened cohort: candidates drawn from the real unit postcodes - nearest ``postcode`` (ADR-0031), for when the target's own postcode holds + nearest ``postcode`` (ADR-0034), for when the target's own postcode holds no same-type comparables. Postcodes are visited nearest first and each candidate is deduped by certificate number across them. diff --git a/scripts/run_modelling_e2e.py b/scripts/run_modelling_e2e.py index 27ad473c..1a1964ce 100644 --- a/scripts/run_modelling_e2e.py +++ b/scripts/run_modelling_e2e.py @@ -709,7 +709,7 @@ def main() -> None: def broaden(target: PredictionTarget) -> list[ComparableProperty]: # Broadened cohort for a gated-out target: the nearest real postcodes, - # walked until enough same-type comparables surface (ADR-0031). Memoised + # walked until enough same-type comparables surface (ADR-0034). Memoised # per (postcode, property_type). key = (target.postcode, target.property_type) if key not in _nearby_cohort_cache: diff --git a/tests/infrastructure/epc_client/test_client.py b/tests/infrastructure/epc_client/test_client.py index 190513ce..97a0403b 100644 --- a/tests/infrastructure/epc_client/test_client.py +++ b/tests/infrastructure/epc_client/test_client.py @@ -78,7 +78,7 @@ def test_429_retry_after_header_drives_sleep_duration( _mock_response(200, cert_response), ] with patch("httpx.get", side_effect=responses), patch( - "infrastructure.epc_client._retry.time.sleep" + "infrastructure.http_retry.time.sleep" ) as mock_sleep: epc_service.get_by_certificate_number("CERT-001") @@ -100,7 +100,7 @@ def test_429_without_retry_after_uses_exponential_backoff( _mock_response(200, cert_response), ] with patch("httpx.get", side_effect=responses), patch( - "infrastructure.epc_client._retry.time.sleep" + "infrastructure.http_retry.time.sleep" ) as mock_sleep: epc_service.get_by_certificate_number("CERT-001") @@ -121,7 +121,7 @@ def test_429_malformed_retry_after_falls_back_to_backoff( _mock_response(200, cert_response), ] with patch("httpx.get", side_effect=responses), patch( - "infrastructure.epc_client._retry.time.sleep" + "infrastructure.http_retry.time.sleep" ) as mock_sleep: epc_service.get_by_certificate_number("CERT-001") @@ -140,7 +140,7 @@ def test_429_retry_after_capped_by_max_backoff(epc_service, rdsap_21_0_1_cert): _mock_response(200, cert_response), ] with patch("httpx.get", side_effect=responses), patch( - "infrastructure.epc_client._retry.time.sleep" + "infrastructure.http_retry.time.sleep" ) as mock_sleep: epc_service.get_by_certificate_number("CERT-001") diff --git a/tests/infrastructure/postcodes_io/test_postcodes_io_client.py b/tests/infrastructure/postcodes_io/test_postcodes_io_client.py index 708ac964..4d5e34a0 100644 --- a/tests/infrastructure/postcodes_io/test_postcodes_io_client.py +++ b/tests/infrastructure/postcodes_io/test_postcodes_io_client.py @@ -18,8 +18,9 @@ _MODULE = "infrastructure.postcodes_io.postcodes_io_client" @pytest.fixture(autouse=True) def _no_sleep() -> Iterator[MagicMock]: - """Never actually sleep during backoff — just record the calls.""" - with patch(f"{_MODULE}.time.sleep") as sleep: + """Never actually sleep during backoff (the shared retry owns the sleep) — + just record the calls.""" + with patch("infrastructure.http_retry.time.sleep") as sleep: yield sleep diff --git a/tests/infrastructure/test_http_retry.py b/tests/infrastructure/test_http_retry.py new file mode 100644 index 00000000..ec1e229f --- /dev/null +++ b/tests/infrastructure/test_http_retry.py @@ -0,0 +1,126 @@ +"""call_with_retry — the shared transient-failure retry both HTTP source clients +use. Generic mechanism (exponential backoff, Retry-After), per-client policy +(what gets raised as transient).""" + +from __future__ import annotations + +from typing import Iterator +from unittest.mock import MagicMock, call, patch + +import httpx +import pytest + +from infrastructure.http_retry import TransientHttpError, call_with_retry + + +@pytest.fixture(autouse=True) +def _no_sleep() -> Iterator[MagicMock]: + with patch("infrastructure.http_retry.time.sleep") as sleep: + yield sleep + + +def test_returns_immediately_on_success(_no_sleep: MagicMock) -> None: + # Act + result = call_with_retry(lambda: 42) + + # Assert + assert result == 42 + _no_sleep.assert_not_called() + + +def test_retries_a_transient_error_then_succeeds(_no_sleep: MagicMock) -> None: + # Arrange — fail twice transiently, then succeed. + calls = {"n": 0} + + def fn() -> str: + calls["n"] += 1 + if calls["n"] < 3: + raise TransientHttpError("429") + return "ok" + + # Act + result = call_with_retry(fn) + + # Assert — exponential backoff between the three attempts. + assert result == "ok" + assert _no_sleep.call_args_list == [call(1.0), call(2.0)] + + +def test_honours_retry_after_over_the_computed_backoff(_no_sleep: MagicMock) -> None: + # Arrange + raised = {"done": False} + + def fn() -> str: + if not raised["done"]: + raised["done"] = True + raise TransientHttpError("429", retry_after=7.0) + return "ok" + + # Act + call_with_retry(fn) + + # Assert + _no_sleep.assert_called_once_with(7.0) + + +def test_retries_transport_errors(_no_sleep: MagicMock) -> None: + # Arrange + attempts = {"n": 0} + + def fn() -> str: + attempts["n"] += 1 + if attempts["n"] == 1: + raise httpx.ReadTimeout("slow") + return "ok" + + # Act + result = call_with_retry(fn) + + # Assert + assert result == "ok" + assert attempts["n"] == 2 + + +def test_reraises_the_last_transient_error_once_exhausted( + _no_sleep: MagicMock, +) -> None: + # Arrange — always transient. + def fn() -> str: + raise TransientHttpError("persistent 429") + + # Act / Assert + with pytest.raises(TransientHttpError, match="persistent 429"): + call_with_retry(fn, max_retries=2) + assert _no_sleep.call_count == 2 + + +def test_non_transient_error_propagates_without_retry(_no_sleep: MagicMock) -> None: + # Arrange + attempts = {"n": 0} + + def fn() -> str: + attempts["n"] += 1 + raise ValueError("not transient") + + # Act / Assert + with pytest.raises(ValueError, match="not transient"): + call_with_retry(fn) + assert attempts["n"] == 1 + _no_sleep.assert_not_called() + + +def test_retry_after_is_capped_by_max_backoff(_no_sleep: MagicMock) -> None: + # Arrange + raised = {"done": False} + + def fn() -> str: + if not raised["done"]: + raised["done"] = True + raise TransientHttpError("429", retry_after=999.0) + return "ok" + + # Act + call_with_retry(fn, max_backoff=60.0) + + # Assert + _no_sleep.assert_called_once_with(60.0) From 2ee1b35dcab30a4ba5bbb1a1d55618dc4c596bd1 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 23 Jun 2026 17:21:03 +0000 Subject: [PATCH 3/4] fix(modelling_e2e): persist Baseline Performance for lodged properties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The handler wrote epc/spatial/solar/plan and marked the property modelled, but never established its Baseline Performance — so no row was created in property_baseline_performance for any property modelled through the Lambda (noticed on portfolio 796 / scenario 1268 / property 727218, a lodged property). Mirror the e2e runner: after the plan UoW commits (so the EPC is persisted for the orchestrator to re-hydrate), run PropertyBaselineOrchestrator for lodged properties. Predicted properties have no lodged figures and no persisted EPC, so they are skipped — consistent with the e2e runner and the ara_first_run Baseline stage. Verified 727218's baseline pipeline builds end-to-end in-memory (lodged_performance → CalculatorRebaseliner → bill → PropertyBaselinePerformance, reason pre_sap10). Tests: lodged path asserts the orchestrator runs once; prediction path asserts it does not. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 25 +++++++++++++++ .../modelling_e2e/test_handler.py | 31 +++++++++++++++---- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 2bb5e245..9a8c8e73 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -51,8 +51,13 @@ from domain.geospatial.coordinates import Coordinates from domain.geospatial.planning_restrictions import PlanningRestrictions from domain.geospatial.spatial_reference import SpatialReference from domain.property.property import Property, PropertyIdentity +from domain.property_baseline.calculator_rebaseliner import CalculatorRebaseliner +from domain.sap10_calculator.calculator import Sap10Calculator from domain.tasks.tasks import Source from harness.console import run_modelling +from orchestration.property_baseline_orchestrator import ( + PropertyBaselineOrchestrator, +) from infrastructure.epc_client.epc_client_service import EpcClientService from infrastructure.postcodes_io.postcodes_io_client import PostcodesIoClient from infrastructure.postgres.config import PostgresConfig @@ -72,6 +77,9 @@ from repositories.geospatial.geospatial_s3_repository import ( GeospatialS3Repository, ParquetReader, ) +from repositories.fuel_rates.fuel_rates_static_file_repository import ( + FuelRatesStaticFileRepository, +) from repositories.postgres_unit_of_work import PostgresUnitOfWork from repositories.product.composite_product_repository import ( catalogue_with_off_catalogue_overrides, @@ -266,6 +274,16 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: ) return _nearby_cohort_cache[key] + # Re-establishes each lodged Property's Baseline Performance from the just- + # persisted EPC (one UoW per property, committed after the Plan's). Predicted + # Properties have no lodged figures, so they get no baseline (mirrors the e2e + # runner and the ara_first_run Baseline stage). + baseline_orchestrator = PropertyBaselineOrchestrator( + unit_of_work=lambda: PostgresUnitOfWork(lambda: Session(engine)), + rebaseliner=CalculatorRebaseliner(Sap10Calculator()), + fuel_rates=FuelRatesStaticFileRepository(), + ) + read_session = Session(engine) try: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] @@ -399,6 +417,13 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: uow.commit() logger.info(f"property={property_id} plan saved") + # Baseline Performance is re-established from the persisted EPC, so + # it runs after the Plan UoW commits. Only lodged Properties have + # the lodged figures the Baseline reads; predicted ones are skipped. + if epc is not None: + baseline_orchestrator.run([property_id]) + logger.info(f"property={property_id} baseline saved") + except Exception as error: # noqa: BLE001 logger.error( f"property={property_id}: {type(error).__name__}: {error}", diff --git a/tests/applications/modelling_e2e/test_handler.py b/tests/applications/modelling_e2e/test_handler.py index d9a93184..6929d13f 100644 --- a/tests/applications/modelling_e2e/test_handler.py +++ b/tests/applications/modelling_e2e/test_handler.py @@ -8,7 +8,7 @@ is needed. One test per distinct behaviour path. from __future__ import annotations from contextlib import ExitStack -from typing import Any +from typing import Any, Iterator from unittest.mock import MagicMock, call, patch import pytest @@ -84,6 +84,16 @@ def _clear_cohort_cache() -> None: h._nearby_cohort_cache.clear() +@pytest.fixture(autouse=True) +def _baseline_orchestrator() -> Iterator[MagicMock]: + """Patch the Baseline orchestrator for every test — construction stays cheap + and ``.run`` is a no-op mock the baseline tests assert against.""" + with patch( + "applications.modelling_e2e.handler.PropertyBaselineOrchestrator" + ) as orchestrator: + yield orchestrator + + # --------------------------------------------------------------------------- # Trigger body validation # --------------------------------------------------------------------------- @@ -117,9 +127,12 @@ def test_trigger_body_rejects_missing_property_ids() -> None: # --------------------------------------------------------------------------- -def test_lodged_epc_path_saves_epc_plan_and_marks_modelled() -> None: +def test_lodged_epc_path_saves_epc_plan_and_marks_modelled( + _baseline_orchestrator: MagicMock, +) -> None: """When get_by_uprn returns an EPC the handler saves it, saves the plan, - and marks the property as modelled — all inside one UoW per property.""" + marks the property as modelled — all inside one UoW per property — and + re-establishes its Baseline Performance after the plan commits.""" # Arrange mock_engine = _engine_mock([PROPERTY_ID], [UPRN], [POSTCODE]) mock_epc = MagicMock() @@ -197,6 +210,8 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled() -> None: PROPERTY_ID, has_recommendations=False ) mock_uow.commit.assert_called_once() + # Baseline Performance is re-established for the lodged property. + _baseline_orchestrator.return_value.run.assert_called_once_with([PROPERTY_ID]) def test_skipped_cohort_certs_are_surfaced_in_the_outputs() -> None: @@ -297,9 +312,12 @@ def test_skipped_cohort_certs_are_surfaced_in_the_outputs() -> None: # --------------------------------------------------------------------------- -def test_prediction_path_saves_plan_without_epc_save() -> None: +def test_prediction_path_saves_plan_without_epc_save( + _baseline_orchestrator: MagicMock, +) -> None: """When get_by_uprn returns None the handler synthesises an EPC via - prediction and saves the plan — but never calls epc.save.""" + prediction and saves the plan — but never calls epc.save, and (having no + lodged figures) never establishes a Baseline.""" # Arrange mock_engine = _engine_mock([PROPERTY_ID], [UPRN], [POSTCODE]) mock_plan = _plan_mock() @@ -399,10 +417,11 @@ def test_prediction_path_saves_plan_without_epc_save() -> None: # Act _call_handler(_BODY) - # Assert — epc.save NOT called (no lodged cert), plan IS saved + # Assert — epc.save NOT called (no lodged cert), plan IS saved, no baseline mock_uow.epc.save.assert_not_called() mock_uow.plan.save.assert_called_once() mock_uow.commit.assert_called_once() + _baseline_orchestrator.return_value.run.assert_not_called() # --------------------------------------------------------------------------- From 290097b1c77433133a80702c58be80b27e12c1da Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 23 Jun 2026 17:52:19 +0000 Subject: [PATCH 4/4] Record per-property failure detail in modelling_e2e subtask outputs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a property failed, the handler recorded only its bare property_id and raised RuntimeError("failed property_ids: [...]"). That string is what SubTask.fail persists into the subtask outputs.error column, so a failed run told you which property failed but never why — forcing a CloudWatch lookup. The per-property catch now captures property_id, uprn, error_type, and the error message, and the raised RuntimeError embeds those as JSON so the subtask outputs column is parseable directly. query_failed_modelling_e2e.py reads that outputs.error into a new Error column in its report. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 31 ++++++++++++++---- scripts/query_failed_modelling_e2e.py | 46 ++++++++++++++++++++------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 9a8c8e73..0f2c4fe6 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -25,6 +25,7 @@ from __future__ import annotations import dataclasses import io +import json import os from collections.abc import Callable from typing import Any, Optional, cast @@ -289,7 +290,7 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] products = catalogue_with_off_catalogue_overrides(read_session) - errors: list[int] = [] + failures: list[dict[str, Any]] = [] for property_id in property_ids: try: @@ -426,10 +427,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: except Exception as error: # noqa: BLE001 logger.error( - f"property={property_id}: {type(error).__name__}: {error}", + f"property={property_id} uprn={uprns.get(property_id)}: " + f"{type(error).__name__}: {error}", exc_info=True, ) - errors.append(property_id) + failures.append( + { + "property_id": property_id, + "uprn": uprns.get(property_id), + "error_type": type(error).__name__, + "error": str(error), + } + ) # Cohort certs the mapper could not consume were skipped (not aborted on) # so prediction could proceed; surface them — with cert numbers — in the @@ -444,10 +453,20 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: f"{[s['certificate_number'] for s in skipped_certs]}" ) - if errors: - message = f"failed property_ids: {errors}" + if failures: + failed_ids = [f["property_id"] for f in failures] + # Persisted verbatim into the subtask's outputs.error (via + # SubTask.fail), so include each property's error type + message — + # not just the IDs — to make failed runs diagnosable without + # cross-referencing CloudWatch. + message = ( + f"failed property_ids: {failed_ids}; " + f"details: {json.dumps(failures)}" + ) if skipped_certs: - message += f"; skipped_unmappable_cohort_certs: {skipped_certs}" + message += ( + f"; skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}" + ) raise RuntimeError(message) return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None diff --git a/scripts/query_failed_modelling_e2e.py b/scripts/query_failed_modelling_e2e.py index a4cca69f..db684023 100644 --- a/scripts/query_failed_modelling_e2e.py +++ b/scripts/query_failed_modelling_e2e.py @@ -11,6 +11,7 @@ from __future__ import annotations import json import sys from pathlib import Path +from typing import Any, cast _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) @@ -30,6 +31,7 @@ with engine.connect() as conn: st.id AS subtask_id, st.task_id, st.inputs, + st.outputs, st.updated_at FROM sub_task st JOIN tasks t ON t.id = st.task_id @@ -45,19 +47,32 @@ with engine.connect() as conn: ) exit(0) + def _error_text(outputs_raw: object) -> str: + """Pull the persisted failure reason out of the subtask outputs JSON.""" + try: + outputs: Any = ( + json.loads(outputs_raw) + if isinstance(outputs_raw, str) + else (outputs_raw or {}) + ) + except Exception: + return str(outputs_raw or "") + if isinstance(outputs, dict): + return str(cast("dict[str, Any]", outputs).get("error", "")) + return "" + # Collect all property_ids across all rows all_property_ids: list[int] = [] - parsed: list[tuple[str, str, list[int], str, str]] = [] - for subtask_id, task_id, inputs_raw, updated_at in subtask_rows: + parsed: list[tuple[str, str, list[int], str, str, str]] = [] + for subtask_id, task_id, inputs_raw, outputs_raw, updated_at in subtask_rows: try: - inputs = ( + inputs: Any = ( json.loads(inputs_raw) if isinstance(inputs_raw, str) else (inputs_raw or {}) ) - property_ids: list[int] = [ - int(p) for p in (inputs.get("property_ids") or []) - ] + raw_ids = cast("list[Any]", cast("dict[str, Any]", inputs).get("property_ids") or []) + property_ids: list[int] = [int(p) for p in raw_ids] except Exception: property_ids = [] parsed.append( @@ -67,6 +82,7 @@ with engine.connect() as conn: property_ids, str(updated_at), inputs_raw or "", + _error_text(outputs_raw), ) ) all_property_ids.extend(property_ids) @@ -80,23 +96,29 @@ with engine.connect() as conn: ).fetchall() uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows} +def _cell(value: str) -> str: + """Escape table-breaking characters so multi-line errors stay on one row.""" + return value.replace("|", "\\|").replace("\n", "
") + + lines: list[str] = [ "# Failed modelling_e2e Subtasks\n", - f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |", - f"|-----------|---------|------------|-------------|------|--------|", + f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Error | Inputs |", + f"|-----------|---------|------------|-------------|------|-------|--------|", ] -for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed: - inputs_cell = (inputs_raw or "").replace("|", "\\|") +for subtask_id, task_id, property_ids, updated_at, inputs_raw, error in parsed: + inputs_cell = _cell(inputs_raw or "") + error_cell = _cell(error or "") if property_ids: for pid in property_ids: uprn = uprn_map.get(pid, "unknown") lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {error_cell} | {inputs_cell} |" ) else: lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | — | — | {error_cell} | {inputs_cell} |" ) _OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")