import pandas as pd import numpy as np from backend.Property import Property from typing import List, Mapping, Any from itertools import groupby from recommendations.FloorRecommendations import FloorRecommendations from recommendations.WallRecommendations import WallRecommendations from recommendations.RoofRecommendations import RoofRecommendations from recommendations.VentilationRecommendations import VentilationRecommendations from recommendations.FireplaceRecommendations import FireplaceRecommendations from recommendations.LightingRecommendations import LightingRecommendations from recommendations.SolarPvRecommendations import SolarPvRecommendations from recommendations.WindowsRecommendations import WindowsRecommendations from recommendations.HeatingRecommender import HeatingRecommender from recommendations.HotwaterRecommendations import HotwaterRecommendations from recommendations.SecondaryHeating import SecondaryHeating from recommendations.DraughtProofingRecommendations import DraughtProofingRecommendations from backend.ml_models.AnnualBillSavings import AnnualBillSavings from backend.apis.GoogleSolarApi import GoogleSolarApi import backend.app.assumptions as assumptions from backend.app.plan.schemas import SPECIFIC_MEASURES, MEASURE_MAP, NON_INVASIVE_SPECIFIC_MEASURES from utils.logger import setup_logger STARTING_DUMMY_ID_VALUE = -9999 logger = setup_logger() class Recommendations: """ High level recommendations class, which sits above the measure specific recommendation classes """ # Used in calculation of recommendation impact - increasing variables are features where # a higher value indicates an improvement. Decreasing is the opposite INCREASING_VARIABLES = ["sap"] DECREASING_VARIABLES = ["carbon", "heat_demand"] # If the recommendation is mechanical ventilation, we don't apply the rule that the new value should be higher MV_INCREASING_VARIABLES = ["carbon", "heat_demand"] MV_DECREASING_VARIABLES = ["sap"] # List of models we expect predictions for, when calculation recommendation impact PREDICTION_PREFIXES = ["sap_change", "heat_demand", "carbon_change"] def __init__( self, property_instance: Property, materials: List, exclusions: List[str] = None, inclusions: List[str] = None, default_u_values: bool = False, ): """ :param property_instance: Instance of the Property class, for the home associated to property_id :param materials: List of materials to be used in the recommendations :param exclusions: List of specific measures or measure types to exclude from recommendations. Defaulted to None, meaning no exclusions to be applied :param inclusions: List of specific measures of measure types to include. Defaulted to None, meaning all measures are included :param default_u_values: Boolean, if True, the recommendations will use the default u-values for the property """ self.property_instance = property_instance self.materials = materials self.exclusions = exclusions if exclusions else [] self.inclusions = inclusions if inclusions else [] self.default_u_values = default_u_values self.all_specific_measures = SPECIFIC_MEASURES self.all_non_invase_measures = NON_INVASIVE_SPECIFIC_MEASURES self.non_invasive_recommendation_types = [ r["type"] for r in self.property_instance.non_invasive_recommendations ] self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials) self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials) self.roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials) self.ventilation_recomender = VentilationRecommendations( property_instance=property_instance, materials=materials ) self.draught_proofing_recommender = DraughtProofingRecommendations(property_instance=property_instance) self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance, materials=materials) self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials) self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials) self.solar_recommender = SolarPvRecommendations(property_instance=property_instance, materials=materials) self.heating_recommender = HeatingRecommender(property_instance=property_instance, materials=materials) self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance) self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance) def find_included_measures(self): """ Determines the set of measures to be included in recommendations """ # Generally, inclusions is a global option and will overrule specific property non-invasive recommendations. # This is done so that we can use inclusions to specify scenarios. inclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.inclusions] exclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.exclusions] # if we have already installed measures, we need to include them so they get factored into the baseline # this is something we'll likely need to remove if self.property_instance.already_installed: # We make sure that any already installed measures are included for rec in self.property_instance.already_installed: if rec not in inclusions_full: inclusions_full.append(rec) # We remove them from the exclusions if they are there exclusions_full = [e for e in exclusions_full if e not in self.property_instance.already_installed] # We need to unlist any lists, but we should check if they're lists first inclusions_full = [ item for sublist in inclusions_full for item in (sublist if isinstance(sublist, list) else [sublist]) ] exclusions_full = [ item for sublist in exclusions_full for item in (sublist if isinstance(sublist, list) else [sublist]) ] # If inclusions and exclusions are empty, it means that nothing was specified, so we allow # all recommendation types if not inclusions_full and not exclusions_full: # All typical measures - this does not include non-invasive measures inless they are specified return self.all_specific_measures + self.non_invasive_recommendation_types if inclusions_full: return inclusions_full if exclusions_full: measures = [ m for m in self.all_specific_measures + self.non_invasive_recommendation_types if m not in exclusions_full ] return measures def recommend(self): """ This method runs the recommendations for the individual measures and then appends them to a list for output The recommendations are implemented in order of suggested phase, from fabric first to heating systems, to renewables. :return: """ property_recommendations = [] phase = 0 measures = self.find_included_measures() non_invasive_recommendation_types = [r["type"] for r in self.property_instance.non_invasive_recommendations] # Building Fabric self.wall_recomender.recommend(phase=phase, measures=measures, default_u_values=self.default_u_values) if self.wall_recomender.recommendations: property_recommendations.append(self.wall_recomender.recommendations) phase += 1 # We handle recommendations covering specific non-invasive measures new_phase = self.wall_recomender.recommend_extended(phase=phase, measures=measures) if self.wall_recomender.extended_recommendations: property_recommendations.append(self.wall_recomender.extended_recommendations) # We don't have any phasing here phase = new_phase self.roof_recommender.recommend(phase=phase, measures=measures, default_u_values=self.default_u_values) if self.roof_recommender.recommendations: property_recommendations.append(self.roof_recommender.recommendations) phase += 1 # Ventilation recommendations # We only produce a ventilation recommendation if the property is recommended to have wall or roof # insulation We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this # has no real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we # have any wall or roof recommendations, we will ensure that ventilation is included in the simulation if ( (self.wall_recomender.recommendations or self.roof_recommender.recommendations) and ("ventilation" in measures) ): self.ventilation_recomender.recommend(phase=phase) if self.ventilation_recomender.recommendation: property_recommendations.append(self.ventilation_recomender.recommendation) phase += 1 if "trickle_vents" in measures: # This is a recommendatin that typically comes from an energy assessment trickle_vents_rec = self.ventilation_recomender.recommend_trickle_vents() if trickle_vents_rec: property_recommendations.append(trickle_vents_rec) if "draught_proofing" in measures: # This is a recommendation that in some instances we can recommend, by deducing it from the SAP # recommendations, however we will implement this later self.draught_proofing_recommender.recommend() if self.draught_proofing_recommender.recommendation: property_recommendations.append(self.draught_proofing_recommender.recommendation) self.floor_recommender.recommend(phase=phase, measures=measures) if self.floor_recommender.recommendations: property_recommendations.append(self.floor_recommender.recommendations) phase += 1 if "low_energy_lighting" in measures: self.lighting_recommender.recommend(phase=phase) if self.lighting_recommender.recommendation: property_recommendations.append(self.lighting_recommender.recommendation) phase += 1 if "mixed_glazing" not in non_invasive_recommendation_types: # If we have a mixed glazing recommendation, we prioritise this over the windows recommendation self.windows_recommender.recommend(phase=phase, measures=measures) if self.windows_recommender.recommendation: property_recommendations.append(self.windows_recommender.recommendation) phase += 1 if "mixed_glazing" in measures: # This is a recommendation that comes exclusively from an energy assessment mixed_glazing_rec = self.windows_recommender.recommend_mixed_glazing(phase=phase) if mixed_glazing_rec: property_recommendations.append(mixed_glazing_rec) phase += 1 if "fireplace" in measures: self.fireplace_recommender.recommend(phase=phase) if self.fireplace_recommender.recommendation: property_recommendations.append(self.fireplace_recommender.recommendation) phase += 1 cavity_or_loft_recommendations = [ r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations if r["type"] in ["cavity_wall_insulation", "loft_insulation"] ] has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0 self.heating_recommender.recommend( phase=phase, measures=measures, has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations, ) if self.heating_recommender.heating_recommendations: # We split into first and second phase recommendations first_phase_recommendations = [ r for r in ( self.heating_recommender.heating_recommendations ) if r["phase"] == phase ] second_phase_recommendations = [ r for r in ( self.heating_recommender.heating_recommendations ) if r["phase"] == phase + 1 ] if first_phase_recommendations and second_phase_recommendations: raise Exception("Imeplement me") if first_phase_recommendations: property_recommendations.append(first_phase_recommendations) if second_phase_recommendations: property_recommendations.append(second_phase_recommendations) # We check if we have distinct heating and heating controls recommendations # If so, we increment by 2 (one of the heating system, one for the heating controls) # otherwise we incremenet by 1 max_used_phase = max( [rec["phase"] for rec in self.heating_recommender.heating_recommendations] ) amount_to_increment = max_used_phase - phase + 1 phase += amount_to_increment # Hot water if "hot_water" in measures: self.hotwater_recommender.recommend(phase=phase) if self.hotwater_recommender.recommendations: if len(self.hotwater_recommender.recommendations) > 1: for r in self.hotwater_recommender.recommendations: property_recommendations.append([r]) phase += 1 else: property_recommendations.append(self.hotwater_recommender.recommendations) phase += 1 if "secondary_heating" in measures: self.secondary_heating_recommender.recommend(phase=phase) if self.secondary_heating_recommender.recommendation: property_recommendations.append(self.secondary_heating_recommender.recommendation) phase += 1 # Renewables if "solar_pv" in measures: self.solar_recommender.recommend(phase=phase) if self.solar_recommender.recommendation: property_recommendations.append(self.solar_recommender.recommendation) phase += 1 if self.property_instance.already_installed: # We need to re-shuffle our measures property_recommendations_removed_installed = [] already_installed_recs = [] for recs in property_recommendations: phase_recs = [] phase_already_installed_recs = [] for rec in recs: if rec["already_installed"]: phase_already_installed_recs.append(rec) else: phase_recs.append(rec) if phase_recs: property_recommendations_removed_installed.append(phase_recs) if phase_already_installed_recs: already_installed_recs.append(phase_already_installed_recs) # We re-set the phases for i, recs in enumerate(property_recommendations_removed_installed): for rec in recs: rec["phase"] = i # already installed recs get negative phasing already_installed_phase = -len(already_installed_recs) for recs in already_installed_recs: for rec in recs: rec["phase"] = already_installed_phase already_installed_phase += 1 property_recommendations = already_installed_recs + property_recommendations_removed_installed # We insert temporary ids into the recommendations which is important for the optimiser later property_recommendations = self.insert_temp_recommendation_id(property_recommendations) # We also need to create the representative recommendations for each recommendation type property_representative_recommendations = self.create_representative_recommendations( property_recommendations, ) # Check to make sure measure_type is populated for recs in property_recommendations: if any(pd.isnull(rec.get("measure_type")) for rec in recs): raise ValueError("Measure type is not populated") return property_recommendations, property_representative_recommendations @staticmethod def create_representative_recommendations(property_recommendations): """ This method will create a representative recommendation for each recommendation type In order to create a representative recommendation, we choose the recommendation that has: 1) Where a U-value is available, has the best U-value to cost ratio 2) Where SAP points are available, has the best SAP points to cost ratio We don't include mechanical ventilation in the representative recommendations, since we don't attribute a SAP impact to this recommendation :return: """ property_representative_recommendations = [] for recommendations_by_type in property_recommendations: # If the property was initially surveyed as filled, but the cavity was only partially filled, we don't # want to include the cavity wall insulation recommendation in the defaults if recommendations_by_type[0].get("type") in [ "trickle_vents", "draught_proofing" ]: continue has_u_value = recommendations_by_type[0].get("new_u_value") is not None has_sap_points = all([r.get("sap_points") is not None for r in recommendations_by_type]) has_rank = recommendations_by_type[0].get("rank") is not None # When check if these recommendations have two different types, such as solid wall insulation # If we have multiple types, we group by type and then select the best recommendation for each type # If we have a heating and heating control recommendation, we use JUST the heating reommendation has_both_heating_types = all( x in [rec["type"] for rec in recommendations_by_type] for x in ["heating", "heating_control"] ) if has_both_heating_types: # Take just heating recommendations_by_type = [ rec for rec in recommendations_by_type if rec["type"] == "heating" ] recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"]) representative_recommendations = [] for _type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]): recommendations = list(recommendations) # We also create an efficiency key, which is used to sort the recommendations if has_u_value: # We sort by the cost per U-value improvement - the lower the better for rec in recommendations: rec["efficiency"] = rec["total"] / rec["starting_u_value"] - rec["new_u_value"] elif not has_u_value and has_sap_points: # Sort the options by the cost per SAP point improvement - the lower the better for rec in recommendations: if rec["sap_points"] == 0: rec["efficiency"] = 0 else: rec["efficiency"] = rec["total"] / rec["sap_points"] elif has_rank: # Sort the options by rank - the lower the better for rec in recommendations: rec["efficiency"] = rec["rank"] else: # Sort the options by cost - the lower the better for rec in recommendations: rec["efficiency"] = rec["total"] recommendations.sort( key=lambda x: x["efficiency"] ) representative_recommendations.append(recommendations[0]) property_representative_recommendations.extend(representative_recommendations) return property_representative_recommendations @staticmethod def insert_temp_recommendation_id(property_recommendations): """ Creates a temporary recommendation id which is needed for filtering recommendations between default and no, after the optimiser has been run :param property_recommendations: nested list of recommendations, grouped by data_types :return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id" integer inserted """ idx = 0 for recs in property_recommendations: for rec in recs: rec["recommendation_id"] = f"{str(idx)}_phase={str(rec['phase'])}" idx += 1 return property_recommendations @staticmethod def _calculate_appliance_solar_savings( rec, property_instance, heating_kwh_reduction, hot_water_kwh_reduction, lighting_kwh_reduction ): """ Calculates the impact on kwh and cost of installing solar panels on appliances :param rec: The recommendation :param property_instance: Instance of the Property class :param heating_kwh_reduction: The kwh reduction from heating :param hot_water_kwh_reduction: The kwh reduction from hot water :param lighting_kwh_reduction: The kwh reduction from lighting :return: """ if rec["type"] != "solar_pv": return 0, 0 if property_instance.solar_panel_configuration is None: print("PLACEHOLDER ESTIMATES") # 50% reduction average kwh_reduction = property_instance.energy_consumption_estimates["adjusted"]["appliances"] * 0.5 predicted_appliances_cost_reduction = kwh_reduction * AnnualBillSavings.ELECTRICITY_PRICE_CAP return predicted_appliances_cost_reduction, kwh_reduction # Calulate the amount of energy the solar panel array will generate for this unit unit_energy_consumption = ( rec["initial_ac_kwh_per_year"] * property_instance.solar_panel_configuration["unit_share_of_energy"] ) unit_energy_utilised = unit_energy_consumption * GoogleSolarApi.SOLAR_CONSUMPTION_PROPORTION unit_energy_exported = unit_energy_consumption - unit_energy_utilised unit_energy_exported_value = unit_energy_exported * AnnualBillSavings.ELECTRICITY_EXPORT_PAYMENT # We assume that 50% of the energy generated will be used by the property without a battery # to be conservative # of the energy utilised, some of it is used by heating, hot water and lighting so we # remove that from the total unit_energy_utilised -= ( heating_kwh_reduction + hot_water_kwh_reduction + lighting_kwh_reduction ) unit_energy_utilised = 0 if unit_energy_utilised < 0 else unit_energy_utilised # This is how much energy the appliances will use after install post_install_appliance_kwh = ( property_instance.energy_consumption_estimates["adjusted"]["appliances"] - unit_energy_utilised ) post_install_appliance_kwh = ( 0 if post_install_appliance_kwh < 0 else post_install_appliance_kwh ) predicted_appliances_kwh_reduction = ( property_instance.energy_consumption_estimates["adjusted"]["appliances"] - post_install_appliance_kwh ) predicted_appliances_cost_reduction = unit_energy_exported_value + ( predicted_appliances_kwh_reduction * AnnualBillSavings.ELECTRICITY_PRICE_CAP ) return predicted_appliances_cost_reduction, predicted_appliances_kwh_reduction @staticmethod def _check_ventilation_out_of_bounds(sap_impact, ventilation_sap_limit): return (sap_impact < ventilation_sap_limit) or (sap_impact >= 0) @staticmethod def _adjust_ventilation_sap(sap_impact, ventilation_sap_limit): if sap_impact >= 0: return -1 if sap_impact < ventilation_sap_limit: return ventilation_sap_limit return sap_impact @staticmethod def _filter_phase_adjustment(phase_adjustments): """ Utility function to select the entry from the dictionary, by phase, with the largest phase adjustment :param phase_adjustments: List of phase adjustments, in the form [{"recommendation_id": str, "phase": int, "sap_adjustment": float}] :return: """ filtered_adjustments = [] phase_adjustments = sorted(phase_adjustments, key=lambda x: x["phase"]) for phase, adjustments in groupby(phase_adjustments, key=lambda x: x["phase"]): adjustments = list(adjustments) adjustments.sort(key=lambda x: x["sap_adjustment"], reverse=True) filtered_adjustments.append(adjustments[0]) return filtered_adjustments @classmethod def _filter_predictions_for_property( cls, all_predictions: Mapping[str, pd.DataFrame], property_id: str, ) -> dict: """ Utility function to filter predictions for a specific property :param all_predictions: Dictionary of all predictions from the model apis :param property_id: The property id to filter for :return: """ return { f"{prefix}_predictions": ( all_predictions[f"{prefix}_predictions"] .loc[ all_predictions[f"{prefix}_predictions"]["property_id"] == property_id ] .copy() ) for prefix in cls.PREDICTION_PREFIXES } @classmethod def get_monotonic_variables(cls, rec_type: str) -> tuple[List[str], List[str]]: """ Utility function to get the monotonic variables for a specific recommendation type :param rec_type: The recommendation type :return: """ if rec_type == "mechanical_ventilation": return cls.MV_INCREASING_VARIABLES, cls.MV_DECREASING_VARIABLES return cls.INCREASING_VARIABLES, cls.DECREASING_VARIABLES @staticmethod def _get_previous_phase_values( rec_phase: int, starting_phase: int, impact_summary: list[dict], property_instance: Property, ) -> dict: if rec_phase == starting_phase: return { "sap": float(property_instance.data["current-energy-efficiency"]), "carbon": float(property_instance.data["co2-emissions-current"]), "heat_demand": float(property_instance.data["energy-consumption-current"]), } previous_phase_reps = [ x for x in impact_summary if x["phase"] == rec_phase - 1 and x["representative"] ] if len(previous_phase_reps) == 1: return previous_phase_reps[0] # It's unlikely that this will occur but this fallback will ensure that we don't # run the next step and run a median of nothing, which will return None if not previous_phase_reps: return { "sap": float(property_instance.data["current-energy-efficiency"]), "carbon": float(property_instance.data["co2-emissions-current"]), "heat_demand": float(property_instance.data["energy-consumption-current"]), } # Median fallback (including zero-length case) keys = ("sap", "carbon", "heat_demand") return { key: np.median([item[key] for item in previous_phase_reps]) for key in keys } @classmethod def _get_phase_predictions( cls, property_predictions: dict, recommendation_id: str, ) -> dict: return { prefix: ( property_predictions[f"{prefix}_predictions"] .loc[ property_predictions[f"{prefix}_predictions"]["recommendation_id"] == str(recommendation_id), "predictions", ] .values[0] ) for prefix in cls.PREDICTION_PREFIXES } @classmethod def _resolve_current_phase_sap( cls, rec: Mapping[str, Any], previous_phase_values: Mapping[str, Any], phase_energy_efficiency_metrics: Mapping[str, Any], adjustments: list[dict], ) -> float: if rec.get("survey", False): return rec["sap_points"] + previous_phase_values["sap"] sap = phase_energy_efficiency_metrics["sap_change"] prior_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]] if not prior_adjustments: return sap filtered = cls._filter_phase_adjustment(prior_adjustments) return sap - sum(a["sap_adjustment"] for a in filtered) @classmethod def _compute_phase_impact( cls, rec_type: str, previous_phase_values: dict, current_phase_values: dict, ) -> dict: """ Utility function for computing the impact of a recommendation phase, enforcing monotonicity :param rec_type: string, the recommendation type :param previous_phase_values: dict, the previous phase values :param current_phase_values: dict, the current phase values :return: dict, the impact of the phase """ phase_increasing, phase_decreasing = cls.get_monotonic_variables(rec_type) # Enforce monotonicity for v in phase_increasing: current_phase_values[v] = max(current_phase_values[v], previous_phase_values[v]) for v in phase_decreasing: current_phase_values[v] = min(current_phase_values[v], previous_phase_values[v]) # Compute impact impact = { "sap": current_phase_values["sap"] - previous_phase_values["sap"], "carbon": previous_phase_values["carbon"] - current_phase_values["carbon"], "heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"], } # Clamp values for metric in impact: if rec_type != "mechanical_ventilation": impact[metric] = max(0, impact[metric]) if metric == "sap": impact[metric] = round(impact[metric], 2) else: impact[metric] = min(0, impact[metric]) return impact @classmethod def _apply_measure_specific_rules( cls, rec: dict, property_phase_impact: dict, previous_phase_values: dict, current_phase_values: dict, adjustments: list, property_instance, ): # For the moment, we cap the number of SAP points that can be achieved by LEDs at 2 if rec["type"] == "low_energy_lighting": lighting_sap_limit = LightingRecommendations.get_sap_limit( property_instance.data["lighting-energy-eff"], property_instance.lighting["low_energy_proportion"] ) # add an adjustment proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit) if proposed_sap_impact != property_phase_impact["sap"]: # Store the sap adjustment. The proposed sap impact will always be less # than the current sap impact, so the adjustment is always positive # as we subtract it from the future phases adjustments.append( { "recommendation_id": rec["recommendation_id"], "phase": rec["phase"], "sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact, } ) property_phase_impact["sap"] = proposed_sap_impact property_phase_impact["carbon"] = min( property_phase_impact["carbon"], rec["co2_equivalent_savings"] ) # Update the current phase values current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"] elif rec["type"] == "mechanical_ventilation": # ventilation is capped by having no greater and a -4 impact ventilation_sap_limit = -4 ventilation_out_of_bounds = cls._check_ventilation_out_of_bounds( property_phase_impact["sap"], ventilation_sap_limit ) if ventilation_out_of_bounds: previous_modelled_sap = previous_phase_values.get("sap_prediction", 0) proposed_sap_impact = current_phase_values["sap"] - previous_modelled_sap proposal_out_of_bounds = cls._check_ventilation_out_of_bounds( proposed_sap_impact, ventilation_sap_limit ) if proposal_out_of_bounds: proposed_sap_impact = cls._adjust_ventilation_sap( proposed_sap_impact, ventilation_sap_limit ) # We keep track of the adjustment # In this case, if the SAP impact has increased, then the adustment should be negative # otherwise it should be positive # When we add the total adjustment, it's an addition # Example # Before: 60, impact -2 => 58 # After: 60, impact -1 (So the impact is bigger) => 59 # So in this case, we need to make sure we add 1 to all future predictions so # the adjustment should be positive # Before: 60, impact 1 => 61 # After: 60, impact -1 => 59 # So in this case, we need to make sure we subtract 1 to all future predictions so # the adjustment should be negative # Both cases are reflected in sap adjustment sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"]) adjustments.append( { "recommendation_id": rec["recommendation_id"], "phase": rec["phase"], "sap_adjustment": sap_adjustment, } ) property_phase_impact["sap"] = proposed_sap_impact # Update the current phase values current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] # This is very much an edge case but we also the end result taking the property # below a SAP rating of 1, which is the minimum SAP rating if previous_phase_values["sap"] + property_phase_impact["sap"] < 1: sap_adjustment = 1 - (previous_phase_values["sap"] + property_phase_impact["sap"]) adjustments.append( { "recommendation_id": rec["recommendation_id"], "phase": rec["phase"], "sap_adjustment": sap_adjustment, } ) # The new impact should be the current impact plus the adjustment property_phase_impact["sap"] = property_phase_impact["sap"] + sap_adjustment # Update the current phase values current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] elif rec["type"] == "loft_insulation": # When we have a loft insulation recommendation, where there is an extension and the existing # amount of loft insulation is already good, we limit the SAP points # By limiting here, we don't change the value in current_phase_values. This means that the # future recommendations won't have an impact that is too large li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit( property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"] ) if li_sap_limit is not None: new_value = min(property_phase_impact["sap"], li_sap_limit) # If we've made an adjustment, keep track of it if new_value != property_phase_impact["sap"]: adjustments.append( { "recommendation_id": rec["recommendation_id"], "phase": rec["phase"], # If we've made an adjustment, it will be negative "sap_adjustment": property_phase_impact["sap"] - new_value, } ) property_phase_impact["sap"] = new_value # Update the current phase values current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] elif rec["type"] == "solar_pv": # We use the SAP points in the recommendation as a minimum proposed_impact = ( rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else property_phase_impact["sap"] ) # SAP adjustments should be negative if proposed_impact != property_phase_impact["sap"]: adjustments.append( { "recommendation_id": rec["recommendation_id"], "phase": rec["phase"], # If we've made an adjustment, we will be increasing the number of SAP # points. Since, we subtract adjustments, this number should be negative "sap_adjustment": property_phase_impact["sap"] - proposed_impact, } ) property_phase_impact["sap"] = proposed_impact # Update the current phase values current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] return property_phase_impact, current_phase_values, adjustments @staticmethod def _validate_recommendation_updates(rec: Mapping[str, Any]): """ Utility function to validate that the recommendation updates have been applied correctly :param rec: updated recommendation :return: """ if ( (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or (rec["heat_demand"] is None) ): raise ValueError("sap points, co2 or heat demand is missing") @classmethod def calculate_recommendation_impact( cls, property_instance: Property, all_predictions: Mapping[str, Any], recommendations: Mapping[int, List], representative_recommendations: Mapping[int, List], debug: bool = False ) -> (Mapping[int, List], List[Mapping[str, Any]]): """ Given predictions from the model apis, with method will update the recommendations with the predicted impact of the recommendation on the property This algorithm is structured as a large loop, but this is due to the fact that it's sequential in nature - each phase depends on the previous, with adjustments and constraints being allied along the way This function will return two objects: 1) Updated recommendations with the predicted impact of the recommendation 2) A list of impacts by phase, which will be used for the kwh model scoring :param property_instance: Instance of the Property class, for the home associated to property_id :param all_predictions: dictionary of predictions from the model apis :param recommendations: dictionary of recommendations for the property :param representative_recommendations: dictionary of representative recommendations for the property :param debug: boolean, indicating if the function is running in debug mode. The only difference is that adjustments are returned for testing :return: Updated recommendations with predicted impact, and a list of impacts by phase """ property_predictions = cls._filter_predictions_for_property( all_predictions, str(property_instance.id) ) # shallow copy intentional - we're going to modify the internals property_recommendations = recommendations[property_instance.id].copy() representative_ids = [ r["recommendation_id"] for r in representative_recommendations[property_instance.id] ] # We allow for negative phase starting_phase = min(rec["phase"] for recs in property_recommendations for rec in recs) # We keep a history of adjustments we have made, so that we ensure that we adjust future # phases for SAP impact_summary, adjustments = [], [] for recommendations_by_type in property_recommendations: for rec in recommendations_by_type: # --- Special-case: non-modelled measures ------------------------- if rec["type"] in { "trickle_vents", "draught_proofing", "extension_cavity_wall_insulation", }: if rec["type"] == "extension_cavity_wall_insulation": previous = cls._get_previous_phase_values( rec_phase=rec["phase"], starting_phase=starting_phase, impact_summary=impact_summary, property_instance=property_instance, ) impact_summary.append( { "phase": rec["phase"], "representative": rec["recommendation_id"] in representative_ids, "recommendation_id": rec["recommendation_id"], "measure_type": rec["measure_type"], "sap": previous["sap"] + rec["sap_points"], "carbon": previous["carbon"] - rec["co2_equivalent_savings"], "heat_demand": previous["heat_demand"] - rec["heat_demand"], } ) continue phase_energy_efficiency_metrics = cls._get_phase_predictions( property_predictions=property_predictions, recommendation_id=rec["recommendation_id"], ) previous_phase_values = cls._get_previous_phase_values( rec_phase=rec["phase"], starting_phase=starting_phase, impact_summary=impact_summary, property_instance=property_instance ) current_phase_values = { "sap": cls._resolve_current_phase_sap( rec, previous_phase_values, phase_energy_efficiency_metrics, adjustments ), "carbon": phase_energy_efficiency_metrics["carbon_change"], "heat_demand": phase_energy_efficiency_metrics["heat_demand"], } # For increasing variables, the new value needs to be higher than the previous, otherwise we set it to # the previous # For decreasing variables, the new value should be lower than the previous, otherwise we set it to # the previous # In either case, we adjudge the recommendation to have had no/negligible impact # However, if the recommendation is mechanical ventilation, this can have a negative SAP impact so # we don't apply this rule property_phase_impact = cls._compute_phase_impact( rec_type=rec["type"], previous_phase_values=previous_phase_values, current_phase_values=current_phase_values, ) property_phase_impact, current_phase_values, adjustments = cls._apply_measure_specific_rules( rec=rec, property_phase_impact=property_phase_impact, previous_phase_values=previous_phase_values, current_phase_values=current_phase_values, adjustments=adjustments, property_instance=property_instance ) # Insert this information into the recommendation. if not rec.get("survey", False): rec["sap_points"] = property_phase_impact["sap"] rec["co2_equivalent_savings"] = property_phase_impact["carbon"] rec["heat_demand"] = property_phase_impact["heat_demand"] cls._validate_recommendation_updates(rec) impact_summary.append( { "phase": rec["phase"], "representative": rec["recommendation_id"] in representative_ids, "recommendation_id": rec["recommendation_id"], "measure_type": rec["measure_type"], **current_phase_values, "sap_prediction": phase_energy_efficiency_metrics["sap_change"] } ) if debug: return property_recommendations, impact_summary, adjustments return property_recommendations, impact_summary @staticmethod def map_descriptions_to_fuel( heating_description, hotwater_description, main_fuel_description, descriptions_to_fuel_types ): # Handle the case of community schemes if (heating_description in ["Community scheme", 'Community scheme, plus solar']) or ( hotwater_description in ["Community scheme", 'Community scheme, plus solar']) and ( "not community" not in main_fuel_description ): if main_fuel_description in ["mains gas (community)", "UNKNOWN"]: return { "heating_fuel_type": "Natural Gas (Community Scheme)", "hotwater_fuel_type": "Natural Gas (Community Scheme)", "heating_cop": 1, "hotwater_cop": 1 } if main_fuel_description in ['biogas (community)']: return { "heating_fuel_type": "Smokeless Fuel", "hotwater_fuel_type": "Smokeless Fuel", "heating_cop": 0.85, "hotwater_cop": 0.85 } if main_fuel_description in ['coal (community)']: return { "heating_fuel_type": "Coal", "hotwater_fuel_type": "Coal", "heating_cop": 0.85, "hotwater_cop": 0.85 } # Handling specific case if main_fuel_description in ["To be used only when there is no heating/hot-water system"] and ( "electric heaters" in heating_description.lower() ): return { "heating_fuel_type": "Electricity", "hotwater_fuel_type": "Electricity", "heating_cop": 1, "hotwater_cop": 1 } logger.warning( "Unhandled community fuel." f"Fuel: {main_fuel_description}" f"Heating: {heating_description}" f"Heating: {hotwater_description}" ) return { "heating_fuel_type": "Unmapped", "hotwater_fuel_type": "Unmapped", "heating_cop": 0.9, "hotwater_cop": 0.9 } mapped = descriptions_to_fuel_types.get(heating_description.strip(), None) if mapped is None: # TODO: This is a non-ideal placeholder but we put something in place for a process that falls over # fairly regularly. A task has been added to planner to refactor this logger.warning("Heating description not mapped: %s", heating_description) mapped = {"fuel": 'Unmapped', "cop": 0.9} heating_fuel = mapped["fuel"] if hotwater_description in [ "From main system", "From main system, no cylinder thermostat", 'From main system, waste water heat recovery', ]: return { "heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel, "heating_cop": mapped["cop"], "hotwater_cop": mapped["cop"] } if hotwater_description in [ "From main system, plus solar", "From main system, plus solar, no cylinder thermostat" ]: # The fuel is return { "heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel + " + Solar Thermal", "heating_cop": mapped["cop"], "hotwater_cop": 1 } mapped_hotwater = descriptions_to_fuel_types.get(hotwater_description.strip()) if mapped_hotwater is None: # TODO: This is a non-ideal placeholder but we put something in place for a process that falls over # fairly regularly. A task has been added to planner to refactor this # We have observed an edge case where the fuel is described as not being community # but the hot water is. We handle as such logger.warning("Hot water description not mapped: %s", hotwater_description) mapped_hotwater = {"fuel": 'Unmapped', "cop": 0.9} return { "heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"], "heating_cop": mapped["cop"], "hotwater_cop": mapped_hotwater["cop"] } @classmethod def calculate_recommendation_tenant_savings( cls, property_instance, kwh_simulation_predictions, property_recommendations, ashp_cop=None ): """ This method inserts the kwh savings and the bill savings that the customer will make from the recommendations based on the predictions from the ML model It also ensures we base our solar savings and solar carbon savings from the calculations based on the solar API and size of the array, instead of ML model :param property_instance: Instance of the Property class, for the home associated to property_id :param kwh_simulation_predictions: dictionary of predictions from the model apis :param property_recommendations: dictionary of recommendations for the property :param ashp_cop: The coefficient of performance for the air source heat pump. :return: """ ashp_cop = ashp_cop if ashp_cop else assumptions.AVERAGE_ASHP_EFFICIENCY # kwh_impact_table = kwh_simulation_predictions["heating_kwh_predictions"][ kwh_simulation_predictions["heating_kwh_predictions"]["property_id"] == str(property_instance.id) ].merge( kwh_simulation_predictions["hotwater_kwh_predictions"].drop( columns=["property_id", "recommendation_id", "phase"] ), how="inner", on="id", suffixes=("_heating", "_hotwater") ).reset_index(drop=True) # We adjust this table with the kwh estimates for low energy lighting kwh values, and solar kwh estimates led_recommendation = pd.DataFrame([ { "phase": r["phase"], "recommendation_id": r["recommendation_id"], "lighting_kwh_savings": r["kwh_savings"] } for recs in property_recommendations for r in recs if r["type"] == "low_energy_lighting" ], columns=["phase", "recommendation_id", "lighting_kwh_savings"]) solar_recommendations = pd.DataFrame([ { "phase": r["phase"], "recommendation_id": r["recommendation_id"], "solar_kwh_savings": ( r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_PROPORTION ) if not r["has_battery"] else ( r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION ), } for recs in property_recommendations for r in recs if r["type"] == "solar_pv" ], columns=["phase", "recommendation_id", "solar_kwh_savings"]) # merge them on kwh_impact_table = kwh_impact_table.merge( led_recommendation, how="left", on=["phase", "recommendation_id"] ).merge( solar_recommendations, how="left", on=["phase", "recommendation_id"] ) property_kwh = property_instance.energy_consumption_estimates["unadjusted"] kwh_impact_table = pd.concat( [ pd.DataFrame( [ { "id": STARTING_DUMMY_ID_VALUE, "phase": STARTING_DUMMY_ID_VALUE, "recommendation_id": STARTING_DUMMY_ID_VALUE, "predictions_heating": float(property_kwh["heating"]), "predictions_hotwater": float(property_kwh["hot_water"]), } ] ), kwh_impact_table ] ).sort_values(["phase", "recommendation_id"], ascending=True).reset_index(drop=True) # We need the recommendaion type rec_id_to_type = { rec["recommendation_id"]: rec["type"] for recs in property_recommendations for rec in recs } rec_id_to_type[STARTING_DUMMY_ID_VALUE] = "starting_dummy" for i in range(0, len(kwh_impact_table)): current = kwh_impact_table.loc[i] current_phase = current['phase'] previous_phase_id = (current_phase - 1) if (current_phase > 0) else -9999 previous_phase = kwh_impact_table[kwh_impact_table['phase'] == previous_phase_id] if not previous_phase.empty: for col in ["predictions_heating", "predictions_hotwater"]: # Check if the recommendation type is ventilation if rec_id_to_type[current["recommendation_id"]] == "mechanical_ventilation": # We expect the kwh to increase if kwh_impact_table.loc[i, col] > previous_phase[col].max(): continue if kwh_impact_table.loc[i, col] > previous_phase[col].max(): kwh_impact_table.loc[i, col] = previous_phase[col].max() descriptions_to_fuel_types = assumptions.DESCRIPTIONS_TO_FUEL_TYPES # We will the air source heat pump efficiencies ashp_keys = [k for k in descriptions_to_fuel_types.keys() if "air source heat pump" in k.lower()] for k in ashp_keys: descriptions_to_fuel_types[k]["cop"] = ashp_cop # For heating system recommendations, this could result in a fuel type change so we reflect that fuel_mapping = pd.DataFrame([ { "id": epc["id"], **cls.map_descriptions_to_fuel( epc["mainheat-description"], epc["hotwater-description"], epc["main-fuel"], descriptions_to_fuel_types ) } for epc in property_instance.updated_simulation_epcs ]) fuel_mapping = pd.concat( [ pd.DataFrame( [ { "id": STARTING_DUMMY_ID_VALUE, **cls.map_descriptions_to_fuel( property_instance.data["mainheat-description"], property_instance.data["hotwater-description"], property_instance.data["main-fuel"], descriptions_to_fuel_types ) } ] ), fuel_mapping ] ) kwh_impact_table = kwh_impact_table.merge( fuel_mapping, how="left", on="id" ).sort_values(["phase", "recommendation_id"], ascending=True).reset_index(drop=True) if (pd.isnull(kwh_impact_table["heating_fuel_type"]).sum() or pd.isnull(kwh_impact_table["hotwater_fuel_type"]).sum()): raise Exception("Fuel type is missing") # As one final adjustment, if we # 1) have a boiler upgrade recommendation # 2) Have an average efficiency boiler, we adjust the COP of the existing boiler down to 75% heating_upgrades = [x for x in property_recommendations if x[0]["type"] == "heating"] boiler_upgrade = [r for recs in heating_upgrades for r in recs if r["measure_type"] == "boiler_upgrade"] existing_heating_efficiency = property_instance.data["mainheat-energy-eff"] if len(boiler_upgrade) and existing_heating_efficiency in ["Very Poor", "Poor", "Average"]: efficiency_map = {"Very Poor": 0.6, "Poor": 0.65, "Average": 0.7} adjusted_cop = efficiency_map[existing_heating_efficiency] boiler_phase = boiler_upgrade[0]["phase"] heating_measure_types_to_id = [ {"recommendation_id": r["recommendation_id"], "measure_type": r["measure_type"]} for r in heating_upgrades[0] ] kwh_impact_table = kwh_impact_table.merge( pd.DataFrame(heating_measure_types_to_id), how="left", on="recommendation_id" ) for col in ["heating_cop", "hotwater_cop"]: kwh_impact_table[col] = np.where( (kwh_impact_table["phase"] <= boiler_phase) & (kwh_impact_table["heating_fuel_type"] == "Natural Gas") & (kwh_impact_table["measure_type"] != "boiler_upgrade"), adjusted_cop, kwh_impact_table[col] ) kwh_impact_table = kwh_impact_table.drop(columns=["measure_type"]) # We now calculate the fuel cost for k in ["heating", "hotwater"]: kwh_impact_table[f"{k}_cost"] = kwh_impact_table.apply( lambda x: AnnualBillSavings.calculate_recommendation_fuel_cost( x[f"predictions_{k}"], x[f"{k}_fuel_type"], x[f"{k}_cop"] ), axis=1 ) # We now deduce if any of the recommendations result in a change of fuel type for recs in property_recommendations: for rec in recs: if rec["type"] in [ "trickle_vents", "draught_proofing", "extension_cavity_wall_insulation" ]: # We cannot score the impact on draught proofing continue rec_impact = kwh_impact_table[kwh_impact_table["recommendation_id"] == rec["recommendation_id"]] prevous_phase_id = (rec["phase"] - 1) if (rec["phase"] > 0) else STARTING_DUMMY_ID_VALUE previous_phase_impact = kwh_impact_table[kwh_impact_table["phase"] == prevous_phase_id] if rec["type"] == "solar_pv": rec["kwh_savings"] = rec_impact["solar_kwh_savings"].values[0] # Calculate carbon savings from this - emissions in kg and convert to tonnes emissions_kg = rec["kwh_savings"] * assumptions.ELECTRICITY_CARBON_INTENSITY emissions_tonnes = emissions_kg / 1000 rec["co2_equivalent_savings"] = emissions_tonnes rec["energy_cost_savings"] = ( rec_impact["solar_kwh_savings"].values[0] * AnnualBillSavings.ELECTRICITY_PRICE_CAP ) continue heating_kwh_savings = ( previous_phase_impact["predictions_heating"].mean() - rec_impact["predictions_heating"].values[0] ) hotwater_kwh_savings = ( previous_phase_impact["predictions_hotwater"].mean() - rec_impact["predictions_hotwater"].values[0] ) # Shouldn't be positive if rec["type"] == "mechanical_ventilation": heating_kwh_savings = 0 if heating_kwh_savings > 0 else heating_kwh_savings hotwater_kwh_savings = 0 if hotwater_kwh_savings > 0 else hotwater_kwh_savings heating_cost_savings = ( previous_phase_impact["heating_cost"].mean() - rec_impact["heating_cost"].values[0] ) hotwater_host = ( previous_phase_impact["hotwater_cost"].mean() - rec_impact["hotwater_cost"].values[0] ) total_kwh_savings = heating_kwh_savings + hotwater_kwh_savings energy_cost_savings = heating_cost_savings + hotwater_host if rec["type"] == "low_energy_lighting": continue rec["kwh_savings"] = total_kwh_savings rec["energy_cost_savings"] = energy_cost_savings # Finally, we set the current energy bill # For a community scheme, there is a standing charge but it's based on the operational cost of the network # and therefore is likely different to the typical standing charge. This will be a cost typically defined # by the network operator and often a building, whose residents are on a heat network, where the building # operator will purchase energy from the network and re-sell it to the residents starting_figures = kwh_impact_table[kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE].squeeze() gas_standing_charge = 0 if ( (starting_figures["heating_fuel_type"] in ["Natural Gas", "Natural Gas (Community Scheme)"]) or (starting_figures["hotwater_fuel_type"] == ["Natural Gas", "Natural Gas (Community Scheme)"]) ): gas_standing_charge = AnnualBillSavings.DAILY_STANDARD_CHARGE_GAS * 365 electricity_standing_charge = AnnualBillSavings.DAILY_STANDARD_CHARGE_ELECTRICITY * 365 # We return a dictionary that contains the individual costs, that can be stored to the database current_energy_bill = { "heating_cost_current": float(starting_figures["heating_cost"]), "hot_water_cost_current": float(starting_figures["hotwater_cost"]), "lighting_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["lighting"]), "appliances_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["appliances"]), "gas_standing_charge": float(gas_standing_charge), "electricity_standing_charge": float(electricity_standing_charge), } return current_energy_bill