feat(epc-prediction): geo-proximity weighting, per-component (#1227)

Folds a haversine distance kernel into the categorical-mode weighting so a
nearer neighbour counts for more — applied ONLY to the components that showed
a clear distance signal in the corpus pre-check (age band, wall + floor
construction, glazing: homes built/retrofitted together cluster). Roof
construction showed no decay and is excluded; heating keeps its coherent
donor. Predictor stays pure: weights come from target.coordinates vs each
Comparable.coordinates (resolved at the boundary); geo is OFF when the target
has no coords, neutral for a neighbour with none.

Scale chosen on the harness: _GEO_SCALE_KM=0.1 is the gate-safe optimum
(0.05 lifts the corpus more but regresses fixture floor_construction).
Corpus (150pc/514, geo off->on): age 0.564->0.572, age_pm1 0.841->0.847,
wall 0.902->0.912, floor_con 0.786->0.796, glazing 0.667->0.673; roof
unchanged. Fixture: glazing 0.5278->0.5833 (floor ratcheted), all else held.

Refactored recency into a reusable _recency_weights vector composed via
_combine, so similarity/recency/geo factors multiply uniformly. Fixture ships
a committed _coordinates.json (OGL OS OpenData; build script carries it from
the corpus sidecar on rebuild) so the gate exercises geo without S3.

This is the per-component method applied to geography ([[feedback_per_component_best_method]]).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-15 14:58:42 +00:00
parent fdc314c857
commit 1f26703dc5
5 changed files with 189 additions and 34 deletions

View file

@ -27,6 +27,7 @@ from domain.epc_prediction.comparable_properties import (
ComparableProperties,
PredictionTarget,
)
from domain.geospatial.coordinates import Coordinates
@dataclass(frozen=True)
@ -64,8 +65,8 @@ class EpcPrediction:
template: Comparable = self._template(comparables)
predicted: EpcPropertyData = copy.deepcopy(template.epc)
predicted.total_floor_area_m2 = _median_floor_area(comparables.members)
self._apply_categorical_modes(predicted, comparables)
self._apply_glazing_mode(predicted, comparables)
self._apply_categorical_modes(predicted, comparables, target.coordinates)
self._apply_glazing_mode(predicted, comparables, target.coordinates)
self._apply_heating_donor(predicted, comparables)
self._apply_overrides(predicted, target)
return predicted
@ -93,16 +94,23 @@ class EpcPrediction:
@staticmethod
def _apply_glazing_mode(
predicted: EpcPropertyData, comparables: ComparableProperties
predicted: EpcPropertyData,
comparables: ComparableProperties,
target_coordinates: Optional[Coordinates],
) -> None:
"""Set every window's glazing type to the recency-weighted cohort mode.
Glazing is retrofitted over a dwelling's life (single → double), so a
recent neighbour reflects the current state its correct method is the
recency-weighted mode (like roof insulation), NOT the plain mode (which
regressed) or the template copy. The window geometry (size, count) is
left on the template; only the glazing categorical moves."""
glazing = _recency_weighted_choice(
comparables.members, _comparable_modal_glazing
"""Set every window's glazing type to the recency- and geo-weighted cohort
mode. Glazing is retrofitted over a dwelling's life (single → double), so
a recent neighbour reflects the current state (recency, like roof
insulation); it also varies geographically (retrofit waves by street), so
a nearer neighbour counts for more. NOT the plain mode (which regressed)
or the template copy. The window geometry (size, count) is left on the
template; only the glazing categorical moves."""
members = comparables.members
weights = _combine(
_recency_weights(members), _geo_weights(target_coordinates, members)
)
glazing = _weighted_mode(
(_comparable_modal_glazing(c) for c in members), weights
)
if glazing is None:
return
@ -152,27 +160,37 @@ class EpcPrediction:
@staticmethod
def _apply_categorical_modes(
predicted: EpcPropertyData, comparables: ComparableProperties
predicted: EpcPropertyData,
comparables: ComparableProperties,
target_coordinates: Optional[Coordinates],
) -> None:
"""Override the predicted picture's homogeneous categoricals — wall /
roof / floor construction + insulation, age band with the cohort mode
(robust to an atypical template, per ADR-0029 decision 4). The mode is
physically-similarity-weighted (decision 5): each neighbour's vote decays
with its distance from the cohort's physical centre, so the mode leans on
the most representative neighbours rather than treating every survivor
equally. The template still supplies the geometry; only the categorical
codes move to the mode. (Glazing type is deliberately left on the
template moding it is marginal and noisy; revisit with a larger
corpus.)"""
the most representative neighbours. The components that vary
*geographically* age band, wall construction, floor construction (homes
built together cluster) additionally take a geo-proximity weight, so a
nearer neighbour counts for more; the rest (e.g. roof construction, which
showed no geo signal) do not. The template still supplies the geometry;
only the categorical codes move to the mode."""
if not predicted.sap_building_parts:
return
main: SapBuildingPart = predicted.sap_building_parts[0]
members = comparables.members
weights: list[float] = _similarity_weights(members)
similarity: list[float] = _similarity_weights(members)
geo: list[float] = _geo_weights(target_coordinates, members)
similarity_geo: list[float] = _combine(similarity, geo)
for attr in _MAIN_PART_CATEGORICALS:
if attr in _RECENCY_WEIGHTED_CATEGORICALS:
mode = _recency_weighted_mode(members, attr)
else:
weights = (
similarity_geo
if attr in _GEO_WEIGHTED_CATEGORICALS
else similarity
)
mode = _weighted_mode(
(_main_part_attr(c, attr) for c in members), weights
)
@ -181,8 +199,13 @@ class EpcPrediction:
floor_dims = main.sap_floor_dimensions
if floor_dims:
for attr in _FLOOR_DIM_CATEGORICALS:
floor_weights = (
similarity_geo
if attr in _GEO_WEIGHTED_CATEGORICALS
else similarity
)
floor_mode = _weighted_int_mode(
(_main_floor_attr(c, attr) for c in members), weights
(_main_floor_attr(c, attr) for c in members), floor_weights
)
if floor_mode is not None:
setattr(floor_dims[0], attr, floor_mode)
@ -241,6 +264,19 @@ _SIMILARITY_SIZE_SCALE_M2: float = 20.0
_SIMILARITY_AGE_WEIGHT: float = 0.5
_AGE_BAND_ORDER: str = "ABCDEFGHIJKL"
# Geo-proximity weighting (#1227): a neighbour's vote decays with its haversine
# distance to the target, so a closer neighbour counts for more. Applied only to
# the components that showed a clear distance signal in the corpus — age band,
# wall + floor construction, glazing (homes built / retrofitted together cluster);
# roof construction showed no decay, so it is excluded. `_GEO_SCALE_KM` is the
# kernel length-scale (chosen on the corpus). Off when the target has no
# coordinates; neutral for a neighbour with none (never penalised for missing
# data). floor_construction lives on the floor dimension but shares this set.
_GEO_SCALE_KM: float = 0.1
_GEO_WEIGHTED_CATEGORICALS: frozenset[str] = frozenset(
{"construction_age_band", "wall_construction", "floor_construction"}
)
def _main_part_attr(
comparable: Comparable, attr: str
@ -347,6 +383,62 @@ def _modal_share(
return modal_count / len(present)
def _combine(left: list[float], right: list[float]) -> list[float]:
"""Element-wise product of two aligned weight vectors (compose weighting
factors, e.g. similarity × geo-proximity)."""
return [a * b for a, b in zip(left, right)]
def _haversine_km(origin: Coordinates, point: Coordinates) -> float:
"""Great-circle distance in km between two WGS84 points."""
radius_km = 6371.0
lat1, lat2 = math.radians(origin.latitude), math.radians(point.latitude)
delta_lat = lat2 - lat1
delta_lon = math.radians(point.longitude - origin.longitude)
h = (
math.sin(delta_lat / 2) ** 2
+ math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2) ** 2
)
return 2 * radius_km * math.asin(min(1.0, math.sqrt(h)))
def _geo_weights(
target: Optional[Coordinates], members: tuple[Comparable, ...]
) -> list[float]:
"""A geo-proximity weight per comparable — an exponential decay in haversine
distance to the target. All-neutral (1.0) when the target has no coordinates
(geo weighting off) or a neighbour has none (never penalised for absent
data); aligned with `members` index-for-index."""
if target is None:
return [1.0] * len(members)
weights: list[float] = []
for comparable in members:
coordinates = comparable.coordinates
if coordinates is None:
weights.append(1.0)
else:
weights.append(
math.exp(-_haversine_km(target, coordinates) / _GEO_SCALE_KM)
)
return weights
def _recency_weights(members: tuple[Comparable, ...]) -> list[float]:
"""A recency weight per comparable — exponential decay in the cert's age
relative to the newest in the cohort, so newer neighbours dominate. All-equal
when no registration dates are lodged. Aligned with `members`."""
newest: date = max(
(c.registration_date or date.min for c in members), default=date.min
)
return [
math.exp(
-((newest - (c.registration_date or date.min)).days / _DAYS_PER_YEAR)
/ _RECENCY_TAU_YEARS
)
for c in members
]
def _recency_weighted_choice(
members: tuple[Comparable, ...],
value_of: Callable[[Comparable], Optional[Union[int, str]]],
@ -357,21 +449,11 @@ def _recency_weighted_choice(
outvote the current state. Falls back to a plain mode when no registration
dates are lodged (all ages 0 equal weight). Returns None when no comparable
supplies a value. Used for the time-varying components those upgraded over a
dwelling's life (loft top-ups, glazing retrofits)."""
newest: date = max(
(c.registration_date or date.min for c in members), default=date.min
dwelling's life (loft top-ups)."""
return _weighted_mode(
(value_of(comparable) for comparable in members),
_recency_weights(members),
)
weights: dict[Union[int, str], float] = defaultdict(float)
for comparable in members:
value = value_of(comparable)
if value is None:
continue
lodged: date = comparable.registration_date or date.min
age_years: float = (newest - lodged).days / _DAYS_PER_YEAR
weights[value] += math.exp(-age_years / _RECENCY_TAU_YEARS)
if not weights:
return None
return max(weights, key=lambda value: weights[value])
def _recency_weighted_mode(

View file

@ -65,6 +65,7 @@ def main() -> None:
(SOURCE / "_index.json").read_text()
)
fixture_index: dict[str, list[str]] = {}
kept_uprns: set[str] = set()
total_certs = 0
for postcode, certs in index.items():
if len(fixture_index) >= _MAX_POSTCODES:
@ -80,15 +81,37 @@ def main() -> None:
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(anon))
kept.append(cert_token)
uprn = raw.get("uprn")
if uprn is not None:
kept_uprns.add(str(int(uprn)))
fixture_index[postcode] = kept
total_certs += len(kept)
(FIXTURE / "_index.json").parent.mkdir(parents=True, exist_ok=True)
(FIXTURE / "_index.json").write_text(json.dumps(fixture_index, indent=2))
_write_coordinates(kept_uprns)
print(
f"wrote {len(fixture_index)} postcodes / {total_certs} anonymised certs "
f"to {FIXTURE}"
)
def _write_coordinates(kept_uprns: set[str]) -> None:
"""Carry the geo-proximity coordinates for the kept UPRNs into the committed
fixture (subset of the corpus `_coordinates.json`), so the gate exercises
geo-weighting without S3. Skipped when the corpus has no coordinates sidecar.
Coordinates are OS OpenData (OGL) and add no identifiability beyond the UPRN
already kept in the fixture."""
source = SOURCE / "_coordinates.json"
if not source.exists():
return
corpus_coords: dict[str, list[float]] = json.loads(source.read_text())
fixture_coords = {
uprn: corpus_coords[uprn]
for uprn in kept_uprns
if uprn in corpus_coords
}
(FIXTURE / "_coordinates.json").write_text(json.dumps(fixture_coords))
if __name__ == "__main__":
main()

View file

@ -48,7 +48,7 @@ _RATE_FLOORS: dict[str, float] = {
"roof_insulation_thickness_pm1": 0.4118,
"floor_insulation": 0.9375,
"has_room_in_roof": 0.8333,
"modal_glazing_type": 0.5278,
"modal_glazing_type": 0.5833,
"has_pv": 1.0000,
"solar_water_heating": 1.0000,
}

View file

@ -16,6 +16,7 @@ from datatypes.epc.domain.epc_property_data import (
SapHeating,
SapWindow,
)
from domain.geospatial.coordinates import Coordinates
from domain.epc_prediction.comparable_properties import (
Comparable,
ComparableProperties,
@ -429,6 +430,54 @@ def test_glazing_follows_the_recency_weighted_cohort_mode() -> None:
assert all(window.glazing_type == 3 for window in predicted.sap_windows)
def test_geo_proximity_weights_the_nearest_neighbour() -> None:
# Arrange — same size + age (so similarity weighting is uniform). Three FAR
# neighbours are cavity (1); one neighbour AT the target is solid brick (2).
# wall construction is a geo-weighted component, so the near neighbour
# outweighs the far majority.
here = Coordinates(longitude=0.0, latitude=0.0)
far = Coordinates(longitude=1.0, latitude=1.0) # ~150 km away
cohort = ComparableProperties(
members=(
Comparable(_epc(wall_construction=1), "1", coordinates=far),
Comparable(_epc(wall_construction=1), "2", coordinates=far),
Comparable(_epc(wall_construction=1), "3", coordinates=far),
Comparable(_epc(wall_construction=2), "4", coordinates=here),
)
)
target = PredictionTarget(
postcode="LS6 1AA", property_type="2", coordinates=here
)
# Act
predicted: EpcPropertyData = EpcPrediction().predict(target, cohort)
# Assert — the near neighbour's wall wins over the far majority.
assert predicted.sap_building_parts[0].wall_construction == 2
def test_geo_proximity_is_off_without_target_coordinates() -> None:
# Arrange — identical cohort, but the target has no coordinates, so geo
# weighting is disabled and the plain cohort majority (cavity, 1) wins.
here = Coordinates(longitude=0.0, latitude=0.0)
far = Coordinates(longitude=1.0, latitude=1.0)
cohort = ComparableProperties(
members=(
Comparable(_epc(wall_construction=1), "1", coordinates=far),
Comparable(_epc(wall_construction=1), "2", coordinates=far),
Comparable(_epc(wall_construction=1), "3", coordinates=far),
Comparable(_epc(wall_construction=2), "4", coordinates=here),
)
)
target = PredictionTarget(postcode="LS6 1AA", property_type="2")
# Act
predicted: EpcPropertyData = EpcPrediction().predict(target, cohort)
# Assert — without target coordinates, the majority wins (geo off).
assert predicted.sap_building_parts[0].wall_construction == 1
def test_applies_a_known_wall_override_over_the_mode() -> None:
# Arrange — the cohort mode is cavity (1), but we KNOW the target is solid
# brick (2), a Landlord Override. The known value must win over the estimate.

File diff suppressed because one or more lines are too long