diff --git a/domain/epc_prediction/epc_prediction.py b/domain/epc_prediction/epc_prediction.py index 251d8cc0..184159da 100644 --- a/domain/epc_prediction/epc_prediction.py +++ b/domain/epc_prediction/epc_prediction.py @@ -111,25 +111,34 @@ class EpcPrediction: ) -> None: """Override the predicted picture's homogeneous categoricals — wall / roof / floor construction + insulation, age band — with the cohort mode - (robust to an atypical template, per ADR-0029 decision 4). The template - still supplies the geometry; only the categorical codes move to the mode. - (Glazing type is deliberately left on the template — moding it is - marginal and noisy; revisit with a larger corpus.)""" + (robust to an atypical template, per ADR-0029 decision 4). The mode is + physically-similarity-weighted (decision 5): each neighbour's vote decays + with its distance from the cohort's physical centre, so the mode leans on + the most representative neighbours rather than treating every survivor + equally. The template still supplies the geometry; only the categorical + codes move to the mode. (Glazing type is deliberately left on the + template — moding it is marginal and noisy; revisit with a larger + corpus.)""" if not predicted.sap_building_parts: return main: SapBuildingPart = predicted.sap_building_parts[0] members = comparables.members + weights: list[float] = _similarity_weights(members) for attr in _MAIN_PART_CATEGORICALS: if attr in _RECENCY_WEIGHTED_CATEGORICALS: mode = _recency_weighted_mode(members, attr) else: - mode = _mode(_main_part_attr(c, attr) for c in members) + mode = _weighted_mode( + (_main_part_attr(c, attr) for c in members), weights + ) if mode is not None: setattr(main, attr, mode) floor_dims = main.sap_floor_dimensions if floor_dims: for attr in _FLOOR_DIM_CATEGORICALS: - floor_mode = _int_mode(_main_floor_attr(c, attr) for c in members) + floor_mode = _weighted_int_mode( + (_main_floor_attr(c, attr) for c in members), weights + ) if floor_mode is not None: setattr(floor_dims[0], attr, floor_mode) @@ -177,6 +186,16 @@ _RECENCY_WEIGHTED_CATEGORICALS: frozenset[str] = frozenset( _RECENCY_TAU_YEARS: float = 4.0 _DAYS_PER_YEAR: float = 365.0 +# Physical-similarity weighting of the categorical mode (ADR-0029 decision 5): a +# comparable's vote decays exponentially with how far it sits from the cohort's +# physical centre — floor area from the median, construction age from the modal +# band — so an outlier-sized or outlier-era neighbour can't sway the mode. Scales +# chosen on the validation corpus (wall-insulation +2.8pp / roof +1.1pp / +# floor-construction +2.4pp / floor-insulation +1.2pp; gate-safe, no regression). +_SIMILARITY_SIZE_SCALE_M2: float = 20.0 +_SIMILARITY_AGE_WEIGHT: float = 0.5 +_AGE_BAND_ORDER: str = "ABCDEFGHIJKL" + def _main_part_attr( comparable: Comparable, attr: str @@ -194,14 +213,72 @@ def _main_floor_attr(comparable: Comparable, attr: str) -> Optional[int]: return value -def _mode( - values: Iterable[Optional[Union[int, str]]], +def _age_band_index(comparable: Comparable) -> Optional[int]: + """The main building part's construction-age-band position (A=0 … L=11), or + None when no recognisable band is lodged.""" + band = _main_part_attr(comparable, "construction_age_band") + if isinstance(band, str) and band in _AGE_BAND_ORDER: + return _AGE_BAND_ORDER.index(band) + return None + + +def _similarity_weights(members: tuple[Comparable, ...]) -> list[float]: + """A physical-similarity weight per comparable (ADR-0029 decision 5): the + product of an exponential decay in its floor-area distance from the cohort + median and in its age-band distance from the cohort's modal band. A neighbour + missing a size or age contributes a neutral weight on that axis, so it is + never penalised for absent data. Aligned with `members` index-for-index.""" + if not members: + return [] + median_area: float = statistics.median( + c.epc.total_floor_area_m2 for c in members + ) + age_indices: list[Optional[int]] = [_age_band_index(c) for c in members] + present_ages: list[int] = [i for i in age_indices if i is not None] + modal_age: Optional[float] = ( + statistics.median(present_ages) if present_ages else None + ) + weights: list[float] = [] + for comparable, age_index in zip(members, age_indices): + size_term: float = math.exp( + -abs(comparable.epc.total_floor_area_m2 - median_area) + / _SIMILARITY_SIZE_SCALE_M2 + ) + age_term: float = ( + math.exp(-_SIMILARITY_AGE_WEIGHT * abs(age_index - modal_age)) + if modal_age is not None and age_index is not None + else 1.0 + ) + weights.append(size_term * age_term) + return weights + + +def _weighted_mode( + values: Iterable[Optional[Union[int, str]]], weights: list[float] ) -> Optional[Union[int, str]]: - """The most common non-None value, or None when there are none.""" - present = [v for v in values if v is not None] - if not present: + """The value with the greatest total similarity weight (ties broken by first + appearance, matching `_mode`), or None when no non-None value is present.""" + totals: dict[Union[int, str], float] = defaultdict(float) + for value, weight in zip(values, weights): + if value is not None: + totals[value] += weight + if not totals: return None - return Counter(present).most_common(1)[0][0] + return max(totals, key=lambda value: totals[value]) + + +def _weighted_int_mode( + values: Iterable[Optional[int]], weights: list[float] +) -> Optional[int]: + """`_weighted_mode` narrowed to int-coded fields (keeps pyright strict happy + when the target attribute is typed `Optional[int]`).""" + totals: dict[int, float] = defaultdict(float) + for value, weight in zip(values, weights): + if value is not None: + totals[value] += weight + if not totals: + return None + return max(totals, key=lambda value: totals[value]) def _modal_share( @@ -238,12 +315,3 @@ def _recency_weighted_mode( if not weights: return None return max(weights, key=lambda value: weights[value]) - - -def _int_mode(values: Iterable[Optional[int]]) -> Optional[int]: - """`_mode` narrowed to int-coded fields (keeps pyright strict happy when the - target attribute is typed `Optional[int]`).""" - present = [v for v in values if v is not None] - if not present: - return None - return Counter(present).most_common(1)[0][0] diff --git a/tests/domain/epc_prediction/test_component_accuracy_gate.py b/tests/domain/epc_prediction/test_component_accuracy_gate.py index 6564e97f..82789304 100644 --- a/tests/domain/epc_prediction/test_component_accuracy_gate.py +++ b/tests/domain/epc_prediction/test_component_accuracy_gate.py @@ -31,11 +31,11 @@ _FIXTURE = Path(__file__).parents[3] / "tests" / "fixtures" / "epc_prediction" # 36-target fixture; a 1e-3 tolerance absorbs float rounding only. _RATE_FLOORS: dict[str, float] = { "wall_construction": 0.8889, - "wall_insulation_type": 0.7778, + "wall_insulation_type": 0.8333, "construction_age_band": 0.6389, "construction_age_band_pm1": 0.8333, "roof_construction": 0.7222, - "floor_construction": 0.7500, + "floor_construction": 0.8125, "heating_main_fuel": 0.9722, "heating_main_category": 0.8889, "heating_main_control": 0.7500, @@ -45,7 +45,7 @@ _RATE_FLOORS: dict[str, float] = { "cylinder_insulation_type": 0.1667, "secondary_heating_type": 0.0000, "roof_insulation_thickness": 0.4118, - "floor_insulation": 0.9062, + "floor_insulation": 0.9375, "has_room_in_roof": 0.8333, "modal_glazing_type": 0.5000, "has_pv": 1.0000, diff --git a/tests/domain/epc_prediction/test_epc_prediction.py b/tests/domain/epc_prediction/test_epc_prediction.py index fb59e317..4cdb2794 100644 --- a/tests/domain/epc_prediction/test_epc_prediction.py +++ b/tests/domain/epc_prediction/test_epc_prediction.py @@ -226,6 +226,50 @@ def test_recency_weights_roof_insulation_mode() -> None: assert predicted.sap_building_parts[0].roof_insulation_thickness == 300 +def test_categorical_mode_leans_on_size_similar_neighbours() -> None: + # Arrange — a count majority (three) carries wall-insulation 9, but two of + # them are 400 m² size outliers; the cohort centre (median 100 m²) holds + # wall-insulation 1. Physical-similarity weighting down-weights the outliers, + # so the size-representative value 1 wins over the plain-count majority 9. + cohort = _cohort( + _epc(floor_area=100.0, wall_insulation_type=1), + _epc(floor_area=100.0, wall_insulation_type=1), + _epc(floor_area=100.0, wall_insulation_type=9), + _epc(floor_area=400.0, wall_insulation_type=9), + _epc(floor_area=400.0, wall_insulation_type=9), + ) + + # Act + predicted: EpcPropertyData = EpcPrediction().predict( + PredictionTarget(postcode="LS6 1AA", property_type="2"), cohort + ) + + # Assert — the size-similar value wins over the outlier-driven majority. + assert predicted.sap_building_parts[0].wall_insulation_type == 1 + + +def test_categorical_mode_leans_on_age_similar_neighbours() -> None: + # Arrange — same size throughout (so size weighting is neutral). A count + # majority (three) carries wall-insulation 9, but two of them are age-band A + # outliers while the cohort's modal band is K. Age-similarity weighting + # down-weights the outliers, so the band-representative value 1 wins. + cohort = _cohort( + _epc(construction_age_band="K", wall_insulation_type=1), + _epc(construction_age_band="K", wall_insulation_type=1), + _epc(construction_age_band="K", wall_insulation_type=9), + _epc(construction_age_band="A", wall_insulation_type=9), + _epc(construction_age_band="A", wall_insulation_type=9), + ) + + # Act + predicted: EpcPropertyData = EpcPrediction().predict( + PredictionTarget(postcode="LS6 1AA", property_type="2"), cohort + ) + + # Assert — the age-similar value wins over the outlier-driven majority. + assert predicted.sap_building_parts[0].wall_insulation_type == 1 + + def test_confidence_reports_cohort_size_and_unanimous_agreement() -> None: # Arrange — a unanimous cohort: three neighbours, all cavity-walled (1). cohort = _cohort(