From 2dda567e2de530f192e529912c4d453dc0243aea Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 20 Jan 2026 16:36:55 +0000 Subject: [PATCH] refactor to break recommendaiton impact into functions --- recommendations/Recommendations.py | 543 ++++++++++++++++------------- 1 file changed, 304 insertions(+), 239 deletions(-) diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index 2795d7ff..33558fae 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -561,6 +561,262 @@ class Recommendations: return cls.MV_INCREASING_VARIABLES, cls.MV_DECREASING_VARIABLES return cls.INCREASING_VARIABLES, cls.DECREASING_VARIABLES + @staticmethod + def _get_previous_phase_values( + rec_phase: int, + starting_phase: int, + impact_summary: list[dict], + property_instance: Property, + ) -> dict: + if rec_phase == starting_phase: + return { + "sap": float(property_instance.data["current-energy-efficiency"]), + "carbon": float(property_instance.data["co2-emissions-current"]), + "heat_demand": float(property_instance.data["energy-consumption-current"]), + } + + previous_phase_reps = [ + x for x in impact_summary + if x["phase"] == rec_phase - 1 and x["representative"] + ] + + if len(previous_phase_reps) == 1: + return previous_phase_reps[0] + + # Median fallback (including zero-length case) + keys = ("sap", "carbon", "heat_demand") + return { + key: np.median([item[key] for item in previous_phase_reps]) + for key in keys + } + + @classmethod + def _get_phase_predictions( + cls, + property_predictions: dict, + recommendation_id: str, + ) -> dict: + return { + prefix: ( + property_predictions[f"{prefix}_predictions"] + .loc[ + property_predictions[f"{prefix}_predictions"]["recommendation_id"] + == str(recommendation_id), + "predictions", + ] + .values[0] + ) + for prefix in cls.PREDICTION_PREFIXES + } + + @classmethod + def _resolve_current_phase_sap( + cls, + rec: Mapping[str, any], + previous_phase_values: Mapping[str, any], + phase_energy_efficiency_metrics: Mapping[str, any], + adjustments: list[dict], + ) -> float: + if rec.get("survey", False): + return rec["sap_points"] + previous_phase_values["sap"] + + sap = phase_energy_efficiency_metrics["sap_change"] + + prior_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]] + if not prior_adjustments: + return sap + + filtered = cls._filter_phase_adjustment(prior_adjustments) + return sap - sum(a["sap_adjustment"] for a in filtered) + + @classmethod + def _compute_phase_impact( + cls, + rec_type: str, + previous_phase_values: dict, + current_phase_values: dict, + ) -> dict: + """ + Utility function for computing the impact of a recommendation phase, enforcing monotonicity + + :param rec_type: string, the recommendation type + :param previous_phase_values: dict, the previous phase values + :param current_phase_values: dict, the current phase values + :return: dict, the impact of the phase + """ + phase_increasing, phase_decreasing = cls.get_monotonic_variables(rec_type) + + # Enforce monotonicity + for v in phase_increasing: + current_phase_values[v] = max(current_phase_values[v], previous_phase_values[v]) + + for v in phase_decreasing: + current_phase_values[v] = min(current_phase_values[v], previous_phase_values[v]) + + # Compute impact + impact = { + "sap": current_phase_values["sap"] - previous_phase_values["sap"], + "carbon": previous_phase_values["carbon"] - current_phase_values["carbon"], + "heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"], + } + + # Clamp values + for metric in impact: + if rec_type != "mechanical_ventilation": + impact[metric] = max(0, impact[metric]) + if metric == "sap": + impact[metric] = round(impact[metric], 2) + else: + impact[metric] = min(0, impact[metric]) + + return impact + + @classmethod + def _apply_measure_specific_rules( + cls, + rec: dict, + property_phase_impact: dict, + previous_phase_values: dict, + current_phase_values: dict, + adjustments: list, + property_instance, + ): + # For the moment, we cap the number of SAP points that can be achieved by LEDs at 2 + if rec["type"] == "low_energy_lighting": + lighting_sap_limit = LightingRecommendations.get_sap_limit( + property_instance.data["lighting-energy-eff"], + property_instance.lighting["low_energy_proportion"] + ) + + # add an adjustment + proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit) + if proposed_sap_impact != property_phase_impact["sap"]: + # Store the sap adjustment. The proposed sap impact will always be less + # than the current sap impact, so the adjustment is always positive + # as we subtract it from the future phases + adjustments.append( + { + "recommendation_id": rec["recommendation_id"], + "phase": rec["phase"], + "sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact, + } + ) + + property_phase_impact["sap"] = proposed_sap_impact + property_phase_impact["carbon"] = min( + property_phase_impact["carbon"], rec["co2_equivalent_savings"] + ) + + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"] + elif rec["type"] == "mechanical_ventilation": + # ventilation is capped by having no greater and a -4 impact + ventilation_sap_limit = -4 + ventilation_out_of_bounds = cls._check_veniltation_out_of_bounds( + property_phase_impact["sap"], ventilation_sap_limit + ) + + if ventilation_out_of_bounds: + previous_modelled_sap = previous_phase_values.get("sap_prediction", 0) + proposed_sap_impact = current_phase_values["sap"] - previous_modelled_sap + proposal_out_of_bounds = cls._check_veniltation_out_of_bounds( + proposed_sap_impact, ventilation_sap_limit + ) + if proposal_out_of_bounds: + proposed_sap_impact = cls._adjust_ventilation_sap( + proposed_sap_impact, ventilation_sap_limit + ) + + # We keep track of the adjustment + # In this case, if the SAP impact has increased, then the adustment should be negative + # otherwise it should be positive + # When we add the total adjustment, it's an addition + # Example + # Before: 60, impact -2 => 58 + # After: 60, impact -1 (So the impact is bigger) => 59 + # So in this case, we need to make sure we add 1 to all future predictions so + # the adjustment should be positive + # Before: 60, impact 1 => 61 + # After: 60, impact -1 => 59 + # So in this case, we need to make sure we subtract 1 to all future predictions so + # the adjustment should be negative + # Both cases are reflected in sap adjustment + sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"]) + + adjustments.append( + { + "recommendation_id": rec["recommendation_id"], + "phase": rec["phase"], + "sap_adjustment": sap_adjustment, + } + ) + + property_phase_impact["sap"] = proposed_sap_impact + + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + elif rec["type"] == "loft_insulation": + # When we have a loft insulation recommendation, where there is an extension and the existing + # amount of loft insulation is already good, we limit the SAP points + # By limiting here, we don't change the value in current_phase_values. This means that the + # future recommendations won't have an impact that is too large + li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit( + property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"] + ) + if li_sap_limit is not None: + new_value = min(property_phase_impact["sap"], li_sap_limit) + # If we've made an adjustment, keep track of it + if new_value != property_phase_impact["sap"]: + adjustments.append( + { + "recommendation_id": rec["recommendation_id"], + "phase": rec["phase"], + # If we've made an adjustment, it will be negative + "sap_adjustment": property_phase_impact["sap"] - new_value, + } + ) + property_phase_impact["sap"] = new_value + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + elif rec["type"] == "solar_pv": + # We use the SAP points in the recommendation as a minimum + proposed_impact = ( + rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else + property_phase_impact["sap"] + ) + + # SAP adjustments should be negative + if proposed_impact != property_phase_impact["sap"]: + adjustments.append( + { + "recommendation_id": rec["recommendation_id"], + "phase": rec["phase"], + # If we've made an adjustment, we will be increasing the number of SAP + # points. Since, we subtract adjustments, this number should be negative + "sap_adjustment": property_phase_impact["sap"] - proposed_impact, + } + ) + property_phase_impact["sap"] = proposed_impact + + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + + return property_phase_impact, current_phase_values, adjustments + + @staticmethod + def _validate_recommendation_updates(rec: Mapping[str, any]): + """ + Utility function to validate that the recommendation updates have been applied correctly + :param rec: updated recommendation + :return: + """ + if ( + (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or + (rec["heat_demand"] is None) + ): + raise ValueError("sap points, co2 or heat demand is missing") + @classmethod def calculate_recommendation_impact( cls, @@ -575,6 +831,9 @@ class Recommendations: Given predictions from the model apis, with method will update the recommendations with the predicted impact of the recommendation on the property + This algorithm is structured as a large loop, but this is due to the fact that it's sequential in nature - + each phase depends on the previous, with adjustments and constraints being allied along the way + This function will return two objects: 1) Updated recommendations with the predicted impact of the recommendation 2) A list of impacts by phase, which will be used for the kwh model scoring @@ -595,8 +854,9 @@ class Recommendations: # shallow copy intentional - we're going to modify the internals property_recommendations = recommendations[property_instance.id].copy() - representative_recs = representative_recommendations[property_instance.id].copy() - representative_ids = [r["recommendation_id"] for r in representative_recs] + representative_ids = [ + r["recommendation_id"] for r in representative_recommendations[property_instance.id] + ] # We allow for negative phase starting_phase = min(rec["phase"] for recs in property_recommendations for rec in recs) @@ -606,20 +866,19 @@ class Recommendations: impact_summary, adjustments = [], [] for recommendations_by_type in property_recommendations: for rec in recommendations_by_type: - if rec["type"] in ["trickle_vents", "draught_proofing", "extension_cavity_wall_insulation"]: - # We don't have a percieved sap impact of mechanical ventilation or trickle vents, and we don't - # have the capacity to score draught proofing + # --- Special-case: non-modelled measures ------------------------- + if rec["type"] in { + "trickle_vents", + "draught_proofing", + "extension_cavity_wall_insulation", + }: if rec["type"] == "extension_cavity_wall_insulation": - - previous_phase = [x for x in impact_summary if x["phase"] == (rec["phase"] - 1)] - if previous_phase: - sap = previous_phase[0]["sap"] - carbon = previous_phase[0]["carbon"] - heat_demand = previous_phase[0]["heat_demand"] - else: - sap = float(property_instance.data["current-energy-efficiency"]) - carbon = float(property_instance.data["co2-emissions-current"]) - heat_demand = float(property_instance.data["energy-consumption-current"]) + previous = cls._get_previous_phase_values( + rec_phase=rec["phase"], + starting_phase=starting_phase, + impact_summary=impact_summary, + property_instance=property_instance, + ) impact_summary.append( { @@ -627,72 +886,29 @@ class Recommendations: "representative": rec["recommendation_id"] in representative_ids, "recommendation_id": rec["recommendation_id"], "measure_type": rec["measure_type"], - "sap": sap + rec["sap_points"], - "carbon": carbon - rec["co2_equivalent_savings"], - "heat_demand": heat_demand - rec["heat_demand"], + "sap": previous["sap"] + rec["sap_points"], + "carbon": previous["carbon"] - rec["co2_equivalent_savings"], + "heat_demand": previous["heat_demand"] - rec["heat_demand"], } ) continue - phase_energy_efficiency_metrics = { - prefix: property_predictions[prefix + "_predictions"][ - property_predictions[prefix + "_predictions"]["recommendation_id"] == str( - rec["recommendation_id"] - )]["predictions"].values[0] for prefix in cls.PREDICTION_PREFIXES - } + phase_energy_efficiency_metrics = cls._get_phase_predictions( + property_predictions=property_predictions, + recommendation_id=rec["recommendation_id"], + ) - # We structure this so that depending on the phase, we capture the previous phase impacts and - # then just have one piece of code to calculate the difference - if rec["phase"] == starting_phase: - # These are just the starting values, from the EPC. When we score the ML models, - # heating_cost_starting and heating_cost_ending are just the values in the EPC. However, with - # heating_cost_ending, we expect that the EPC will predict a heating cost based on what would happen - # if we implemented the recommendation today, so our starting value is the EPC - - previous_phase_values = { - "sap": float(property_instance.data["current-energy-efficiency"]), - # For carbon, even though we generally use the updated figure which includes the carbon - # associated to appliances, for this scoring process we use the EPC carbon value. This means - # that we don't overestimate the impact since the model uses the EPC carbon value - "carbon": float(property_instance.data["co2-emissions-current"]), - "heat_demand": float(property_instance.data["energy-consumption-current"]), - } - - else: - - previous_phase_values_multiple = [ - x for x in impact_summary if x["phase"] == (rec["phase"] - 1) and x["representative"] - ] - if len(previous_phase_values_multiple) != 1: - # Take an average of each of the previous phases - keys_to_median = ["sap", "carbon", "heat_demand"] - - previous_phase_values = {} - for key in keys_to_median: - values = [item[key] for item in previous_phase_values_multiple] - previous_phase_values[key] = np.median(values) - - else: - previous_phase_values = previous_phase_values_multiple[0] - - # We extract the values for the current phase - if rec.get("survey", False): - current_phase_sap = rec["sap_points"] + previous_phase_values["sap"] - else: - current_phase_sap = phase_energy_efficiency_metrics["sap_change"] - # If we have an adjustment, we apply it here. We de-dupe, taking the - # largest adjustment by phase - though, they should all be the same - phase_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]] - if phase_adjustments: - phase_adjustments = cls._filter_phase_adjustment(phase_adjustments) - total_adjustment = sum( - a["sap_adjustment"] for a in phase_adjustments - ) - # Take the max, by phase, subtract from the current phase sap - current_phase_sap -= total_adjustment + previous_phase_values = cls._get_previous_phase_values( + rec_phase=rec["phase"], + starting_phase=starting_phase, + impact_summary=impact_summary, + property_instance=property_instance + ) current_phase_values = { - "sap": current_phase_sap, + "sap": cls._resolve_current_phase_sap( + rec, previous_phase_values, phase_energy_efficiency_metrics, adjustments + ), "carbon": phase_energy_efficiency_metrics["carbon_change"], "heat_demand": phase_energy_efficiency_metrics["heat_demand"], } @@ -705,167 +921,20 @@ class Recommendations: # However, if the recommendation is mechanical ventilation, this can have a negative SAP impact so # we don't apply this rule - phase_increasing_variables, phase_decreasing_variables = cls.get_monotonic_variables(rec["type"]) + property_phase_impact = cls._compute_phase_impact( + rec_type=rec["type"], + previous_phase_values=previous_phase_values, + current_phase_values=current_phase_values, + ) - for v in phase_increasing_variables: - current_phase_values[v] = ( - current_phase_values[v] if current_phase_values[v] > previous_phase_values[v] else - previous_phase_values[v] - ) - for v in previous_phase_values: - if v in phase_decreasing_variables: - current_phase_values[v] = ( - current_phase_values[v] if current_phase_values[v] < previous_phase_values[v] else - previous_phase_values[v] - ) - - property_phase_impact = { - # Increasing - "sap": current_phase_values["sap"] - previous_phase_values["sap"], - # Decreasing - "carbon": previous_phase_values["carbon"] - current_phase_values["carbon"], - # Decreasing - "heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"], - } - - # Prevent from being negative - apart from ventilation - for metric in ["sap", "carbon", "heat_demand"]: - if rec["type"] != "mechanical_ventilation": - property_phase_impact[metric] = ( - 0 if property_phase_impact[metric] < 0 else property_phase_impact[metric] - ) - if metric == "sap": - property_phase_impact[metric] = round(property_phase_impact[metric], 2) - else: - # We prevent mechanical ventilation from being positive - property_phase_impact[metric] = ( - 0 if property_phase_impact[metric] > 0 else property_phase_impact[metric] - ) - - # For the moment, we cap the number of SAP points that can be achieved by LEDs at 2 - if rec["type"] == "low_energy_lighting": - lighting_sap_limit = LightingRecommendations.get_sap_limit( - property_instance.data["lighting-energy-eff"], - property_instance.lighting["low_energy_proportion"] - ) - - # add an adjustment - proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit) - if proposed_sap_impact != property_phase_impact["sap"]: - # Store the sap adjustment. The proposed sap impact will always be less - # than the current sap impact, so the adjustment is always positive - # as we subtract it from the future phases - adjustments.append( - { - "recommendation_id": rec["recommendation_id"], - "phase": rec["phase"], - "sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact, - } - ) - - property_phase_impact["sap"] = proposed_sap_impact - property_phase_impact["carbon"] = min( - property_phase_impact["carbon"], rec["co2_equivalent_savings"] - ) - - # Update the current phase values - current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] - current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"] - - # We also ensure that mechanical ventilation doesn't have an ovely strong negative SAP impact - if rec["type"] == "mechanical_ventilation": - # ventilation is capped by having no greater and a -4 impact - ventilation_sap_limit = -4 - ventilation_out_of_bounds = cls._check_veniltation_out_of_bounds( - property_phase_impact["sap"], ventilation_sap_limit - ) - - if ventilation_out_of_bounds: - previous_modelled_sap = previous_phase_values.get("sap_prediction", 0) - proposed_sap_impact = current_phase_sap - previous_modelled_sap - proposal_out_of_bounds = cls._check_veniltation_out_of_bounds( - proposed_sap_impact, ventilation_sap_limit - ) - if proposal_out_of_bounds: - proposed_sap_impact = cls._adjust_ventilation_sap( - proposed_sap_impact, ventilation_sap_limit - ) - - # We keep track of the adjustment - # In this case, if the SAP impact has increased, then the adustment should be negative - # otherwise it should be positive - # When we add the total adjustment, it's an addition - # Example - # Before: 60, impact -2 => 58 - # After: 60, impact -1 (So the impact is bigger) => 59 - # So in this case, we need to make sure we add 1 to all future predictions so - # the adjustment should be positive - # Before: 60, impact 1 => 61 - # After: 60, impact -1 => 59 - # So in this case, we need to make sure we subtract 1 to all future predictions so - # the adjustment should be negative - # Both cases are reflected in sap adjustment - sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"]) - - adjustments.append( - { - "recommendation_id": rec["recommendation_id"], - "phase": rec["phase"], - "sap_adjustment": sap_adjustment, - } - ) - - property_phase_impact["sap"] = proposed_sap_impact - - # Update the current phase values - current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] - - if rec["type"] == "loft_insulation": - # When we have a loft insulation recommendation, where there is an extension and the existing - # amount of loft insulation is already good, we limit the SAP points - # By limiting here, we don't change the value in current_phase_values. This means that the - # future recommendations won't have an impact that is too large - li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit( - property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"] - ) - if li_sap_limit is not None: - new_value = min(property_phase_impact["sap"], li_sap_limit) - # If we've made an adjustment, keep track of it - if new_value != property_phase_impact["sap"]: - adjustments.append( - { - "recommendation_id": rec["recommendation_id"], - "phase": rec["phase"], - # If we've made an adjustment, it will be negative - "sap_adjustment": property_phase_impact["sap"] - new_value, - } - ) - property_phase_impact["sap"] = new_value - # Update the current phase values - current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] - - if rec["type"] == "solar_pv": - # We use the SAP points in the recommendation as a minimum - proposed_impact = ( - rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else - property_phase_impact["sap"] - ) - - # SAP adjustments should be negative - if proposed_impact != property_phase_impact["sap"]: - adjustments.append( - { - "recommendation_id": rec["recommendation_id"], - "phase": rec["phase"], - # If we've made an adjustment, we will be increasing the number of SAP - # points. Since, we subtract adjustments, this number should be negative - "sap_adjustment": property_phase_impact["sap"] - proposed_impact, - } - ) - property_phase_impact["sap"] = proposed_impact - - # Update the current phase values - current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + property_phase_impact, current_phase_values, adjustments = cls._apply_measure_specific_rules( + rec=rec, + property_phase_impact=property_phase_impact, + previous_phase_values=previous_phase_values, + current_phase_values=current_phase_values, + adjustments=adjustments, + property_instance=property_instance + ) # Insert this information into the recommendation. if not rec.get("survey", False): @@ -874,11 +943,7 @@ class Recommendations: rec["co2_equivalent_savings"] = property_phase_impact["carbon"] rec["heat_demand"] = property_phase_impact["heat_demand"] - if ( - (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or - (rec["heat_demand"] is None) - ): - raise ValueError("sap points, co2 or heat demand is missing") + cls._validate_recommendation_updates(rec) impact_summary.append( {