mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
675 lines
30 KiB
Python
675 lines
30 KiB
Python
from datetime import datetime
|
|
import re
|
|
import os
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from etl.epc.DataProcessor import EPCDataProcessor
|
|
from etl.epc.Dataset import TrainingDataset
|
|
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, \
|
|
BUILT_FORM_REMAP
|
|
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
|
|
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet
|
|
from etl.epc.settings import DATA_ANOMALY_MATCHES
|
|
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
|
|
from recommendations.recommendation_utils import (
|
|
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
|
|
)
|
|
|
|
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
|
|
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class Property:
|
|
ATTRIBUTE_MAP = {
|
|
"floor-description": "floor",
|
|
"hotwater-description": "hotwater",
|
|
"main-fuel": "main_fuel",
|
|
"mainheat-description": "main_heating",
|
|
"mainheatcont-description": "main_heating_controls",
|
|
"roof-description": "roof",
|
|
"walls-description": "walls",
|
|
"windows-description": "windows",
|
|
"lighting-description": "lighting"
|
|
}
|
|
|
|
floor = None
|
|
hotwater = None
|
|
main_fuel = None
|
|
main_heating = None
|
|
main_heating_controls = None
|
|
roof = None
|
|
walls = None
|
|
windows = None
|
|
lighting = None
|
|
|
|
spatial = None
|
|
base_difference_record = None
|
|
|
|
DATA_ANOMALY_MATCHES = DATA_ANOMALY_MATCHES
|
|
|
|
def __init__(self, id, postcode, address, epc_record):
|
|
|
|
self.epc_record = epc_record
|
|
|
|
self.id = id
|
|
|
|
self.address = address
|
|
self.postcode = postcode
|
|
self.data = {k.replace("_", "-"): v for k, v in epc_record.get("prepared_epc").items()}
|
|
self.old_data = epc_record.get("old_data")
|
|
self.property_dimensions = None
|
|
|
|
self.uprn = epc_record.get("uprn")
|
|
self.full_sap_epc = epc_record.get("full_sap_epc")
|
|
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
|
|
self.restricted_measures = False
|
|
self.year_built = epc_record.get("year_built")
|
|
self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms")
|
|
self.age_band = epc_record.get("age_band")
|
|
self.construction_age_band = epc_record.get("construction_age_band")
|
|
self.number_of_floors = epc_record.get("number_of_floors")
|
|
self.perimeter = None
|
|
self.wall_type = None
|
|
self.floor_type = None
|
|
|
|
self.energy = {
|
|
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
|
|
"co2_emissions": epc_record.get("co2_emissions_current"),
|
|
}
|
|
self.ventilation = {
|
|
"ventilation": epc_record.get("mechanical_ventilation"),
|
|
}
|
|
self.solar_pv = {
|
|
"solar_pv": epc_record.get("photo_supply"),
|
|
}
|
|
self.solar_hot_water = {
|
|
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
|
|
"solar_hot_water_boolean": epc_record.get("solar_water_heating_flag_bool"),
|
|
}
|
|
self.wind_turbine = {
|
|
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
|
|
}
|
|
self.number_of_open_fireplaces = {
|
|
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_open_fireplaces"),
|
|
}
|
|
self.number_of_extensions = {
|
|
"number_of_extensions": epc_record.prepared_epc.get("extension_count"),
|
|
}
|
|
self.number_of_storeys = {
|
|
"number_of_storeys": epc_record.prepared_epc.get("flat_storey_count"),
|
|
}
|
|
self.heat_loss_corridor = {
|
|
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
|
|
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
|
|
"heat_loss_corridor_boolean": epc_record.get("heat_loss_corridor_bool"),
|
|
}
|
|
self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag')
|
|
self.floor_height = epc_record.prepared_epc.get('floor_height')
|
|
self.insulation_wall_area = None
|
|
self.floor_area = epc_record.prepared_epc.get('total_floor_area')
|
|
self.pitched_roof_area = None
|
|
self.insulation_floor_area = None
|
|
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
|
|
self.floor_level = None
|
|
self.number_of_windows = None
|
|
self.solar_pv_roof_area = None
|
|
self.solar_pv_percentage = None
|
|
|
|
self.current_adjusted_energy = None
|
|
self.expected_adjusted_energy = None
|
|
|
|
self.recommendations_scoring_data = []
|
|
|
|
def create_base_difference_epc_record(self, cleaned_lookup: dict):
|
|
"""
|
|
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
|
|
expected EPC
|
|
It will be the same starting and ending EPC, as we don't have the expected EPC yet
|
|
"""
|
|
|
|
difference_record = self.epc_record - self.epc_record
|
|
|
|
# TODO: change these lower and replace in the settings file
|
|
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
|
|
print("NEED TO CHANGE THE DASH TO LOWER CASE")
|
|
fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names]
|
|
|
|
fixed_data = {k.replace("-", "_"): v for k, v in self.data.items() if k in fixed_data_col_names}
|
|
|
|
difference_record.append_fixed_data(fixed_data)
|
|
|
|
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
|
|
|
|
# TODO: adjust the base difference record with the previously calculated u values + features
|
|
# estimated_perimeter is different to the perimeter in the epc record
|
|
|
|
# self.base_difference_record.df
|
|
|
|
def adjust_difference_record_with_recommendations(self, property_recommendations):
|
|
"""
|
|
This method will adjust the difference record, based on the recommendations made for the property
|
|
:param property_recommendations: dictionary of recommendations for the property
|
|
"""
|
|
|
|
self.recommendations_scoring_data = []
|
|
|
|
for recommendations_by_type in property_recommendations:
|
|
for i, rec in enumerate(recommendations_by_type):
|
|
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
|
|
scoring_dict = self.create_recommendation_scoring_data(
|
|
property_id=self.id, recommendation_record=recommendation_record, recommendation=rec,
|
|
)
|
|
|
|
self.recommendations_scoring_data.append(scoring_dict)
|
|
|
|
@staticmethod
|
|
def create_recommendation_scoring_data(property_id, recommendation_record, recommendation: dict):
|
|
|
|
for col in [
|
|
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
|
|
]:
|
|
if recommendation_record[col] is None:
|
|
recommendation_record[col] = "none"
|
|
|
|
# We update the description to indicate it's insulated
|
|
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
|
|
# The upgrade made here is to the u-value of the walls and the description of the
|
|
# insulation thickness
|
|
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
|
recommendation_record["walls_insulation_thickness_ending"] = "above average"
|
|
recommendation_record["walls_energy_eff_ending"] = "Good"
|
|
else:
|
|
if recommendation_record["walls_thermal_transmittance_ending"] is None:
|
|
raise ValueError("We should not have a None value for the u value")
|
|
|
|
if recommendation_record["walls_insulation_thickness_ending"] is None:
|
|
recommendation_record["walls_insulation_thickness_ending"] = "none"
|
|
|
|
# Update description to indicate it's insulate
|
|
if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation",
|
|
"exposed_floor_insulation"]:
|
|
if len(recommendation["parts"]) > 1:
|
|
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
|
|
|
|
# recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
|
# We don't really see above average for this in the training data
|
|
recommendation_record["floor_insulation_thickness_ending"] = "average"
|
|
# This is rarely ever populated in the training data
|
|
# recommendation_record["floor_energy_eff_ending"] = "Good"
|
|
else:
|
|
if recommendation_record["floor_thermal_transmittance_ending"] is None:
|
|
raise ValueError("We should not have a None value for the u value")
|
|
|
|
if recommendation_record["floor_insulation_thickness_ending"] is None:
|
|
recommendation_record["floor_insulation_thickness_ending"] = "none"
|
|
|
|
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
|
|
recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
|
|
|
|
parts = recommendation["parts"]
|
|
if len(parts) != 1:
|
|
raise ValueError("More than one part for roof insulation - investiage me")
|
|
|
|
# This is based on the values we have in the training data
|
|
valid_numeric_values = [
|
|
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
|
|
]
|
|
|
|
proposed_depth = int(parts[0]["depth"])
|
|
if proposed_depth not in valid_numeric_values:
|
|
# Take the nearest value for scoring
|
|
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
|
|
|
|
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
|
|
if recommendation["type"] == "loft_insulation":
|
|
recommendation_record["roof_energy_eff_ending"] = "Good"
|
|
else:
|
|
recommendation_record["roof_energy_eff_ending"] = "Very Good"
|
|
else:
|
|
# Fill missing roof u-values - this fill is not based on recommended upgrades
|
|
if recommendation_record["roof_thermal_transmittance_ending"] is None:
|
|
raise ValueError("We should not have a None value for the u value")
|
|
|
|
if recommendation_record["roof_insulation_thickness_ending"] is None:
|
|
recommendation_record["roof_insulation_thickness_ending"] = "none"
|
|
|
|
if recommendation["type"] == "mechanical_ventilation":
|
|
recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only'
|
|
|
|
if recommendation["type"] == "sealing_open_fireplace":
|
|
recommendation_record["number_open_fireplaces_ending"] = 0
|
|
|
|
if recommendation["type"] == "low_energy_lighting":
|
|
recommendation_record["low_energy_lighting_ending"] = 100
|
|
recommendation_record["lighting_energy_eff_starting"] = "Very Good"
|
|
|
|
if recommendation["type"] == "windows_glazing":
|
|
recommendation_record["multi_glaze_proportion_ending"] = 100
|
|
recommendation_record["windows_energy_eff_ending"] = "Average"
|
|
|
|
is_secondary_glazing = recommendation["is_secondary_glazing"]
|
|
|
|
if recommendation_record["glazing_type_ending"] == "multiple":
|
|
pass
|
|
elif recommendation_record["glazing_type_ending"] == "single":
|
|
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
|
|
elif recommendation_record["glazing_type_ending"] == "double":
|
|
recommendation_record["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
|
|
elif recommendation_record["glazing_type_ending"] == "secondary":
|
|
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
|
|
elif recommendation_record["glazing_type_ending"] in ["triple", "high performance"]:
|
|
recommendation_record["glazing_type_ending"] = "multiple"
|
|
else:
|
|
raise ValueError("Invalid glazing type - implement me")
|
|
|
|
if recommendation["type"] == "solar_pv":
|
|
recommendation_record["photo_supply_ending"] = recommendation["photo_supply"]
|
|
|
|
if recommendation["type"] not in [
|
|
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
|
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
|
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
|
|
"windows_glazing", "solar_pv"
|
|
]:
|
|
raise NotImplementedError("Implement me")
|
|
|
|
recommendation_record['id'] = "+".join([str(property_id), str(recommendation["recommendation_id"])])
|
|
|
|
return recommendation_record
|
|
|
|
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
|
|
"""
|
|
Given the cleaning that has been performed, we'll use this to identify the property
|
|
components, from roof to walls to windows, heating and hot water
|
|
:param cleaned: This is the dictionary of components found in cleaner.cleaned
|
|
:param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage
|
|
of the roof that is suitable for solar panels
|
|
:param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the
|
|
solar pv roof area
|
|
:return:
|
|
"""
|
|
|
|
if not cleaned:
|
|
raise ValueError("Cleaner does not contain cleaned data")
|
|
|
|
if not self.data:
|
|
raise ValueError("Property does not contain data")
|
|
|
|
self.set_basic_property_dimensions()
|
|
|
|
for description, attribute in cleaned.items():
|
|
|
|
if self.data[description] in self.DATA_ANOMALY_MATCHES:
|
|
template = cleaned[description][0]
|
|
fill_dict = dict(zip(template.keys(), [None] * len(template)))
|
|
fill_dict.update({
|
|
"original_description": self.data[description],
|
|
"clean_description": self.data[description],
|
|
})
|
|
setattr(
|
|
self,
|
|
self.ATTRIBUTE_MAP[description],
|
|
fill_dict,
|
|
)
|
|
continue
|
|
|
|
attributes = [
|
|
x for x in cleaned[description] if x["original_description"] == self.data[description]
|
|
]
|
|
|
|
if len(attributes) > 1:
|
|
raise ValueError("Either No attributes or multiple found for %s" % description)
|
|
|
|
if len(attributes) == 0:
|
|
# We attempt to perform the clean on the fly
|
|
cleaner_cls = all_cleaner_map[description]
|
|
cleaner_cls = cleaner_cls(self.data[description])
|
|
processed = {
|
|
"original_description": self.data[description],
|
|
"clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(),
|
|
**cleaner_cls.process()
|
|
}
|
|
|
|
attributes = [processed]
|
|
|
|
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
|
|
|
|
self.set_wall_type()
|
|
self.set_floor_type()
|
|
self.set_floor_level()
|
|
self.set_windows_count()
|
|
self.set_solar_panel_area(
|
|
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
|
|
)
|
|
|
|
def set_spatial(self, spatial: pd.DataFrame):
|
|
"""
|
|
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
|
|
|
|
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
|
|
|
|
:param spatial: Dataframe, containing the spatial data for the property
|
|
"""
|
|
self.in_conservation_area = spatial["conservation_status"].values[0]
|
|
self.is_listed = spatial["is_listed_building"].values[0]
|
|
self.is_heritage = spatial["is_heritage_building"].values[0]
|
|
|
|
# We do an equals True, in the case of one of these variables being True
|
|
if (self.in_conservation_area == True) | (self.is_listed == True) | (self.is_heritage == True):
|
|
self.restricted_measures = True
|
|
|
|
spatial_dict = spatial.to_dict("records")[0]
|
|
self.spatial = {
|
|
"x_coordinate": spatial_dict["X_COORDINATE"],
|
|
"y_coordinate": spatial_dict["Y_COORDINATE"],
|
|
"latitude": spatial_dict["LATITUDE"],
|
|
"longitude": spatial_dict["LONGITUDE"],
|
|
"conservation_status": spatial_dict["conservation_status"],
|
|
"is_listed_building": spatial_dict["is_listed_building"],
|
|
"is_heritage_building": spatial_dict["is_heritage_building"],
|
|
}
|
|
|
|
def _clean_upload_data(self, to_update):
|
|
for k, v in to_update.items():
|
|
if v in self.DATA_ANOMALY_MATCHES:
|
|
to_update[k] = None
|
|
return to_update
|
|
|
|
def get_full_property_data(self):
|
|
"""
|
|
This method extracts the data which is pushed to the database, containing core information, from the EPC
|
|
about a property
|
|
:return:
|
|
"""
|
|
|
|
property_data = {
|
|
"creation_status": "READY",
|
|
"uprn": int(self.data["uprn"]),
|
|
"building_reference_number": int(self.data["building-reference-number"]),
|
|
"has_pre_condition_report": True,
|
|
"has_recommendations": True,
|
|
"property_type": self.data["property-type"],
|
|
"built_form": self.data["built-form"],
|
|
"local_authority": self.data["local-authority-label"],
|
|
"constituency": self.data["constituency-label"],
|
|
"number_of_rooms": self.number_of_rooms,
|
|
"year_built": self.year_built,
|
|
"tenure": self.data["tenure"],
|
|
"current_epc_rating": self.data["current-energy-rating"],
|
|
"current_sap_points": self.data["current-energy-efficiency"],
|
|
}
|
|
|
|
property_data = self._clean_upload_data(property_data)
|
|
|
|
return property_data
|
|
|
|
@classmethod
|
|
def _prepare_rating_field(cls, field, rating_lookup):
|
|
"""
|
|
Utility function for usage in the lambda, for preparing the _rating fields
|
|
"""
|
|
return rating_lookup[field].value if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None) else None
|
|
|
|
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
|
|
|
|
property_details_epc = {
|
|
"property_id": self.id,
|
|
"portfolio_id": portfolio_id,
|
|
"full_address": self.data["address"],
|
|
"total_floor_area": float(self.data["total-floor-area"]),
|
|
"walls": self.walls["clean_description"],
|
|
"walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"], rating_lookup),
|
|
"roof": self.roof["clean_description"],
|
|
"roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"], rating_lookup),
|
|
"floor": self.floor["clean_description"],
|
|
"floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"], rating_lookup),
|
|
"windows": self.windows["clean_description"],
|
|
"windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"], rating_lookup),
|
|
"heating": self.main_heating["clean_description"],
|
|
"heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"], rating_lookup),
|
|
"heating_controls": self.main_heating_controls["clean_description"],
|
|
"heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"], rating_lookup),
|
|
"hot_water": self.hotwater["clean_description"],
|
|
"hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"], rating_lookup),
|
|
"lighting": self.lighting["clean_description"],
|
|
"lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"], rating_lookup),
|
|
"mainfuel": self.main_fuel["clean_description"],
|
|
"ventilation": self.ventilation["ventilation"],
|
|
"solar_pv": self.solar_pv["solar_pv"],
|
|
"solar_hot_water": self.solar_hot_water["solar_hot_water_boolean"],
|
|
"wind_turbine": self.wind_turbine["wind_turbine"],
|
|
"floor_height": self.floor_height,
|
|
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"],
|
|
"unheated_corridor_length": self.heat_loss_corridor["length"],
|
|
"number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"],
|
|
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
|
|
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
|
|
"mains_gas": self.mains_gas,
|
|
"energy_tariff": self.data["energy-tariff"],
|
|
"primary_energy_consumption": self.energy["primary_energy_consumption"],
|
|
"co2_emissions": self.energy["co2_emissions"],
|
|
"adjusted_energy_consumption": self.current_adjusted_energy,
|
|
"estimated": self.data.get("estimated", False)
|
|
}
|
|
|
|
return property_details_epc
|
|
|
|
def get_spatial_data(self, uprn_filenames):
|
|
|
|
"""
|
|
Given a property's UPRN, this method will pull the associated spatial data from s3
|
|
:return:
|
|
"""
|
|
|
|
if self.uprn is None:
|
|
logger.warning("We do not have a UPRN for this property - this needs to be implemented")
|
|
self.in_conservation_area = False
|
|
self.is_listed = False
|
|
self.is_heritage = False
|
|
self.restricted_measures = True
|
|
return
|
|
|
|
# We get the file name for the uprn
|
|
filtered_df = uprn_filenames[(uprn_filenames['lower'] <= self.uprn) & (uprn_filenames['upper'] >= self.uprn)]
|
|
if filtered_df.empty:
|
|
logger.warning("Could not find file containing UPRNS")
|
|
return None
|
|
|
|
filename = filtered_df.iloc[0]['filenames']
|
|
|
|
spatial_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
|
|
)
|
|
|
|
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
|
|
|
|
# Pull out spatial features
|
|
self.set_spatial(spatial)
|
|
|
|
def _filter_property_dimensions(self, property_dimensions):
|
|
"""
|
|
Will filter the property dimensions dataframe to only include the relevant rows for the property
|
|
:param property_dimensions:
|
|
:return: filtered property dimensions dataframe
|
|
"""
|
|
|
|
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])]
|
|
|
|
if self.construction_age_band is not None and self.construction_age_band not in self.DATA_ANOMALY_MATCHES:
|
|
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
|
|
|
|
if self.data["built-form"] not in self.DATA_ANOMALY_MATCHES and self.data["built-form"] in result["BUILT_FORM"]:
|
|
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
|
|
|
|
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
|
|
|
|
def set_basic_property_dimensions(self):
|
|
"""
|
|
This method sets the number of floors of the property, using a simple approach based on an estimate for
|
|
average room size, number of rooms and total floor area
|
|
|
|
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
|
|
number of rooms and total floor area
|
|
|
|
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
|
|
medians across the EPC data
|
|
:return:
|
|
"""
|
|
|
|
# TODO: These functions should work on an EPCRecord object, so that the format is more standardised.
|
|
# They could also be added as attributes to the EPC Record
|
|
|
|
self.perimeter = estimate_perimeter(
|
|
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
|
|
)
|
|
|
|
self.insulation_wall_area = estimate_external_wall_area(
|
|
num_floors=self.number_of_floors,
|
|
floor_height=self.floor_height,
|
|
perimeter=self.perimeter,
|
|
built_form=self.data["built-form"],
|
|
)
|
|
|
|
self.insulation_floor_area = self.floor_area / self.number_of_floors
|
|
|
|
self.pitched_roof_area = esimtate_pitched_roof_area(
|
|
floor_area=self.insulation_floor_area, floor_height=self.floor_height
|
|
)
|
|
|
|
def set_floor_level(self):
|
|
self.floor_level = (
|
|
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
|
|
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
|
|
else None
|
|
)
|
|
|
|
if self.floor_level is None:
|
|
|
|
if self.data["property-type"] != "Flat":
|
|
return
|
|
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
else:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
|
|
# where a property is marked as being on the first floor
|
|
if self.floor_level > 0:
|
|
|
|
# We check if there is another property below
|
|
if not self.floor["another_property_below"]:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
if self.floor_level == 0:
|
|
# Check if another property below
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
return
|
|
|
|
def set_wall_type(self):
|
|
"""
|
|
This method sets the wall type of the property, using a simple approach based on the wall description
|
|
:return:
|
|
"""
|
|
self.wall_type = get_wall_type(**self.walls)
|
|
|
|
def set_floor_type(self):
|
|
"""
|
|
This method sets the floor type of the property, which is used for calculating u-values
|
|
|
|
Section 5.6 of the BRE indicates that
|
|
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
|
|
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
|
|
and the U-values in Table S12 are used.
|
|
|
|
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
|
|
calculating u-values
|
|
"""
|
|
|
|
if self.floor["is_suspended"] | self.floor["another_property_below"]:
|
|
self.floor_type = "suspended"
|
|
elif self.floor["is_solid"]:
|
|
self.floor_type = "solid"
|
|
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
|
|
self.floor_type = "exposed_floor"
|
|
elif self.floor["thermal_transmittance"] is not None:
|
|
self.floor_type = "solid"
|
|
else:
|
|
raise NotImplementedError("Implement this floor type")
|
|
|
|
@staticmethod
|
|
def _extract_component(component_data, component_rename_cols, component_drop_cols, rename_prefix=None):
|
|
for k in component_rename_cols:
|
|
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
|
|
|
|
component_data = {
|
|
k: v for k, v in component_data.items() if k not in component_drop_cols + component_rename_cols
|
|
}
|
|
|
|
return component_data
|
|
|
|
def set_adjusted_energy(self, current_adjusted_energy, expected_adjusted_energy):
|
|
"""
|
|
Stores these values for usage later
|
|
"""
|
|
self.current_adjusted_energy = current_adjusted_energy
|
|
self.expected_adjusted_energy = expected_adjusted_energy
|
|
|
|
def set_windows_count(self):
|
|
"""
|
|
Using the estimate_windows function, this method will set the number of windows in the property
|
|
:return:
|
|
"""
|
|
|
|
self.number_of_windows = estimate_windows(
|
|
property_type=self.data["property-type"],
|
|
built_form=self.data["built-form"],
|
|
construction_age_band=self.construction_age_band,
|
|
floor_area=self.floor_area,
|
|
number_habitable_rooms=self.number_of_rooms,
|
|
extension_count=float(self.data["extension-count"]),
|
|
)
|
|
|
|
def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds):
|
|
"""
|
|
Sets the approximate area of the solar panels
|
|
:return:
|
|
"""
|
|
|
|
if (self.insulation_floor_area is None) and (self.pitched_roof_area is None):
|
|
raise ValueError(
|
|
"Need to set insulation floor area and pitched roof area before setting solar pv roof area"
|
|
)
|
|
|
|
photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup(
|
|
photo_supply_lookup=photo_supply_lookup,
|
|
floor_area_decile_thresholds=floor_area_decile_thresholds,
|
|
tenure=self.data["tenure"],
|
|
built_form=self.data["built-form"],
|
|
property_type=self.data["property-type"],
|
|
construction_age_band=self.construction_age_band,
|
|
is_flat=self.roof["is_flat"],
|
|
is_pitched=self.roof["is_pitched"],
|
|
is_roof_room=self.roof["is_roof_room"],
|
|
floor_area=self.floor_area
|
|
)
|
|
|
|
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
|
|
percentage_of_roof = percentage_of_roof / 100
|
|
|
|
self.solar_pv_roof_area = (
|
|
self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else
|
|
self.pitched_roof_area * percentage_of_roof
|
|
)
|
|
|
|
self.solar_pv_percentage = percentage_of_roof
|