mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1102 lines
44 KiB
Python
1102 lines
44 KiB
Python
import os
|
|
import ast
|
|
from itertools import groupby
|
|
import pandas as pd
|
|
|
|
from etl.epc.Dataset import TrainingDataset
|
|
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES
|
|
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
|
|
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet
|
|
from etl.epc.settings import DATA_ANOMALY_MATCHES
|
|
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
|
|
from recommendations.recommendation_utils import (
|
|
estimate_perimeter,
|
|
get_wall_type,
|
|
estimate_external_wall_area,
|
|
esimtate_pitched_roof_area,
|
|
estimate_windows,
|
|
)
|
|
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
|
|
|
|
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
|
|
DATA_BUCKET = os.environ.get(
|
|
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
|
|
)
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class Property:
|
|
ATTRIBUTE_MAP = {
|
|
"floor-description": "floor",
|
|
"hotwater-description": "hotwater",
|
|
"main-fuel": "main_fuel",
|
|
"mainheat-description": "main_heating",
|
|
"mainheatcont-description": "main_heating_controls",
|
|
"roof-description": "roof",
|
|
"walls-description": "walls",
|
|
"windows-description": "windows",
|
|
"lighting-description": "lighting",
|
|
}
|
|
|
|
floor = None
|
|
hotwater = None
|
|
main_fuel = None
|
|
main_heating = None
|
|
main_heating_controls = None
|
|
roof = None
|
|
walls = None
|
|
windows = None
|
|
lighting = None
|
|
energy_source = None
|
|
|
|
spatial = None
|
|
base_difference_record = None
|
|
|
|
DATA_ANOMALY_MATCHES = DATA_ANOMALY_MATCHES
|
|
|
|
# Surplus information, that can be provided as optional inputs, by a customer
|
|
n_bathrooms = None
|
|
n_bedrooms = None
|
|
|
|
def __init__(
|
|
self,
|
|
id,
|
|
postcode,
|
|
address,
|
|
epc_record,
|
|
already_installed=None,
|
|
non_invasive_recommendations=None,
|
|
measures=None,
|
|
**kwargs
|
|
):
|
|
|
|
self.epc_record = epc_record
|
|
|
|
self.id = id
|
|
|
|
self.address = address
|
|
self.postcode = postcode
|
|
self.data = {
|
|
k.replace("_", "-"): v for k, v in epc_record.get("prepared_epc").items()
|
|
}
|
|
self.old_data = epc_record.get("old_data")
|
|
self.property_dimensions = None
|
|
# This is a list of measures that have already been installed in the property, typically found as a result
|
|
# of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the
|
|
# cost and instead, provide a message that the measure has already been installed
|
|
|
|
self.already_installed = ast.literal_eval(already_installed['already_installed']) if already_installed else []
|
|
self.non_invasive_recommendations = (
|
|
ast.literal_eval(non_invasive_recommendations['recommendations']) if
|
|
non_invasive_recommendations else []
|
|
)
|
|
# This is a list of measures that have been recommended for the property
|
|
if isinstance(measures, list):
|
|
self.measures = measures
|
|
else:
|
|
self.measures = ast.literal_eval(measures) if measures else None
|
|
|
|
self.uprn = epc_record.get("uprn")
|
|
self.full_sap_epc = epc_record.get("full_sap_epc")
|
|
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
|
|
self.restricted_measures = False
|
|
self.year_built = epc_record.get("year_built")
|
|
self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms")
|
|
self.age_band = epc_record.get("age_band")
|
|
self.construction_age_band = epc_record.get("construction_age_band")
|
|
self.number_of_floors = epc_record.get("number_of_floors")
|
|
self.perimeter = None
|
|
self.wall_type = None
|
|
self.floor_type = None
|
|
|
|
self.energy = {
|
|
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
|
|
"co2_emissions": epc_record.get("co2_emissions_current"),
|
|
}
|
|
self.ventilation = {
|
|
"ventilation": epc_record.get("mechanical_ventilation"),
|
|
}
|
|
self.solar_pv = {
|
|
"solar_pv": epc_record.get("photo_supply"),
|
|
}
|
|
self.solar_hot_water = {
|
|
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
|
|
"solar_hot_water_boolean": epc_record.get("solar_water_heating_flag_bool"),
|
|
}
|
|
self.wind_turbine = {
|
|
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
|
|
}
|
|
self.number_of_open_fireplaces = {
|
|
"number_of_open_fireplaces": epc_record.prepared_epc.get(
|
|
"number_open_fireplaces"
|
|
),
|
|
}
|
|
self.number_of_extensions = {
|
|
"number_of_extensions": epc_record.prepared_epc.get("extension_count"),
|
|
}
|
|
self.number_of_storeys = {
|
|
"number_of_storeys": epc_record.prepared_epc.get("flat_storey_count"),
|
|
}
|
|
self.heat_loss_corridor = {
|
|
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
|
|
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
|
|
"heat_loss_corridor_boolean": epc_record.get("heat_loss_corridor_bool"),
|
|
}
|
|
self.mains_gas = epc_record.prepared_epc.get("mains_gas_flag")
|
|
self.floor_height = epc_record.prepared_epc.get("floor_height")
|
|
self.insulation_wall_area = None
|
|
self.floor_area = epc_record.prepared_epc.get("total_floor_area")
|
|
self.pitched_roof_area = None
|
|
self.insulation_floor_area = None
|
|
self.number_lighting_outlets = epc_record.prepared_epc.get(
|
|
"fixed_lighting_outlets_count"
|
|
)
|
|
self.floor_level = None
|
|
self.number_of_windows = None
|
|
self.solar_pv_percentage = None
|
|
|
|
self.current_adjusted_energy = None
|
|
self.expected_adjusted_energy = None
|
|
self.current_energy_bill = None
|
|
self.expected_energy_bill = None
|
|
|
|
self.heating_energy_source = None
|
|
self.hot_water_energy_source = None
|
|
|
|
self.recommendations_scoring_data = []
|
|
|
|
self.parse_kwargs(kwargs)
|
|
|
|
@classmethod
|
|
def extract_kwargs(cls, kwargs):
|
|
"""
|
|
This method is to be used in the router, to extract the kwargs from the request and prevent any errors such as
|
|
non-integer values, or inputs that clash with the __init__ method of this class
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
n_bathrooms = kwargs.get("n_bathrooms", None)
|
|
if n_bathrooms not in [None, ""]:
|
|
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
|
|
n_bathrooms = int(round(float(n_bathrooms) + 1e-5))
|
|
|
|
n_bedrooms = kwargs.get("n_bedrooms", None)
|
|
if n_bedrooms not in [None, ""]:
|
|
n_bedrooms = int(round(float(n_bedrooms) + 1e-5))
|
|
|
|
return {
|
|
"n_bathrooms": n_bathrooms,
|
|
"n_bedrooms": n_bedrooms,
|
|
}
|
|
|
|
def parse_kwargs(self, kwargs):
|
|
# We extract the elements from kwargs that we recognise. Anything additional is ignored
|
|
self.n_bathrooms = kwargs.get("n_bathrooms", None)
|
|
self.n_bedrooms = kwargs.get("n_bedrooms", None)
|
|
|
|
def create_base_difference_epc_record(self, cleaned_lookup: dict):
|
|
"""
|
|
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
|
|
expected EPC
|
|
It will be the same starting and ending EPC, as we don't have the expected EPC yet
|
|
"""
|
|
|
|
# difference_record = self.epc_record - self.epc_record
|
|
|
|
# TODO: change these lower and replace in the settings file
|
|
# print(
|
|
# "CHANGE THE LATEST FIELD TO REMOVE NUMBER HABITABLE ROOMS IF WE WANT TO USE STARTING/ENDING"
|
|
# )
|
|
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
|
|
# print("NEED TO CHANGE THE DASH TO LOWER CASE")
|
|
fixed_data_col_names = [
|
|
x.lower().replace("_", "-") for x in fixed_data_col_names
|
|
]
|
|
|
|
fixed_data = {
|
|
k.replace("-", "_"): v
|
|
for k, v in self.data.items()
|
|
if k in fixed_data_col_names
|
|
}
|
|
|
|
# difference_record.append_fixed_data(fixed_data)
|
|
|
|
difference_record = self.epc_record.create_EPCDifferenceRecord(
|
|
self.epc_record, fixed_data
|
|
)
|
|
|
|
self.base_difference_record = TrainingDataset(
|
|
datasets=[difference_record], cleaned_lookup=cleaned_lookup
|
|
)
|
|
|
|
# TODO: adjust the base difference record with the previously calculated u values + features
|
|
# estimated_perimeter is different to the perimeter in the epc record
|
|
|
|
# self.base_difference_record.df
|
|
|
|
def simulate_all_representative_recommendations(
|
|
self, property_representative_recommendations,
|
|
):
|
|
"""
|
|
This method was put together to simulate the impact of the representative recommendations on the property
|
|
all at once, for usage within the mds report
|
|
:return:
|
|
"""
|
|
|
|
recommendation_record = self.base_difference_record.df.to_dict("records")[
|
|
0
|
|
].copy()
|
|
|
|
scoring_dict = self.create_recommendation_scoring_data(
|
|
property_id=self.id,
|
|
recommendation_record=recommendation_record,
|
|
recommendations=property_representative_recommendations,
|
|
primary_recommendation_id=self.id,
|
|
non_invasive_recommendations=self.non_invasive_recommendations,
|
|
)
|
|
|
|
return scoring_dict
|
|
|
|
def adjust_difference_record_with_recommendations(
|
|
self, property_recommendations, property_representative_recommendations
|
|
):
|
|
"""
|
|
This method will adjust the difference record, based on the recommendations made for the property
|
|
|
|
In order to score the measures, we need to consider the phase of the retrofit.
|
|
|
|
:param property_recommendations: dictionary of recommendations for the property
|
|
:param property_representative_recommendations: dictionary of representative recommendations for the property
|
|
"""
|
|
|
|
self.recommendations_scoring_data = []
|
|
phases = sorted(
|
|
[
|
|
r[0]["phase"]
|
|
for r in property_recommendations
|
|
if r[0]["phase"] is not None
|
|
]
|
|
)
|
|
|
|
for phase in phases:
|
|
property_recommendations_by_phase = [
|
|
r for r in property_recommendations if r[0]["phase"] == phase
|
|
][0]
|
|
previous_phases = [p for p in phases if p < phase]
|
|
previous_phase_representatives = [
|
|
r
|
|
for r in property_representative_recommendations
|
|
if r["phase"] in previous_phases
|
|
]
|
|
# For solid wall insulation, we will actually have 2 representative recommendations, since we consider
|
|
# both internal and external wall insulation as possible measures. We will use the representative that
|
|
# has the lowest efficiency.
|
|
# Take the representative with the lowest efficiency, by phase
|
|
|
|
# To be safe, we sort by phase
|
|
previous_phase_representatives = sorted(
|
|
previous_phase_representatives, key=lambda x: x["phase"]
|
|
)
|
|
|
|
previous_phase_representatives = [
|
|
min(group, key=lambda x: x["efficiency"])
|
|
for _, group in groupby(
|
|
previous_phase_representatives, key=lambda x: x["phase"]
|
|
)
|
|
]
|
|
|
|
recommendation_record = self.base_difference_record.df.to_dict("records")[
|
|
0
|
|
].copy()
|
|
|
|
for rec in property_recommendations_by_phase:
|
|
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
|
|
|
|
if rec["type"] == "mechanical_ventilation":
|
|
continue
|
|
|
|
scoring_dict = self.create_recommendation_scoring_data(
|
|
property_id=self.id,
|
|
recommendation_record=recommendation_record,
|
|
recommendations=previous_phase_representatives + [rec],
|
|
primary_recommendation_id=rec["recommendation_id"],
|
|
non_invasive_recommendations=self.non_invasive_recommendations,
|
|
)
|
|
self.recommendations_scoring_data.append(scoring_dict)
|
|
|
|
@staticmethod
|
|
def create_recommendation_scoring_data(
|
|
property_id,
|
|
recommendation_record,
|
|
recommendations: list,
|
|
primary_recommendation_id: int,
|
|
non_invasive_recommendations: list = None,
|
|
):
|
|
"""
|
|
This function will iterate through a list of recommendations and apply a simulation for each recommendation
|
|
This allows us to later multiple measures and see the impact of the measures on the property
|
|
:param property_id: The id of the property
|
|
:param recommendation_record: The record of the property, which will be updated
|
|
:param recommendations: The list of recommendations to apply
|
|
:param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record
|
|
:param non_invasive_recommendations: The list of non-invasive recommendations
|
|
:return: The updated recommendation record
|
|
"""
|
|
|
|
output = recommendation_record.copy()
|
|
non_invasive_recommendations = [] if non_invasive_recommendations is None else non_invasive_recommendations
|
|
|
|
for col in [
|
|
"walls_insulation_thickness",
|
|
"floor_insulation_thickness",
|
|
"roof_insulation_thickness",
|
|
]:
|
|
if output[col] is None:
|
|
output[col] = "none"
|
|
|
|
for recommendation in recommendations:
|
|
# For the list of recommendations we have, we iteratively update the output
|
|
|
|
# Update description to indicate it's insulate
|
|
if recommendation["type"] in [
|
|
"solid_floor_insulation",
|
|
"suspended_floor_insulation",
|
|
"exposed_floor_insulation",
|
|
]:
|
|
if len(recommendation["parts"]) > 1:
|
|
raise NotImplementedError(
|
|
"Have more than 1 floor insulation part - handle this case"
|
|
)
|
|
|
|
# We don't really see above average for this in the training data
|
|
output["floor_insulation_thickness_ending"] = "average"
|
|
else:
|
|
if output["floor_thermal_transmittance_ending"] is None:
|
|
raise ValueError("We should not have a None value for the u value")
|
|
|
|
if output["floor_insulation_thickness_ending"] is None:
|
|
output["floor_insulation_thickness_ending"] = "none"
|
|
|
|
if recommendation["type"] in [
|
|
"loft_insulation",
|
|
"room_roof_insulation",
|
|
"flat_roof_insulation",
|
|
]:
|
|
output["roof_thermal_transmittance_ending"] = recommendation[
|
|
"new_u_value"
|
|
]
|
|
|
|
parts = recommendation["parts"]
|
|
if len(parts) != 1:
|
|
raise ValueError(
|
|
"More than one part for roof insulation - investiage me"
|
|
)
|
|
|
|
# This is based on the values we have in the training data
|
|
valid_numeric_values = [
|
|
12,
|
|
25,
|
|
50,
|
|
75,
|
|
100,
|
|
150,
|
|
200,
|
|
250,
|
|
270,
|
|
300,
|
|
350,
|
|
400,
|
|
]
|
|
|
|
proposed_depth = recommendation["new_thickness"]
|
|
if proposed_depth not in valid_numeric_values:
|
|
# Take the nearest value for scoring
|
|
proposed_depth = min(
|
|
valid_numeric_values, key=lambda x: abs(x - proposed_depth)
|
|
)
|
|
|
|
output["roof_insulation_thickness_ending"] = str(int(proposed_depth))
|
|
if recommendation["type"] == "loft_insulation":
|
|
if proposed_depth >= 270:
|
|
output["roof_energy_eff_ending"] = "Very Good"
|
|
else:
|
|
if output["roof_energy_eff_ending"] not in ["Good", "Very Good"]:
|
|
output["roof_energy_eff_ending"] = "Good"
|
|
else:
|
|
output["roof_energy_eff_ending"] = "Very Good"
|
|
else:
|
|
# Fill missing roof u-values - this fill is not based on recommended upgrades
|
|
if output["roof_thermal_transmittance_ending"] is None:
|
|
raise ValueError("We should not have a None value for the u value")
|
|
|
|
if output["roof_insulation_thickness_ending"] is None:
|
|
output["roof_insulation_thickness_ending"] = "none"
|
|
|
|
if recommendation["type"] == "sealing_open_fireplace":
|
|
output["number_open_fireplaces_ending"] = 0
|
|
|
|
if recommendation["type"] == "low_energy_lighting":
|
|
output["low_energy_lighting_ending"] = 100
|
|
output["lighting_energy_eff_ending"] = "Very Good"
|
|
|
|
if recommendation["type"] == "windows_glazing":
|
|
output["multi_glaze_proportion_ending"] = 100
|
|
if output["windows_energy_eff_ending"] not in ["Average", "Good", "Very Good"]:
|
|
output["windows_energy_eff_ending"] = "Average"
|
|
|
|
is_secondary_glazing = recommendation["is_secondary_glazing"]
|
|
|
|
if output["glazing_type_ending"] == "multiple":
|
|
pass
|
|
elif output["glazing_type_ending"] == "single":
|
|
output["glazing_type_ending"] = (
|
|
"secondary" if is_secondary_glazing else "double"
|
|
)
|
|
elif output["glazing_type_ending"] == "double":
|
|
output["glazing_type_ending"] = (
|
|
"multiple" if is_secondary_glazing else "double"
|
|
)
|
|
elif output["glazing_type_ending"] == "secondary":
|
|
output["glazing_type_ending"] = (
|
|
"secondary" if is_secondary_glazing else "multiple"
|
|
)
|
|
elif output["glazing_type_ending"] in ["triple", "high performance"]:
|
|
output["glazing_type_ending"] = "multiple"
|
|
else:
|
|
raise ValueError("Invalid glazing type - implement me")
|
|
|
|
if is_secondary_glazing:
|
|
output["glazed_type_ending"] = "secondary glazing"
|
|
else:
|
|
output["glazed_type_ending"] = (
|
|
"double glazing installed during or after 2002"
|
|
)
|
|
|
|
if recommendation["type"] in [
|
|
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
|
]:
|
|
# We update the data, as defined in the recommendaton
|
|
if output["walls_insulation_thickness_ending"] is None:
|
|
output["walls_insulation_thickness_ending"] = "none"
|
|
|
|
simulation_config = recommendation["simulation_config"]
|
|
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning
|
|
# value
|
|
for key, value in simulation_config.items():
|
|
if value is None:
|
|
simulation_config[key] = "Unknown"
|
|
|
|
output.update(simulation_config)
|
|
|
|
if recommendation["type"] == "solar_pv":
|
|
output["photo_supply_ending"] = recommendation["photo_supply"]
|
|
|
|
if recommendation["type"] not in [
|
|
"sealing_open_fireplace", "low_energy_lighting",
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
|
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
|
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
|
|
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
|
|
"heating_control", "secondary_heating"
|
|
]:
|
|
raise NotImplementedError(
|
|
"Implement me, given type %s" % recommendation["type"]
|
|
)
|
|
|
|
output["id"] = "+".join([str(property_id), str(primary_recommendation_id)])
|
|
|
|
return output
|
|
|
|
def get_components(
|
|
self,
|
|
cleaned,
|
|
photo_supply_lookup,
|
|
floor_area_decile_thresholds,
|
|
energy_consumption_client
|
|
):
|
|
"""
|
|
Given the cleaning that has been performed, we'll use this to identify the property
|
|
components, from roof to walls to windows, heating and hot water
|
|
:param cleaned: This is the dictionary of components found in cleaner.cleaned
|
|
:param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage
|
|
of the roof that is suitable for solar panels
|
|
:param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the
|
|
solar pv roof area
|
|
:param energy_consumption_client: Contains the heating and hot water kwh models - used to predict current
|
|
energy annual consumption in kWh
|
|
:return:
|
|
"""
|
|
|
|
if not cleaned:
|
|
raise ValueError("Cleaner does not contain cleaned data")
|
|
|
|
if not self.data:
|
|
raise ValueError("Property does not contain data")
|
|
|
|
self.set_basic_property_dimensions()
|
|
|
|
for description, attribute in cleaned.items():
|
|
|
|
if self.data[description] in self.DATA_ANOMALY_MATCHES:
|
|
template = cleaned[description][0]
|
|
fill_dict = dict(zip(template.keys(), [None] * len(template)))
|
|
fill_dict.update(
|
|
{
|
|
"original_description": self.data[description],
|
|
"clean_description": self.data[description],
|
|
}
|
|
)
|
|
setattr(
|
|
self,
|
|
self.ATTRIBUTE_MAP[description],
|
|
fill_dict,
|
|
)
|
|
continue
|
|
|
|
attributes = [
|
|
x
|
|
for x in cleaned[description]
|
|
if x["original_description"] == self.data[description]
|
|
]
|
|
|
|
if len(attributes) > 1:
|
|
raise ValueError(
|
|
"Either No attributes or multiple found for %s" % description
|
|
)
|
|
|
|
if len(attributes) == 0:
|
|
# We attempt to perform the clean on the fly
|
|
cleaner_cls = all_cleaner_map[description]
|
|
cleaner_cls = cleaner_cls(self.data[description])
|
|
processed = {
|
|
"original_description": self.data[description],
|
|
"clean_description": cleaner_cls.description.replace(
|
|
"(assumed)", ""
|
|
)
|
|
.rstrip()
|
|
.capitalize(),
|
|
**cleaner_cls.process(),
|
|
}
|
|
|
|
attributes = [processed]
|
|
|
|
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
|
|
|
|
self.set_wall_type()
|
|
self.set_floor_type()
|
|
self.set_floor_level()
|
|
self.set_windows_count()
|
|
self.set_solar_panel_area(
|
|
photo_supply_lookup=photo_supply_lookup,
|
|
floor_area_decile_thresholds=floor_area_decile_thresholds,
|
|
)
|
|
self.set_energy_source()
|
|
self.find_energy_sources()
|
|
self.set_current_energy_bill()
|
|
|
|
def set_current_energy_bill(self, energy_consumption_client):
|
|
"""
|
|
Given what we know about the property now, estimates the current energy consumption using the UCL paper
|
|
https://www.sciencedirect.com/science/article/pii/S0378778823002542
|
|
:return:
|
|
"""
|
|
scoring_df = pd.DataFrame([self.epc_record.prepared_epc])
|
|
# Change columns from underscores to hyphens
|
|
scoring_df.columns = [
|
|
x.lower().replace("_", "-") for x in scoring_df.columns
|
|
]
|
|
for col in ["heating_kwh", "hot_water_kwh"]:
|
|
scoring_df[col] = None
|
|
energy_consumption_client.data = None
|
|
heating_prediction = energy_consumption_client.score_new_data(
|
|
new_data=scoring_df, target="heating_kwh"
|
|
)
|
|
|
|
hot_water_prediction = energy_consumption_client.score_new_data(
|
|
new_data=scoring_df, target="hot_water_kwh"
|
|
)
|
|
|
|
starting_heat_demand = (
|
|
float(self.data["energy-consumption-current"]) * self.floor_area
|
|
)
|
|
|
|
self.current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
|
epc_energy_consumption=starting_heat_demand,
|
|
current_epc_rating=self.data["current-energy-rating"],
|
|
total_floor_area=self.floor_area
|
|
)
|
|
|
|
self.current_energy_bill = AnnualBillSavings.calculate_annual_bill(self.current_adjusted_energy)
|
|
|
|
def set_spatial(self, spatial: pd.DataFrame):
|
|
"""
|
|
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
|
|
|
|
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
|
|
|
|
:param spatial: Dataframe, containing the spatial data for the property
|
|
"""
|
|
self.in_conservation_area = spatial["conservation_status"].values[0]
|
|
self.is_listed = spatial["is_listed_building"].values[0]
|
|
self.is_heritage = spatial["is_heritage_building"].values[0]
|
|
|
|
# We do an equals True, in the case of one of these variables being True
|
|
if (
|
|
(self.in_conservation_area == True)
|
|
| (self.is_listed == True)
|
|
| (self.is_heritage == True)
|
|
):
|
|
self.restricted_measures = True
|
|
|
|
spatial_dict = spatial.to_dict("records")[0]
|
|
self.spatial = {
|
|
"x_coordinate": spatial_dict["X_COORDINATE"],
|
|
"y_coordinate": spatial_dict["Y_COORDINATE"],
|
|
"latitude": spatial_dict["LATITUDE"],
|
|
"longitude": spatial_dict["LONGITUDE"],
|
|
"conservation_status": spatial_dict["conservation_status"],
|
|
"is_listed_building": spatial_dict["is_listed_building"],
|
|
"is_heritage_building": spatial_dict["is_heritage_building"],
|
|
}
|
|
|
|
def _clean_upload_data(self, to_update):
|
|
for k, v in to_update.items():
|
|
if v in self.DATA_ANOMALY_MATCHES:
|
|
to_update[k] = None
|
|
return to_update
|
|
|
|
def get_full_property_data(self, current_valuation=None):
|
|
"""
|
|
This method extracts the data which is pushed to the database, containing core information, from the EPC
|
|
about a property
|
|
:return:
|
|
"""
|
|
|
|
property_data = {
|
|
"creation_status": "READY",
|
|
"uprn": int(self.data["uprn"]),
|
|
"building_reference_number": int(self.data["building-reference-number"]),
|
|
"has_pre_condition_report": True,
|
|
"has_recommendations": True,
|
|
"property_type": self.data["property-type"],
|
|
"built_form": self.data["built-form"],
|
|
"local_authority": self.data["local-authority-label"],
|
|
"constituency": self.data["constituency-label"],
|
|
"number_of_rooms": self.number_of_rooms,
|
|
"year_built": self.year_built,
|
|
"tenure": self.data["tenure"],
|
|
"current_epc_rating": self.data["current-energy-rating"],
|
|
"current_sap_points": self.data["current-energy-efficiency"],
|
|
"current_valuation": current_valuation,
|
|
}
|
|
|
|
property_data = self._clean_upload_data(property_data)
|
|
|
|
return property_data
|
|
|
|
@classmethod
|
|
def _prepare_rating_field(cls, field, rating_lookup):
|
|
"""
|
|
Utility function for usage in the lambda, for preparing the _rating fields
|
|
"""
|
|
return (
|
|
rating_lookup[field].value
|
|
if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None)
|
|
else None
|
|
)
|
|
|
|
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
|
|
|
|
property_details_epc = {
|
|
"property_id": self.id,
|
|
"portfolio_id": portfolio_id,
|
|
"full_address": self.data["address"],
|
|
"total_floor_area": float(self.data["total-floor-area"]),
|
|
"walls": self.walls["clean_description"],
|
|
"walls_rating": self._prepare_rating_field(
|
|
self.data["walls-energy-eff"], rating_lookup
|
|
),
|
|
"roof": self.roof["clean_description"],
|
|
"roof_rating": self._prepare_rating_field(
|
|
self.data["roof-energy-eff"], rating_lookup
|
|
),
|
|
"floor": self.floor["clean_description"],
|
|
"floor_rating": self._prepare_rating_field(
|
|
self.data["floor-energy-eff"], rating_lookup
|
|
),
|
|
"windows": self.windows["clean_description"],
|
|
"windows_rating": self._prepare_rating_field(
|
|
self.data["windows-energy-eff"], rating_lookup
|
|
),
|
|
"heating": self.main_heating["clean_description"],
|
|
"heating_rating": self._prepare_rating_field(
|
|
self.data["mainheat-energy-eff"], rating_lookup
|
|
),
|
|
"heating_controls": self.main_heating_controls["clean_description"],
|
|
"heating_controls_rating": self._prepare_rating_field(
|
|
self.data["mainheatc-energy-eff"], rating_lookup
|
|
),
|
|
"hot_water": self.hotwater["clean_description"],
|
|
"hot_water_rating": self._prepare_rating_field(
|
|
self.data["hot-water-energy-eff"], rating_lookup
|
|
),
|
|
"lighting": self.lighting["clean_description"],
|
|
"lighting_rating": self._prepare_rating_field(
|
|
self.data["lighting-energy-eff"], rating_lookup
|
|
),
|
|
"mainfuel": self.main_fuel["clean_description"],
|
|
"ventilation": self.ventilation["ventilation"],
|
|
"solar_pv": self.solar_pv["solar_pv"],
|
|
"solar_hot_water": self.solar_hot_water["solar_hot_water_boolean"],
|
|
"wind_turbine": self.wind_turbine["wind_turbine"],
|
|
"floor_height": self.floor_height,
|
|
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"],
|
|
"unheated_corridor_length": self.heat_loss_corridor["length"],
|
|
"number_of_open_fireplaces": self.number_of_open_fireplaces[
|
|
"number_of_open_fireplaces"
|
|
],
|
|
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
|
|
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
|
|
"mains_gas": self.mains_gas,
|
|
"energy_tariff": self.data["energy-tariff"],
|
|
"primary_energy_consumption": self.energy["primary_energy_consumption"],
|
|
"co2_emissions": self.energy["co2_emissions"],
|
|
"adjusted_energy_consumption": self.current_adjusted_energy,
|
|
"estimated": self.data.get("estimated", False),
|
|
}
|
|
|
|
return property_details_epc
|
|
|
|
def get_spatial_data(self, uprn_filenames):
|
|
"""
|
|
Given a property's UPRN, this method will pull the associated spatial data from s3
|
|
:return:
|
|
"""
|
|
|
|
if self.uprn is None:
|
|
logger.warning(
|
|
"We do not have a UPRN for this property - this needs to be implemented"
|
|
)
|
|
self.in_conservation_area = False
|
|
self.is_listed = False
|
|
self.is_heritage = False
|
|
self.restricted_measures = True
|
|
return
|
|
|
|
# We get the file name for the uprn
|
|
filtered_df = uprn_filenames[
|
|
(uprn_filenames["lower"] <= self.uprn)
|
|
& (uprn_filenames["upper"] >= self.uprn)
|
|
]
|
|
if filtered_df.empty:
|
|
logger.warning("Could not find file containing UPRNS")
|
|
return None
|
|
|
|
filename = filtered_df.iloc[0]["filenames"]
|
|
|
|
spatial_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
|
|
)
|
|
|
|
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
|
|
|
|
# Pull out spatial features
|
|
self.set_spatial(spatial)
|
|
|
|
def _filter_property_dimensions(self, property_dimensions):
|
|
"""
|
|
Will filter the property dimensions dataframe to only include the relevant rows for the property
|
|
:param property_dimensions:
|
|
:return: filtered property dimensions dataframe
|
|
"""
|
|
|
|
result = property_dimensions[
|
|
(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])
|
|
]
|
|
|
|
if (
|
|
self.construction_age_band is not None
|
|
and self.construction_age_band not in self.DATA_ANOMALY_MATCHES
|
|
):
|
|
result = result[
|
|
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
|
|
]
|
|
|
|
if (
|
|
self.data["built-form"] not in self.DATA_ANOMALY_MATCHES
|
|
and self.data["built-form"] in result["BUILT_FORM"]
|
|
):
|
|
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
|
|
|
|
return result[
|
|
["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
|
|
].mean()
|
|
|
|
def set_basic_property_dimensions(self):
|
|
"""
|
|
This method sets the number of floors of the property, using a simple approach based on an estimate for
|
|
average room size, number of rooms and total floor area
|
|
|
|
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
|
|
number of rooms and total floor area
|
|
|
|
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
|
|
medians across the EPC data
|
|
:return:
|
|
"""
|
|
|
|
# TODO: These functions should work on an EPCRecord object, so that the format is more standardised.
|
|
# They could also be added as attributes to the EPC Record
|
|
|
|
self.perimeter = estimate_perimeter(
|
|
self.floor_area / self.number_of_floors,
|
|
self.number_of_rooms / self.number_of_floors,
|
|
)
|
|
|
|
self.insulation_wall_area = estimate_external_wall_area(
|
|
num_floors=self.number_of_floors,
|
|
floor_height=self.floor_height,
|
|
perimeter=self.perimeter,
|
|
built_form=self.data["built-form"],
|
|
)
|
|
|
|
self.insulation_floor_area = self.floor_area / self.number_of_floors
|
|
|
|
self.pitched_roof_area = esimtate_pitched_roof_area(
|
|
floor_area=self.insulation_floor_area, floor_height=self.floor_height
|
|
)
|
|
|
|
def set_floor_level(self):
|
|
self.floor_level = (
|
|
FLOOR_LEVEL_MAP[self.data["floor-level"]]
|
|
if self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES
|
|
and self.data["floor-level"] is not None
|
|
else None
|
|
)
|
|
|
|
if self.floor_level is None:
|
|
|
|
if self.data["property-type"] != "Flat":
|
|
return
|
|
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
else:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
|
|
# where a property is marked as being on the first floor
|
|
if self.floor_level > 0:
|
|
|
|
# We check if there is another property below (for a non-sap assessment)
|
|
if not self.floor["another_property_below"] and self.floor["thermal_transmittance_unit"] is None:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
if self.floor_level == 0:
|
|
# Check if another property below
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
return
|
|
|
|
def set_wall_type(self):
|
|
"""
|
|
This method sets the wall type of the property, using a simple approach based on the wall description
|
|
:return:
|
|
"""
|
|
self.wall_type = get_wall_type(**self.walls)
|
|
|
|
def set_floor_type(self):
|
|
"""
|
|
This method sets the floor type of the property, which is used for calculating u-values
|
|
|
|
Section 5.6 of the BRE indicates that
|
|
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
|
|
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
|
|
and the U-values in Table S12 are used.
|
|
|
|
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
|
|
calculating u-values
|
|
"""
|
|
|
|
if self.floor["is_suspended"] | self.floor["another_property_below"]:
|
|
self.floor_type = "suspended"
|
|
elif self.floor["is_solid"]:
|
|
self.floor_type = "solid"
|
|
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
|
|
self.floor_type = "exposed_floor"
|
|
elif self.floor["thermal_transmittance"] is not None:
|
|
self.floor_type = "solid"
|
|
else:
|
|
raise NotImplementedError("Implement this floor type")
|
|
|
|
@staticmethod
|
|
def _extract_component(
|
|
component_data, component_rename_cols, component_drop_cols, rename_prefix=None
|
|
):
|
|
for k in component_rename_cols:
|
|
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
|
|
|
|
component_data = {
|
|
k: v
|
|
for k, v in component_data.items()
|
|
if k not in component_drop_cols + component_rename_cols
|
|
}
|
|
|
|
return component_data
|
|
|
|
def set_adjusted_energy(
|
|
self, expected_adjusted_energy, expected_energy_bill
|
|
):
|
|
"""
|
|
Stores these values for usage later
|
|
"""
|
|
|
|
self.expected_adjusted_energy = expected_adjusted_energy
|
|
self.expected_energy_bill = expected_energy_bill
|
|
|
|
def set_windows_count(self):
|
|
"""
|
|
Using the estimate_windows function, this method will set the number of windows in the property
|
|
:return:
|
|
"""
|
|
|
|
self.number_of_windows = estimate_windows(
|
|
property_type=self.data["property-type"],
|
|
built_form=self.data["built-form"],
|
|
construction_age_band=self.construction_age_band,
|
|
floor_area=self.floor_area,
|
|
number_habitable_rooms=self.number_of_rooms,
|
|
)
|
|
|
|
def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds):
|
|
"""
|
|
Sets the approximate area of the solar panels
|
|
:return:
|
|
"""
|
|
|
|
if (self.insulation_floor_area is None) and (self.pitched_roof_area is None):
|
|
raise ValueError(
|
|
"Need to set insulation floor area and pitched roof area before setting solar pv roof area"
|
|
)
|
|
|
|
photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup(
|
|
photo_supply_lookup=photo_supply_lookup,
|
|
floor_area_decile_thresholds=floor_area_decile_thresholds,
|
|
tenure=self.data["tenure"],
|
|
built_form=self.data["built-form"],
|
|
property_type=self.data["property-type"],
|
|
construction_age_band=self.construction_age_band,
|
|
is_flat=self.roof["is_flat"],
|
|
is_pitched=self.roof["is_pitched"],
|
|
is_roof_room=self.roof["is_roof_room"],
|
|
floor_area=self.floor_area,
|
|
)
|
|
|
|
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
|
|
percentage_of_roof = percentage_of_roof / 100
|
|
|
|
self.solar_pv_percentage = percentage_of_roof
|
|
|
|
def get_solar_pv_roof_area(self, percentage_of_roof):
|
|
"""
|
|
Given a percentage of the roof, this method will return the estimated area of the solar panels
|
|
:param percentage_of_roof:
|
|
:return:
|
|
"""
|
|
|
|
return (
|
|
self.insulation_floor_area * percentage_of_roof
|
|
if self.roof["is_flat"]
|
|
else self.pitched_roof_area * percentage_of_roof
|
|
)
|
|
|
|
def set_energy_source(self):
|
|
"""
|
|
This method sets the energy source of the property, based on the mains gas flag and energy tariff.
|
|
"""
|
|
# Default to "electricity_and_gas" to cover most scenarios including when mains_gas_flag is True
|
|
energy_source = "electricity_and_gas"
|
|
|
|
# If the tariff explicitly indicates electricity use without a dual indication and mains_gas_flag is not True
|
|
# We check for the common electricity tariffs
|
|
if not self.data["mains-gas-flag"] and self.data["energy-tariff"] in [
|
|
"Single",
|
|
"off-peak 7 hour",
|
|
"off-peak 10 hour",
|
|
"off-peak 18 hour",
|
|
"standard tariff",
|
|
"24 hour",
|
|
]:
|
|
energy_source = "electricity"
|
|
|
|
# Set the energy source based on the conditions above
|
|
self.energy_source = energy_source
|
|
|
|
def find_energy_sources(self):
|
|
# Based on the heating and the hot water
|
|
heating_fuel_mapping = {
|
|
'has_mains_gas': 'Natural Gas',
|
|
'has_electric': 'Electricity',
|
|
'has_oil': 'Oil',
|
|
'has_wood_logs': 'Wood Logs',
|
|
'has_coal': 'Coal',
|
|
'has_anthracite': 'Anthracite',
|
|
'has_smokeless_fuel': 'Smokeless Fuel',
|
|
'has_lpg': 'LPG',
|
|
'has_b30k': 'B30K Biofuel',
|
|
'has_air_source_heat_pump': 'Electricity',
|
|
'has_ground_source_heat_pump': 'Electricity',
|
|
'has_water_source_heat_pump': 'Electricity',
|
|
'has_electric_heat_pump': 'Electricity',
|
|
'has_solar_assisted_heat_pump': 'Electricity',
|
|
'has_exhaust_source_heat_pump': 'Electricity',
|
|
'has_community_heat_pump': 'Electricity',
|
|
'has_wood_pellets': 'Wood Pellets',
|
|
'has_community_scheme': 'Varied (Community Scheme)'
|
|
}
|
|
|
|
# Hot water
|
|
heater_type_to_fuel = {
|
|
'gas instantaneous': 'Natural Gas',
|
|
'electric heat pump': 'Electricity',
|
|
'electric immersion': 'Electricity',
|
|
'gas boiler': 'Natural Gas',
|
|
'oil boiler': 'Oil',
|
|
'electric instantaneous': 'Electricity',
|
|
'gas multipoint': 'Natural Gas',
|
|
'heat pump': 'Electricity',
|
|
'solid fuel boiler': 'Solid Fuel',
|
|
'solid fuel range cooker': 'Solid Fuel',
|
|
'room heaters': 'Varied' # Could be any fuel, further specifics needed based on context
|
|
}
|
|
|
|
# Define a mapping from system types to general categories or modifications of fuel types
|
|
system_type_modification = {
|
|
'from main system': 'Main System',
|
|
'from secondary system': 'Secondary System',
|
|
'from second main heating system': 'Secondary System',
|
|
'community scheme': 'Community Scheme'
|
|
}
|
|
|
|
self.heating_energy_source = [
|
|
fuel for key, fuel in heating_fuel_mapping.items() if self.main_heating.get(key, False)
|
|
]
|
|
if len(self.heating_energy_source) == 0 or len(self.heating_energy_source) > 1:
|
|
raise Exception("Investigate em")
|
|
|
|
self.heating_energy_source = self.heating_energy_source[0]
|
|
|
|
if self.hotwater["heater_type"] is not None:
|
|
self.hot_water_energy_source = heater_type_to_fuel[self.hotwater["heater_type"]]
|
|
else:
|
|
fuel = system_type_modification[self.hotwater["system_type"]]
|
|
if fuel == 'Main System':
|
|
self.hot_water_energy_source = self.heating_energy_source
|
|
else:
|
|
raise Exception("Investiage me")
|