neighbouring properties added

This commit is contained in:
Jun-te Kim 2026-06-22 14:38:00 +00:00
parent 2afa7acea4
commit 9c89a0e680
8 changed files with 288 additions and 62 deletions

View file

@ -46,9 +46,10 @@ class Property:
# neither a lodged EPC nor Site Notes; a real source always wins.
predicted_epc: Optional[EpcPropertyData] = None
# Resolved Landlord Overrides as Simulation Overlays, folded onto the lodged
# EPC to form the Effective EPC (ADR-0032). Empty when the Property has no
# overrides — the EPC is then returned unchanged. Only applied on the
# `epc_with_overlay` path; never when Site Notes are the source.
# OR neighbour-synthesised EPC to form the Effective EPC (ADR-0032). Empty
# when the Property has no overrides — the EPC is then returned unchanged.
# Applied on the `epc_with_overlay` and `predicted` paths; never when Site
# Notes are the source.
landlord_overrides: Sequence[EpcSimulation] = field(default_factory=tuple)
# The current open-market value (a Property Valuation) — externally sourced
# and mostly absent; feeds the Plan's Valuation Uplift £ forms (ADR-0018).
@ -59,11 +60,13 @@ class Property:
@property
def source_path(self) -> SourcePath:
"""Which of the two disjoint source paths models this Property (ADR-0001).
"""Which of the three disjoint source paths models this Property (ADR-0001).
Site Notes alone, or the public EPC (with Landlord Overrides, once that
slice lands). When both exist the newer wins (Recency Tie-Break); on an
equal date the survey wins, as it reflects on-site observation.
Site Notes, or the public EPC (with Landlord Overrides folded on), or
as a last resort when neither real source exists a neighbour-synthesised
EPC (EPC Prediction, ADR-0031). When both Site Notes and an EPC exist the
newer wins (Recency Tie-Break); on an equal date the survey wins, as it
reflects on-site observation. A real source always beats the prediction.
"""
if self.site_notes is not None and self.epc is not None:
epc_date = self.epc.registration_date or self.epc.inspection_date
@ -85,19 +88,26 @@ class Property:
def effective_epc(self) -> EpcPropertyData:
"""The EpcPropertyData the modelling pipeline scores against.
Path 1: the Site Notes' surveyed data. Path 2: the public EPC with any
Landlord Overrides folded on as Simulation Overlays (ADR-0032) returned
as-is when there are none. Path 3: a neighbour-synthesised EPC (EPC
Prediction gap-fill, ADR-0031), used only when neither real source is
present.
Path 1: the Site Notes' surveyed data.
Path 2: the public EPC with any Landlord Overrides folded on as Simulation Overlays (ADR-0032) returned
as-is when there are none.
Path 3: a neighbour-synthesised EPC (EPC Prediction gap-fill, ADR-0031), likewise with any Landlord Overrides
folded on: the cohort fills the unknown fields, the landlord's known
facts (wall/roof/glazing/heating/age) correct them. Used only when
neither real source is present.
"""
if self.source_path == "site_notes":
assert self.site_notes is not None
return self.site_notes.to_epc_property_data()
if self.source_path == "predicted":
assert self.predicted_epc is not None
return self.predicted_epc
return self._with_overrides(self.predicted_epc)
assert self.epc is not None
return self._with_overrides(self.epc)
def _with_overrides(self, epc: EpcPropertyData) -> EpcPropertyData:
"""``epc`` with any Landlord Overrides folded on as Simulation Overlays,
or unchanged when there are none (ADR-0032)."""
if self.landlord_overrides:
return apply_simulations(self.epc, self.landlord_overrides)
return self.epc
return apply_simulations(epc, self.landlord_overrides)
return epc

View file

@ -1,5 +1,7 @@
import time
from typing import Callable, TypeVar
from typing import Callable, Optional, TypeVar
import httpx
from infrastructure.epc_client.exceptions import EpcRateLimitError
@ -13,16 +15,21 @@ def call_with_retry(
backoff_multiplier: float = 2.0,
max_backoff: float = 60.0,
) -> T:
last_exc: EpcRateLimitError | None = None
"""Retry `fn` on transient EPC-API failures: HTTP 429 rate limits and
transport errors (read/connect timeouts, connection resets). A 429 honours
the server's `Retry-After`; transport errors back off exponentially. Non-
transient failures (other 4xx/5xx, mapping errors) propagate immediately."""
last_exc: Optional[Exception] = None
for attempt in range(max_retries + 1):
try:
return fn()
except EpcRateLimitError as exc:
except (EpcRateLimitError, httpx.TransportError) as exc:
last_exc = exc
if attempt < max_retries:
if exc.retry_after is not None:
if isinstance(exc, EpcRateLimitError) and exc.retry_after is not None:
delay = exc.retry_after
else:
delay = backoff_base * (backoff_multiplier ** attempt)
delay = backoff_base * (backoff_multiplier**attempt)
time.sleep(min(delay, max_backoff))
raise last_exc # type: ignore[misc]
assert last_exc is not None
raise last_exc

View file

@ -55,8 +55,9 @@ from __future__ import annotations
import argparse
import os
import sys
import time
from pathlib import Path
from typing import Any, Optional
from typing import Any, Callable, Optional
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
@ -72,6 +73,15 @@ from repositories.property.landlord_override_overlays import ( # noqa: E402
from repositories.property.property_overrides_postgres_reader import ( # noqa: E402
PropertyOverridesPostgresReader,
)
from domain.epc_prediction.comparable_properties import ( # noqa: E402
ComparableProperty,
select_comparables,
)
from domain.epc_prediction.epc_prediction import EpcPrediction # noqa: E402
from domain.epc_prediction.prediction_target import ( # noqa: E402
build_prediction_target,
)
from domain.geospatial.coordinates import Coordinates # noqa: E402
from domain.geospatial.planning_restrictions import PlanningRestrictions # noqa: E402
from domain.geospatial.spatial_reference import SpatialReference # noqa: E402
from domain.modelling.considered_measures import ( # noqa: E402
@ -88,9 +98,15 @@ from infrastructure.solar.google_solar_api_client import ( # noqa: E402
BuildingInsightsNotFoundError,
GoogleSolarApiClient,
)
from repositories.comparable_properties.epc_comparable_properties_repository import ( # noqa: E402
EpcComparablePropertiesRepository,
)
from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
)
from repositories.property.override_backed_prediction_attributes_reader import ( # noqa: E402
OverrideBackedPredictionAttributesReader,
)
from repositories.product.product_postgres_repository import ( # noqa: E402
ProductPostgresRepository,
)
@ -138,6 +154,10 @@ def _solar_insights_for(
)
except BuildingInsightsNotFoundError:
return None # no Google solar coverage at this point — model without it
# A transient Solar failure (timeout/reset) is NOT swallowed: it propagates so
# the property is marked ERROR and the wrapper's retry sweep re-runs it later
# when Solar recovers. We must not silently model a coverage-having property
# without its solar leg.
def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[int]]:
@ -150,6 +170,18 @@ def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[in
return {int(pid): (int(uprn) if uprn is not None else None) for pid, uprn in rows}
def _postcodes_for(engine: Engine, property_ids: list[int]) -> dict[int, str]:
"""Read each Property's postcode from the DB (read-only). Needed to find the
EPC-Prediction cohort (the postcode's other lodged certs) and to seed the
PredictionTarget when a Property has no EPC."""
with engine.connect() as conn:
rows = conn.execute(
text("SELECT id, postcode FROM property WHERE id = ANY(:ids)"),
{"ids": property_ids},
).fetchall()
return {int(pid): (postcode or "") for pid, postcode in rows}
def _dump_overrides(engine: Engine, property_ids: list[int]) -> None:
"""Print each target Property's ``property_overrides`` rows (read-only), so the
Landlord Overrides folded into the Effective EPC are visible before modelling."""
@ -304,16 +336,19 @@ def _persist(
uprn: int,
portfolio_id: int,
scenario: Scenario,
epc: EpcPropertyData,
epc: Optional[EpcPropertyData],
spatial: Optional[SpatialReference],
solar_insights: Optional[dict[str, Any]],
plan: Plan,
) -> None:
"""Write the run's inputs (EPC + spatial + solar) and the computed Plan to
the DB in one Unit of Work, then commit. ``PlanPostgresRepository`` replaces
any existing Plan for ``(property_id, scenario.id)`` (idempotent re-run)."""
any existing Plan for ``(property_id, scenario.id)`` (idempotent re-run). A
predicted Property has no lodged EPC to store (``epc is None``), so only the
spatial/solar inputs and the Plan are persisted for it."""
with PostgresUnitOfWork(lambda: Session(engine)) as uow:
uow.epc.save(epc, property_id=property_id, portfolio_id=portfolio_id)
if epc is not None:
uow.epc.save(epc, property_id=property_id, portfolio_id=portfolio_id)
if spatial is not None:
uow.spatial.save(uprn, spatial)
# The live `solar` table is keyed by UPRN and needs the fetch's
@ -343,6 +378,46 @@ def _persist(
uow.commit()
def _predict_epc(
*,
property_id: int,
uprn: int,
postcode: str,
portfolio_id: int,
attributes_reader: OverrideBackedPredictionAttributesReader,
coordinates: Optional[Coordinates],
cohort_for: Callable[[str], list[ComparableProperty]],
predictor: EpcPrediction,
) -> Optional[EpcPropertyData]:
"""Synthesise an EpcPropertyData for an EPC-less Property from its postcode
cohort (EPC Prediction Path 3, ADR-0031), or None when the Property is
ineligible (``property_type`` unresolvable) or no comparable neighbours exist.
The cohort is found by POSTCODE, so a wrong postcode on the property row
yields the wrong neighbours a prediction is only as good as the postcode it
is given."""
attributes = attributes_reader.attributes_for(property_id)
identity = PropertyIdentity(
portfolio_id=portfolio_id, postcode=postcode, address="", uprn=uprn
)
target = build_prediction_target(identity, coordinates, attributes)
if target is None:
return None # property_type unresolvable — gated out of prediction
comparables = select_comparables(target, cohort_for(target.postcode))
if not comparables.members:
return None # no comparable neighbours in the postcode
predicted = predictor.predict(target, comparables)
# The calculator needs a MAIN building part; a cohort whose template carries
# none (e.g. a malformed flat record) yields an unscoreable picture, so reject
# it as not-predictable rather than letting the calculator StopIteration.
if not any(
part.identifier is BuildingPartIdentifier.MAIN
for part in predicted.sap_building_parts
):
return None
return predicted
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
@ -384,11 +459,31 @@ def main() -> None:
action="store_true",
help="skip the live Google Solar fetch (no Solar PV Options)",
)
parser.add_argument(
"--out-prefix",
default=None,
help="write outputs to <prefix>.md / <prefix>.csv / <prefix>_candidates.csv "
"(parent dirs created) instead of ./modelling_e2e.*; lets batched runs "
"keep separate, durable output files",
)
args = parser.parse_args()
if args.persist and (args.scenario_id is None or args.portfolio_id is None):
parser.error("--persist requires --scenario-id and --portfolio-id")
if args.out_prefix:
_base = Path(args.out_prefix)
_base.parent.mkdir(parents=True, exist_ok=True)
md_path = _base.with_suffix(".md")
csv_path = _base.with_suffix(".csv")
candidates_path = _base.parent / f"{_base.name}_candidates.csv"
else:
md_path, csv_path, candidates_path = (
_MARKDOWN_PATH,
_CSV_PATH,
_CANDIDATES_CSV_PATH,
)
load_env(ENV_PATH)
# The new gov EPC API (Bearer) authenticates with OPEN_EPC_API_TOKEN — the
# name is misleading; EPC_AUTH_TOKEN is dead (403). Verified against the
@ -401,10 +496,26 @@ def main() -> None:
_parse_measures(args.measures), _parse_measures(args.exclude_measures)
)
uprns = _uprns_for(engine, args.property_ids)
postcodes = _postcodes_for(engine, args.property_ids)
# Landlord Overrides are read from property_overrides and folded onto the lodged
# EPC to form the Effective EPC the calculator scores (ADR-0032).
overrides_reader = PropertyOverridesPostgresReader(lambda: Session(engine))
_dump_overrides(engine, args.property_ids)
# EPC Prediction (Path 3, ADR-0031): when a Property has no lodged EPC, an
# EpcPropertyData is synthesised from its postcode cohort. The cohort comes
# from the live EPC API (search-by-postcode + per-cert fetch), memoised per
# postcode so co-located missing Properties don't refetch the same cohort.
prediction_attributes = OverrideBackedPredictionAttributesReader(overrides_reader)
comparables_repo = EpcComparablePropertiesRepository(epc_client, geospatial)
predictor = EpcPrediction()
_cohort_cache: dict[str, list[ComparableProperty]] = {}
def cohort_for(postcode: str) -> list[ComparableProperty]:
if postcode not in _cohort_cache:
_cohort_cache[postcode] = (
comparables_repo.candidates_for(postcode) if postcode else []
)
return _cohort_cache[postcode]
# One read-only session for the live `material` catalogue, reused across the
# batch so both store and no-store runs price against the same DB rows.
catalogue_session = Session(engine)
@ -442,42 +553,107 @@ def main() -> None:
"selected,description"
]
for property_id in args.property_ids:
total = len(args.property_ids)
run_start = time.monotonic()
errors = 0
for index, property_id in enumerate(args.property_ids, start=1):
elapsed = time.monotonic() - run_start
rate = elapsed / (index - 1) if index > 1 else 0.0
eta = rate * (total - index + 1)
bar_done = int(28 * (index - 1) / total)
bar = "#" * bar_done + "-" * (28 - bar_done)
print(
f"[{bar}] {index}/{total} ({100 * (index - 1) / total:.1f}%) "
f"· {errors} err · elapsed {elapsed / 60:.1f}m · ETA {eta / 60:.1f}m "
f"· property {property_id}",
flush=True,
)
uprn = uprns.get(property_id)
try:
if uprn is None:
raise ValueError("no UPRN on the property row")
epc: Optional[EpcPropertyData] = epc_client.get_by_uprn(uprn)
if epc is None:
raise ValueError(f"no EPC found for UPRN {uprn}")
# Fold any Landlord Overrides onto the lodged EPC; with none, the
# Effective EPC is the lodged EPC unchanged (ADR-0032).
overlaid_property = Property(
identity=PropertyIdentity(
portfolio_id=args.portfolio_id or 0,
postcode="",
address="",
uprn=uprn,
),
epc=epc,
landlord_overrides=overlays_from(
overrides_reader.overrides_for(property_id)
),
)
effective_epc: EpcPropertyData = overlaid_property.effective_epc
lodged_wall = _main_wall_summary(epc)
effective_wall = _main_wall_summary(effective_epc)
if lodged_wall != effective_wall:
print(
f" overlay moved the main wall: lodged [{lodged_wall}] "
f"-> effective [{effective_wall}]"
)
else:
print(f" overlay no-op on main wall: [{lodged_wall}]")
postcode = postcodes.get(property_id, "")
# Resolve the spatial reference once: its planning protections gate
# measures, and its coordinates both drive solar AND distance-weight
# the EPC-Prediction cohort, so resolve before the EPC branch.
spatial: Optional[SpatialReference] = _spatial_for(geospatial, uprn)
restrictions: PlanningRestrictions = (
spatial.restrictions if spatial is not None else PlanningRestrictions()
)
coordinates: Optional[Coordinates] = (
spatial.coordinates if spatial is not None else None
)
overrides = overlays_from(overrides_reader.overrides_for(property_id))
epc: Optional[EpcPropertyData] = epc_client.get_by_uprn(uprn)
predicted = False
if epc is not None:
# Lodged EPC: fold any Landlord Overrides onto it; with none, the
# Effective EPC is the lodged EPC unchanged (ADR-0032).
overlaid_property = Property(
identity=PropertyIdentity(
portfolio_id=args.portfolio_id or 0,
postcode=postcode,
address="",
uprn=uprn,
),
epc=epc,
landlord_overrides=overrides,
)
effective_epc: EpcPropertyData = overlaid_property.effective_epc
lodged_wall = _main_wall_summary(epc)
effective_wall = _main_wall_summary(effective_epc)
if lodged_wall != effective_wall:
print(
f" overlay moved the main wall: lodged [{lodged_wall}] "
f"-> effective [{effective_wall}]"
)
else:
print(f" overlay no-op on main wall: [{lodged_wall}]")
else:
# No lodged EPC: synthesise one from the postcode cohort
# (EPC Prediction Path 3, ADR-0031).
predicted_epc = _predict_epc(
property_id=property_id,
uprn=uprn,
postcode=postcode,
portfolio_id=args.portfolio_id or 0,
attributes_reader=prediction_attributes,
coordinates=coordinates,
cohort_for=cohort_for,
predictor=predictor,
)
if predicted_epc is None:
raise ValueError(
f"no EPC for UPRN {uprn} and not predictable "
f"(unresolved property_type or empty '{postcode}' cohort)"
)
# Property.effective_epc folds any Landlord Overrides onto the
# synthesised EPC (cohort fills the unknown fields, the landlord's
# known facts correct them) — same overlay the lodged path applies.
effective_epc = Property(
identity=PropertyIdentity(
portfolio_id=args.portfolio_id or 0,
postcode=postcode,
address="",
uprn=uprn,
),
epc=None,
predicted_epc=predicted_epc,
landlord_overrides=overrides,
).effective_epc
predicted = True
synth_wall = _main_wall_summary(predicted_epc)
effective_wall = _main_wall_summary(effective_epc)
if synth_wall != effective_wall:
print(
f" no lodged EPC -> synthesised from '{postcode}' cohort; "
f"overlay moved wall [{synth_wall}] -> [{effective_wall}]"
)
else:
print(
f" no lodged EPC -> synthesised from '{postcode}' cohort "
f"(overlay no-op on wall) [{synth_wall}]"
)
solar_insights: Optional[dict[str, Any]] = (
None if args.no_solar else _solar_insights_for(solar_client, spatial)
)
@ -492,9 +668,10 @@ def main() -> None:
print_table=False,
)
# The full candidate menu (every Generator Option + its cost), so
# measures the Optimiser did not select are still visible.
# measures the Optimiser did not select are still visible. A predicted
# Property has no lodged cert, so the synthesised Effective EPC is used.
candidates: list[Recommendation] = candidate_recommendations(
epc,
epc if epc is not None else effective_epc,
planning_restrictions=restrictions,
solar_insights=solar_insights,
considered_measures=considered,
@ -521,6 +698,7 @@ def main() -> None:
# subsequent property reports `InFailedSqlTransaction` and masks its
# own real error. Reset so each property surfaces what's wrong.
catalogue_session.rollback()
errors += 1
line = f"property {property_id} (uprn {uprn}): ERROR — {type(error).__name__}: {error}"
print(line + "\n")
md_lines.append(f"## Property {property_id}\n\n`{line}`\n")
@ -530,11 +708,15 @@ def main() -> None:
measure_types = [m.measure_type for m in plan.measures]
selected: set[MeasureType] = {m.measure_type for m in plan.measures}
context = _context_summary(spatial, solar_insights)
# Flag EPC-Prediction properties so a synthesised SAP is never mistaken
# for one scored off a lodged cert.
source_tag = " · ⚠ PREDICTED (no lodged EPC)" if predicted else ""
candidate_lines = _candidate_lines(candidates, selected)
header = (
f"=== Property {property_id} (uprn {uprn}) === "
f"SAP {plan.baseline.sap_continuous:.1f} -> {plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · £{plan.cost_of_works:,.0f} · {context}"
f"{source_tag}"
)
print(header)
print(format_plan_table(plan))
@ -543,7 +725,7 @@ def main() -> None:
print(candidate_line)
print()
md_lines.append(f"## Property {property_id} (uprn {uprn})\n")
md_lines.append(f"## Property {property_id} (uprn {uprn}){source_tag}\n")
md_lines.append(
f"SAP {plan.baseline.sap_continuous:.1f}{plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · cost £{plan.cost_of_works:,.0f} "
@ -565,14 +747,14 @@ def main() -> None:
)
catalogue_session.close()
_MARKDOWN_PATH.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
_CSV_PATH.write_text("\n".join(csv_rows) + "\n", encoding="utf-8")
_CANDIDATES_CSV_PATH.write_text(
md_path.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
csv_path.write_text("\n".join(csv_rows) + "\n", encoding="utf-8")
candidates_path.write_text(
"\n".join(candidate_csv_rows) + "\n", encoding="utf-8"
)
print(f"wrote {_MARKDOWN_PATH.resolve()}")
print(f"wrote {_CSV_PATH.resolve()}")
print(f"wrote {_CANDIDATES_CSV_PATH.resolve()}")
print(f"wrote {md_path.resolve()}")
print(f"wrote {csv_path.resolve()}")
print(f"wrote {candidates_path.resolve()}")
if __name__ == "__main__":

View file

@ -91,3 +91,30 @@ def test_baseline_wall_is_unchanged_when_no_override_applies() -> None:
# Assert
assert main.wall_construction == 4
def test_effective_epc_folds_overrides_onto_a_predicted_epc() -> None:
# Arrange — an EPC-less Property whose EPC was neighbour-synthesised, plus a
# solid-brick/internal-insulation wall override the landlord knows.
overlay = wall_overlay_for("Solid brick, with internal insulation", 0)
assert overlay is not None
prop = Property(
identity=_identity(), predicted_epc=_epc(), landlord_overrides=[overlay]
)
# Act
main = _main_wall(prop.effective_epc)
# Assert — the override's codes correct the synthesised main wall, exactly as
# they do on the lodged path (the cohort fills the rest).
assert main.wall_construction == 3
assert main.wall_insulation_type == 3
def test_effective_epc_predicted_is_returned_as_is_when_no_overrides() -> None:
# Arrange — a predicted Property with no Landlord Overrides.
predicted = _epc()
prop = Property(identity=_identity(), predicted_epc=predicted)
# Act / Assert — the synthesised EPC is returned untouched (same object).
assert prop.effective_epc is predicted