Merge branch 'main' of github.com:Hestia-Homes/Model into etl-michael

This commit is contained in:
Michael Duong 2024-02-19 15:20:05 +00:00
commit 343039233a
19 changed files with 2802 additions and 487 deletions

View file

@ -1,20 +1,15 @@
from datetime import datetime
import re
import os
import numpy as np
from itertools import groupby
import pandas as pd
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Dataset import TrainingDataset
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, \
BUILT_FORM_REMAP
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import DATA_ANOMALY_MATCHES
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
)
@ -47,6 +42,7 @@ class Property:
walls = None
windows = None
lighting = None
energy_source = None
spatial = None
base_difference_record = None
@ -118,7 +114,6 @@ class Property:
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
self.floor_level = None
self.number_of_windows = None
self.solar_pv_roof_area = None
self.solar_pv_percentage = None
self.current_adjusted_energy = None
@ -151,139 +146,215 @@ class Property:
# self.base_difference_record.df
def adjust_difference_record_with_recommendations(self, property_recommendations):
def adjust_difference_record_with_recommendations(
self, property_recommendations,
property_representative_recommendations
):
"""
This method will adjust the difference record, based on the recommendations made for the property
In order to score the measures, we need to consider the phase of the retrofit.
:param property_recommendations: dictionary of recommendations for the property
:param property_representative_recommendations: dictionary of representative recommendations for the property
"""
self.recommendations_scoring_data = []
phases = sorted([r[0]["phase"] for r in property_recommendations if r[0]["phase"] is not None])
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
scoring_dict = self.create_recommendation_scoring_data(
property_id=self.id, recommendation_record=recommendation_record, recommendation=rec,
for phase in phases:
property_recommendations_by_phase = [r for r in property_recommendations if r[0]["phase"] == phase][0]
previous_phases = [p for p in phases if p < phase]
previous_phase_representatives = [
r for r in property_representative_recommendations if r["phase"] in previous_phases
]
# For solid wall insulation, we will actually have 2 representative recommendations, since we consider
# both internal and external wall insulation as possible measures. We will use the representative that
# has the lowest efficiency.
# Take the representative with the lowest efficiency, by phase
# To be safe, we sort by phase
previous_phase_representatives = sorted(previous_phase_representatives, key=lambda x: x['phase'])
previous_phase_representatives = [
min(group, key=lambda x: x['efficiency']) for _, group in groupby(
previous_phase_representatives, key=lambda x: x['phase']
)
]
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
for rec in property_recommendations_by_phase:
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
if rec["type"] == "mechanical_ventilation":
continue
scoring_dict = self.create_recommendation_scoring_data(
property_id=self.id,
recommendation_record=recommendation_record,
recommendations=previous_phase_representatives + [rec],
primary_recommendation_id=rec["recommendation_id"]
)
self.recommendations_scoring_data.append(scoring_dict)
@staticmethod
def create_recommendation_scoring_data(property_id, recommendation_record, recommendation: dict):
def create_recommendation_scoring_data(
property_id, recommendation_record, recommendations: list, primary_recommendation_id: int
):
"""
This function will iterate through a list of recommendations and apply a simulation for each recommendation
This allows us to later multiple measures and see the impact of the measures on the property
:param property_id: The id of the property
:param recommendation_record: The record of the property, which will be updated
:param recommendations: The list of recommendations to apply
:param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record
:return: The updated recommendation record
"""
output = recommendation_record.copy()
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if recommendation_record[col] is None:
recommendation_record[col] = "none"
if output[col] is None:
output[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
recommendation_record["walls_insulation_thickness_ending"] = "above average"
recommendation_record["walls_energy_eff_ending"] = "Good"
else:
if recommendation_record["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
for recommendation in recommendations:
# For the list of recommendations we have, we iteratively update the output
if recommendation_record["walls_insulation_thickness_ending"] is None:
recommendation_record["walls_insulation_thickness_ending"] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
output["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
# Setting the insulation thickness here to above average should be tested further because we
# don't see a high volume of instances for this
output["walls_insulation_thickness_ending"] = "above average"
output["walls_energy_eff_ending"] = "Good"
# Update description to indicate it's insulate
if recommendation["type"] in [
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"
]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
# test the impact of using these booleans
if recommendation["type"] == "external_wall_insulation":
output["external_insulation"] = True
output["internal_insulation"] = False
# recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
recommendation_record["floor_insulation_thickness_ending"] = "average"
# This is rarely ever populated in the training data
# recommendation_record["floor_energy_eff_ending"] = "Good"
else:
if recommendation_record["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if recommendation["type"] == "internal_wall_insulation":
output["external_insulation"] = False
output["internal_insulation"] = True
if recommendation_record["floor_insulation_thickness_ending"] is None:
recommendation_record["floor_insulation_thickness_ending"] = "none"
# TODO: perhaps detrimental
# When making a recommendation for the wall, we will also update the ventilation
# if output["mechanical_ventilation_ending"] == 'natural':
# output["mechanical_ventilation_ending"] = 'mechanical, extract only'
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
if recommendation["type"] == "loft_insulation":
recommendation_record["roof_energy_eff_ending"] = "Good"
else:
recommendation_record["roof_energy_eff_ending"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if recommendation_record["roof_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if recommendation_record["roof_insulation_thickness_ending"] is None:
recommendation_record["roof_insulation_thickness_ending"] = "none"
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
if recommendation["type"] == "mechanical_ventilation":
recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only'
# Update description to indicate it's insulate
if recommendation["type"] in [
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"
]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
if recommendation["type"] == "sealing_open_fireplace":
recommendation_record["number_open_fireplaces_ending"] = 0
if recommendation["type"] == "low_energy_lighting":
recommendation_record["low_energy_lighting_ending"] = 100
recommendation_record["lighting_energy_eff_starting"] = "Very Good"
if recommendation["type"] == "windows_glazing":
recommendation_record["multi_glaze_proportion_ending"] = 100
recommendation_record["windows_energy_eff_ending"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
if recommendation_record["glazing_type_ending"] == "multiple":
pass
elif recommendation_record["glazing_type_ending"] == "single":
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
elif recommendation_record["glazing_type_ending"] == "double":
recommendation_record["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
elif recommendation_record["glazing_type_ending"] == "secondary":
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
elif recommendation_record["glazing_type_ending"] in ["triple", "high performance"]:
recommendation_record["glazing_type_ending"] = "multiple"
# output["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
output["floor_insulation_thickness_ending"] = "average"
# This is rarely ever populated in the training data
# output["floor_energy_eff_ending"] = "Good"
else:
raise ValueError("Invalid glazing type - implement me")
if output["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if recommendation["type"] == "solar_pv":
recommendation_record["photo_supply_ending"] = recommendation["photo_supply"]
if output["floor_insulation_thickness_ending"] is None:
output["floor_insulation_thickness_ending"] = "none"
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
output["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
recommendation_record['id'] = "+".join([str(property_id), str(recommendation["recommendation_id"])])
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
return recommendation_record
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
output["roof_insulation_thickness_ending"] = str(proposed_depth)
if recommendation["type"] == "loft_insulation":
if proposed_depth >= 270:
output["roof_energy_eff_ending"] = "Very Good"
else:
output["roof_energy_eff_ending"] = "Good"
else:
output["roof_energy_eff_ending"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if output["roof_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["roof_insulation_thickness_ending"] is None:
output["roof_insulation_thickness_ending"] = "none"
if recommendation["type"] == "sealing_open_fireplace":
output["number_open_fireplaces_ending"] = 0
if recommendation["type"] == "low_energy_lighting":
output["low_energy_lighting_ending"] = 100
output["lighting_energy_eff_starting"] = "Very Good"
if recommendation["type"] == "windows_glazing":
output["multi_glaze_proportion_ending"] = 100
output["windows_energy_eff_ending"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
if output["glazing_type_ending"] == "multiple":
pass
elif output["glazing_type_ending"] == "single":
output["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
elif output["glazing_type_ending"] == "double":
output["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
elif output["glazing_type_ending"] == "secondary":
output["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
elif output["glazing_type_ending"] in ["triple", "high performance"]:
output["glazing_type_ending"] = "multiple"
else:
raise ValueError("Invalid glazing type - implement me")
if is_secondary_glazing:
output["glazed_type_ending"] = "secondary glazing"
else:
output["glazed_type_ending"] = "double glazing installed during or after 2002 "
if recommendation["type"] == "solar_pv":
output["photo_supply_ending"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")
output['id'] = "+".join([str(property_id), str(primary_recommendation_id)])
return output
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
"""
@ -349,6 +420,7 @@ class Property:
self.set_solar_panel_area(
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
self.set_energy_source()
def set_spatial(self, spatial: pd.DataFrame):
"""
@ -668,9 +740,33 @@ class Property:
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
percentage_of_roof = percentage_of_roof / 100
self.solar_pv_roof_area = (
self.solar_pv_percentage = percentage_of_roof
def get_solar_pv_roof_area(self, percentage_of_roof):
"""
Given a percentage of the roof, this method will return the estimated area of the solar panels
:param percentage_of_roof:
:return:
"""
return (
self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else
self.pitched_roof_area * percentage_of_roof
)
self.solar_pv_percentage = percentage_of_roof
def set_energy_source(self):
"""
This method sets the energy source of the property, based on the mains gas flag and energy tariff.
"""
# Default to "electricity_and_gas" to cover most scenarios including when mains_gas_flag is True
energy_source = "electricity_and_gas"
# If the tariff explicitly indicates electricity use without a dual indication and mains_gas_flag is not True
# We check for the common electricity tariffs
if not self.data["mains-gas-flag"] and self.data["energy-tariff"] in [
"Single", "off-peak 7 hour", "off-peak 10 hour", "off-peak 18 hour", "standard tariff", "24 hour"
]:
energy_source = "electricity"
# Set the energy source based on the conditions above
self.energy_source = energy_source

View file

@ -81,6 +81,7 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"],
"heat_demand": rec["heat_demand"],
"adjusted_heat_demand": rec["adjusted_heat_demand"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"total_work_hours": rec["labour_hours"],
"energy_cost_savings": rec["energy_cost_savings"],

View file

@ -22,6 +22,7 @@ class Recommendation(Base):
new_u_value = Column(Float)
sap_points = Column(Float)
heat_demand = Column(Float)
adjusted_heat_demand = Column(Float)
co2_equivalent_savings = Column(Float)
energy_savings = Column(Float)
energy_cost_savings = Column(Float)

View file

@ -136,25 +136,32 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in input_properties:
# Property recommendations
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires
# decanting the tenant
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
representative_recommendations[p.id] = property_representative_recommendations
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(property_recommendations)
p.adjust_difference_record_with_recommendations(
property_recommendations, property_representative_recommendations
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
# TODO: Make sure that number_habitable_rooms has been dropped
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
@ -180,10 +187,18 @@ async def trigger_plan(body: PlanTriggerRequest):
property_instance = [p for p in input_properties if p.id == property_id][0]
recommendations_with_impact = Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations
recommendations_with_impact, current_adjusted_energy, expected_adjusted_energy = (
Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations
)
)
# Store the resulting adjusted energy in the property instance
property_instance.set_adjusted_energy(
current_adjusted_energy=current_adjusted_energy,
expected_adjusted_energy=expected_adjusted_energy
)
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
@ -207,7 +222,7 @@ async def trigger_plan(body: PlanTriggerRequest):
selected_recommendations = {r["id"] for r in solution}
# If wall ventilation is selected, we also include mechanical ventilation as a best practice measure
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
if any(x in [r["type"] for r in solution] for x in [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
]):
@ -237,174 +252,6 @@ async def trigger_plan(body: PlanTriggerRequest):
]
recommendations[property_id] = final_recommendations
# This is a temporary step, to estimate the impact of the measured on heat demand and carbon
# TODO: This needs to be cleaned up, if it happens to be kept
representative_recs = {}
for property_id, property_recommendations in recommendations.items():
default_recommendations = [r for r in property_recommendations if r["default"]]
default_types = {x["type"] for x in default_recommendations}
# Missing types
missing_types = list(set([r["type"] for r in property_recommendations if r["type"] not in default_types]))
# We might have a missing type as one of the solid wall options because for a solid wall, you might
# have ewi or iwi but only one of them will be a default
if ("internal_wall_insulation" in default_types) or ("external_wall_insaultion" in default_types):
missing_types = [
t for t in missing_types if t not in ["internal_wall_insulation", "external_wall_insulation"]
]
# We check if NO wall insulation was selected but iwi and ewi are available
# This condition will check
# 1) iwi and ewi are both in missing_types
# 2) iwi and ewi are not in default_types
# If both of these are true, it means that no wall insulation was selected via the optimisation routine
# but both are possible, so we need to select a default. We default to iwi because it's usually cheaper
if (("internal_wall_insulation" in missing_types) and ("external_wall_insulation" in missing_types)) and (
("internal_wall_insulation" not in default_types) and ("external_wall_insulation" not in default_types)
):
missing_types = [t for t in missing_types if t != "external_wall_insulation"]
if missing_types:
for missed_type in missing_types:
missed = [r for r in property_recommendations if r["type"] == missed_type]
min_cost = min([r["total"] for r in missed])
# Grab a representative, based on cheapest cost
representative_rec = [r for r in property_recommendations if np.isclose(r["total"], min_cost)]
default_recommendations.append(representative_rec[0])
representative_recs[property_id] = default_recommendations
# We update the carbon and heat demand predictions
# TODO: The api call producing all_combined_predictions has been removed so we can potentially completely
# refactor this block to just perform the energy adjustments
for property_id, property_recommendations in recommendations.items():
property_instance = [p for p in input_properties if p.id == property_id][0]
heat_demand_change = sum(
x.get("heat_demand", 0) for x in representative_recs[property_id] if
x["type"] not in ["mechanical_ventilation", "low_energy_lighting"]
)
carbon_change = sum(
x.get("co2_equivalent_savings", 0) for x in representative_recs[property_id] if
x["type"] not in ["mechanical_ventilation", "low_energy_lighting"]
)
starting_heat_demand = (
float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area
)
expected_heat_demand = starting_heat_demand - heat_demand_change
# We don't want to adjust the heat demand for mechanical ventilation so we add it back on
# We adjust the heat demand figures to align to the UCL paper
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=starting_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
# We sum up the SAP points of the default recommendations and calculate a new EPC category. This
# category is then used to produce adjusted energy figures
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=expected_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
heat_demand_change = (
current_adjusted_energy - expected_adjusted_energy
)
# update the recommendations
# We need to totals for the representative recommendations
representative_rec_data = [
{
"recommendation_id": r["recommendation_id"],
"co2_equivalent_savings": r.get("co2_equivalent_savings"),
"heat_demand": r.get("heat_demand"),
"type": r["type"]
} for r
in representative_recs[property_id]
]
representative_rec_data = pd.DataFrame(representative_rec_data)
# Suppress mechanical ventilation to have zero heat demand and co2
representative_rec_data.loc[
representative_rec_data["type"] == "mechanical_ventilation", "co2_equivalent_savings"
] = 0
representative_rec_data.loc[
representative_rec_data["type"] == "mechanical_ventilation", "heat_demand"
] = 0
# Supress low energy lighting to have zero heat demand and co2 - this does not get affected by this process
representative_rec_data.loc[
representative_rec_data["type"] == "low_energy_lighting", "co2_equivalent_savings"
] = 0
representative_rec_data.loc[
representative_rec_data["type"] == "low_energy_lighting", "heat_demand"
] = 0
# Convert co2 and heat demand to proportions of their column sums
representative_rec_data["co2_equivalent_savings_percent"] = (
representative_rec_data["co2_equivalent_savings"] /
representative_rec_data["co2_equivalent_savings"].sum()
)
representative_rec_data["heat_demand_percent"] = (
representative_rec_data["heat_demand"] / representative_rec_data["heat_demand"].sum()
)
# We'll use the proportions to update the carbon and heat demand
representative_rec_data["co2_equivalent_savings"] = (
carbon_change * representative_rec_data["co2_equivalent_savings_percent"]
)
representative_rec_data["heat_demand"] = (
heat_demand_change * representative_rec_data["heat_demand_percent"]
)
# Finally, insert these values into the final recommendations
for rec in property_recommendations:
if rec["type"] in ["external_wall_insulation", "internal_wall_insulation"]:
change_data = representative_rec_data[
representative_rec_data["type"].isin(["external_wall_insulation", "internal_wall_insulation"])
]
else:
change_data = representative_rec_data[representative_rec_data["type"] == rec["type"]]
if rec["type"] == "mechanical_ventilation":
rec["co2_equivalent_savings"] = 0
rec["heat_demand"] = 0
rec["energy_cost_savings"] = 0
elif rec["type"] == "low_energy_lighting":
# We do not convert, we just calculate energy cost savings
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["heat_demand"])
continue
else:
rec["co2_equivalent_savings"] = change_data["co2_equivalent_savings"].values[0]
rec["heat_demand"] = change_data["heat_demand"].values[0]
# If the recommendation is solar, the savings are entirely in electricity
if rec["type"] == "solar_pv":
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["heat_demand"])
else:
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
# Update recommendations
recommendations[property_id] = property_recommendations
# For expected adjust energy, we don't include mechanical ventilation so we'll add it back on
mechanical_ventilation_rec = representative_rec_data[
representative_rec_data["type"] == "mechanical_ventilation"
]
if not mechanical_ventilation_rec.empty:
expected_adjusted_energy = (
expected_adjusted_energy + mechanical_ventilation_rec["heat_demand"].values[0]
)
property_instance.set_adjusted_energy(
current_adjusted_energy=current_adjusted_energy,
expected_adjusted_energy=expected_adjusted_energy
)
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations

View file

@ -51,13 +51,14 @@ class PropertyValuation:
KNIGHT_FRANK_MAPPING = [
{"start": "D", "end": "C", "increase_percentage": 0.03},
{"start": "D", "end": "B", "increase_percentage": 0.088},
{"start": "D", "end": "A", "increase_percentage": 0.088},
]
NATIONWIDE_MAPPING = [
{"start": "G", "end": "D", "increase_percentage": 0.035},
{"start": "F", "end": "D", "increase_percentage": 0.035},
{"start": "D", "end": "B", "increase_percentage": 0.017},
{"start": "D", "end": "A", "increase_percentage": 0.017},
# {"start": "G", "end": "D", "increase_percentage": 0.035},
# {"start": "F", "end": "D", "increase_percentage": 0.035},
# {"start": "D", "end": "B", "increase_percentage": 0.017},
# {"start": "D", "end": "A", "increase_percentage": 0.017},
]
EPC_BANDS = ["G", "F", "E", "D", "C", "B", "A"]

View file

@ -130,8 +130,14 @@ class ModelApi:
)
)
predictions_df["predictions"] = predictions_df["predictions"].astype(float).round(1)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
# string split on phase= and then grab the second element of the resulting list. We could also use a
# regular expression to do this but we use the string split method here, for safety.
predictions_df['phase'] = predictions_df['recommendation_id'].str.split('phase=').str[1].str[0]
# Convert back to int
predictions_df['phase'] = predictions_df['phase'].astype(int)
predictions[model_prefix] = predictions_df

View file

@ -24,27 +24,6 @@ load_dotenv(ENV_FILE)
class DataLoader:
COLOUR_CONFIG = {
"ha_1": {
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
},
"ha_6": {
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
"survey_list": {
"green": "FF92D050", "purple": "FF7030A0", "red": "FFFF0000", "blue": "FF00B0F0"
}
},
"ha_14": {
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
},
"ha_39": {
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
},
"ha_107": {
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
}
}
MIN_ROWS = {
"ha_1": 2,
"ha_6": 2,
@ -53,12 +32,87 @@ class DataLoader:
"ha_107": 2,
}
COLUMN_CONFIG = {
"ha_1": {
"address": "Address",
"postcode": "Address - Postcode"
}
}
def __init__(self, files, use_cache):
self.files = files
self.use_cache = use_cache
self.data = {}
def create_asset_list_matching_address(self, ha_name, asset_list):
if ha_name in ["ha_1", "ha_6"]:
asset_list["matching_address"] = asset_list[
self.COLUMN_CONFIG[ha_name]["address"]
].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list[
self.COLUMN_CONFIG[ha_name]["postcode"]
].str.lower().str.strip()
elif ha_name == "ha_14":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
asset_list["Address 2"].str.lower().str.strip() + ", " + \
asset_list["Address 3"].str.lower().str.strip() + ", " + \
asset_list["Address 4"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
elif ha_name == "ha_39":
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_3"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
asset_list["post_code"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
elif ha_name == "ha_107":
# Create matching_address by concatenating House No, Street, Town, District, Postcode
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Street"].str.lower().str.strip() + ", " + \
asset_list["Town"].str.lower().str.strip() + ", " + \
asset_list["District"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
else:
raise NotImplementedError("implement me")
return asset_list
def append_asset_list_built_form(self, ha_name, asset_list):
# Finally, we process property_type or built form, where needed
if ha_name == "ha_6":
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
return asset_list
@staticmethod
def create_asset_list_house_no(ha_name, asset_list):
"""
This function will append the House number onto the asset list
:return:
"""
if ha_name in ["ha_107"]:
asset_list["HouseNo"] = asset_list["House No"].copy()
else:
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
house_numbers = split_addresses[0].str.split(' ', expand=True)
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
# many columns there might be
house_numbers = house_numbers.iloc[:, 0:1]
house_numbers.columns = ['HouseNo']
asset_list = pd.concat([asset_list, house_numbers[["HouseNo"]]], axis=1)
return asset_list
def load_asset_list(self, file_path, ha_name, sheet_name=None):
workbook = openpyxl.load_workbook(file_path)
if sheet_name is not None:
@ -87,74 +141,15 @@ class DataLoader:
# Remove entirely empty roww - consider all rows apart from row_color
asset_list = asset_list.loc[asset_list.loc[:, asset_list.columns != 'row_color'].notnull().any(axis=1)]
asset_list_colours = self.COLOUR_CONFIG[ha_name]["asset_list"]
asset_list["row_colour_name"] = np.where(
asset_list["row_color"] == asset_list_colours["red"], "red",
np.where(asset_list["row_color"] == asset_list_colours["green"], "green", "yellow")
)
asset_list["row_meaning"] = np.where(
asset_list["row_colour_name"] == "red", "does not meet criteria",
np.where(
asset_list["row_colour_name"] == "green", "identified potential eco works (CWI)", "maybe in the future"
)
)
# Add in asset_list_row_id
asset_list["asset_list_row_id"] = [ha_name + str(i) for i in range(0, len(asset_list))]
# Prepare the asset list
# Depending on the HA, we need to rename some columns
if ha_name == "ha_1":
asset_list["matching_address"] = asset_list["Address"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Address - Postcode"].str.lower().str.strip()
elif ha_name == "ha_6":
asset_list["matching_address"] = asset_list["propertyaddress"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Post Code"].str.lower().str.strip()
elif ha_name == "ha_14":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
asset_list["Address 2"].str.lower().str.strip() + ", " + \
asset_list["Address 3"].str.lower().str.strip() + ", " + \
asset_list["Address 4"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
elif ha_name == "ha_39":
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_3"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
asset_list["post_code"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
elif ha_name == "ha_107":
# Create matching_address by concatenating House No, Street, Town, District, Postcode
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Street"].str.lower().str.strip() + ", " + \
asset_list["Town"].str.lower().str.strip() + ", " + \
asset_list["District"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
else:
raise NotImplementedError("implement me")
# Create matching address and matching postcode
asset_list = self.create_asset_list_matching_address(ha_name=ha_name, asset_list=asset_list)
if ha_name in ["ha_107"]:
asset_list["HouseNo"] = asset_list["House No"].copy()
else:
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
house_numbers = split_addresses[0].str.split(' ', expand=True)
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
# many columns there might be
house_numbers = house_numbers.iloc[:, 0:1]
house_numbers.columns = ['HouseNo']
asset_list = self.create_asset_list_house_no(ha_name=ha_name, asset_list=asset_list)
asset_list = pd.concat([asset_list, house_numbers[["HouseNo"]]], axis=1)
# Finally, we process property_type or built form, where needed
if ha_name == "ha_6":
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list)
return asset_list
@ -177,9 +172,7 @@ class DataLoader:
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
# Remove columns that are None
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
survey_list["row_colour"] = survey_colors
survey_list_colours = self.COLOUR_CONFIG[ha_name]["survey_list"]
# The survey list has 4 possible colours:
# PURPLE - Installer advised install complete and a complimentary post works EPC has been completed.
@ -1252,13 +1245,13 @@ def app():
:return:
"""
use_cache = True
use_cache = False
files = {
"ha_1": {
"asset_list": {
"filepath": "etl/eligibility/ha_15_32/HA 1 - ASSET LIST.xlsx",
"sheetname": "HA 1"
"filepath": "local_data/ha_data/HA1/ACCENT GROUP.xlsx",
"sheetname": "Energy data"
}
},
"ha_6": {

View file

@ -50,9 +50,9 @@ scenario_properties = [
"postcode": "NN1 5JY",
"lmk-key": "1459796789102016070507274146560098",
"measures": [
[["internal_wall_insulation"], "11", None],
[["external_wall_insulation"], "10", None],
[["solar", "windows"], "12-15", {"photo_supply_ending": 50}],
[["internal_wall_insulation"], "11", None, [0]],
[["external_wall_insulation"], "10", None, [0]],
[["solar", "windows"], "12-15", {"photo_supply_ending": 50}, [0, 1]],
],
},
{
@ -60,7 +60,7 @@ scenario_properties = [
"postcode": "HP1 2HA",
"lmk-key": "c14029235739827d5f627dc8aa9bb567d026b267e851e0db0001db24638667b1",
"measures": [
[["cavity_wall_insulation", "loft_insulation"], "15", None],
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
],
},
{
@ -68,7 +68,7 @@ scenario_properties = [
"postcode": "HP1 2HE",
"lmk-key": "99296a6dda21314fef3a61cda59e441e9a2aacf115eb96f4a0fa85696bf7b117",
"measures": [
[["cavity_wall_insulation", "loft_insulation"], "15", None],
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
],
},
{
@ -76,7 +76,7 @@ scenario_properties = [
"postcode": "HP1 2AN",
"lmk-key": "d1e0534be3a44c33003323b21d0e322e3daddc65b5ee71936f89c59ddab96b50",
"measures": [
[["cavity_wall_insulation", "loft_insulation"], "15", None],
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
],
},
{
@ -84,7 +84,7 @@ scenario_properties = [
"postcode": "HP1 2HX",
"lmk-key": "1eae354db522a95188018d9cd0502ed8c609910b6c88f8797d3a25f59b11770a",
"measures": [
[["cavity_wall_insulation", "loft_insulation"], "15", None],
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
],
},
]
@ -195,18 +195,18 @@ for scenario_property in scenario_properties:
recommendation_record = p.base_difference_record.df.to_dict("records")[
0
].copy()
for rec in combi:
recommendation_record = p.create_recommendation_scoring_data(
property_id=rec["type"],
recommendation_record=recommendation_record,
recommendation=rec,
)
recommendation_record = p.create_recommendation_scoring_data(
property_id=i,
primary_recommendation_id=i,
recommendation_record=recommendation_record,
recommendations=combi,
)
if override is not None:
for key, value in override.items():
recommendation_record[key] = value
recommendation_record["id"] = "+".join(measure) + "+" + str(i)
recommendation_record["id"] = "&".join(measure) + "+" + str(i)
recommendation_record["impact"] = impact
scoring_list.append(recommendation_record)
@ -230,6 +230,20 @@ recommendations_scoring_data.insert(0, "impact", impact_col)
id_col = recommendations_scoring_data.pop("id")
recommendations_scoring_data.insert(0, "id", id_col)
from backend.ml_models.api import ModelApi
model_api = ModelApi(portfolio_id="generate-scenarios-data", timestamp=created_at)
all_predictions = model_api.predict_all(
df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
save_dataframe_to_s3_parquet(
recommendations_scoring_data,
"retrofit-data-dev",

File diff suppressed because it is too large Load diff

View file

@ -37,6 +37,9 @@ MCS_SOLAR_PV_COST_DATA = {
"average_cost_per_kwh-Northern Ireland": 2126.09,
}
# This is based on quotes from installers
BATTERY_COST = 3500
class Costs:
"""
@ -835,7 +838,7 @@ class Costs:
"labour_days": labour_days
}
def solar_pv(self, wattage: float):
def solar_pv(self, wattage: float, has_battery: bool = False):
"""
Calculates the total cost for solar PV based data provided by the MCS dashboard, which contains
@ -847,8 +850,8 @@ class Costs:
Price can also be benchmarked against this checkatrade article:
https://www.checkatrade.com/blog/cost-guides/cost-of-solar-panel-installation/
:param wattage: Peak wattage of the solar PV system
:return:
:param wattage: Peak wattage of the solar PV system]
:param has_battery: Bool, whether the system includes a battery
"""
# Get the cost data relevant to the region
@ -857,7 +860,12 @@ class Costs:
kw = wattage / 1000
total_cost = kw * regional_cost
if has_battery:
# The battery cost is based on the £3500 quote, recieved from installers
total_cost += BATTERY_COST
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people

View file

@ -20,7 +20,7 @@ class FireplaceRecommendations(Definitions):
self.has_ventilaion = None
self.recommendation = None
def recommend(self):
def recommend(self, phase=0):
"""
Based on the number of open fireplcaes found, we recommend sealing each one at a cost of
around £500
@ -37,6 +37,7 @@ class FireplaceRecommendations(Definitions):
# We recommend installing two mechanical ventilation systems
self.recommendation = [
{
"phase": phase,
"parts": [],
"type": "sealing_open_fireplace",
"description": "Seal %s open fireplaces" % str(number_open_fireplaces),

View file

@ -69,7 +69,7 @@ class FloorRecommendations(Definitions):
# TODO: To be completed
self.exposed_floor_non_insulation_materials = []
def recommend(self):
def recommend(self, phase=0):
u_value = self.property.floor["thermal_transmittance"]
property_type = self.property.data["property-type"]
@ -118,6 +118,7 @@ class FloorRecommendations(Definitions):
if self.property.floor["is_suspended"]:
# Given the U-value, we recommend underfloor insulation
self.recommend_floor_insulation(
phase=phase,
u_value=u_value,
insulation_materials=self.suspended_floor_insulation_materials,
non_insulation_materials=self.suspended_floor_non_insulation_materials
@ -129,7 +130,8 @@ class FloorRecommendations(Definitions):
self.recommend_floor_insulation(
u_value=u_value,
insulation_materials=self.solid_floor_insulation_materials,
non_insulation_materials=self.solid_floor_non_insulation_materials
non_insulation_materials=self.solid_floor_non_insulation_materials,
phase=phase
)
return
@ -156,7 +158,7 @@ class FloorRecommendations(Definitions):
raise ValueError("Invalid material type - implement me!")
def recommend_floor_insulation(self, u_value, insulation_materials, non_insulation_materials):
def recommend_floor_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
"""
This method is tasked with estimating the impact of performing suspended floor insulation
:return:
@ -198,6 +200,7 @@ class FloorRecommendations(Definitions):
self.recommendations.append(
{
"phase": phase,
"parts": [
get_recommended_part(
part=material.to_dict(),

View file

@ -4,6 +4,9 @@ from recommendations.Costs import Costs
class LightingRecommendations:
# We introduce a SAP limit to lighting, which is based on empirical findings. We do see cases where lighting is
# worth more than 2 points, but this is unlikely in the context of other upgrades that can be made to the property
SAP_LIMIT = 2
def __init__(self, property_instance: Property, materials: List):
"""
@ -51,7 +54,7 @@ class LightingRecommendations:
return total_energy_savings_per_year, carbon_reduction_tonnes
def recommend(self):
def recommend(self, phase=0):
"""
This method will check if there are any lighting fittings that aren't low energy.
@ -90,6 +93,7 @@ class LightingRecommendations:
self.recommendation = [
{
"phase": phase,
"parts": [],
"type": "low_energy_lighting",
"description": description,

View file

@ -1,5 +1,8 @@
import numpy as np
from backend.Property import Property
from typing import List
from itertools import groupby
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from recommendations.RoofRecommendations import RoofRecommendations
@ -44,57 +47,124 @@ class Recommendations:
"""
This method runs the recommendations for the individual measures and then appends them to a list for output
The recommendations are implemented in order of suggested phase, from fabric first to heating systems, to
renewables.
:return:
"""
property_recommendations = []
# Floor recommendations
self.floor_recommender.recommend()
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase = 0
# Wall recommendations
self.wall_recomender.recommend()
self.wall_recomender.recommend(phase=phase)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
# Roof recommendations
self.roof_recommender.recommend()
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
# Fireplace sealing recommendations
self.fireplace_recommender.recommend()
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
# Roof recommendations
self.roof_recommender.recommend(phase=phase)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Lighting recommendations
self.lighting_recommender.recommend()
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
# Floor recommendations
self.floor_recommender.recommend(phase=phase)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
# Windows recommendations
self.windows_recommender.recommend()
self.windows_recommender.recommend(phase=phase)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
# Fireplace sealing recommendations
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
# Lighting recommendations
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
# Solar recommendations
self.solar_recommender.recommend()
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
phase += 1
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)
return property_recommendations
# We also need to create the representative recommendations for each recommendation type
property_representative_recommendations = self.create_representative_recommendations(property_recommendations)
return property_recommendations, property_representative_recommendations
@staticmethod
def create_representative_recommendations(property_recommendations):
"""
This method will create a representative recommendation for each recommendation type
In order to create a representative recommendation, we choose the recommendation that has:
1) Where a U-value is available, has the best U-value to cost ratio
2) Where SAP points are available, has the best SAP points to cost ratio
We don't include mechanical ventilation in the representative recommendations, since we don't attribute a
SAP impact to this recommendation
:return:
"""
property_representative_recommendations = []
for recommendations_by_type in property_recommendations:
if recommendations_by_type[0].get("type") == "mechanical_ventilation":
continue
has_u_value = recommendations_by_type[0].get("new_u_value") is not None
has_sap_points = recommendations_by_type[0].get("sap_points") is not None
# When check if these recommendations have two different types, such as solid wall insulation
# If we have multiple types, we group by type and then select the best recommendation for each type
recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"])
representative_recommendations = []
for type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):
recommendations = list(recommendations)
# We also create an efficiency key, which is used to sort the recommendations
if has_u_value:
# We sort by the cost per U-value improvement - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"] / rec["starting_u_value"] - rec["new_u_value"]
elif not has_u_value and has_sap_points:
# Sort the options by the cost per SAP point improvement - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"] / rec["sap_points"]
else:
# Sort the options by cost - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"]
recommendations.sort(
key=lambda x: x["efficiency"]
)
representative_recommendations.append(recommendations[0])
property_representative_recommendations.extend(representative_recommendations)
return property_representative_recommendations
@staticmethod
def insert_temp_recommendation_id(property_recommendations):
@ -110,7 +180,7 @@ class Recommendations:
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = idx
rec["recommendation_id"] = f"{str(idx)}_phase={str(rec['phase'])}"
idx += 1
return property_recommendations
@ -140,11 +210,44 @@ class Recommendations:
property_recommendations = recommendations[property_instance.id].copy()
# We calculate the impact by phase
sap_phase_impact = property_sap_predictions.groupby("phase")["predictions"].median().reset_index()
heat_phase_impact = property_heat_predictions.groupby("phase")["predictions"].median().reset_index()
carbon_phase_impact = property_carbon_predictions.groupby("phase")["predictions"].median().reset_index()
# The heat demand change is the difference between the starting heat demand and the value at the final phase
expected_heat_demand = property_instance.floor_area * (
heat_phase_impact[heat_phase_impact["phase"] == max(heat_phase_impact["phase"])]["predictions"].values[0]
)
starting_heat_demand = (
float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area
)
# This is the unadjusted resulting heat demand
predicted_heat_demand_change = starting_heat_demand - expected_heat_demand
# We don't want to adjust the heat demand for mechanical ventilation so we add it back on
# We adjust the heat demand figures to align to the UCL paper
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=starting_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=expected_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
adjusted_heat_demand_change = (
current_adjusted_energy - expected_adjusted_energy
)
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
# We don't use the model for low energy lighting at the moment
if rec["type"] == "low_energy_lighting":
if rec["type"] == "mechanical_ventilation":
# We don't have a percieved sap impact of mechanical ventilation
continue
new_heat_demand = property_heat_predictions[property_heat_predictions["recommendation_id"] == str(
@ -158,26 +261,60 @@ class Recommendations:
new_sap = property_sap_predictions[property_sap_predictions["recommendation_id"] == str(
rec["recommendation_id"]
)]["predictions"].values[0]
rec["sap_points"] = new_sap - float(property_instance.data["current-energy-efficiency"])
if rec["type"] == "mechanical_ventilation":
if rec["phase"] == 0:
predicted_sap_points = new_sap - float(property_instance.data["current-energy-efficiency"])
predicted_co2_savings = float(property_instance.data["co2-emissions-current"]) - new_carbon
predicted_heat_demand = property_instance.floor_area * (
float(property_instance.data["energy-consumption-current"]) - new_heat_demand
)
else:
previous_phase = rec["phase"] - 1
predicted_sap_points = (
new_sap - sap_phase_impact[sap_phase_impact["phase"] == previous_phase]["predictions"].values[0]
)
predicted_co2_savings = (
carbon_phase_impact[carbon_phase_impact["phase"] == previous_phase]["predictions"].values[0] -
new_carbon
)
predicted_heat_demand = property_instance.floor_area * (
heat_phase_impact[heat_phase_impact["phase"] == previous_phase]["predictions"].values[0] -
new_heat_demand
)
if rec["type"] == "low_energy_lighting":
# For the moment, we cap the number of SAP points that can be achieved by ventilation at 2
rec["sap_points"] = min(rec["sap_points"], VentilationRecommendations.SAP_LIMIT)
rec["sap_points"] = min(predicted_sap_points, LightingRecommendations.SAP_LIMIT)
rec["co2_equivalent_savings"] = min(predicted_co2_savings, rec["co2_equivalent_savings"])
rec["heat_demand"] = min(predicted_heat_demand, rec["heat_demand"])
else:
rec["sap_points"] = predicted_sap_points
rec["co2_equivalent_savings"] = predicted_co2_savings
rec["heat_demand"] = predicted_heat_demand
# Round to 2 decimal places
rec["sap_points"] = round(rec["sap_points"], 2)
rec["co2_equivalent_savings"] = float(property_instance.data["co2-emissions-current"]) - new_carbon
# Energy consumption current is per meter squared, so we need to multiply by the floor area to get
# an absolute figure for the home
rec["heat_demand"] = (
(float(property_instance.data["energy-consumption-current"]) - new_heat_demand
) * property_instance.floor_area)
# We now calculate the adjusted heat demand for this recommendation, which is simply the percentage
# of the total adjusted heat demand change. The percentage we use is this recommendation's percentage
# of the total heat demand per square meter change
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
rec["adjusted_heat_demand"] = adjusted_heat_demand_change * (
rec["heat_demand"] / predicted_heat_demand_change
)
# We make sure this is NOT below 0
rec["adjusted_heat_demand"] = max(0, rec["adjusted_heat_demand"])
# Depending on the property's tarriff, we calculate the amount of energy savings this measure will bring
if property_instance.energy_source == "electricity":
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["adjusted_heat_demand"])
elif property_instance.energy_source == "electricity_and_gas":
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["adjusted_heat_demand"])
else:
raise ValueError("Invalid value for energy source")
if (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or (
rec["heat_demand"] is None) or (rec["energy_cost_savings"] is None):
raise ValueError("sap points, co2 or heat demand is missing")
return property_recommendations
return property_recommendations, current_adjusted_energy, expected_adjusted_energy

View file

@ -53,7 +53,7 @@ class RoofRecommendations:
]
]
def recommend(self):
def recommend(self, phase):
if self.property.roof["has_dwelling_above"]:
return
@ -98,11 +98,11 @@ class RoofRecommendations:
return
if self.property.roof["is_pitched"] or self.property.roof["is_flat"]:
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof)
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase)
return
if self.property.roof["is_roof_room"]:
self.recommend_room_roof_insulation(u_value)
self.recommend_room_roof_insulation(u_value, phase)
return
raise NotImplementedError("Implement me")
@ -124,7 +124,7 @@ class RoofRecommendations:
raise ValueError("Invalid material type")
def recommend_roof_insulation(
self, u_value, insulation_thickness, roof
self, u_value, insulation_thickness, roof, phase
):
"""
@ -217,6 +217,7 @@ class RoofRecommendations:
recommendations.append(
{
"phase": phase,
"parts": [
get_recommended_part(
part=material.to_dict(),
@ -236,7 +237,7 @@ class RoofRecommendations:
self.recommendations = recommendations
def recommend_room_roof_insulation(self, u_value):
def recommend_room_roof_insulation(self, u_value, phase):
"""
This method recommends room in roof insulation for properties that have been identified
to possess a room in roof.
@ -314,6 +315,7 @@ class RoofRecommendations:
recommendations.append(
{
"phase": phase,
"parts": [
get_recommended_part(
part=material,

View file

@ -18,7 +18,7 @@ class SolarPvRecommendations:
self.recommendation = []
def recommend(self):
def recommend(self, phase):
"""
We check if a property is potentially suitable for solar PV based on the following criteria:
- The property is a house or bungalow
@ -39,30 +39,54 @@ class SolarPvRecommendations:
if not is_valid_property_type or not is_valid_roof_type or not has_no_existing_solar_pv:
return
# We now have a property which is potentially suitable for solar PV
number_solar_panels = np.floor(self.property.solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
roof_coverage_percent = round(self.property.solar_pv_percentage * 100)
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage)
kw = np.floor(solar_panel_wattage / 100) / 10
self.recommendation = [
{
"parts": [],
"type": "solar_pv",
"description": f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
f"{roof_coverage_percent}% the roof",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": 100 * self.property.solar_pv_percentage
}
# For the solar recommendations, we produce the following scenarios:
# 1) Solar panels only, we present a high, medium and low coverage
# 2) With and without battery
roof_coverage_scenarios = [
self.property.solar_pv_percentage - 0.1, self.property.solar_pv_percentage,
self.property.solar_pv_percentage + 0.1
]
# We make sure we haven't gone too low or high
roof_coverage_scenarios = [v for v in roof_coverage_scenarios if 0 <= v <= 1]
battery_scenarios = [False, True]
# I now produce the cross product of the scenarios
scenarios = [(roof, battery) for roof in roof_coverage_scenarios for battery in battery_scenarios]
for roof_coverage, has_battery in scenarios:
# We now have a property which is potentially suitable for solar PV
solar_pv_roof_area = self.property.get_solar_pv_roof_area(roof_coverage)
number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
roof_coverage_percent = round(roof_coverage * 100)
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=has_battery)
kw = np.floor(solar_panel_wattage / 100) / 10
if has_battery:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
f"{round(roof_coverage_percent)}% the roof, with a battery storage system.")
else:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p"
f"anel system on {round(roof_coverage_percent)}% the roof.")
self.recommendation.append(
{
"phase": phase,
"parts": [],
"type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": 100 * roof_coverage
}
)

View file

@ -15,9 +15,6 @@ class VentilationRecommendations(Definitions):
'mechanical, supply and extract'
]
# We introduce a SAP limit, to prevent over-predicting the SAP impact of mechanical ventilation
SAP_LIMIT = 2
def __init__(
self,
property_instance: Property,
@ -62,12 +59,17 @@ class VentilationRecommendations(Definitions):
# We recommend installing two mechanical ventilation systems
self.recommendation = [
{
"phase": None,
"parts": part,
"type": part[0]["type"],
"description": f"Install {n_units} {part[0]['description']} units",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"sap_points": 0,
"heat_demand": 0,
"adjusted_heat_demand": 0,
"co2_equivalent_savings": 0,
"energy_cost_savings": 0,
"total": estimated_cost,
# We use a very simple and rough estimate of 4 hours per unit
"labour_hours": 4 * n_units,

View file

@ -97,7 +97,7 @@ class WallRecommendations(Definitions):
return True
def recommend(self):
def recommend(self, phase=0):
# if building built after 1990 + we're able to identify U-value +
# U-value less than 0.18 and if in or close to a conversation area,
# recommend internal wall insulation as a possible measure
@ -146,19 +146,19 @@ class WallRecommendations(Definitions):
if is_cavity_wall:
if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# Test filling cavity
self.find_cavity_insulation(u_value, insulation_thickness)
self.find_cavity_insulation(u_value, insulation_thickness, phase)
return
# Remaining wall types are treated with IWI or EWI
if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
self.find_insulation(u_value)
self.find_insulation(u_value, phase)
return
# If the u-value is within regulations, we don't do anything
return
def find_cavity_insulation(self, u_value, insulation_thickness):
def find_cavity_insulation(self, u_value, insulation_thickness, phase):
"""
This method tests different materials to fill the cavity wall, determining which
material will give us the best U-value.
@ -210,6 +210,7 @@ class WallRecommendations(Definitions):
recommendations.append(
{
"phase": phase,
"parts": [
get_recommended_part(
part=material.to_dict(),
@ -229,7 +230,7 @@ class WallRecommendations(Definitions):
self.recommendations = recommendations
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials):
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
lowest_selected_u_value = None
recommendations = []
@ -274,6 +275,7 @@ class WallRecommendations(Definitions):
recommendations.append(
{
"phase": phase,
"parts": [
get_recommended_part(
part=material.to_dict(),
@ -293,7 +295,7 @@ class WallRecommendations(Definitions):
return recommendations
def find_insulation(self, u_value):
def find_insulation(self, u_value, phase):
"""
This function contains the logic for finding potential insulation measures for a property, depending
on the parts available and whether the property can have external wall insulation installed
@ -310,13 +312,15 @@ class WallRecommendations(Definitions):
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.external_wall_insulation_materials),
non_insulation_materials=self.external_wall_non_insulation_materials
non_insulation_materials=self.external_wall_non_insulation_materials,
phase=phase
)
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),
non_insulation_materials=self.internal_wall_non_insulation_materials
non_insulation_materials=self.internal_wall_non_insulation_materials,
phase=phase
)
self.recommendations += ewi_recommendations + iwi_recommendations

View file

@ -30,7 +30,7 @@ class WindowsRecommendations:
raise ValueError("There should only be one window glazing material")
self.glazing_material = self.glazing_material[0]
def recommend(self):
def recommend(self, phase=0):
"""
This method will recommend the best possible glazing options for a property.
@ -85,6 +85,7 @@ class WindowsRecommendations:
self.recommendation = [
{
"phase": phase,
"parts": [],
"type": "windows_glazing",
"description": description,