Model/recommendations/Recommendations.py
Khalim Conn-Kowlessar abc300b406 merged
2026-02-10 14:31:28 +00:00

1333 lines
62 KiB
Python

import pandas as pd
import numpy as np
from backend.Property import Property
from typing import List, Mapping, Any
from itertools import groupby
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.LightingRecommendations import LightingRecommendations
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from recommendations.WindowsRecommendations import WindowsRecommendations
from recommendations.HeatingRecommender import HeatingRecommender
from recommendations.HotwaterRecommendations import HotwaterRecommendations
from recommendations.SecondaryHeating import SecondaryHeating
from recommendations.DraughtProofingRecommendations import DraughtProofingRecommendations
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.apis.GoogleSolarApi import GoogleSolarApi
import backend.app.assumptions as assumptions
from backend.app.plan.schemas import SPECIFIC_MEASURES, MEASURE_MAP, NON_INVASIVE_SPECIFIC_MEASURES
from utils.logger import setup_logger
STARTING_DUMMY_ID_VALUE = -9999
logger = setup_logger()
class Recommendations:
"""
High level recommendations class, which sits above the measure specific recommendation classes
"""
# Used in calculation of recommendation impact - increasing variables are features where
# a higher value indicates an improvement. Decreasing is the opposite
INCREASING_VARIABLES = ["sap"]
DECREASING_VARIABLES = ["carbon", "heat_demand"]
# If the recommendation is mechanical ventilation, we don't apply the rule that the new value should be higher
MV_INCREASING_VARIABLES = ["carbon", "heat_demand"]
MV_DECREASING_VARIABLES = ["sap"]
# List of models we expect predictions for, when calculation recommendation impact
PREDICTION_PREFIXES = ["sap_change", "heat_demand", "carbon_change"]
def __init__(
self,
property_instance: Property,
materials: List,
exclusions: List[str] = None,
inclusions: List[str] = None,
default_u_values: bool = False,
):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
:param materials: List of materials to be used in the recommendations
:param exclusions: List of specific measures or measure types to exclude from recommendations. Defaulted to
None, meaning no exclusions to be applied
:param inclusions: List of specific measures of measure types to include. Defaulted to None, meaning all
measures are included
:param default_u_values: Boolean, if True, the recommendations will use the default u-values for the property
"""
self.property_instance = property_instance
self.materials = materials
self.exclusions = exclusions if exclusions else []
self.inclusions = inclusions if inclusions else []
self.default_u_values = default_u_values
self.all_specific_measures = SPECIFIC_MEASURES
self.all_non_invase_measures = NON_INVASIVE_SPECIFIC_MEASURES
self.non_invasive_recommendation_types = [
r["type"] for r in self.property_instance.non_invasive_recommendations
]
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials)
self.roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
self.ventilation_recomender = VentilationRecommendations(
property_instance=property_instance, materials=materials
)
self.draught_proofing_recommender = DraughtProofingRecommendations(property_instance=property_instance)
self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance, materials=materials)
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
self.solar_recommender = SolarPvRecommendations(property_instance=property_instance, materials=materials)
self.heating_recommender = HeatingRecommender(property_instance=property_instance, materials=materials)
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance)
def find_included_measures(self):
"""
Determines the set of measures to be included in recommendations
"""
# Generally, inclusions is a global option and will overrule specific property non-invasive recommendations.
# This is done so that we can use inclusions to specify scenarios.
inclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.inclusions]
exclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.exclusions]
# if we have already installed measures, we need to include them so they get factored into the baseline
# this is something we'll likely need to remove
if self.property_instance.already_installed:
# We make sure that any already installed measures are included
for rec in self.property_instance.already_installed:
if rec not in inclusions_full:
inclusions_full.append(rec)
# We remove them from the exclusions if they are there
exclusions_full = [e for e in exclusions_full if e not in self.property_instance.already_installed]
# We need to unlist any lists, but we should check if they're lists first
inclusions_full = [
item for sublist in inclusions_full for item in (sublist if isinstance(sublist, list) else [sublist])
]
exclusions_full = [
item for sublist in exclusions_full for item in (sublist if isinstance(sublist, list) else [sublist])
]
# If inclusions and exclusions are empty, it means that nothing was specified, so we allow
# all recommendation types
if not inclusions_full and not exclusions_full:
# All typical measures - this does not include non-invasive measures inless they are specified
return self.all_specific_measures + self.non_invasive_recommendation_types
if inclusions_full:
return inclusions_full
if exclusions_full:
measures = [
m for m in self.all_specific_measures + self.non_invasive_recommendation_types
if m not in exclusions_full
]
return measures
def recommend(self):
"""
This method runs the recommendations for the individual measures and then appends them to a list for output
The recommendations are implemented in order of suggested phase, from fabric first to heating systems, to
renewables.
:return:
"""
property_recommendations = []
phase = 0
measures = self.find_included_measures()
non_invasive_recommendation_types = [r["type"] for r in self.property_instance.non_invasive_recommendations]
# Building Fabric
self.wall_recomender.recommend(phase=phase, measures=measures, default_u_values=self.default_u_values)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
# We handle recommendations covering specific non-invasive measures
new_phase = self.wall_recomender.recommend_extended(phase=phase, measures=measures)
if self.wall_recomender.extended_recommendations:
property_recommendations.append(self.wall_recomender.extended_recommendations)
# We don't have any phasing here
phase = new_phase
self.roof_recommender.recommend(phase=phase, measures=measures, default_u_values=self.default_u_values)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
# insulation We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
# has no real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
# have any wall or roof recommendations, we will ensure that ventilation is included in the simulation
if (
(self.wall_recomender.recommendations or self.roof_recommender.recommendations) and
("ventilation" in measures)
):
self.ventilation_recomender.recommend(phase=phase)
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
phase += 1
if "trickle_vents" in measures:
# This is a recommendatin that typically comes from an energy assessment
trickle_vents_rec = self.ventilation_recomender.recommend_trickle_vents()
if trickle_vents_rec:
property_recommendations.append(trickle_vents_rec)
if "draught_proofing" in measures:
# This is a recommendation that in some instances we can recommend, by deducing it from the SAP
# recommendations, however we will implement this later
self.draught_proofing_recommender.recommend()
if self.draught_proofing_recommender.recommendation:
property_recommendations.append(self.draught_proofing_recommender.recommendation)
self.floor_recommender.recommend(phase=phase, measures=measures)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
if "low_energy_lighting" in measures:
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
if "mixed_glazing" not in non_invasive_recommendation_types:
# If we have a mixed glazing recommendation, we prioritise this over the windows recommendation
self.windows_recommender.recommend(phase=phase, measures=measures)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
if "mixed_glazing" in measures:
# This is a recommendation that comes exclusively from an energy assessment
mixed_glazing_rec = self.windows_recommender.recommend_mixed_glazing(phase=phase)
if mixed_glazing_rec:
property_recommendations.append(mixed_glazing_rec)
phase += 1
if "fireplace" in measures:
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
cavity_or_loft_recommendations = [
r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations
if r["type"] in ["cavity_wall_insulation", "loft_insulation"]
]
has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0
self.heating_recommender.recommend(
phase=phase,
measures=measures,
has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations,
)
if self.heating_recommender.heating_recommendations:
# We split into first and second phase recommendations
first_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations
)
if r["phase"] == phase
]
second_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations
)
if r["phase"] == phase + 1
]
if first_phase_recommendations and second_phase_recommendations:
raise Exception("Imeplement me")
if first_phase_recommendations:
property_recommendations.append(first_phase_recommendations)
if second_phase_recommendations:
property_recommendations.append(second_phase_recommendations)
# We check if we have distinct heating and heating controls recommendations
# If so, we increment by 2 (one of the heating system, one for the heating controls)
# otherwise we incremenet by 1
max_used_phase = max(
[rec["phase"] for rec in
self.heating_recommender.heating_recommendations]
)
amount_to_increment = max_used_phase - phase + 1
phase += amount_to_increment
# Hot water
if "hot_water" in measures:
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
if len(self.hotwater_recommender.recommendations) > 1:
for r in self.hotwater_recommender.recommendations:
property_recommendations.append([r])
phase += 1
else:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
if "secondary_heating" in measures:
self.secondary_heating_recommender.recommend(phase=phase)
if self.secondary_heating_recommender.recommendation:
property_recommendations.append(self.secondary_heating_recommender.recommendation)
phase += 1
# Renewables
if "solar_pv" in measures:
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
phase += 1
if self.property_instance.already_installed:
# We need to re-shuffle our measures
property_recommendations_removed_installed = []
already_installed_recs = []
for recs in property_recommendations:
phase_recs = []
phase_already_installed_recs = []
for rec in recs:
if rec["already_installed"]:
phase_already_installed_recs.append(rec)
else:
phase_recs.append(rec)
if phase_recs:
property_recommendations_removed_installed.append(phase_recs)
if phase_already_installed_recs:
already_installed_recs.append(phase_already_installed_recs)
# We re-set the phases
for i, recs in enumerate(property_recommendations_removed_installed):
for rec in recs:
rec["phase"] = i
# already installed recs get negative phasing
already_installed_phase = -len(already_installed_recs)
for recs in already_installed_recs:
for rec in recs:
rec["phase"] = already_installed_phase
already_installed_phase += 1
property_recommendations = already_installed_recs + property_recommendations_removed_installed
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)
# We also need to create the representative recommendations for each recommendation type
property_representative_recommendations = self.create_representative_recommendations(
property_recommendations,
)
# Check to make sure measure_type is populated
for recs in property_recommendations:
if any(pd.isnull(rec.get("measure_type")) for rec in recs):
raise ValueError("Measure type is not populated")
return property_recommendations, property_representative_recommendations
@staticmethod
def create_representative_recommendations(property_recommendations):
"""
This method will create a representative recommendation for each recommendation type
In order to create a representative recommendation, we choose the recommendation that has:
1) Where a U-value is available, has the best U-value to cost ratio
2) Where SAP points are available, has the best SAP points to cost ratio
We don't include mechanical ventilation in the representative recommendations, since we don't attribute a
SAP impact to this recommendation
:return:
"""
property_representative_recommendations = []
for recommendations_by_type in property_recommendations:
# If the property was initially surveyed as filled, but the cavity was only partially filled, we don't
# want to include the cavity wall insulation recommendation in the defaults
if recommendations_by_type[0].get("type") in [
"trickle_vents", "draught_proofing"
]:
continue
has_u_value = recommendations_by_type[0].get("new_u_value") is not None
has_sap_points = all([r.get("sap_points") is not None for r in recommendations_by_type])
has_rank = recommendations_by_type[0].get("rank") is not None
# When check if these recommendations have two different types, such as solid wall insulation
# If we have multiple types, we group by type and then select the best recommendation for each type
# If we have a heating and heating control recommendation, we use JUST the heating reommendation
has_both_heating_types = all(
x in [rec["type"] for rec in recommendations_by_type] for x in ["heating", "heating_control"]
)
if has_both_heating_types:
# Take just heating
recommendations_by_type = [
rec for rec in recommendations_by_type if rec["type"] == "heating"
]
recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"])
representative_recommendations = []
for _type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):
recommendations = list(recommendations)
# We also create an efficiency key, which is used to sort the recommendations
if has_u_value:
# We sort by the cost per U-value improvement - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"] / rec["starting_u_value"] - rec["new_u_value"]
elif not has_u_value and has_sap_points:
# Sort the options by the cost per SAP point improvement - the lower the better
for rec in recommendations:
if rec["sap_points"] == 0:
rec["efficiency"] = 0
else:
rec["efficiency"] = rec["total"] / rec["sap_points"]
elif has_rank:
# Sort the options by rank - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["rank"]
else:
# Sort the options by cost - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"]
recommendations.sort(
key=lambda x: x["efficiency"]
)
representative_recommendations.append(recommendations[0])
property_representative_recommendations.extend(representative_recommendations)
return property_representative_recommendations
@staticmethod
def insert_temp_recommendation_id(property_recommendations):
"""
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
idx = 0
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = f"{str(idx)}_phase={str(rec['phase'])}"
idx += 1
return property_recommendations
@staticmethod
def _calculate_appliance_solar_savings(
rec, property_instance, heating_kwh_reduction, hot_water_kwh_reduction, lighting_kwh_reduction
):
"""
Calculates the impact on kwh and cost of installing solar panels on appliances
:param rec: The recommendation
:param property_instance: Instance of the Property class
:param heating_kwh_reduction: The kwh reduction from heating
:param hot_water_kwh_reduction: The kwh reduction from hot water
:param lighting_kwh_reduction: The kwh reduction from lighting
:return:
"""
if rec["type"] != "solar_pv":
return 0, 0
if property_instance.solar_panel_configuration is None:
print("PLACEHOLDER ESTIMATES")
# 50% reduction average
kwh_reduction = property_instance.energy_consumption_estimates["adjusted"]["appliances"] * 0.5
predicted_appliances_cost_reduction = kwh_reduction * AnnualBillSavings.ELECTRICITY_PRICE_CAP
return predicted_appliances_cost_reduction, kwh_reduction
# Calulate the amount of energy the solar panel array will generate for this unit
unit_energy_consumption = (
rec["initial_ac_kwh_per_year"] *
property_instance.solar_panel_configuration["unit_share_of_energy"]
)
unit_energy_utilised = unit_energy_consumption * GoogleSolarApi.SOLAR_CONSUMPTION_PROPORTION
unit_energy_exported = unit_energy_consumption - unit_energy_utilised
unit_energy_exported_value = unit_energy_exported * AnnualBillSavings.ELECTRICITY_EXPORT_PAYMENT
# We assume that 50% of the energy generated will be used by the property without a battery
# to be conservative
# of the energy utilised, some of it is used by heating, hot water and lighting so we
# remove that from the total
unit_energy_utilised -= (
heating_kwh_reduction + hot_water_kwh_reduction + lighting_kwh_reduction
)
unit_energy_utilised = 0 if unit_energy_utilised < 0 else unit_energy_utilised
# This is how much energy the appliances will use after install
post_install_appliance_kwh = (
property_instance.energy_consumption_estimates["adjusted"]["appliances"] -
unit_energy_utilised
)
post_install_appliance_kwh = (
0 if post_install_appliance_kwh < 0 else post_install_appliance_kwh
)
predicted_appliances_kwh_reduction = (
property_instance.energy_consumption_estimates["adjusted"]["appliances"] -
post_install_appliance_kwh
)
predicted_appliances_cost_reduction = unit_energy_exported_value + (
predicted_appliances_kwh_reduction * AnnualBillSavings.ELECTRICITY_PRICE_CAP
)
return predicted_appliances_cost_reduction, predicted_appliances_kwh_reduction
@staticmethod
def _check_ventilation_out_of_bounds(sap_impact, ventilation_sap_limit):
return (sap_impact < ventilation_sap_limit) or (sap_impact >= 0)
@staticmethod
def _adjust_ventilation_sap(sap_impact, ventilation_sap_limit):
if sap_impact >= 0:
return -1
if sap_impact < ventilation_sap_limit:
return ventilation_sap_limit
return sap_impact
@staticmethod
def _filter_phase_adjustment(phase_adjustments):
"""
Utility function to select the entry from the dictionary, by phase, with the largest
phase adjustment
:param phase_adjustments: List of phase adjustments, in the form
[{"recommendation_id": str, "phase": int, "sap_adjustment": float}]
:return:
"""
filtered_adjustments = []
phase_adjustments = sorted(phase_adjustments, key=lambda x: x["phase"])
for phase, adjustments in groupby(phase_adjustments, key=lambda x: x["phase"]):
adjustments = list(adjustments)
adjustments.sort(key=lambda x: x["sap_adjustment"], reverse=True)
filtered_adjustments.append(adjustments[0])
return filtered_adjustments
@classmethod
def _filter_predictions_for_property(
cls,
all_predictions: Mapping[str, pd.DataFrame],
property_id: str,
) -> dict:
"""
Utility function to filter predictions for a specific property
:param all_predictions: Dictionary of all predictions from the model apis
:param property_id: The property id to filter for
:return:
"""
return {
f"{prefix}_predictions": (
all_predictions[f"{prefix}_predictions"]
.loc[
all_predictions[f"{prefix}_predictions"]["property_id"] == property_id
]
.copy()
)
for prefix in cls.PREDICTION_PREFIXES
}
@classmethod
def get_monotonic_variables(cls, rec_type: str) -> tuple[List[str], List[str]]:
"""
Utility function to get the monotonic variables for a specific recommendation type
:param rec_type: The recommendation type
:return:
"""
if rec_type == "mechanical_ventilation":
return cls.MV_INCREASING_VARIABLES, cls.MV_DECREASING_VARIABLES
return cls.INCREASING_VARIABLES, cls.DECREASING_VARIABLES
@staticmethod
def _get_previous_phase_values(
rec_phase: int,
starting_phase: int,
impact_summary: list[dict],
property_instance: Property,
) -> dict:
if rec_phase == starting_phase:
return {
"sap": float(property_instance.data["current-energy-efficiency"]),
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
previous_phase_reps = [
x for x in impact_summary
if x["phase"] == rec_phase - 1 and x["representative"]
]
if len(previous_phase_reps) == 1:
return previous_phase_reps[0]
# It's unlikely that this will occur but this fallback will ensure that we don't
# run the next step and run a median of nothing, which will return None
if not previous_phase_reps:
return {
"sap": float(property_instance.data["current-energy-efficiency"]),
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
# Median fallback (including zero-length case)
keys = ("sap", "carbon", "heat_demand")
return {
key: np.median([item[key] for item in previous_phase_reps])
for key in keys
}
@classmethod
def _get_phase_predictions(
cls,
property_predictions: dict,
recommendation_id: str,
) -> dict:
return {
prefix: (
property_predictions[f"{prefix}_predictions"]
.loc[
property_predictions[f"{prefix}_predictions"]["recommendation_id"]
== str(recommendation_id),
"predictions",
]
.values[0]
)
for prefix in cls.PREDICTION_PREFIXES
}
@classmethod
def _resolve_current_phase_sap(
cls,
rec: Mapping[str, Any],
previous_phase_values: Mapping[str, Any],
phase_energy_efficiency_metrics: Mapping[str, Any],
adjustments: list[dict],
) -> float:
if rec.get("survey", False):
return rec["sap_points"] + previous_phase_values["sap"]
sap = phase_energy_efficiency_metrics["sap_change"]
prior_adjustments = [a for a in adjustments if a["phase"] < rec["phase"]]
if not prior_adjustments:
return sap
filtered = cls._filter_phase_adjustment(prior_adjustments)
return sap - sum(a["sap_adjustment"] for a in filtered)
@classmethod
def _compute_phase_impact(
cls,
rec_type: str,
previous_phase_values: dict,
current_phase_values: dict,
) -> dict:
"""
Utility function for computing the impact of a recommendation phase, enforcing monotonicity
:param rec_type: string, the recommendation type
:param previous_phase_values: dict, the previous phase values
:param current_phase_values: dict, the current phase values
:return: dict, the impact of the phase
"""
phase_increasing, phase_decreasing = cls.get_monotonic_variables(rec_type)
# Enforce monotonicity
for v in phase_increasing:
current_phase_values[v] = max(current_phase_values[v], previous_phase_values[v])
for v in phase_decreasing:
current_phase_values[v] = min(current_phase_values[v], previous_phase_values[v])
# Compute impact
impact = {
"sap": current_phase_values["sap"] - previous_phase_values["sap"],
"carbon": previous_phase_values["carbon"] - current_phase_values["carbon"],
"heat_demand": previous_phase_values["heat_demand"] - current_phase_values["heat_demand"],
}
# Clamp values
for metric in impact:
if rec_type != "mechanical_ventilation":
impact[metric] = max(0, impact[metric])
if metric == "sap":
impact[metric] = round(impact[metric], 2)
else:
impact[metric] = min(0, impact[metric])
return impact
@classmethod
def _apply_measure_specific_rules(
cls,
rec: dict,
property_phase_impact: dict,
previous_phase_values: dict,
current_phase_values: dict,
adjustments: list,
property_instance,
):
# For the moment, we cap the number of SAP points that can be achieved by LEDs at 2
if rec["type"] == "low_energy_lighting":
lighting_sap_limit = LightingRecommendations.get_sap_limit(
property_instance.data["lighting-energy-eff"],
property_instance.lighting["low_energy_proportion"]
)
# add an adjustment
proposed_sap_impact = min(property_phase_impact["sap"], lighting_sap_limit)
if proposed_sap_impact != property_phase_impact["sap"]:
# Store the sap adjustment. The proposed sap impact will always be less
# than the current sap impact, so the adjustment is always positive
# as we subtract it from the future phases
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": property_phase_impact["sap"] - proposed_sap_impact,
}
)
property_phase_impact["sap"] = proposed_sap_impact
property_phase_impact["carbon"] = min(
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"]
elif rec["type"] == "mechanical_ventilation":
# ventilation is capped by having no greater and a -4 impact
ventilation_sap_limit = -4
ventilation_out_of_bounds = cls._check_ventilation_out_of_bounds(
property_phase_impact["sap"], ventilation_sap_limit
)
if ventilation_out_of_bounds:
previous_modelled_sap = previous_phase_values.get("sap_prediction", 0)
proposed_sap_impact = current_phase_values["sap"] - previous_modelled_sap
proposal_out_of_bounds = cls._check_ventilation_out_of_bounds(
proposed_sap_impact, ventilation_sap_limit
)
if proposal_out_of_bounds:
proposed_sap_impact = cls._adjust_ventilation_sap(
proposed_sap_impact, ventilation_sap_limit
)
# We keep track of the adjustment
# In this case, if the SAP impact has increased, then the adustment should be negative
# otherwise it should be positive
# When we add the total adjustment, it's an addition
# Example
# Before: 60, impact -2 => 58
# After: 60, impact -1 (So the impact is bigger) => 59
# So in this case, we need to make sure we add 1 to all future predictions so
# the adjustment should be positive
# Before: 60, impact 1 => 61
# After: 60, impact -1 => 59
# So in this case, we need to make sure we subtract 1 to all future predictions so
# the adjustment should be negative
# Both cases are reflected in sap adjustment
sap_adjustment = proposed_sap_impact - float(property_phase_impact["sap"])
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
"sap_adjustment": sap_adjustment,
}
)
property_phase_impact["sap"] = proposed_sap_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
elif rec["type"] == "loft_insulation":
# When we have a loft insulation recommendation, where there is an extension and the existing
# amount of loft insulation is already good, we limit the SAP points
# By limiting here, we don't change the value in current_phase_values. This means that the
# future recommendations won't have an impact that is too large
li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit(
property_instance.data["roof-energy-eff"], property_instance.roof["insulation_thickness"]
)
if li_sap_limit is not None:
new_value = min(property_phase_impact["sap"], li_sap_limit)
# If we've made an adjustment, keep track of it
if new_value != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, it will be negative
"sap_adjustment": property_phase_impact["sap"] - new_value,
}
)
property_phase_impact["sap"] = new_value
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
elif rec["type"] == "solar_pv":
# We use the SAP points in the recommendation as a minimum
proposed_impact = (
rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else
property_phase_impact["sap"]
)
# SAP adjustments should be negative
if proposed_impact != property_phase_impact["sap"]:
adjustments.append(
{
"recommendation_id": rec["recommendation_id"],
"phase": rec["phase"],
# If we've made an adjustment, we will be increasing the number of SAP
# points. Since, we subtract adjustments, this number should be negative
"sap_adjustment": property_phase_impact["sap"] - proposed_impact,
}
)
property_phase_impact["sap"] = proposed_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
return property_phase_impact, current_phase_values, adjustments
@staticmethod
def _validate_recommendation_updates(rec: Mapping[str, Any]):
"""
Utility function to validate that the recommendation updates have been applied correctly
:param rec: updated recommendation
:return:
"""
if (
(rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or
(rec["heat_demand"] is None)
):
raise ValueError("sap points, co2 or heat demand is missing")
@classmethod
def calculate_recommendation_impact(
cls,
property_instance: Property,
all_predictions: Mapping[str, Any],
recommendations: Mapping[int, List],
representative_recommendations: Mapping[int, List],
debug: bool = False
) -> (Mapping[int, List], List[Mapping[str, Any]]):
"""
Given predictions from the model apis, with method will update the recommendations with the predicted
impact of the recommendation on the property
This algorithm is structured as a large loop, but this is due to the fact that it's sequential in nature -
each phase depends on the previous, with adjustments and constraints being allied along the way
This function will return two objects:
1) Updated recommendations with the predicted impact of the recommendation
2) A list of impacts by phase, which will be used for the kwh model scoring
:param property_instance: Instance of the Property class, for the home associated to property_id
:param all_predictions: dictionary of predictions from the model apis
:param recommendations: dictionary of recommendations for the property
:param representative_recommendations: dictionary of representative recommendations for the property
:param debug: boolean, indicating if the function is running in debug mode. The only difference is that
adjustments are returned for testing
:return: Updated recommendations with predicted impact, and a list of impacts by phase
"""
property_predictions = cls._filter_predictions_for_property(
all_predictions, str(property_instance.id)
)
# shallow copy intentional - we're going to modify the internals
property_recommendations = recommendations[property_instance.id].copy()
representative_ids = [
r["recommendation_id"] for r in representative_recommendations[property_instance.id]
]
# We allow for negative phase
starting_phase = min(rec["phase"] for recs in property_recommendations for rec in recs)
# We keep a history of adjustments we have made, so that we ensure that we adjust future
# phases for SAP
impact_summary, adjustments = [], []
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
# --- Special-case: non-modelled measures -------------------------
if rec["type"] in {
"trickle_vents",
"draught_proofing",
"extension_cavity_wall_insulation",
}:
if rec["type"] == "extension_cavity_wall_insulation":
previous = cls._get_previous_phase_values(
rec_phase=rec["phase"],
starting_phase=starting_phase,
impact_summary=impact_summary,
property_instance=property_instance,
)
impact_summary.append(
{
"phase": rec["phase"],
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
"sap": previous["sap"] + rec["sap_points"],
"carbon": previous["carbon"] - rec["co2_equivalent_savings"],
"heat_demand": previous["heat_demand"] - rec["heat_demand"],
}
)
continue
phase_energy_efficiency_metrics = cls._get_phase_predictions(
property_predictions=property_predictions,
recommendation_id=rec["recommendation_id"],
)
previous_phase_values = cls._get_previous_phase_values(
rec_phase=rec["phase"],
starting_phase=starting_phase,
impact_summary=impact_summary,
property_instance=property_instance
)
current_phase_values = {
"sap": cls._resolve_current_phase_sap(
rec, previous_phase_values, phase_energy_efficiency_metrics, adjustments
),
"carbon": phase_energy_efficiency_metrics["carbon_change"],
"heat_demand": phase_energy_efficiency_metrics["heat_demand"],
}
# For increasing variables, the new value needs to be higher than the previous, otherwise we set it to
# the previous
# For decreasing variables, the new value should be lower than the previous, otherwise we set it to
# the previous
# In either case, we adjudge the recommendation to have had no/negligible impact
# However, if the recommendation is mechanical ventilation, this can have a negative SAP impact so
# we don't apply this rule
property_phase_impact = cls._compute_phase_impact(
rec_type=rec["type"],
previous_phase_values=previous_phase_values,
current_phase_values=current_phase_values,
)
property_phase_impact, current_phase_values, adjustments = cls._apply_measure_specific_rules(
rec=rec,
property_phase_impact=property_phase_impact,
previous_phase_values=previous_phase_values,
current_phase_values=current_phase_values,
adjustments=adjustments,
property_instance=property_instance
)
# Insert this information into the recommendation.
if not rec.get("survey", False):
rec["sap_points"] = property_phase_impact["sap"]
rec["co2_equivalent_savings"] = property_phase_impact["carbon"]
rec["heat_demand"] = property_phase_impact["heat_demand"]
cls._validate_recommendation_updates(rec)
impact_summary.append(
{
"phase": rec["phase"],
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
**current_phase_values,
"sap_prediction": phase_energy_efficiency_metrics["sap_change"]
}
)
if debug:
return property_recommendations, impact_summary, adjustments
return property_recommendations, impact_summary
@staticmethod
def map_descriptions_to_fuel(
heating_description, hotwater_description, main_fuel_description, descriptions_to_fuel_types
):
# Handle the case of community schemes
if (heating_description in ["Community scheme", 'Community scheme, plus solar']) or (
hotwater_description in ["Community scheme", 'Community scheme, plus solar']) and (
"not community" not in main_fuel_description
):
if main_fuel_description in ["mains gas (community)", "UNKNOWN"]:
return {
"heating_fuel_type": "Natural Gas (Community Scheme)",
"hotwater_fuel_type": "Natural Gas (Community Scheme)",
"heating_cop": 1,
"hotwater_cop": 1
}
if main_fuel_description in ['biogas (community)']:
return {
"heating_fuel_type": "Smokeless Fuel",
"hotwater_fuel_type": "Smokeless Fuel",
"heating_cop": 0.85,
"hotwater_cop": 0.85
}
if main_fuel_description in ['coal (community)']:
return {
"heating_fuel_type": "Coal",
"hotwater_fuel_type": "Coal",
"heating_cop": 0.85,
"hotwater_cop": 0.85
}
# Handling specific case
if main_fuel_description in ["To be used only when there is no heating/hot-water system"] and (
"electric heaters" in heating_description.lower()
):
return {
"heating_fuel_type": "Electricity",
"hotwater_fuel_type": "Electricity",
"heating_cop": 1,
"hotwater_cop": 1
}
logger.warning(
"Unhandled community fuel."
f"Fuel: {main_fuel_description}"
f"Heating: {heating_description}"
f"Heating: {hotwater_description}"
)
return {
"heating_fuel_type": "Unmapped",
"hotwater_fuel_type": "Unmapped",
"heating_cop": 0.9,
"hotwater_cop": 0.9
}
mapped = descriptions_to_fuel_types.get(heating_description.strip(), None)
if mapped is None:
# TODO: This is a non-ideal placeholder but we put something in place for a process that falls over
# fairly regularly. A task has been added to planner to refactor this
logger.warning("Heating description not mapped: %s", heating_description)
mapped = {"fuel": 'Unmapped', "cop": 0.9}
heating_fuel = mapped["fuel"]
if hotwater_description in [
"From main system", "From main system, no cylinder thermostat",
'From main system, waste water heat recovery',
]:
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel,
"heating_cop": mapped["cop"], "hotwater_cop": mapped["cop"]
}
if hotwater_description in [
"From main system, plus solar", "From main system, plus solar, no cylinder thermostat"
]:
# The fuel is
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel + " + Solar Thermal",
"heating_cop": mapped["cop"], "hotwater_cop": 1
}
mapped_hotwater = descriptions_to_fuel_types.get(hotwater_description.strip())
if mapped_hotwater is None:
# TODO: This is a non-ideal placeholder but we put something in place for a process that falls over
# fairly regularly. A task has been added to planner to refactor this
# We have observed an edge case where the fuel is described as not being community
# but the hot water is. We handle as such
logger.warning("Hot water description not mapped: %s", hotwater_description)
mapped_hotwater = {"fuel": 'Unmapped', "cop": 0.9}
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"],
"heating_cop": mapped["cop"], "hotwater_cop": mapped_hotwater["cop"]
}
@classmethod
def calculate_recommendation_tenant_savings(
cls, property_instance, kwh_simulation_predictions, property_recommendations, ashp_cop=None
):
"""
This method inserts the kwh savings and the bill savings that the customer will make from the recommendations
based on the predictions from the ML model
It also ensures we base our solar savings and solar carbon savings from the calculations based on
the solar API and size of the array, instead of ML model
:param property_instance: Instance of the Property class, for the home associated to property_id
:param kwh_simulation_predictions: dictionary of predictions from the model apis
:param property_recommendations: dictionary of recommendations for the property
:param ashp_cop: The coefficient of performance for the air source heat pump.
:return:
"""
ashp_cop = ashp_cop if ashp_cop else assumptions.AVERAGE_ASHP_EFFICIENCY
#
kwh_impact_table = kwh_simulation_predictions["heating_kwh_predictions"][
kwh_simulation_predictions["heating_kwh_predictions"]["property_id"] == str(property_instance.id)
].merge(
kwh_simulation_predictions["hotwater_kwh_predictions"].drop(
columns=["property_id", "recommendation_id", "phase"]
),
how="inner",
on="id",
suffixes=("_heating", "_hotwater")
).reset_index(drop=True)
# We adjust this table with the kwh estimates for low energy lighting kwh values, and solar kwh estimates
led_recommendation = pd.DataFrame([
{
"phase": r["phase"],
"recommendation_id": r["recommendation_id"],
"lighting_kwh_savings": r["kwh_savings"]
} for recs in property_recommendations for r in recs if r["type"] == "low_energy_lighting"
], columns=["phase", "recommendation_id", "lighting_kwh_savings"])
solar_recommendations = pd.DataFrame([
{
"phase": r["phase"],
"recommendation_id": r["recommendation_id"],
"solar_kwh_savings": (
r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_PROPORTION
) if not r["has_battery"] else (
r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION
),
} for recs in property_recommendations for r in recs if r["type"] == "solar_pv"
], columns=["phase", "recommendation_id", "solar_kwh_savings"])
# merge them on
kwh_impact_table = kwh_impact_table.merge(
led_recommendation, how="left", on=["phase", "recommendation_id"]
).merge(
solar_recommendations, how="left", on=["phase", "recommendation_id"]
)
property_kwh = property_instance.energy_consumption_estimates["unadjusted"]
kwh_impact_table = pd.concat(
[
pd.DataFrame(
[
{
"id": STARTING_DUMMY_ID_VALUE,
"phase": STARTING_DUMMY_ID_VALUE,
"recommendation_id": STARTING_DUMMY_ID_VALUE,
"predictions_heating": float(property_kwh["heating"]),
"predictions_hotwater": float(property_kwh["hot_water"]),
}
]
),
kwh_impact_table
]
).sort_values(["phase", "recommendation_id"], ascending=True).reset_index(drop=True)
# We need the recommendaion type
rec_id_to_type = {
rec["recommendation_id"]: rec["type"] for recs in property_recommendations for rec in recs
}
rec_id_to_type[STARTING_DUMMY_ID_VALUE] = "starting_dummy"
for i in range(0, len(kwh_impact_table)):
current = kwh_impact_table.loc[i]
current_phase = current['phase']
previous_phase_id = (current_phase - 1) if (current_phase > 0) else -9999
previous_phase = kwh_impact_table[kwh_impact_table['phase'] == previous_phase_id]
if not previous_phase.empty:
for col in ["predictions_heating", "predictions_hotwater"]:
# Check if the recommendation type is ventilation
if rec_id_to_type[current["recommendation_id"]] == "mechanical_ventilation":
# We expect the kwh to increase
if kwh_impact_table.loc[i, col] > previous_phase[col].max():
continue
if kwh_impact_table.loc[i, col] > previous_phase[col].max():
kwh_impact_table.loc[i, col] = previous_phase[col].max()
descriptions_to_fuel_types = assumptions.DESCRIPTIONS_TO_FUEL_TYPES
# We will the air source heat pump efficiencies
ashp_keys = [k for k in descriptions_to_fuel_types.keys() if "air source heat pump" in k.lower()]
for k in ashp_keys:
descriptions_to_fuel_types[k]["cop"] = ashp_cop
# For heating system recommendations, this could result in a fuel type change so we reflect that
fuel_mapping = pd.DataFrame([
{
"id": epc["id"],
**cls.map_descriptions_to_fuel(
epc["mainheat-description"], epc["hotwater-description"], epc["main-fuel"],
descriptions_to_fuel_types
)
} for epc in property_instance.updated_simulation_epcs
])
fuel_mapping = pd.concat(
[
pd.DataFrame(
[
{
"id": STARTING_DUMMY_ID_VALUE,
**cls.map_descriptions_to_fuel(
property_instance.data["mainheat-description"],
property_instance.data["hotwater-description"],
property_instance.data["main-fuel"],
descriptions_to_fuel_types
)
}
]
),
fuel_mapping
]
)
kwh_impact_table = kwh_impact_table.merge(
fuel_mapping, how="left", on="id"
).sort_values(["phase", "recommendation_id"], ascending=True).reset_index(drop=True)
if (pd.isnull(kwh_impact_table["heating_fuel_type"]).sum() or
pd.isnull(kwh_impact_table["hotwater_fuel_type"]).sum()):
raise Exception("Fuel type is missing")
# As one final adjustment, if we
# 1) have a boiler upgrade recommendation
# 2) Have an average efficiency boiler, we adjust the COP of the existing boiler down to 75%
heating_upgrades = [x for x in property_recommendations if x[0]["type"] == "heating"]
boiler_upgrade = [r for recs in heating_upgrades for r in recs if r["measure_type"] == "boiler_upgrade"]
existing_heating_efficiency = property_instance.data["mainheat-energy-eff"]
if len(boiler_upgrade) and existing_heating_efficiency in ["Very Poor", "Poor", "Average"]:
efficiency_map = {"Very Poor": 0.6, "Poor": 0.65, "Average": 0.7}
adjusted_cop = efficiency_map[existing_heating_efficiency]
boiler_phase = boiler_upgrade[0]["phase"]
heating_measure_types_to_id = [
{"recommendation_id": r["recommendation_id"], "measure_type": r["measure_type"]}
for r in heating_upgrades[0]
]
kwh_impact_table = kwh_impact_table.merge(
pd.DataFrame(heating_measure_types_to_id), how="left", on="recommendation_id"
)
for col in ["heating_cop", "hotwater_cop"]:
kwh_impact_table[col] = np.where(
(kwh_impact_table["phase"] <= boiler_phase) &
(kwh_impact_table["heating_fuel_type"] == "Natural Gas") &
(kwh_impact_table["measure_type"] != "boiler_upgrade"),
adjusted_cop, kwh_impact_table[col]
)
kwh_impact_table = kwh_impact_table.drop(columns=["measure_type"])
# We now calculate the fuel cost
for k in ["heating", "hotwater"]:
kwh_impact_table[f"{k}_cost"] = kwh_impact_table.apply(
lambda x: AnnualBillSavings.calculate_recommendation_fuel_cost(
x[f"predictions_{k}"], x[f"{k}_fuel_type"], x[f"{k}_cop"]
), axis=1
)
# We now deduce if any of the recommendations result in a change of fuel type
for recs in property_recommendations:
for rec in recs:
if rec["type"] in [
"trickle_vents", "draught_proofing", "extension_cavity_wall_insulation"
]:
# We cannot score the impact on draught proofing
continue
rec_impact = kwh_impact_table[kwh_impact_table["recommendation_id"] == rec["recommendation_id"]]
prevous_phase_id = (rec["phase"] - 1) if (rec["phase"] > 0) else STARTING_DUMMY_ID_VALUE
previous_phase_impact = kwh_impact_table[kwh_impact_table["phase"] == prevous_phase_id]
if rec["type"] == "solar_pv":
rec["kwh_savings"] = rec_impact["solar_kwh_savings"].values[0]
# Calculate carbon savings from this - emissions in kg and convert to tonnes
emissions_kg = rec["kwh_savings"] * assumptions.ELECTRICITY_CARBON_INTENSITY
emissions_tonnes = emissions_kg / 1000
rec["co2_equivalent_savings"] = emissions_tonnes
rec["energy_cost_savings"] = (
rec_impact["solar_kwh_savings"].values[0] * AnnualBillSavings.ELECTRICITY_PRICE_CAP
)
continue
heating_kwh_savings = (
previous_phase_impact["predictions_heating"].mean() - rec_impact["predictions_heating"].values[0]
)
hotwater_kwh_savings = (
previous_phase_impact["predictions_hotwater"].mean() - rec_impact["predictions_hotwater"].values[0]
)
# Shouldn't be positive
if rec["type"] == "mechanical_ventilation":
heating_kwh_savings = 0 if heating_kwh_savings > 0 else heating_kwh_savings
hotwater_kwh_savings = 0 if hotwater_kwh_savings > 0 else hotwater_kwh_savings
heating_cost_savings = (
previous_phase_impact["heating_cost"].mean() - rec_impact["heating_cost"].values[0]
)
hotwater_host = (
previous_phase_impact["hotwater_cost"].mean() - rec_impact["hotwater_cost"].values[0]
)
total_kwh_savings = heating_kwh_savings + hotwater_kwh_savings
energy_cost_savings = heating_cost_savings + hotwater_host
if rec["type"] == "low_energy_lighting":
continue
rec["kwh_savings"] = total_kwh_savings
rec["energy_cost_savings"] = energy_cost_savings
# Finally, we set the current energy bill
# For a community scheme, there is a standing charge but it's based on the operational cost of the network
# and therefore is likely different to the typical standing charge. This will be a cost typically defined
# by the network operator and often a building, whose residents are on a heat network, where the building
# operator will purchase energy from the network and re-sell it to the residents
starting_figures = kwh_impact_table[kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE].squeeze()
gas_standing_charge = 0
if (
(starting_figures["heating_fuel_type"] in ["Natural Gas", "Natural Gas (Community Scheme)"]) or
(starting_figures["hotwater_fuel_type"] == ["Natural Gas", "Natural Gas (Community Scheme)"])
):
gas_standing_charge = AnnualBillSavings.DAILY_STANDARD_CHARGE_GAS * 365
electricity_standing_charge = AnnualBillSavings.DAILY_STANDARD_CHARGE_ELECTRICITY * 365
# We return a dictionary that contains the individual costs, that can be stored to the database
current_energy_bill = {
"heating_cost_current": float(starting_figures["heating_cost"]),
"hot_water_cost_current": float(starting_figures["hotwater_cost"]),
"lighting_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["lighting"]),
"appliances_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["appliances"]),
"gas_standing_charge": float(gas_standing_charge),
"electricity_standing_charge": float(electricity_standing_charge),
}
return current_energy_bill