refactor to break recommendaiton impact into functions

This commit is contained in:
Khalim Conn-Kowlessar 2026-01-20 16:36:55 +00:00
parent 32a3695ba2
commit 2dda567e2d

View file

@ -561,6 +561,262 @@ class Recommendations:
return cls.MV_INCREASING_VARIABLES, cls.MV_DECREASING_VARIABLES
return cls.INCREASING_VARIABLES, cls.DECREASING_VARIABLES
@staticmethod
def _get_previous_phase_values(
rec_phase: int,
starting_phase: int,
impact_summary: list[dict],
property_instance: Property,
) -> dict:
if rec_phase == starting_phase:
return {
"sap": float(property_instance.data["current-energy-efficiency"]),
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
previous_phase_reps = [
x for x in impact_summary
if x["phase"] == rec_phase - 1 and x["representative"]
]
if len(previous_phase_reps) == 1:
return previous_phase_reps[0]
# Median fallback (including zero-length case)
keys = ("sap", "carbon", "heat_demand")
return {
key: np.median([item[key] for item in previous_phase_reps])
for key in keys
}
@classmethod
def _get_phase_predictions(
cls,
property_predictions: dict,
recommendation_id: str,
) -> dict:
return {
prefix: (
property_predictions[f"{prefix}_predictions"]
.loc[
property_predictions[f"{prefix}_predictions"]["recommendation_id"]
== str(recommendation_id),
"predictions",
]
.values[0]
)
for prefix in cls.PREDICTION_PREFIXES
}
@classmethod
def _resolve_current_phase_sap(
cls,
rec: Mapping[str, any],
previous_phase_values: Mapping[str, any],
phase_energy_efficiency_metrics: Mapping[str, any],
adjustments: list[dict],
) -> float:
if rec.get("survey", False):
return rec["sap_points"] + previous_phase_values["sap"]
sap = phase_energy_efficiency_metrics["sap_change"]
prior_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]]
if not prior_adjustments:
return sap
filtered = cls._filter_phase_adjustment(prior_adjustments)
return sap - sum(a["sap_adjustment"] for a in filtered)
@classmethod
def _compute_phase_impact(
cls,
rec_type: str,
previous_phase_values: dict,
current_phase_values: dict,
) -> dict:
"""
Utility function for computing the impact of a recommendation phase, enforcing monotonicity
:param rec_type: string, the recommendation type
:param previous_phase_values: dict, the previous phase values
:param current_phase_values: dict, the current phase values
:return: dict, the impact of the phase
"""
phase_increasing, phase_decreasing = cls.get_monotonic_variables(rec_type)
# Enforce monotonicity
for v in phase_increasing:
current_phase_values[v] = max(current_phase_values[v], previous_phase_values[v])
for v in phase_decreasing:
current_phase_values[v] = min(current_phase_values[v], previous_phase_values[v])
# Compute impact
impact = {
"sap": current_phase_values["sap"] - previous_phase_values["sap"],
"carbon": previous_phase_values["carbon"] - current_phase_values["carbon"],
"heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"],
}
# Clamp values
for metric in impact:
if rec_type != "mechanical_ventilation":
impact[metric] = max(0, impact[metric])
if metric == "sap":
impact[metric] = round(impact[metric], 2)
else:
impact[metric] = min(0, impact[metric])
return impact
@classmethod
def _apply_measure_specific_rules(
cls,
rec: dict,
property_phase_impact: dict,
previous_phase_values: dict,
current_phase_values: dict,
adjustments: list,
property_instance,
):
# For the moment, we cap the number of SAP points that can be achieved by LEDs at 2
if rec["type"] == "low_energy_lighting":
lighting_sap_limit = LightingRecommendations.get_sap_limit(
property_instance.data["lighting-energy-eff"],
property_instance.lighting["low_energy_proportion"]
)
# add an adjustment
proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit)
if proposed_sap_impact != property_phase_impact["sap"]:
# Store the sap adjustment. The proposed sap impact will always be less
# than the current sap impact, so the adjustment is always positive
# as we subtract it from the future phases
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact,
}
)
property_phase_impact["sap"] = proposed_sap_impact
property_phase_impact["carbon"] = min(
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"]
elif rec["type"] == "mechanical_ventilation":
# ventilation is capped by having no greater and a -4 impact
ventilation_sap_limit = -4
ventilation_out_of_bounds = cls._check_veniltation_out_of_bounds(
property_phase_impact["sap"], ventilation_sap_limit
)
if ventilation_out_of_bounds:
previous_modelled_sap = previous_phase_values.get("sap_prediction", 0)
proposed_sap_impact = current_phase_values["sap"] - previous_modelled_sap
proposal_out_of_bounds = cls._check_veniltation_out_of_bounds(
proposed_sap_impact, ventilation_sap_limit
)
if proposal_out_of_bounds:
proposed_sap_impact = cls._adjust_ventilation_sap(
proposed_sap_impact, ventilation_sap_limit
)
# We keep track of the adjustment
# In this case, if the SAP impact has increased, then the adustment should be negative
# otherwise it should be positive
# When we add the total adjustment, it's an addition
# Example
# Before: 60, impact -2 => 58
# After: 60, impact -1 (So the impact is bigger) => 59
# So in this case, we need to make sure we add 1 to all future predictions so
# the adjustment should be positive
# Before: 60, impact 1 => 61
# After: 60, impact -1 => 59
# So in this case, we need to make sure we subtract 1 to all future predictions so
# the adjustment should be negative
# Both cases are reflected in sap adjustment
sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"])
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": sap_adjustment,
}
)
property_phase_impact["sap"] = proposed_sap_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
elif rec["type"] == "loft_insulation":
# When we have a loft insulation recommendation, where there is an extension and the existing
# amount of loft insulation is already good, we limit the SAP points
# By limiting here, we don't change the value in current_phase_values. This means that the
# future recommendations won't have an impact that is too large
li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit(
property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"]
)
if li_sap_limit is not None:
new_value = min(property_phase_impact["sap"], li_sap_limit)
# If we've made an adjustment, keep track of it
if new_value != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, it will be negative
"sap_adjustment": property_phase_impact["sap"] - new_value,
}
)
property_phase_impact["sap"] = new_value
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
elif rec["type"] == "solar_pv":
# We use the SAP points in the recommendation as a minimum
proposed_impact = (
rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else
property_phase_impact["sap"]
)
# SAP adjustments should be negative
if proposed_impact != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, we will be increasing the number of SAP
# points. Since, we subtract adjustments, this number should be negative
"sap_adjustment": property_phase_impact["sap"] - proposed_impact,
}
)
property_phase_impact["sap"] = proposed_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
return property_phase_impact, current_phase_values, adjustments
@staticmethod
def _validate_recommendation_updates(rec: Mapping[str, any]):
"""
Utility function to validate that the recommendation updates have been applied correctly
:param rec: updated recommendation
:return:
"""
if (
(rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or
(rec["heat_demand"] is None)
):
raise ValueError("sap points, co2 or heat demand is missing")
@classmethod
def calculate_recommendation_impact(
cls,
@ -575,6 +831,9 @@ class Recommendations:
Given predictions from the model apis, with method will update the recommendations with the predicted
impact of the recommendation on the property
This algorithm is structured as a large loop, but this is due to the fact that it's sequential in nature -
each phase depends on the previous, with adjustments and constraints being allied along the way
This function will return two objects:
1) Updated recommendations with the predicted impact of the recommendation
2) A list of impacts by phase, which will be used for the kwh model scoring
@ -595,8 +854,9 @@ class Recommendations:
# shallow copy intentional - we're going to modify the internals
property_recommendations = recommendations[property_instance.id].copy()
representative_recs = representative_recommendations[property_instance.id].copy()
representative_ids = [r["recommendation_id"] for r in representative_recs]
representative_ids = [
r["recommendation_id"] for r in representative_recommendations[property_instance.id]
]
# We allow for negative phase
starting_phase = min(rec["phase"] for recs in property_recommendations for rec in recs)
@ -606,20 +866,19 @@ class Recommendations:
impact_summary, adjustments = [], []
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
if rec["type"] in ["trickle_vents", "draught_proofing", "extension_cavity_wall_insulation"]:
# We don't have a percieved sap impact of mechanical ventilation or trickle vents, and we don't
# have the capacity to score draught proofing
# --- Special-case: non-modelled measures -------------------------
if rec["type"] in {
"trickle_vents",
"draught_proofing",
"extension_cavity_wall_insulation",
}:
if rec["type"] == "extension_cavity_wall_insulation":
previous_phase = [x for x in impact_summary if x["phase"] == (rec["phase"] - 1)]
if previous_phase:
sap = previous_phase[0]["sap"]
carbon = previous_phase[0]["carbon"]
heat_demand = previous_phase[0]["heat_demand"]
else:
sap = float(property_instance.data["current-energy-efficiency"])
carbon = float(property_instance.data["co2-emissions-current"])
heat_demand = float(property_instance.data["energy-consumption-current"])
previous = cls._get_previous_phase_values(
rec_phase=rec["phase"],
starting_phase=starting_phase,
impact_summary=impact_summary,
property_instance=property_instance,
)
impact_summary.append(
{
@ -627,72 +886,29 @@ class Recommendations:
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
"sap": sap + rec["sap_points"],
"carbon": carbon - rec["co2_equivalent_savings"],
"heat_demand": heat_demand - rec["heat_demand"],
"sap": previous["sap"] + rec["sap_points"],
"carbon": previous["carbon"] - rec["co2_equivalent_savings"],
"heat_demand": previous["heat_demand"] - rec["heat_demand"],
}
)
continue
phase_energy_efficiency_metrics = {
prefix: property_predictions[prefix + "_predictions"][
property_predictions[prefix + "_predictions"]["recommendation_id"] == str(
rec["recommendation_id"]
)]["predictions"].values[0] for prefix in cls.PREDICTION_PREFIXES
}
phase_energy_efficiency_metrics = cls._get_phase_predictions(
property_predictions=property_predictions,
recommendation_id=rec["recommendation_id"],
)
# We structure this so that depending on the phase, we capture the previous phase impacts and
# then just have one piece of code to calculate the difference
if rec["phase"] == starting_phase:
# These are just the starting values, from the EPC. When we score the ML models,
# heating_cost_starting and heating_cost_ending are just the values in the EPC. However, with
# heating_cost_ending, we expect that the EPC will predict a heating cost based on what would happen
# if we implemented the recommendation today, so our starting value is the EPC
previous_phase_values = {
"sap": float(property_instance.data["current-energy-efficiency"]),
# For carbon, even though we generally use the updated figure which includes the carbon
# associated to appliances, for this scoring process we use the EPC carbon value. This means
# that we don't overestimate the impact since the model uses the EPC carbon value
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
else:
previous_phase_values_multiple = [
x for x in impact_summary if x["phase"] == (rec["phase"] - 1) and x["representative"]
]
if len(previous_phase_values_multiple) != 1:
# Take an average of each of the previous phases
keys_to_median = ["sap", "carbon", "heat_demand"]
previous_phase_values = {}
for key in keys_to_median:
values = [item[key] for item in previous_phase_values_multiple]
previous_phase_values[key] = np.median(values)
else:
previous_phase_values = previous_phase_values_multiple[0]
# We extract the values for the current phase
if rec.get("survey", False):
current_phase_sap = rec["sap_points"] + previous_phase_values["sap"]
else:
current_phase_sap = phase_energy_efficiency_metrics["sap_change"]
# If we have an adjustment, we apply it here. We de-dupe, taking the
# largest adjustment by phase - though, they should all be the same
phase_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]]
if phase_adjustments:
phase_adjustments = cls._filter_phase_adjustment(phase_adjustments)
total_adjustment = sum(
a["sap_adjustment"] for a in phase_adjustments
)
# Take the max, by phase, subtract from the current phase sap
current_phase_sap -= total_adjustment
previous_phase_values = cls._get_previous_phase_values(
rec_phase=rec["phase"],
starting_phase=starting_phase,
impact_summary=impact_summary,
property_instance=property_instance
)
current_phase_values = {
"sap": current_phase_sap,
"sap": cls._resolve_current_phase_sap(
rec, previous_phase_values, phase_energy_efficiency_metrics, adjustments
),
"carbon": phase_energy_efficiency_metrics["carbon_change"],
"heat_demand": phase_energy_efficiency_metrics["heat_demand"],
}
@ -705,167 +921,20 @@ class Recommendations:
# However, if the recommendation is mechanical ventilation, this can have a negative SAP impact so
# we don't apply this rule
phase_increasing_variables, phase_decreasing_variables = cls.get_monotonic_variables(rec["type"])
property_phase_impact = cls._compute_phase_impact(
rec_type=rec["type"],
previous_phase_values=previous_phase_values,
current_phase_values=current_phase_values,
)
for v in phase_increasing_variables:
current_phase_values[v] = (
current_phase_values[v] if current_phase_values[v] > previous_phase_values[v] else
previous_phase_values[v]
)
for v in previous_phase_values:
if v in phase_decreasing_variables:
current_phase_values[v] = (
current_phase_values[v] if current_phase_values[v] < previous_phase_values[v] else
previous_phase_values[v]
)
property_phase_impact = {
# Increasing
"sap": current_phase_values["sap"] - previous_phase_values["sap"],
# Decreasing
"carbon": previous_phase_values["carbon"] - current_phase_values["carbon"],
# Decreasing
"heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"],
}
# Prevent from being negative - apart from ventilation
for metric in ["sap", "carbon", "heat_demand"]:
if rec["type"] != "mechanical_ventilation":
property_phase_impact[metric] = (
0 if property_phase_impact[metric] < 0 else property_phase_impact[metric]
)
if metric == "sap":
property_phase_impact[metric] = round(property_phase_impact[metric], 2)
else:
# We prevent mechanical ventilation from being positive
property_phase_impact[metric] = (
0 if property_phase_impact[metric] > 0 else property_phase_impact[metric]
)
# For the moment, we cap the number of SAP points that can be achieved by LEDs at 2
if rec["type"] == "low_energy_lighting":
lighting_sap_limit = LightingRecommendations.get_sap_limit(
property_instance.data["lighting-energy-eff"],
property_instance.lighting["low_energy_proportion"]
)
# add an adjustment
proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit)
if proposed_sap_impact != property_phase_impact["sap"]:
# Store the sap adjustment. The proposed sap impact will always be less
# than the current sap impact, so the adjustment is always positive
# as we subtract it from the future phases
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact,
}
)
property_phase_impact["sap"] = proposed_sap_impact
property_phase_impact["carbon"] = min(
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"]
# We also ensure that mechanical ventilation doesn't have an ovely strong negative SAP impact
if rec["type"] == "mechanical_ventilation":
# ventilation is capped by having no greater and a -4 impact
ventilation_sap_limit = -4
ventilation_out_of_bounds = cls._check_veniltation_out_of_bounds(
property_phase_impact["sap"], ventilation_sap_limit
)
if ventilation_out_of_bounds:
previous_modelled_sap = previous_phase_values.get("sap_prediction", 0)
proposed_sap_impact = current_phase_sap - previous_modelled_sap
proposal_out_of_bounds = cls._check_veniltation_out_of_bounds(
proposed_sap_impact, ventilation_sap_limit
)
if proposal_out_of_bounds:
proposed_sap_impact = cls._adjust_ventilation_sap(
proposed_sap_impact, ventilation_sap_limit
)
# We keep track of the adjustment
# In this case, if the SAP impact has increased, then the adustment should be negative
# otherwise it should be positive
# When we add the total adjustment, it's an addition
# Example
# Before: 60, impact -2 => 58
# After: 60, impact -1 (So the impact is bigger) => 59
# So in this case, we need to make sure we add 1 to all future predictions so
# the adjustment should be positive
# Before: 60, impact 1 => 61
# After: 60, impact -1 => 59
# So in this case, we need to make sure we subtract 1 to all future predictions so
# the adjustment should be negative
# Both cases are reflected in sap adjustment
sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"])
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": sap_adjustment,
}
)
property_phase_impact["sap"] = proposed_sap_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
if rec["type"] == "loft_insulation":
# When we have a loft insulation recommendation, where there is an extension and the existing
# amount of loft insulation is already good, we limit the SAP points
# By limiting here, we don't change the value in current_phase_values. This means that the
# future recommendations won't have an impact that is too large
li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit(
property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"]
)
if li_sap_limit is not None:
new_value = min(property_phase_impact["sap"], li_sap_limit)
# If we've made an adjustment, keep track of it
if new_value != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, it will be negative
"sap_adjustment": property_phase_impact["sap"] - new_value,
}
)
property_phase_impact["sap"] = new_value
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
if rec["type"] == "solar_pv":
# We use the SAP points in the recommendation as a minimum
proposed_impact = (
rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else
property_phase_impact["sap"]
)
# SAP adjustments should be negative
if proposed_impact != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, we will be increasing the number of SAP
# points. Since, we subtract adjustments, this number should be negative
"sap_adjustment": property_phase_impact["sap"] - proposed_impact,
}
)
property_phase_impact["sap"] = proposed_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
property_phase_impact, current_phase_values, adjustments = cls._apply_measure_specific_rules(
rec=rec,
property_phase_impact=property_phase_impact,
previous_phase_values=previous_phase_values,
current_phase_values=current_phase_values,
adjustments=adjustments,
property_instance=property_instance
)
# Insert this information into the recommendation.
if not rec.get("survey", False):
@ -874,11 +943,7 @@ class Recommendations:
rec["co2_equivalent_savings"] = property_phase_impact["carbon"]
rec["heat_demand"] = property_phase_impact["heat_demand"]
if (
(rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or
(rec["heat_demand"] is None)
):
raise ValueError("sap points, co2 or heat demand is missing")
cls._validate_recommendation_updates(rec)
impact_summary.append(
{