mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #277 from Hestia-Homes/test-sap-model-updates
Test sap model updates
This commit is contained in:
commit
81e1ca65d0
19 changed files with 2802 additions and 487 deletions
|
|
@ -1,20 +1,15 @@
|
|||
from datetime import datetime
|
||||
import re
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
from itertools import groupby
|
||||
import pandas as pd
|
||||
|
||||
from etl.epc.DataProcessor import EPCDataProcessor
|
||||
from etl.epc.Dataset import TrainingDataset
|
||||
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, \
|
||||
BUILT_FORM_REMAP
|
||||
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES
|
||||
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
|
||||
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
|
||||
from utils.logger import setup_logger
|
||||
from utils.s3 import read_dataframe_from_s3_parquet
|
||||
from etl.epc.settings import DATA_ANOMALY_MATCHES
|
||||
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
|
||||
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
|
||||
from recommendations.recommendation_utils import (
|
||||
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
|
||||
)
|
||||
|
|
@ -47,6 +42,7 @@ class Property:
|
|||
walls = None
|
||||
windows = None
|
||||
lighting = None
|
||||
energy_source = None
|
||||
|
||||
spatial = None
|
||||
base_difference_record = None
|
||||
|
|
@ -118,7 +114,6 @@ class Property:
|
|||
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
|
||||
self.floor_level = None
|
||||
self.number_of_windows = None
|
||||
self.solar_pv_roof_area = None
|
||||
self.solar_pv_percentage = None
|
||||
|
||||
self.current_adjusted_energy = None
|
||||
|
|
@ -151,139 +146,215 @@ class Property:
|
|||
|
||||
# self.base_difference_record.df
|
||||
|
||||
def adjust_difference_record_with_recommendations(self, property_recommendations):
|
||||
def adjust_difference_record_with_recommendations(
|
||||
self, property_recommendations,
|
||||
property_representative_recommendations
|
||||
):
|
||||
"""
|
||||
This method will adjust the difference record, based on the recommendations made for the property
|
||||
|
||||
In order to score the measures, we need to consider the phase of the retrofit.
|
||||
|
||||
:param property_recommendations: dictionary of recommendations for the property
|
||||
:param property_representative_recommendations: dictionary of representative recommendations for the property
|
||||
"""
|
||||
|
||||
self.recommendations_scoring_data = []
|
||||
phases = sorted([r[0]["phase"] for r in property_recommendations if r[0]["phase"] is not None])
|
||||
|
||||
for recommendations_by_type in property_recommendations:
|
||||
for i, rec in enumerate(recommendations_by_type):
|
||||
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
|
||||
scoring_dict = self.create_recommendation_scoring_data(
|
||||
property_id=self.id, recommendation_record=recommendation_record, recommendation=rec,
|
||||
for phase in phases:
|
||||
property_recommendations_by_phase = [r for r in property_recommendations if r[0]["phase"] == phase][0]
|
||||
previous_phases = [p for p in phases if p < phase]
|
||||
previous_phase_representatives = [
|
||||
r for r in property_representative_recommendations if r["phase"] in previous_phases
|
||||
]
|
||||
# For solid wall insulation, we will actually have 2 representative recommendations, since we consider
|
||||
# both internal and external wall insulation as possible measures. We will use the representative that
|
||||
# has the lowest efficiency.
|
||||
# Take the representative with the lowest efficiency, by phase
|
||||
|
||||
# To be safe, we sort by phase
|
||||
previous_phase_representatives = sorted(previous_phase_representatives, key=lambda x: x['phase'])
|
||||
|
||||
previous_phase_representatives = [
|
||||
min(group, key=lambda x: x['efficiency']) for _, group in groupby(
|
||||
previous_phase_representatives, key=lambda x: x['phase']
|
||||
)
|
||||
]
|
||||
|
||||
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
|
||||
|
||||
for rec in property_recommendations_by_phase:
|
||||
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
|
||||
|
||||
if rec["type"] == "mechanical_ventilation":
|
||||
continue
|
||||
|
||||
scoring_dict = self.create_recommendation_scoring_data(
|
||||
property_id=self.id,
|
||||
recommendation_record=recommendation_record,
|
||||
recommendations=previous_phase_representatives + [rec],
|
||||
primary_recommendation_id=rec["recommendation_id"]
|
||||
)
|
||||
self.recommendations_scoring_data.append(scoring_dict)
|
||||
|
||||
@staticmethod
|
||||
def create_recommendation_scoring_data(property_id, recommendation_record, recommendation: dict):
|
||||
def create_recommendation_scoring_data(
|
||||
property_id, recommendation_record, recommendations: list, primary_recommendation_id: int
|
||||
):
|
||||
"""
|
||||
This function will iterate through a list of recommendations and apply a simulation for each recommendation
|
||||
This allows us to later multiple measures and see the impact of the measures on the property
|
||||
:param property_id: The id of the property
|
||||
:param recommendation_record: The record of the property, which will be updated
|
||||
:param recommendations: The list of recommendations to apply
|
||||
:param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record
|
||||
:return: The updated recommendation record
|
||||
"""
|
||||
|
||||
output = recommendation_record.copy()
|
||||
|
||||
for col in [
|
||||
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
|
||||
]:
|
||||
if recommendation_record[col] is None:
|
||||
recommendation_record[col] = "none"
|
||||
if output[col] is None:
|
||||
output[col] = "none"
|
||||
|
||||
# We update the description to indicate it's insulated
|
||||
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
|
||||
# The upgrade made here is to the u-value of the walls and the description of the
|
||||
# insulation thickness
|
||||
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
recommendation_record["walls_insulation_thickness_ending"] = "above average"
|
||||
recommendation_record["walls_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
if recommendation_record["walls_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
for recommendation in recommendations:
|
||||
# For the list of recommendations we have, we iteratively update the output
|
||||
|
||||
if recommendation_record["walls_insulation_thickness_ending"] is None:
|
||||
recommendation_record["walls_insulation_thickness_ending"] = "none"
|
||||
# We update the description to indicate it's insulated
|
||||
if recommendation["type"] in [
|
||||
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
|
||||
]:
|
||||
# The upgrade made here is to the u-value of the walls and the description of the
|
||||
# insulation thickness
|
||||
output["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
# Setting the insulation thickness here to above average should be tested further because we
|
||||
# don't see a high volume of instances for this
|
||||
output["walls_insulation_thickness_ending"] = "above average"
|
||||
output["walls_energy_eff_ending"] = "Good"
|
||||
|
||||
# Update description to indicate it's insulate
|
||||
if recommendation["type"] in [
|
||||
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"
|
||||
]:
|
||||
if len(recommendation["parts"]) > 1:
|
||||
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
|
||||
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
|
||||
# test the impact of using these booleans
|
||||
if recommendation["type"] == "external_wall_insulation":
|
||||
output["external_insulation"] = True
|
||||
output["internal_insulation"] = False
|
||||
|
||||
# recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
# We don't really see above average for this in the training data
|
||||
recommendation_record["floor_insulation_thickness_ending"] = "average"
|
||||
# This is rarely ever populated in the training data
|
||||
# recommendation_record["floor_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
if recommendation_record["floor_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
if recommendation["type"] == "internal_wall_insulation":
|
||||
output["external_insulation"] = False
|
||||
output["internal_insulation"] = True
|
||||
|
||||
if recommendation_record["floor_insulation_thickness_ending"] is None:
|
||||
recommendation_record["floor_insulation_thickness_ending"] = "none"
|
||||
# TODO: perhaps detrimental
|
||||
# When making a recommendation for the wall, we will also update the ventilation
|
||||
# if output["mechanical_ventilation_ending"] == 'natural':
|
||||
# output["mechanical_ventilation_ending"] = 'mechanical, extract only'
|
||||
|
||||
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
|
||||
recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
|
||||
parts = recommendation["parts"]
|
||||
if len(parts) != 1:
|
||||
raise ValueError("More than one part for roof insulation - investiage me")
|
||||
|
||||
# This is based on the values we have in the training data
|
||||
valid_numeric_values = [
|
||||
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
|
||||
]
|
||||
|
||||
proposed_depth = int(parts[0]["depth"])
|
||||
if proposed_depth not in valid_numeric_values:
|
||||
# Take the nearest value for scoring
|
||||
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
|
||||
|
||||
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
|
||||
if recommendation["type"] == "loft_insulation":
|
||||
recommendation_record["roof_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
recommendation_record["roof_energy_eff_ending"] = "Very Good"
|
||||
else:
|
||||
# Fill missing roof u-values - this fill is not based on recommended upgrades
|
||||
if recommendation_record["roof_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
if output["walls_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
|
||||
if recommendation_record["roof_insulation_thickness_ending"] is None:
|
||||
recommendation_record["roof_insulation_thickness_ending"] = "none"
|
||||
if output["walls_insulation_thickness_ending"] is None:
|
||||
output["walls_insulation_thickness_ending"] = "none"
|
||||
|
||||
if recommendation["type"] == "mechanical_ventilation":
|
||||
recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only'
|
||||
# Update description to indicate it's insulate
|
||||
if recommendation["type"] in [
|
||||
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"
|
||||
]:
|
||||
if len(recommendation["parts"]) > 1:
|
||||
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
|
||||
|
||||
if recommendation["type"] == "sealing_open_fireplace":
|
||||
recommendation_record["number_open_fireplaces_ending"] = 0
|
||||
|
||||
if recommendation["type"] == "low_energy_lighting":
|
||||
recommendation_record["low_energy_lighting_ending"] = 100
|
||||
recommendation_record["lighting_energy_eff_starting"] = "Very Good"
|
||||
|
||||
if recommendation["type"] == "windows_glazing":
|
||||
recommendation_record["multi_glaze_proportion_ending"] = 100
|
||||
recommendation_record["windows_energy_eff_ending"] = "Average"
|
||||
|
||||
is_secondary_glazing = recommendation["is_secondary_glazing"]
|
||||
|
||||
if recommendation_record["glazing_type_ending"] == "multiple":
|
||||
pass
|
||||
elif recommendation_record["glazing_type_ending"] == "single":
|
||||
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
|
||||
elif recommendation_record["glazing_type_ending"] == "double":
|
||||
recommendation_record["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
|
||||
elif recommendation_record["glazing_type_ending"] == "secondary":
|
||||
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
|
||||
elif recommendation_record["glazing_type_ending"] in ["triple", "high performance"]:
|
||||
recommendation_record["glazing_type_ending"] = "multiple"
|
||||
# output["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
# We don't really see above average for this in the training data
|
||||
output["floor_insulation_thickness_ending"] = "average"
|
||||
# This is rarely ever populated in the training data
|
||||
# output["floor_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
raise ValueError("Invalid glazing type - implement me")
|
||||
if output["floor_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
|
||||
if recommendation["type"] == "solar_pv":
|
||||
recommendation_record["photo_supply_ending"] = recommendation["photo_supply"]
|
||||
if output["floor_insulation_thickness_ending"] is None:
|
||||
output["floor_insulation_thickness_ending"] = "none"
|
||||
|
||||
if recommendation["type"] not in [
|
||||
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
|
||||
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
||||
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
||||
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
|
||||
"windows_glazing", "solar_pv"
|
||||
]:
|
||||
raise NotImplementedError("Implement me")
|
||||
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
|
||||
output["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
||||
|
||||
recommendation_record['id'] = "+".join([str(property_id), str(recommendation["recommendation_id"])])
|
||||
parts = recommendation["parts"]
|
||||
if len(parts) != 1:
|
||||
raise ValueError("More than one part for roof insulation - investiage me")
|
||||
|
||||
return recommendation_record
|
||||
# This is based on the values we have in the training data
|
||||
valid_numeric_values = [
|
||||
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
|
||||
]
|
||||
|
||||
proposed_depth = int(parts[0]["depth"])
|
||||
if proposed_depth not in valid_numeric_values:
|
||||
# Take the nearest value for scoring
|
||||
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
|
||||
|
||||
output["roof_insulation_thickness_ending"] = str(proposed_depth)
|
||||
if recommendation["type"] == "loft_insulation":
|
||||
if proposed_depth >= 270:
|
||||
output["roof_energy_eff_ending"] = "Very Good"
|
||||
else:
|
||||
output["roof_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
output["roof_energy_eff_ending"] = "Very Good"
|
||||
else:
|
||||
# Fill missing roof u-values - this fill is not based on recommended upgrades
|
||||
if output["roof_thermal_transmittance_ending"] is None:
|
||||
raise ValueError("We should not have a None value for the u value")
|
||||
|
||||
if output["roof_insulation_thickness_ending"] is None:
|
||||
output["roof_insulation_thickness_ending"] = "none"
|
||||
|
||||
if recommendation["type"] == "sealing_open_fireplace":
|
||||
output["number_open_fireplaces_ending"] = 0
|
||||
|
||||
if recommendation["type"] == "low_energy_lighting":
|
||||
output["low_energy_lighting_ending"] = 100
|
||||
output["lighting_energy_eff_starting"] = "Very Good"
|
||||
|
||||
if recommendation["type"] == "windows_glazing":
|
||||
output["multi_glaze_proportion_ending"] = 100
|
||||
output["windows_energy_eff_ending"] = "Average"
|
||||
|
||||
is_secondary_glazing = recommendation["is_secondary_glazing"]
|
||||
|
||||
if output["glazing_type_ending"] == "multiple":
|
||||
pass
|
||||
elif output["glazing_type_ending"] == "single":
|
||||
output["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
|
||||
elif output["glazing_type_ending"] == "double":
|
||||
output["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
|
||||
elif output["glazing_type_ending"] == "secondary":
|
||||
output["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
|
||||
elif output["glazing_type_ending"] in ["triple", "high performance"]:
|
||||
output["glazing_type_ending"] = "multiple"
|
||||
else:
|
||||
raise ValueError("Invalid glazing type - implement me")
|
||||
|
||||
if is_secondary_glazing:
|
||||
output["glazed_type_ending"] = "secondary glazing"
|
||||
else:
|
||||
output["glazed_type_ending"] = "double glazing installed during or after 2002 "
|
||||
|
||||
if recommendation["type"] == "solar_pv":
|
||||
output["photo_supply_ending"] = recommendation["photo_supply"]
|
||||
|
||||
if recommendation["type"] not in [
|
||||
"sealing_open_fireplace", "low_energy_lighting",
|
||||
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
||||
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
||||
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
|
||||
"windows_glazing", "solar_pv"
|
||||
]:
|
||||
raise NotImplementedError("Implement me")
|
||||
|
||||
output['id'] = "+".join([str(property_id), str(primary_recommendation_id)])
|
||||
|
||||
return output
|
||||
|
||||
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
|
||||
"""
|
||||
|
|
@ -349,6 +420,7 @@ class Property:
|
|||
self.set_solar_panel_area(
|
||||
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
|
||||
)
|
||||
self.set_energy_source()
|
||||
|
||||
def set_spatial(self, spatial: pd.DataFrame):
|
||||
"""
|
||||
|
|
@ -668,9 +740,33 @@ class Property:
|
|||
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
|
||||
percentage_of_roof = percentage_of_roof / 100
|
||||
|
||||
self.solar_pv_roof_area = (
|
||||
self.solar_pv_percentage = percentage_of_roof
|
||||
|
||||
def get_solar_pv_roof_area(self, percentage_of_roof):
|
||||
"""
|
||||
Given a percentage of the roof, this method will return the estimated area of the solar panels
|
||||
:param percentage_of_roof:
|
||||
:return:
|
||||
"""
|
||||
|
||||
return (
|
||||
self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else
|
||||
self.pitched_roof_area * percentage_of_roof
|
||||
)
|
||||
|
||||
self.solar_pv_percentage = percentage_of_roof
|
||||
def set_energy_source(self):
|
||||
"""
|
||||
This method sets the energy source of the property, based on the mains gas flag and energy tariff.
|
||||
"""
|
||||
# Default to "electricity_and_gas" to cover most scenarios including when mains_gas_flag is True
|
||||
energy_source = "electricity_and_gas"
|
||||
|
||||
# If the tariff explicitly indicates electricity use without a dual indication and mains_gas_flag is not True
|
||||
# We check for the common electricity tariffs
|
||||
if not self.data["mains-gas-flag"] and self.data["energy-tariff"] in [
|
||||
"Single", "off-peak 7 hour", "off-peak 10 hour", "off-peak 18 hour", "standard tariff", "24 hour"
|
||||
]:
|
||||
energy_source = "electricity"
|
||||
|
||||
# Set the energy source based on the conditions above
|
||||
self.energy_source = energy_source
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
|
|||
"new_u_value": rec.get("new_u_value"),
|
||||
"sap_points": rec["sap_points"],
|
||||
"heat_demand": rec["heat_demand"],
|
||||
"adjusted_heat_demand": rec["adjusted_heat_demand"],
|
||||
"co2_equivalent_savings": rec["co2_equivalent_savings"],
|
||||
"total_work_hours": rec["labour_hours"],
|
||||
"energy_cost_savings": rec["energy_cost_savings"],
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ class Recommendation(Base):
|
|||
new_u_value = Column(Float)
|
||||
sap_points = Column(Float)
|
||||
heat_demand = Column(Float)
|
||||
adjusted_heat_demand = Column(Float)
|
||||
co2_equivalent_savings = Column(Float)
|
||||
energy_savings = Column(Float)
|
||||
energy_cost_savings = Column(Float)
|
||||
|
|
|
|||
|
|
@ -136,25 +136,32 @@ async def trigger_plan(body: PlanTriggerRequest):
|
|||
|
||||
recommendations = {}
|
||||
recommendations_scoring_data = []
|
||||
|
||||
representative_recommendations = {}
|
||||
for p in input_properties:
|
||||
|
||||
# Property recommendations
|
||||
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
|
||||
|
||||
# TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires
|
||||
# decanting the tenant
|
||||
recommender = Recommendations(property_instance=p, materials=materials)
|
||||
property_recommendations = recommender.recommend()
|
||||
property_recommendations, property_representative_recommendations = recommender.recommend()
|
||||
|
||||
if not property_recommendations:
|
||||
continue
|
||||
|
||||
recommendations[p.id] = property_recommendations
|
||||
representative_recommendations[p.id] = property_representative_recommendations
|
||||
|
||||
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
||||
p.adjust_difference_record_with_recommendations(property_recommendations)
|
||||
p.adjust_difference_record_with_recommendations(
|
||||
property_recommendations, property_representative_recommendations
|
||||
)
|
||||
|
||||
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
||||
|
||||
# TODO: Make sure that number_habitable_rooms has been dropped
|
||||
|
||||
logger.info("Preparing data for scoring in sap change api")
|
||||
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
||||
recommendations_scoring_data = recommendations_scoring_data.drop(
|
||||
|
|
@ -180,10 +187,18 @@ async def trigger_plan(body: PlanTriggerRequest):
|
|||
|
||||
property_instance = [p for p in input_properties if p.id == property_id][0]
|
||||
|
||||
recommendations_with_impact = Recommendations.calculate_recommendation_impact(
|
||||
property_instance=property_instance,
|
||||
all_predictions=all_predictions,
|
||||
recommendations=recommendations
|
||||
recommendations_with_impact, current_adjusted_energy, expected_adjusted_energy = (
|
||||
Recommendations.calculate_recommendation_impact(
|
||||
property_instance=property_instance,
|
||||
all_predictions=all_predictions,
|
||||
recommendations=recommendations
|
||||
)
|
||||
)
|
||||
|
||||
# Store the resulting adjusted energy in the property instance
|
||||
property_instance.set_adjusted_energy(
|
||||
current_adjusted_energy=current_adjusted_energy,
|
||||
expected_adjusted_energy=expected_adjusted_energy
|
||||
)
|
||||
|
||||
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
|
||||
|
|
@ -207,7 +222,7 @@ async def trigger_plan(body: PlanTriggerRequest):
|
|||
|
||||
selected_recommendations = {r["id"] for r in solution}
|
||||
|
||||
# If wall ventilation is selected, we also include mechanical ventilation as a best practice measure
|
||||
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
|
||||
if any(x in [r["type"] for r in solution] for x in [
|
||||
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
|
||||
]):
|
||||
|
|
@ -237,174 +252,6 @@ async def trigger_plan(body: PlanTriggerRequest):
|
|||
]
|
||||
recommendations[property_id] = final_recommendations
|
||||
|
||||
# This is a temporary step, to estimate the impact of the measured on heat demand and carbon
|
||||
# TODO: This needs to be cleaned up, if it happens to be kept
|
||||
representative_recs = {}
|
||||
for property_id, property_recommendations in recommendations.items():
|
||||
default_recommendations = [r for r in property_recommendations if r["default"]]
|
||||
default_types = {x["type"] for x in default_recommendations}
|
||||
|
||||
# Missing types
|
||||
missing_types = list(set([r["type"] for r in property_recommendations if r["type"] not in default_types]))
|
||||
# We might have a missing type as one of the solid wall options because for a solid wall, you might
|
||||
# have ewi or iwi but only one of them will be a default
|
||||
if ("internal_wall_insulation" in default_types) or ("external_wall_insaultion" in default_types):
|
||||
missing_types = [
|
||||
t for t in missing_types if t not in ["internal_wall_insulation", "external_wall_insulation"]
|
||||
]
|
||||
|
||||
# We check if NO wall insulation was selected but iwi and ewi are available
|
||||
# This condition will check
|
||||
# 1) iwi and ewi are both in missing_types
|
||||
# 2) iwi and ewi are not in default_types
|
||||
# If both of these are true, it means that no wall insulation was selected via the optimisation routine
|
||||
# but both are possible, so we need to select a default. We default to iwi because it's usually cheaper
|
||||
if (("internal_wall_insulation" in missing_types) and ("external_wall_insulation" in missing_types)) and (
|
||||
("internal_wall_insulation" not in default_types) and ("external_wall_insulation" not in default_types)
|
||||
):
|
||||
missing_types = [t for t in missing_types if t != "external_wall_insulation"]
|
||||
|
||||
if missing_types:
|
||||
for missed_type in missing_types:
|
||||
missed = [r for r in property_recommendations if r["type"] == missed_type]
|
||||
min_cost = min([r["total"] for r in missed])
|
||||
# Grab a representative, based on cheapest cost
|
||||
|
||||
representative_rec = [r for r in property_recommendations if np.isclose(r["total"], min_cost)]
|
||||
default_recommendations.append(representative_rec[0])
|
||||
|
||||
representative_recs[property_id] = default_recommendations
|
||||
|
||||
# We update the carbon and heat demand predictions
|
||||
# TODO: The api call producing all_combined_predictions has been removed so we can potentially completely
|
||||
# refactor this block to just perform the energy adjustments
|
||||
for property_id, property_recommendations in recommendations.items():
|
||||
|
||||
property_instance = [p for p in input_properties if p.id == property_id][0]
|
||||
|
||||
heat_demand_change = sum(
|
||||
x.get("heat_demand", 0) for x in representative_recs[property_id] if
|
||||
x["type"] not in ["mechanical_ventilation", "low_energy_lighting"]
|
||||
)
|
||||
carbon_change = sum(
|
||||
x.get("co2_equivalent_savings", 0) for x in representative_recs[property_id] if
|
||||
x["type"] not in ["mechanical_ventilation", "low_energy_lighting"]
|
||||
)
|
||||
|
||||
starting_heat_demand = (
|
||||
float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area
|
||||
)
|
||||
expected_heat_demand = starting_heat_demand - heat_demand_change
|
||||
|
||||
# We don't want to adjust the heat demand for mechanical ventilation so we add it back on
|
||||
|
||||
# We adjust the heat demand figures to align to the UCL paper
|
||||
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
||||
epc_energy_consumption=starting_heat_demand,
|
||||
current_epc_rating=property_instance.data["current-energy-rating"],
|
||||
)
|
||||
|
||||
# We sum up the SAP points of the default recommendations and calculate a new EPC category. This
|
||||
# category is then used to produce adjusted energy figures
|
||||
|
||||
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
||||
epc_energy_consumption=expected_heat_demand,
|
||||
current_epc_rating=property_instance.data["current-energy-rating"],
|
||||
)
|
||||
|
||||
heat_demand_change = (
|
||||
current_adjusted_energy - expected_adjusted_energy
|
||||
)
|
||||
|
||||
# update the recommendations
|
||||
# We need to totals for the representative recommendations
|
||||
representative_rec_data = [
|
||||
{
|
||||
"recommendation_id": r["recommendation_id"],
|
||||
"co2_equivalent_savings": r.get("co2_equivalent_savings"),
|
||||
"heat_demand": r.get("heat_demand"),
|
||||
"type": r["type"]
|
||||
} for r
|
||||
in representative_recs[property_id]
|
||||
]
|
||||
representative_rec_data = pd.DataFrame(representative_rec_data)
|
||||
# Suppress mechanical ventilation to have zero heat demand and co2
|
||||
representative_rec_data.loc[
|
||||
representative_rec_data["type"] == "mechanical_ventilation", "co2_equivalent_savings"
|
||||
] = 0
|
||||
representative_rec_data.loc[
|
||||
representative_rec_data["type"] == "mechanical_ventilation", "heat_demand"
|
||||
] = 0
|
||||
# Supress low energy lighting to have zero heat demand and co2 - this does not get affected by this process
|
||||
representative_rec_data.loc[
|
||||
representative_rec_data["type"] == "low_energy_lighting", "co2_equivalent_savings"
|
||||
] = 0
|
||||
representative_rec_data.loc[
|
||||
representative_rec_data["type"] == "low_energy_lighting", "heat_demand"
|
||||
] = 0
|
||||
|
||||
# Convert co2 and heat demand to proportions of their column sums
|
||||
representative_rec_data["co2_equivalent_savings_percent"] = (
|
||||
representative_rec_data["co2_equivalent_savings"] /
|
||||
representative_rec_data["co2_equivalent_savings"].sum()
|
||||
)
|
||||
|
||||
representative_rec_data["heat_demand_percent"] = (
|
||||
representative_rec_data["heat_demand"] / representative_rec_data["heat_demand"].sum()
|
||||
)
|
||||
|
||||
# We'll use the proportions to update the carbon and heat demand
|
||||
representative_rec_data["co2_equivalent_savings"] = (
|
||||
carbon_change * representative_rec_data["co2_equivalent_savings_percent"]
|
||||
)
|
||||
|
||||
representative_rec_data["heat_demand"] = (
|
||||
heat_demand_change * representative_rec_data["heat_demand_percent"]
|
||||
)
|
||||
|
||||
# Finally, insert these values into the final recommendations
|
||||
for rec in property_recommendations:
|
||||
if rec["type"] in ["external_wall_insulation", "internal_wall_insulation"]:
|
||||
change_data = representative_rec_data[
|
||||
representative_rec_data["type"].isin(["external_wall_insulation", "internal_wall_insulation"])
|
||||
]
|
||||
else:
|
||||
change_data = representative_rec_data[representative_rec_data["type"] == rec["type"]]
|
||||
|
||||
if rec["type"] == "mechanical_ventilation":
|
||||
rec["co2_equivalent_savings"] = 0
|
||||
rec["heat_demand"] = 0
|
||||
rec["energy_cost_savings"] = 0
|
||||
elif rec["type"] == "low_energy_lighting":
|
||||
# We do not convert, we just calculate energy cost savings
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["heat_demand"])
|
||||
continue
|
||||
else:
|
||||
rec["co2_equivalent_savings"] = change_data["co2_equivalent_savings"].values[0]
|
||||
rec["heat_demand"] = change_data["heat_demand"].values[0]
|
||||
# If the recommendation is solar, the savings are entirely in electricity
|
||||
if rec["type"] == "solar_pv":
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["heat_demand"])
|
||||
else:
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
|
||||
|
||||
# Update recommendations
|
||||
recommendations[property_id] = property_recommendations
|
||||
|
||||
# For expected adjust energy, we don't include mechanical ventilation so we'll add it back on
|
||||
mechanical_ventilation_rec = representative_rec_data[
|
||||
representative_rec_data["type"] == "mechanical_ventilation"
|
||||
]
|
||||
if not mechanical_ventilation_rec.empty:
|
||||
expected_adjusted_energy = (
|
||||
expected_adjusted_energy + mechanical_ventilation_rec["heat_demand"].values[0]
|
||||
)
|
||||
|
||||
property_instance.set_adjusted_energy(
|
||||
current_adjusted_energy=current_adjusted_energy,
|
||||
expected_adjusted_energy=expected_adjusted_energy
|
||||
)
|
||||
|
||||
# 1) the property data
|
||||
# 2) the property details (epc)
|
||||
# 3) the recommendations
|
||||
|
|
|
|||
|
|
@ -51,13 +51,14 @@ class PropertyValuation:
|
|||
KNIGHT_FRANK_MAPPING = [
|
||||
{"start": "D", "end": "C", "increase_percentage": 0.03},
|
||||
{"start": "D", "end": "B", "increase_percentage": 0.088},
|
||||
{"start": "D", "end": "A", "increase_percentage": 0.088},
|
||||
]
|
||||
|
||||
NATIONWIDE_MAPPING = [
|
||||
{"start": "G", "end": "D", "increase_percentage": 0.035},
|
||||
{"start": "F", "end": "D", "increase_percentage": 0.035},
|
||||
{"start": "D", "end": "B", "increase_percentage": 0.017},
|
||||
{"start": "D", "end": "A", "increase_percentage": 0.017},
|
||||
# {"start": "G", "end": "D", "increase_percentage": 0.035},
|
||||
# {"start": "F", "end": "D", "increase_percentage": 0.035},
|
||||
# {"start": "D", "end": "B", "increase_percentage": 0.017},
|
||||
# {"start": "D", "end": "A", "increase_percentage": 0.017},
|
||||
]
|
||||
|
||||
EPC_BANDS = ["G", "F", "E", "D", "C", "B", "A"]
|
||||
|
|
|
|||
|
|
@ -130,8 +130,14 @@ class ModelApi:
|
|||
)
|
||||
)
|
||||
|
||||
predictions_df["predictions"] = predictions_df["predictions"].astype(float).round(1)
|
||||
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
|
||||
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
|
||||
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
|
||||
# string split on phase= and then grab the second element of the resulting list. We could also use a
|
||||
# regular expression to do this but we use the string split method here, for safety.
|
||||
predictions_df['phase'] = predictions_df['recommendation_id'].str.split('phase=').str[1].str[0]
|
||||
# Convert back to int
|
||||
predictions_df['phase'] = predictions_df['phase'].astype(int)
|
||||
|
||||
predictions[model_prefix] = predictions_df
|
||||
|
||||
|
|
|
|||
|
|
@ -24,27 +24,6 @@ load_dotenv(ENV_FILE)
|
|||
|
||||
|
||||
class DataLoader:
|
||||
COLOUR_CONFIG = {
|
||||
"ha_1": {
|
||||
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
|
||||
},
|
||||
"ha_6": {
|
||||
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
|
||||
"survey_list": {
|
||||
"green": "FF92D050", "purple": "FF7030A0", "red": "FFFF0000", "blue": "FF00B0F0"
|
||||
}
|
||||
},
|
||||
"ha_14": {
|
||||
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
|
||||
},
|
||||
"ha_39": {
|
||||
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
|
||||
},
|
||||
"ha_107": {
|
||||
"asset_list": {"red": "FFFF0000", "green": "FF00B050"},
|
||||
}
|
||||
}
|
||||
|
||||
MIN_ROWS = {
|
||||
"ha_1": 2,
|
||||
"ha_6": 2,
|
||||
|
|
@ -53,12 +32,87 @@ class DataLoader:
|
|||
"ha_107": 2,
|
||||
}
|
||||
|
||||
COLUMN_CONFIG = {
|
||||
"ha_1": {
|
||||
"address": "Address",
|
||||
"postcode": "Address - Postcode"
|
||||
}
|
||||
}
|
||||
|
||||
def __init__(self, files, use_cache):
|
||||
self.files = files
|
||||
self.use_cache = use_cache
|
||||
|
||||
self.data = {}
|
||||
|
||||
def create_asset_list_matching_address(self, ha_name, asset_list):
|
||||
|
||||
if ha_name in ["ha_1", "ha_6"]:
|
||||
asset_list["matching_address"] = asset_list[
|
||||
self.COLUMN_CONFIG[ha_name]["address"]
|
||||
].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list[
|
||||
self.COLUMN_CONFIG[ha_name]["postcode"]
|
||||
].str.lower().str.strip()
|
||||
elif ha_name == "ha_14":
|
||||
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
|
||||
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 2"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 3"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 4"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Postcode"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
|
||||
elif ha_name == "ha_39":
|
||||
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
|
||||
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_3"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["post_code"].astype(str).str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
|
||||
elif ha_name == "ha_107":
|
||||
# Create matching_address by concatenating House No, Street, Town, District, Postcode
|
||||
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["Street"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Town"].str.lower().str.strip() + ", " + \
|
||||
asset_list["District"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Postcode"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
|
||||
else:
|
||||
raise NotImplementedError("implement me")
|
||||
|
||||
return asset_list
|
||||
|
||||
def append_asset_list_built_form(self, ha_name, asset_list):
|
||||
|
||||
# Finally, we process property_type or built form, where needed
|
||||
if ha_name == "ha_6":
|
||||
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
|
||||
|
||||
return asset_list
|
||||
|
||||
@staticmethod
|
||||
def create_asset_list_house_no(ha_name, asset_list):
|
||||
"""
|
||||
This function will append the House number onto the asset list
|
||||
:return:
|
||||
"""
|
||||
|
||||
if ha_name in ["ha_107"]:
|
||||
asset_list["HouseNo"] = asset_list["House No"].copy()
|
||||
else:
|
||||
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
|
||||
house_numbers = split_addresses[0].str.split(' ', expand=True)
|
||||
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
|
||||
# many columns there might be
|
||||
house_numbers = house_numbers.iloc[:, 0:1]
|
||||
house_numbers.columns = ['HouseNo']
|
||||
|
||||
asset_list = pd.concat([asset_list, house_numbers[["HouseNo"]]], axis=1)
|
||||
|
||||
return asset_list
|
||||
|
||||
def load_asset_list(self, file_path, ha_name, sheet_name=None):
|
||||
workbook = openpyxl.load_workbook(file_path)
|
||||
if sheet_name is not None:
|
||||
|
|
@ -87,74 +141,15 @@ class DataLoader:
|
|||
# Remove entirely empty roww - consider all rows apart from row_color
|
||||
asset_list = asset_list.loc[asset_list.loc[:, asset_list.columns != 'row_color'].notnull().any(axis=1)]
|
||||
|
||||
asset_list_colours = self.COLOUR_CONFIG[ha_name]["asset_list"]
|
||||
|
||||
asset_list["row_colour_name"] = np.where(
|
||||
asset_list["row_color"] == asset_list_colours["red"], "red",
|
||||
np.where(asset_list["row_color"] == asset_list_colours["green"], "green", "yellow")
|
||||
)
|
||||
|
||||
asset_list["row_meaning"] = np.where(
|
||||
asset_list["row_colour_name"] == "red", "does not meet criteria",
|
||||
np.where(
|
||||
asset_list["row_colour_name"] == "green", "identified potential eco works (CWI)", "maybe in the future"
|
||||
)
|
||||
)
|
||||
|
||||
# Add in asset_list_row_id
|
||||
asset_list["asset_list_row_id"] = [ha_name + str(i) for i in range(0, len(asset_list))]
|
||||
|
||||
# Prepare the asset list
|
||||
# Depending on the HA, we need to rename some columns
|
||||
if ha_name == "ha_1":
|
||||
asset_list["matching_address"] = asset_list["Address"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Address - Postcode"].str.lower().str.strip()
|
||||
elif ha_name == "ha_6":
|
||||
asset_list["matching_address"] = asset_list["propertyaddress"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Post Code"].str.lower().str.strip()
|
||||
elif ha_name == "ha_14":
|
||||
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
|
||||
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 2"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 3"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Address 4"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Postcode"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
|
||||
elif ha_name == "ha_39":
|
||||
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
|
||||
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_3"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["post_code"].astype(str).str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
|
||||
elif ha_name == "ha_107":
|
||||
# Create matching_address by concatenating House No, Street, Town, District, Postcode
|
||||
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
|
||||
asset_list["Street"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Town"].str.lower().str.strip() + ", " + \
|
||||
asset_list["District"].str.lower().str.strip() + ", " + \
|
||||
asset_list["Postcode"].str.lower().str.strip()
|
||||
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
|
||||
else:
|
||||
raise NotImplementedError("implement me")
|
||||
# Create matching address and matching postcode
|
||||
asset_list = self.create_asset_list_matching_address(ha_name=ha_name, asset_list=asset_list)
|
||||
|
||||
if ha_name in ["ha_107"]:
|
||||
asset_list["HouseNo"] = asset_list["House No"].copy()
|
||||
else:
|
||||
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
|
||||
house_numbers = split_addresses[0].str.split(' ', expand=True)
|
||||
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
|
||||
# many columns there might be
|
||||
house_numbers = house_numbers.iloc[:, 0:1]
|
||||
house_numbers.columns = ['HouseNo']
|
||||
asset_list = self.create_asset_list_house_no(ha_name=ha_name, asset_list=asset_list)
|
||||
|
||||
asset_list = pd.concat([asset_list, house_numbers[["HouseNo"]]], axis=1)
|
||||
|
||||
# Finally, we process property_type or built form, where needed
|
||||
if ha_name == "ha_6":
|
||||
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
|
||||
asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list)
|
||||
|
||||
return asset_list
|
||||
|
||||
|
|
@ -177,9 +172,7 @@ class DataLoader:
|
|||
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
|
||||
# Remove columns that are None
|
||||
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
|
||||
|
||||
survey_list["row_colour"] = survey_colors
|
||||
survey_list_colours = self.COLOUR_CONFIG[ha_name]["survey_list"]
|
||||
|
||||
# The survey list has 4 possible colours:
|
||||
# PURPLE - Installer advised install complete and a complimentary post works EPC has been completed.
|
||||
|
|
@ -1252,13 +1245,13 @@ def app():
|
|||
:return:
|
||||
"""
|
||||
|
||||
use_cache = True
|
||||
use_cache = False
|
||||
|
||||
files = {
|
||||
"ha_1": {
|
||||
"asset_list": {
|
||||
"filepath": "etl/eligibility/ha_15_32/HA 1 - ASSET LIST.xlsx",
|
||||
"sheetname": "HA 1"
|
||||
"filepath": "local_data/ha_data/HA1/ACCENT GROUP.xlsx",
|
||||
"sheetname": "Energy data"
|
||||
}
|
||||
},
|
||||
"ha_6": {
|
||||
|
|
|
|||
|
|
@ -50,9 +50,9 @@ scenario_properties = [
|
|||
"postcode": "NN1 5JY",
|
||||
"lmk-key": "1459796789102016070507274146560098",
|
||||
"measures": [
|
||||
[["internal_wall_insulation"], "11", None],
|
||||
[["external_wall_insulation"], "10", None],
|
||||
[["solar", "windows"], "12-15", {"photo_supply_ending": 50}],
|
||||
[["internal_wall_insulation"], "11", None, [0]],
|
||||
[["external_wall_insulation"], "10", None, [0]],
|
||||
[["solar", "windows"], "12-15", {"photo_supply_ending": 50}, [0, 1]],
|
||||
],
|
||||
},
|
||||
{
|
||||
|
|
@ -60,7 +60,7 @@ scenario_properties = [
|
|||
"postcode": "HP1 2HA",
|
||||
"lmk-key": "c14029235739827d5f627dc8aa9bb567d026b267e851e0db0001db24638667b1",
|
||||
"measures": [
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None],
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
|
||||
],
|
||||
},
|
||||
{
|
||||
|
|
@ -68,7 +68,7 @@ scenario_properties = [
|
|||
"postcode": "HP1 2HE",
|
||||
"lmk-key": "99296a6dda21314fef3a61cda59e441e9a2aacf115eb96f4a0fa85696bf7b117",
|
||||
"measures": [
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None],
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
|
||||
],
|
||||
},
|
||||
{
|
||||
|
|
@ -76,7 +76,7 @@ scenario_properties = [
|
|||
"postcode": "HP1 2AN",
|
||||
"lmk-key": "d1e0534be3a44c33003323b21d0e322e3daddc65b5ee71936f89c59ddab96b50",
|
||||
"measures": [
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None],
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
|
||||
],
|
||||
},
|
||||
{
|
||||
|
|
@ -84,7 +84,7 @@ scenario_properties = [
|
|||
"postcode": "HP1 2HX",
|
||||
"lmk-key": "1eae354db522a95188018d9cd0502ed8c609910b6c88f8797d3a25f59b11770a",
|
||||
"measures": [
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None],
|
||||
[["cavity_wall_insulation", "loft_insulation"], "15", None, [0, 1]],
|
||||
],
|
||||
},
|
||||
]
|
||||
|
|
@ -195,18 +195,18 @@ for scenario_property in scenario_properties:
|
|||
recommendation_record = p.base_difference_record.df.to_dict("records")[
|
||||
0
|
||||
].copy()
|
||||
for rec in combi:
|
||||
recommendation_record = p.create_recommendation_scoring_data(
|
||||
property_id=rec["type"],
|
||||
recommendation_record=recommendation_record,
|
||||
recommendation=rec,
|
||||
)
|
||||
recommendation_record = p.create_recommendation_scoring_data(
|
||||
property_id=i,
|
||||
primary_recommendation_id=i,
|
||||
recommendation_record=recommendation_record,
|
||||
recommendations=combi,
|
||||
)
|
||||
|
||||
if override is not None:
|
||||
for key, value in override.items():
|
||||
recommendation_record[key] = value
|
||||
|
||||
recommendation_record["id"] = "+".join(measure) + "+" + str(i)
|
||||
recommendation_record["id"] = "&".join(measure) + "+" + str(i)
|
||||
recommendation_record["impact"] = impact
|
||||
scoring_list.append(recommendation_record)
|
||||
|
||||
|
|
@ -230,6 +230,20 @@ recommendations_scoring_data.insert(0, "impact", impact_col)
|
|||
id_col = recommendations_scoring_data.pop("id")
|
||||
recommendations_scoring_data.insert(0, "id", id_col)
|
||||
|
||||
from backend.ml_models.api import ModelApi
|
||||
|
||||
model_api = ModelApi(portfolio_id="generate-scenarios-data", timestamp=created_at)
|
||||
|
||||
all_predictions = model_api.predict_all(
|
||||
df=recommendations_scoring_data,
|
||||
bucket=get_settings().DATA_BUCKET,
|
||||
prediction_buckets={
|
||||
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
|
||||
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
|
||||
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
|
||||
}
|
||||
)
|
||||
|
||||
save_dataframe_to_s3_parquet(
|
||||
recommendations_scoring_data,
|
||||
"retrofit-data-dev",
|
||||
|
|
|
|||
2170
etl/testing_data/sap_model_simulation.py
Normal file
2170
etl/testing_data/sap_model_simulation.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -37,6 +37,9 @@ MCS_SOLAR_PV_COST_DATA = {
|
|||
"average_cost_per_kwh-Northern Ireland": 2126.09,
|
||||
}
|
||||
|
||||
# This is based on quotes from installers
|
||||
BATTERY_COST = 3500
|
||||
|
||||
|
||||
class Costs:
|
||||
"""
|
||||
|
|
@ -835,7 +838,7 @@ class Costs:
|
|||
"labour_days": labour_days
|
||||
}
|
||||
|
||||
def solar_pv(self, wattage: float):
|
||||
def solar_pv(self, wattage: float, has_battery: bool = False):
|
||||
|
||||
"""
|
||||
Calculates the total cost for solar PV based data provided by the MCS dashboard, which contains
|
||||
|
|
@ -847,8 +850,8 @@ class Costs:
|
|||
|
||||
Price can also be benchmarked against this checkatrade article:
|
||||
https://www.checkatrade.com/blog/cost-guides/cost-of-solar-panel-installation/
|
||||
:param wattage: Peak wattage of the solar PV system
|
||||
:return:
|
||||
:param wattage: Peak wattage of the solar PV system]
|
||||
:param has_battery: Bool, whether the system includes a battery
|
||||
"""
|
||||
|
||||
# Get the cost data relevant to the region
|
||||
|
|
@ -857,7 +860,12 @@ class Costs:
|
|||
kw = wattage / 1000
|
||||
total_cost = kw * regional_cost
|
||||
|
||||
if has_battery:
|
||||
# The battery cost is based on the £3500 quote, recieved from installers
|
||||
total_cost += BATTERY_COST
|
||||
|
||||
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
|
||||
|
||||
vat = total_cost - subtotal_before_vat
|
||||
|
||||
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class FireplaceRecommendations(Definitions):
|
|||
self.has_ventilaion = None
|
||||
self.recommendation = None
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase=0):
|
||||
"""
|
||||
Based on the number of open fireplcaes found, we recommend sealing each one at a cost of
|
||||
around £500
|
||||
|
|
@ -37,6 +37,7 @@ class FireplaceRecommendations(Definitions):
|
|||
# We recommend installing two mechanical ventilation systems
|
||||
self.recommendation = [
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [],
|
||||
"type": "sealing_open_fireplace",
|
||||
"description": "Seal %s open fireplaces" % str(number_open_fireplaces),
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ class FloorRecommendations(Definitions):
|
|||
# TODO: To be completed
|
||||
self.exposed_floor_non_insulation_materials = []
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase=0):
|
||||
u_value = self.property.floor["thermal_transmittance"]
|
||||
|
||||
property_type = self.property.data["property-type"]
|
||||
|
|
@ -118,6 +118,7 @@ class FloorRecommendations(Definitions):
|
|||
if self.property.floor["is_suspended"]:
|
||||
# Given the U-value, we recommend underfloor insulation
|
||||
self.recommend_floor_insulation(
|
||||
phase=phase,
|
||||
u_value=u_value,
|
||||
insulation_materials=self.suspended_floor_insulation_materials,
|
||||
non_insulation_materials=self.suspended_floor_non_insulation_materials
|
||||
|
|
@ -129,7 +130,8 @@ class FloorRecommendations(Definitions):
|
|||
self.recommend_floor_insulation(
|
||||
u_value=u_value,
|
||||
insulation_materials=self.solid_floor_insulation_materials,
|
||||
non_insulation_materials=self.solid_floor_non_insulation_materials
|
||||
non_insulation_materials=self.solid_floor_non_insulation_materials,
|
||||
phase=phase
|
||||
)
|
||||
return
|
||||
|
||||
|
|
@ -156,7 +158,7 @@ class FloorRecommendations(Definitions):
|
|||
|
||||
raise ValueError("Invalid material type - implement me!")
|
||||
|
||||
def recommend_floor_insulation(self, u_value, insulation_materials, non_insulation_materials):
|
||||
def recommend_floor_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
|
||||
"""
|
||||
This method is tasked with estimating the impact of performing suspended floor insulation
|
||||
:return:
|
||||
|
|
@ -198,6 +200,7 @@ class FloorRecommendations(Definitions):
|
|||
|
||||
self.recommendations.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [
|
||||
get_recommended_part(
|
||||
part=material.to_dict(),
|
||||
|
|
|
|||
|
|
@ -4,6 +4,9 @@ from recommendations.Costs import Costs
|
|||
|
||||
|
||||
class LightingRecommendations:
|
||||
# We introduce a SAP limit to lighting, which is based on empirical findings. We do see cases where lighting is
|
||||
# worth more than 2 points, but this is unlikely in the context of other upgrades that can be made to the property
|
||||
SAP_LIMIT = 2
|
||||
|
||||
def __init__(self, property_instance: Property, materials: List):
|
||||
"""
|
||||
|
|
@ -51,7 +54,7 @@ class LightingRecommendations:
|
|||
|
||||
return total_energy_savings_per_year, carbon_reduction_tonnes
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase=0):
|
||||
"""
|
||||
This method will check if there are any lighting fittings that aren't low energy.
|
||||
|
||||
|
|
@ -90,6 +93,7 @@ class LightingRecommendations:
|
|||
|
||||
self.recommendation = [
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [],
|
||||
"type": "low_energy_lighting",
|
||||
"description": description,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
import numpy as np
|
||||
|
||||
from backend.Property import Property
|
||||
from typing import List
|
||||
from itertools import groupby
|
||||
from recommendations.FloorRecommendations import FloorRecommendations
|
||||
from recommendations.WallRecommendations import WallRecommendations
|
||||
from recommendations.RoofRecommendations import RoofRecommendations
|
||||
|
|
@ -44,57 +47,124 @@ class Recommendations:
|
|||
|
||||
"""
|
||||
This method runs the recommendations for the individual measures and then appends them to a list for output
|
||||
|
||||
The recommendations are implemented in order of suggested phase, from fabric first to heating systems, to
|
||||
renewables.
|
||||
:return:
|
||||
"""
|
||||
|
||||
property_recommendations = []
|
||||
|
||||
# Floor recommendations
|
||||
self.floor_recommender.recommend()
|
||||
if self.floor_recommender.recommendations:
|
||||
property_recommendations.append(self.floor_recommender.recommendations)
|
||||
|
||||
phase = 0
|
||||
# Wall recommendations
|
||||
self.wall_recomender.recommend()
|
||||
self.wall_recomender.recommend(phase=phase)
|
||||
if self.wall_recomender.recommendations:
|
||||
property_recommendations.append(self.wall_recomender.recommendations)
|
||||
|
||||
# Roof recommendations
|
||||
self.roof_recommender.recommend()
|
||||
if self.roof_recommender.recommendations:
|
||||
property_recommendations.append(self.roof_recommender.recommendations)
|
||||
phase += 1
|
||||
|
||||
# Ventilation recommendations
|
||||
# We only produce a ventilation recommendation if the property is recommended to have wall or roof insulation
|
||||
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this has no
|
||||
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we have any
|
||||
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
|
||||
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
|
||||
self.ventilation_recomender.recommend()
|
||||
if self.ventilation_recomender.recommendation:
|
||||
property_recommendations.append(self.ventilation_recomender.recommendation)
|
||||
|
||||
# Fireplace sealing recommendations
|
||||
self.fireplace_recommender.recommend()
|
||||
if self.fireplace_recommender.recommendation:
|
||||
property_recommendations.append(self.fireplace_recommender.recommendation)
|
||||
# Roof recommendations
|
||||
self.roof_recommender.recommend(phase=phase)
|
||||
if self.roof_recommender.recommendations:
|
||||
property_recommendations.append(self.roof_recommender.recommendations)
|
||||
phase += 1
|
||||
|
||||
# Lighting recommendations
|
||||
self.lighting_recommender.recommend()
|
||||
if self.lighting_recommender.recommendation:
|
||||
property_recommendations.append(self.lighting_recommender.recommendation)
|
||||
# Floor recommendations
|
||||
self.floor_recommender.recommend(phase=phase)
|
||||
if self.floor_recommender.recommendations:
|
||||
property_recommendations.append(self.floor_recommender.recommendations)
|
||||
phase += 1
|
||||
|
||||
# Windows recommendations
|
||||
self.windows_recommender.recommend()
|
||||
self.windows_recommender.recommend(phase=phase)
|
||||
if self.windows_recommender.recommendation:
|
||||
property_recommendations.append(self.windows_recommender.recommendation)
|
||||
phase += 1
|
||||
|
||||
# Fireplace sealing recommendations
|
||||
self.fireplace_recommender.recommend(phase=phase)
|
||||
if self.fireplace_recommender.recommendation:
|
||||
property_recommendations.append(self.fireplace_recommender.recommendation)
|
||||
phase += 1
|
||||
|
||||
# Lighting recommendations
|
||||
self.lighting_recommender.recommend(phase=phase)
|
||||
if self.lighting_recommender.recommendation:
|
||||
property_recommendations.append(self.lighting_recommender.recommendation)
|
||||
phase += 1
|
||||
|
||||
# Solar recommendations
|
||||
self.solar_recommender.recommend()
|
||||
self.solar_recommender.recommend(phase=phase)
|
||||
if self.solar_recommender.recommendation:
|
||||
property_recommendations.append(self.solar_recommender.recommendation)
|
||||
phase += 1
|
||||
|
||||
# We insert temporary ids into the recommendations which is important for the optimiser later
|
||||
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)
|
||||
|
||||
return property_recommendations
|
||||
# We also need to create the representative recommendations for each recommendation type
|
||||
property_representative_recommendations = self.create_representative_recommendations(property_recommendations)
|
||||
|
||||
return property_recommendations, property_representative_recommendations
|
||||
|
||||
@staticmethod
|
||||
def create_representative_recommendations(property_recommendations):
|
||||
"""
|
||||
This method will create a representative recommendation for each recommendation type
|
||||
In order to create a representative recommendation, we choose the recommendation that has:
|
||||
1) Where a U-value is available, has the best U-value to cost ratio
|
||||
2) Where SAP points are available, has the best SAP points to cost ratio
|
||||
|
||||
We don't include mechanical ventilation in the representative recommendations, since we don't attribute a
|
||||
SAP impact to this recommendation
|
||||
:return:
|
||||
"""
|
||||
property_representative_recommendations = []
|
||||
|
||||
for recommendations_by_type in property_recommendations:
|
||||
|
||||
if recommendations_by_type[0].get("type") == "mechanical_ventilation":
|
||||
continue
|
||||
|
||||
has_u_value = recommendations_by_type[0].get("new_u_value") is not None
|
||||
has_sap_points = recommendations_by_type[0].get("sap_points") is not None
|
||||
|
||||
# When check if these recommendations have two different types, such as solid wall insulation
|
||||
# If we have multiple types, we group by type and then select the best recommendation for each type
|
||||
|
||||
recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"])
|
||||
representative_recommendations = []
|
||||
for type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):
|
||||
recommendations = list(recommendations)
|
||||
# We also create an efficiency key, which is used to sort the recommendations
|
||||
if has_u_value:
|
||||
# We sort by the cost per U-value improvement - the lower the better
|
||||
for rec in recommendations:
|
||||
rec["efficiency"] = rec["total"] / rec["starting_u_value"] - rec["new_u_value"]
|
||||
elif not has_u_value and has_sap_points:
|
||||
# Sort the options by the cost per SAP point improvement - the lower the better
|
||||
for rec in recommendations:
|
||||
rec["efficiency"] = rec["total"] / rec["sap_points"]
|
||||
else:
|
||||
# Sort the options by cost - the lower the better
|
||||
for rec in recommendations:
|
||||
rec["efficiency"] = rec["total"]
|
||||
|
||||
recommendations.sort(
|
||||
key=lambda x: x["efficiency"]
|
||||
)
|
||||
representative_recommendations.append(recommendations[0])
|
||||
property_representative_recommendations.extend(representative_recommendations)
|
||||
|
||||
return property_representative_recommendations
|
||||
|
||||
@staticmethod
|
||||
def insert_temp_recommendation_id(property_recommendations):
|
||||
|
|
@ -110,7 +180,7 @@ class Recommendations:
|
|||
|
||||
for recs in property_recommendations:
|
||||
for rec in recs:
|
||||
rec["recommendation_id"] = idx
|
||||
rec["recommendation_id"] = f"{str(idx)}_phase={str(rec['phase'])}"
|
||||
idx += 1
|
||||
|
||||
return property_recommendations
|
||||
|
|
@ -140,11 +210,44 @@ class Recommendations:
|
|||
|
||||
property_recommendations = recommendations[property_instance.id].copy()
|
||||
|
||||
# We calculate the impact by phase
|
||||
sap_phase_impact = property_sap_predictions.groupby("phase")["predictions"].median().reset_index()
|
||||
heat_phase_impact = property_heat_predictions.groupby("phase")["predictions"].median().reset_index()
|
||||
carbon_phase_impact = property_carbon_predictions.groupby("phase")["predictions"].median().reset_index()
|
||||
|
||||
# The heat demand change is the difference between the starting heat demand and the value at the final phase
|
||||
expected_heat_demand = property_instance.floor_area * (
|
||||
heat_phase_impact[heat_phase_impact["phase"] == max(heat_phase_impact["phase"])]["predictions"].values[0]
|
||||
)
|
||||
starting_heat_demand = (
|
||||
float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area
|
||||
)
|
||||
|
||||
# This is the unadjusted resulting heat demand
|
||||
predicted_heat_demand_change = starting_heat_demand - expected_heat_demand
|
||||
|
||||
# We don't want to adjust the heat demand for mechanical ventilation so we add it back on
|
||||
|
||||
# We adjust the heat demand figures to align to the UCL paper
|
||||
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
||||
epc_energy_consumption=starting_heat_demand,
|
||||
current_epc_rating=property_instance.data["current-energy-rating"],
|
||||
)
|
||||
|
||||
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
||||
epc_energy_consumption=expected_heat_demand,
|
||||
current_epc_rating=property_instance.data["current-energy-rating"],
|
||||
)
|
||||
|
||||
adjusted_heat_demand_change = (
|
||||
current_adjusted_energy - expected_adjusted_energy
|
||||
)
|
||||
|
||||
for recommendations_by_type in property_recommendations:
|
||||
for rec in recommendations_by_type:
|
||||
|
||||
# We don't use the model for low energy lighting at the moment
|
||||
if rec["type"] == "low_energy_lighting":
|
||||
if rec["type"] == "mechanical_ventilation":
|
||||
# We don't have a percieved sap impact of mechanical ventilation
|
||||
continue
|
||||
|
||||
new_heat_demand = property_heat_predictions[property_heat_predictions["recommendation_id"] == str(
|
||||
|
|
@ -158,26 +261,60 @@ class Recommendations:
|
|||
new_sap = property_sap_predictions[property_sap_predictions["recommendation_id"] == str(
|
||||
rec["recommendation_id"]
|
||||
)]["predictions"].values[0]
|
||||
rec["sap_points"] = new_sap - float(property_instance.data["current-energy-efficiency"])
|
||||
|
||||
if rec["type"] == "mechanical_ventilation":
|
||||
if rec["phase"] == 0:
|
||||
predicted_sap_points = new_sap - float(property_instance.data["current-energy-efficiency"])
|
||||
predicted_co2_savings = float(property_instance.data["co2-emissions-current"]) - new_carbon
|
||||
predicted_heat_demand = property_instance.floor_area * (
|
||||
float(property_instance.data["energy-consumption-current"]) - new_heat_demand
|
||||
)
|
||||
else:
|
||||
previous_phase = rec["phase"] - 1
|
||||
predicted_sap_points = (
|
||||
new_sap - sap_phase_impact[sap_phase_impact["phase"] == previous_phase]["predictions"].values[0]
|
||||
)
|
||||
predicted_co2_savings = (
|
||||
carbon_phase_impact[carbon_phase_impact["phase"] == previous_phase]["predictions"].values[0] -
|
||||
new_carbon
|
||||
)
|
||||
predicted_heat_demand = property_instance.floor_area * (
|
||||
heat_phase_impact[heat_phase_impact["phase"] == previous_phase]["predictions"].values[0] -
|
||||
new_heat_demand
|
||||
)
|
||||
|
||||
if rec["type"] == "low_energy_lighting":
|
||||
# For the moment, we cap the number of SAP points that can be achieved by ventilation at 2
|
||||
rec["sap_points"] = min(rec["sap_points"], VentilationRecommendations.SAP_LIMIT)
|
||||
rec["sap_points"] = min(predicted_sap_points, LightingRecommendations.SAP_LIMIT)
|
||||
rec["co2_equivalent_savings"] = min(predicted_co2_savings, rec["co2_equivalent_savings"])
|
||||
rec["heat_demand"] = min(predicted_heat_demand, rec["heat_demand"])
|
||||
else:
|
||||
rec["sap_points"] = predicted_sap_points
|
||||
rec["co2_equivalent_savings"] = predicted_co2_savings
|
||||
rec["heat_demand"] = predicted_heat_demand
|
||||
|
||||
# Round to 2 decimal places
|
||||
rec["sap_points"] = round(rec["sap_points"], 2)
|
||||
rec["co2_equivalent_savings"] = float(property_instance.data["co2-emissions-current"]) - new_carbon
|
||||
|
||||
# Energy consumption current is per meter squared, so we need to multiply by the floor area to get
|
||||
# an absolute figure for the home
|
||||
rec["heat_demand"] = (
|
||||
(float(property_instance.data["energy-consumption-current"]) - new_heat_demand
|
||||
) * property_instance.floor_area)
|
||||
# We now calculate the adjusted heat demand for this recommendation, which is simply the percentage
|
||||
# of the total adjusted heat demand change. The percentage we use is this recommendation's percentage
|
||||
# of the total heat demand per square meter change
|
||||
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
|
||||
rec["adjusted_heat_demand"] = adjusted_heat_demand_change * (
|
||||
rec["heat_demand"] / predicted_heat_demand_change
|
||||
)
|
||||
# We make sure this is NOT below 0
|
||||
rec["adjusted_heat_demand"] = max(0, rec["adjusted_heat_demand"])
|
||||
|
||||
# Depending on the property's tarriff, we calculate the amount of energy savings this measure will bring
|
||||
if property_instance.energy_source == "electricity":
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate_electric(rec["adjusted_heat_demand"])
|
||||
elif property_instance.energy_source == "electricity_and_gas":
|
||||
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["adjusted_heat_demand"])
|
||||
else:
|
||||
raise ValueError("Invalid value for energy source")
|
||||
|
||||
if (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or (
|
||||
rec["heat_demand"] is None) or (rec["energy_cost_savings"] is None):
|
||||
raise ValueError("sap points, co2 or heat demand is missing")
|
||||
|
||||
return property_recommendations
|
||||
return property_recommendations, current_adjusted_energy, expected_adjusted_energy
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ class RoofRecommendations:
|
|||
]
|
||||
]
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase):
|
||||
|
||||
if self.property.roof["has_dwelling_above"]:
|
||||
return
|
||||
|
|
@ -98,11 +98,11 @@ class RoofRecommendations:
|
|||
return
|
||||
|
||||
if self.property.roof["is_pitched"] or self.property.roof["is_flat"]:
|
||||
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof)
|
||||
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase)
|
||||
return
|
||||
|
||||
if self.property.roof["is_roof_room"]:
|
||||
self.recommend_room_roof_insulation(u_value)
|
||||
self.recommend_room_roof_insulation(u_value, phase)
|
||||
return
|
||||
|
||||
raise NotImplementedError("Implement me")
|
||||
|
|
@ -124,7 +124,7 @@ class RoofRecommendations:
|
|||
raise ValueError("Invalid material type")
|
||||
|
||||
def recommend_roof_insulation(
|
||||
self, u_value, insulation_thickness, roof
|
||||
self, u_value, insulation_thickness, roof, phase
|
||||
):
|
||||
|
||||
"""
|
||||
|
|
@ -217,6 +217,7 @@ class RoofRecommendations:
|
|||
|
||||
recommendations.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [
|
||||
get_recommended_part(
|
||||
part=material.to_dict(),
|
||||
|
|
@ -236,7 +237,7 @@ class RoofRecommendations:
|
|||
|
||||
self.recommendations = recommendations
|
||||
|
||||
def recommend_room_roof_insulation(self, u_value):
|
||||
def recommend_room_roof_insulation(self, u_value, phase):
|
||||
"""
|
||||
This method recommends room in roof insulation for properties that have been identified
|
||||
to possess a room in roof.
|
||||
|
|
@ -314,6 +315,7 @@ class RoofRecommendations:
|
|||
|
||||
recommendations.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [
|
||||
get_recommended_part(
|
||||
part=material,
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class SolarPvRecommendations:
|
|||
|
||||
self.recommendation = []
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase):
|
||||
"""
|
||||
We check if a property is potentially suitable for solar PV based on the following criteria:
|
||||
- The property is a house or bungalow
|
||||
|
|
@ -39,30 +39,54 @@ class SolarPvRecommendations:
|
|||
if not is_valid_property_type or not is_valid_roof_type or not has_no_existing_solar_pv:
|
||||
return
|
||||
|
||||
# We now have a property which is potentially suitable for solar PV
|
||||
number_solar_panels = np.floor(self.property.solar_pv_roof_area / self.SOLAR_PANEL_AREA)
|
||||
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
|
||||
|
||||
roof_coverage_percent = round(self.property.solar_pv_percentage * 100)
|
||||
|
||||
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
|
||||
# of solar PV installations
|
||||
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage)
|
||||
|
||||
kw = np.floor(solar_panel_wattage / 100) / 10
|
||||
|
||||
self.recommendation = [
|
||||
{
|
||||
"parts": [],
|
||||
"type": "solar_pv",
|
||||
"description": f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
|
||||
f"{roof_coverage_percent}% the roof",
|
||||
"starting_u_value": None,
|
||||
"new_u_value": None,
|
||||
"sap_points": None,
|
||||
**cost_result,
|
||||
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
|
||||
# back up here
|
||||
"photo_supply": 100 * self.property.solar_pv_percentage
|
||||
}
|
||||
# For the solar recommendations, we produce the following scenarios:
|
||||
# 1) Solar panels only, we present a high, medium and low coverage
|
||||
# 2) With and without battery
|
||||
roof_coverage_scenarios = [
|
||||
self.property.solar_pv_percentage - 0.1, self.property.solar_pv_percentage,
|
||||
self.property.solar_pv_percentage + 0.1
|
||||
]
|
||||
# We make sure we haven't gone too low or high
|
||||
roof_coverage_scenarios = [v for v in roof_coverage_scenarios if 0 <= v <= 1]
|
||||
battery_scenarios = [False, True]
|
||||
|
||||
# I now produce the cross product of the scenarios
|
||||
scenarios = [(roof, battery) for roof in roof_coverage_scenarios for battery in battery_scenarios]
|
||||
|
||||
for roof_coverage, has_battery in scenarios:
|
||||
# We now have a property which is potentially suitable for solar PV
|
||||
solar_pv_roof_area = self.property.get_solar_pv_roof_area(roof_coverage)
|
||||
|
||||
number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA)
|
||||
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
|
||||
|
||||
roof_coverage_percent = round(roof_coverage * 100)
|
||||
|
||||
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
|
||||
# of solar PV installations
|
||||
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=has_battery)
|
||||
|
||||
kw = np.floor(solar_panel_wattage / 100) / 10
|
||||
|
||||
if has_battery:
|
||||
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
|
||||
f"{round(roof_coverage_percent)}% the roof, with a battery storage system.")
|
||||
else:
|
||||
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p"
|
||||
f"anel system on {round(roof_coverage_percent)}% the roof.")
|
||||
|
||||
self.recommendation.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [],
|
||||
"type": "solar_pv",
|
||||
"description": description,
|
||||
"starting_u_value": None,
|
||||
"new_u_value": None,
|
||||
"sap_points": None,
|
||||
**cost_result,
|
||||
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
|
||||
# back up here
|
||||
"photo_supply": 100 * roof_coverage
|
||||
}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -15,9 +15,6 @@ class VentilationRecommendations(Definitions):
|
|||
'mechanical, supply and extract'
|
||||
]
|
||||
|
||||
# We introduce a SAP limit, to prevent over-predicting the SAP impact of mechanical ventilation
|
||||
SAP_LIMIT = 2
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
property_instance: Property,
|
||||
|
|
@ -62,12 +59,17 @@ class VentilationRecommendations(Definitions):
|
|||
# We recommend installing two mechanical ventilation systems
|
||||
self.recommendation = [
|
||||
{
|
||||
"phase": None,
|
||||
"parts": part,
|
||||
"type": part[0]["type"],
|
||||
"description": f"Install {n_units} {part[0]['description']} units",
|
||||
"starting_u_value": None,
|
||||
"new_u_value": None,
|
||||
"sap_points": None,
|
||||
"sap_points": 0,
|
||||
"heat_demand": 0,
|
||||
"adjusted_heat_demand": 0,
|
||||
"co2_equivalent_savings": 0,
|
||||
"energy_cost_savings": 0,
|
||||
"total": estimated_cost,
|
||||
# We use a very simple and rough estimate of 4 hours per unit
|
||||
"labour_hours": 4 * n_units,
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class WallRecommendations(Definitions):
|
|||
|
||||
return True
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase=0):
|
||||
# if building built after 1990 + we're able to identify U-value +
|
||||
# U-value less than 0.18 and if in or close to a conversation area,
|
||||
# recommend internal wall insulation as a possible measure
|
||||
|
|
@ -146,19 +146,19 @@ class WallRecommendations(Definitions):
|
|||
if is_cavity_wall:
|
||||
if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
|
||||
# Test filling cavity
|
||||
self.find_cavity_insulation(u_value, insulation_thickness)
|
||||
self.find_cavity_insulation(u_value, insulation_thickness, phase)
|
||||
|
||||
return
|
||||
|
||||
# Remaining wall types are treated with IWI or EWI
|
||||
if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
|
||||
self.find_insulation(u_value)
|
||||
self.find_insulation(u_value, phase)
|
||||
return
|
||||
|
||||
# If the u-value is within regulations, we don't do anything
|
||||
return
|
||||
|
||||
def find_cavity_insulation(self, u_value, insulation_thickness):
|
||||
def find_cavity_insulation(self, u_value, insulation_thickness, phase):
|
||||
"""
|
||||
This method tests different materials to fill the cavity wall, determining which
|
||||
material will give us the best U-value.
|
||||
|
|
@ -210,6 +210,7 @@ class WallRecommendations(Definitions):
|
|||
|
||||
recommendations.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [
|
||||
get_recommended_part(
|
||||
part=material.to_dict(),
|
||||
|
|
@ -229,7 +230,7 @@ class WallRecommendations(Definitions):
|
|||
|
||||
self.recommendations = recommendations
|
||||
|
||||
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials):
|
||||
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
|
||||
|
||||
lowest_selected_u_value = None
|
||||
recommendations = []
|
||||
|
|
@ -274,6 +275,7 @@ class WallRecommendations(Definitions):
|
|||
|
||||
recommendations.append(
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [
|
||||
get_recommended_part(
|
||||
part=material.to_dict(),
|
||||
|
|
@ -293,7 +295,7 @@ class WallRecommendations(Definitions):
|
|||
|
||||
return recommendations
|
||||
|
||||
def find_insulation(self, u_value):
|
||||
def find_insulation(self, u_value, phase):
|
||||
"""
|
||||
This function contains the logic for finding potential insulation measures for a property, depending
|
||||
on the parts available and whether the property can have external wall insulation installed
|
||||
|
|
@ -310,13 +312,15 @@ class WallRecommendations(Definitions):
|
|||
ewi_recommendations = self._find_insulation(
|
||||
u_value=u_value,
|
||||
insulation_materials=pd.DataFrame(self.external_wall_insulation_materials),
|
||||
non_insulation_materials=self.external_wall_non_insulation_materials
|
||||
non_insulation_materials=self.external_wall_non_insulation_materials,
|
||||
phase=phase
|
||||
)
|
||||
|
||||
iwi_recommendations = self._find_insulation(
|
||||
u_value=u_value,
|
||||
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),
|
||||
non_insulation_materials=self.internal_wall_non_insulation_materials
|
||||
non_insulation_materials=self.internal_wall_non_insulation_materials,
|
||||
phase=phase
|
||||
)
|
||||
|
||||
self.recommendations += ewi_recommendations + iwi_recommendations
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class WindowsRecommendations:
|
|||
raise ValueError("There should only be one window glazing material")
|
||||
self.glazing_material = self.glazing_material[0]
|
||||
|
||||
def recommend(self):
|
||||
def recommend(self, phase=0):
|
||||
"""
|
||||
This method will recommend the best possible glazing options for a property.
|
||||
|
||||
|
|
@ -85,6 +85,7 @@ class WindowsRecommendations:
|
|||
|
||||
self.recommendation = [
|
||||
{
|
||||
"phase": phase,
|
||||
"parts": [],
|
||||
"type": "windows_glazing",
|
||||
"description": description,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue