mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
831 lines
32 KiB
Python
831 lines
32 KiB
Python
from datetime import datetime
|
|
import re
|
|
import os
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from etl.epc.DataProcessor import DataProcessor
|
|
from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
|
|
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet
|
|
from epc_api.client import EpcClient
|
|
from BaseUtility import Definitions
|
|
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
|
|
from recommendations.recommendation_utils import (
|
|
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
|
|
)
|
|
|
|
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
|
|
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class Property(Definitions):
|
|
ATTRIBUTE_MAP = {
|
|
"floor-description": "floor",
|
|
"hotwater-description": "hotwater",
|
|
"main-fuel": "main_fuel",
|
|
"mainheat-description": "main_heating",
|
|
"mainheatcont-description": "main_heating_controls",
|
|
"roof-description": "roof",
|
|
"walls-description": "walls",
|
|
"windows-description": "windows",
|
|
"lighting-description": "lighting"
|
|
}
|
|
|
|
floor = None
|
|
hotwater = None
|
|
main_fuel = None
|
|
main_heating = None
|
|
main_heating_controls = None
|
|
roof = None
|
|
walls = None
|
|
windows = None
|
|
lighting = None
|
|
|
|
spatial = None
|
|
|
|
def __init__(self, id, address, postcode, data=None, old_data=None, full_sap_epc=None):
|
|
self.id = id
|
|
|
|
self.address = address
|
|
self.postcode = postcode
|
|
self.data = data
|
|
self.old_data = old_data
|
|
self.full_sap_epc = full_sap_epc
|
|
self.property_dimensions = None
|
|
|
|
self.uprn = None if data is not None else data["uprn"]
|
|
|
|
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
|
|
self.restricted_measures = False
|
|
self.year_built = None
|
|
self.number_of_rooms = None
|
|
self.age_band = None
|
|
self.construction_age_band = None
|
|
self.number_of_floors = None
|
|
self.perimeter = None
|
|
self.wall_type = None
|
|
self.floor_type = None
|
|
|
|
self.energy = None
|
|
self.ventilation = None
|
|
self.solar_pv = None
|
|
self.solar_hot_water = None
|
|
self.wind_turbine = None
|
|
self.number_of_open_fireplaces = None
|
|
self.number_of_extensions = None
|
|
self.number_of_storeys = None
|
|
self.heat_loss_corridor = None
|
|
self.mains_gas = None
|
|
self.floor_height = None
|
|
self.insulation_wall_area = None
|
|
self.floor_area = None
|
|
self.pitched_roof_area = None
|
|
self.insulation_floor_area = None
|
|
self.number_lighting_outlets = None
|
|
self.floor_level = None
|
|
self.number_of_windows = None
|
|
|
|
self.current_adjusted_energy = None
|
|
self.expected_adjusted_energy = None
|
|
|
|
def set_energy(self):
|
|
"""
|
|
Extracts and formats data about the home's energy and co2 consumption
|
|
To being with, this is just formatting epc data
|
|
|
|
Data:
|
|
- primary_energy_consumption
|
|
This is based on the "energy-consumption-current" field in the EPC data.
|
|
Current estimated total energy consumption for the property in a 12 month period (kWh/m2). Displayed on EPC
|
|
as the current primary energy use per square metre of floor area.
|
|
|
|
- co2_emissions
|
|
This is based on the "co2-emissions-current" field in the EPC data.
|
|
CO₂ emissions per year in tonnes/year.
|
|
"""
|
|
|
|
self.energy = {
|
|
"primary_energy_consumption": float(self.data["energy-consumption-current"]),
|
|
"co2_emissions": float(self.data["co2-emissions-current"]),
|
|
}
|
|
|
|
def set_ventilation(self):
|
|
"""
|
|
Extracts and formats data about the home's ventilation
|
|
To being with, this is just formatting epc data
|
|
|
|
Data:
|
|
- ventilation
|
|
This is based on the "ventilation-type" field in the EPC data.
|
|
Ventilation type of the property.
|
|
"""
|
|
|
|
ventilation = self.data["mechanical-ventilation"]
|
|
# perform some simple cleaning - when checking 300k epc, the only unique values were
|
|
# {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'}
|
|
if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]:
|
|
ventilation = None
|
|
|
|
self.ventilation = {
|
|
"ventilation": ventilation,
|
|
}
|
|
|
|
def set_solar_pv(self):
|
|
"""
|
|
Extracts and formats data about the home's solar pv
|
|
To being with, this is just formatting epc data
|
|
|
|
Data:
|
|
- solar_pv
|
|
This is based on the "photo-supply" field in the EPC data.
|
|
|
|
When checking 100k epc, either the value was "" or a stringified number
|
|
"""
|
|
|
|
solar_pv = self.data["photo-supply"]
|
|
if solar_pv == "":
|
|
solar_pv = None
|
|
else:
|
|
solar_pv = float(solar_pv)
|
|
|
|
self.solar_pv = {
|
|
"solar_pv": solar_pv,
|
|
}
|
|
|
|
def set_solar_hot_water(self):
|
|
"""
|
|
Extracts and formats data about the home's solar hot water
|
|
We are just formatting the solar-water-heating-flag in the epc data
|
|
:return:
|
|
"""
|
|
|
|
value_map = {
|
|
"Y": True,
|
|
"N": False,
|
|
"": None,
|
|
}
|
|
|
|
self.solar_hot_water = {
|
|
"solar_hot_water": value_map[self.data["solar-water-heating-flag"]],
|
|
}
|
|
|
|
def set_wind_turbine(self):
|
|
"""
|
|
Extracts and formats data about the home's wind turbine
|
|
We are just formatting the wind-turbine-flag in the epc data
|
|
:return:
|
|
"""
|
|
|
|
wind_turbine_count = self.data["wind-turbine-count"]
|
|
if wind_turbine_count == "":
|
|
wind_turbine_count = None
|
|
else:
|
|
wind_turbine_count = int(wind_turbine_count)
|
|
|
|
self.wind_turbine = {
|
|
"wind_turbine": wind_turbine_count,
|
|
}
|
|
|
|
def set_count_variables(self):
|
|
|
|
"""
|
|
For EPC fields that are just counts, we'll set them here
|
|
These are fields that are integers but may contain additional values such as "" so we can't do a direct
|
|
conversion straight to an integer
|
|
:return:
|
|
"""
|
|
|
|
fields = {
|
|
"number_of_open_fireplaces": "number-open-fireplaces",
|
|
"number_of_extensions": "extension-count",
|
|
"number_of_storeys": "flat-storey-count",
|
|
"number_of_rooms": "number-habitable-rooms",
|
|
}
|
|
|
|
null_attributes = ["number_of_storeys", "number_of_rooms"]
|
|
|
|
for attribute, epc_field in fields.items():
|
|
value = self.data["extension-count"]
|
|
if value == "" or value in self.DATA_ANOMALY_MATCHES:
|
|
if attribute in null_attributes:
|
|
value = None
|
|
else:
|
|
value = 0
|
|
else:
|
|
value = int(value)
|
|
|
|
setattr(self, attribute, value)
|
|
|
|
def get_components(self, cleaned):
|
|
"""
|
|
Given the cleaning that has been performed, we'll use this to identify the property
|
|
components, from roof to walls to windows, heating and hot water
|
|
:param cleaned: This is the dictionary of components found in cleaner.cleaned
|
|
:return:
|
|
"""
|
|
|
|
if not cleaned:
|
|
raise ValueError("Cleaner does not contain cleaned data")
|
|
|
|
if not self.data:
|
|
raise ValueError("Property does not contain data")
|
|
|
|
# We need to implement an EPC cleaning process, which we run on the EPC data, immediately after we download
|
|
# it
|
|
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
|
|
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
|
|
if self.data["property-type"] == "Flat":
|
|
self.data["built-form"] = "Semi-Detached"
|
|
|
|
self.set_year_built()
|
|
self.set_energy()
|
|
self.set_ventilation()
|
|
self.set_solar_pv()
|
|
self.set_solar_hot_water()
|
|
self.set_wind_turbine()
|
|
self.set_count_variables()
|
|
self.set_heat_loss_corridor()
|
|
self.set_mains_gas()
|
|
self.set_age_band()
|
|
|
|
self.set_basic_property_dimensions()
|
|
|
|
for description, attribute in cleaned.items():
|
|
|
|
if self.data[description] in self.DATA_ANOMALY_MATCHES:
|
|
template = cleaned[description][0]
|
|
fill_dict = dict(zip(template.keys(), [None] * len(template)))
|
|
fill_dict.update({
|
|
"original_description": self.data[description],
|
|
"clean_description": self.data[description],
|
|
})
|
|
setattr(
|
|
self,
|
|
self.ATTRIBUTE_MAP[description],
|
|
fill_dict,
|
|
)
|
|
continue
|
|
|
|
attributes = [
|
|
x for x in cleaned[description] if x["original_description"] == self.data[description]
|
|
]
|
|
if len(attributes) > 1:
|
|
raise ValueError("Either No attributes or multiple found for %s" % description)
|
|
|
|
if len(attributes) == 0:
|
|
# We attempt to perform the clean on the fly
|
|
cleaner_cls = all_cleaner_map[description]
|
|
cleaner_cls = cleaner_cls(self.data[description])
|
|
processed = {
|
|
"original_description": self.data[description],
|
|
"clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(),
|
|
**cleaner_cls.process()
|
|
}
|
|
|
|
attributes = [processed]
|
|
|
|
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
|
|
|
|
self.set_wall_type()
|
|
self.set_floor_type()
|
|
self.set_floor_level()
|
|
self.set_windows_count()
|
|
|
|
def set_age_band(self):
|
|
"""
|
|
Sets a cleaned version of the age band of the property given the EPC data
|
|
:return:
|
|
"""
|
|
|
|
if not self.data:
|
|
raise ValueError("Property does not contain data")
|
|
|
|
self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
|
|
if self.construction_age_band in self.DATA_ANOMALY_MATCHES:
|
|
if self.old_data:
|
|
# Take the most recent
|
|
max_datetime = max(
|
|
[x["lodgement-datetime"] for x in self.old_data if
|
|
x["construction-age-band"] not in self.DATA_ANOMALY_MATCHES]
|
|
)
|
|
most_recent = [x for x in self.old_data if x["lodgement-datetime"] == max_datetime]
|
|
|
|
self.construction_age_band = DataProcessor.clean_construction_age_band(
|
|
most_recent[0]["construction-age-band"]
|
|
)
|
|
|
|
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
|
|
|
|
if (self.data["transaction-type"] == "new dwelling") and (self.age_band is None):
|
|
self.age_band = "L"
|
|
self.construction_age_band = 'England and Wales: 2012 onwards'
|
|
|
|
if self.age_band is None:
|
|
raise ValueError("age_band is missing")
|
|
|
|
def set_spatial(self, spatial: pd.DataFrame):
|
|
"""
|
|
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
|
|
|
|
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
|
|
|
|
:param spatial: Dataframe, containing the spatial data for the property
|
|
"""
|
|
self.in_conservation_area = spatial["conservation_status"].values[0]
|
|
self.is_listed = spatial["is_listed_building"].values[0]
|
|
self.is_heritage = spatial["is_heritage_building"].values[0]
|
|
|
|
# We do an equals True, in the case of one of these variables being True
|
|
if (self.in_conservation_area == True) | (self.is_listed == True) | (self.is_heritage == True):
|
|
self.restricted_measures = True
|
|
|
|
spatial_dict = spatial.to_dict("records")[0]
|
|
self.spatial = {
|
|
"x_coordinate": spatial_dict["X_COORDINATE"],
|
|
"y_coordinate": spatial_dict["Y_COORDINATE"],
|
|
"latitude": spatial_dict["LATITUDE"],
|
|
"longitude": spatial_dict["LONGITUDE"],
|
|
"conservation_status": spatial_dict["conservation_status"],
|
|
"is_listed_building": spatial_dict["is_listed_building"],
|
|
"is_heritage_building": spatial_dict["is_heritage_building"],
|
|
}
|
|
|
|
def set_year_built(self):
|
|
"""
|
|
Estimates when the property was built based on as much available data as possible.
|
|
|
|
"""
|
|
|
|
if self.full_sap_epc:
|
|
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
|
|
|
|
return
|
|
|
|
if self.data["construction-age-band"] not in self.DATA_ANOMALY_MATCHES:
|
|
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
|
|
# more options for recommendations if that age falls before the year that insulation in walls became
|
|
# common practice
|
|
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.data["construction-age-band"])]
|
|
self.year_built = band[0]
|
|
return
|
|
|
|
# We don't know when the property was built
|
|
self.year_built = None
|
|
|
|
def set_heat_loss_corridor(self):
|
|
"""
|
|
cleans the heat-loss-corridor
|
|
:return:
|
|
"""
|
|
map = {
|
|
"no corridor": False,
|
|
"unheated corridor": True,
|
|
"heated corridor": False
|
|
}
|
|
|
|
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
|
|
has_heat_loss_corridor = False
|
|
else:
|
|
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
|
|
|
|
length = self.data["unheated-corridor-length"]
|
|
if length == "":
|
|
length = None
|
|
else:
|
|
length = float(length)
|
|
|
|
self.heat_loss_corridor = {
|
|
"heat_loss_corridor": has_heat_loss_corridor,
|
|
"length": length
|
|
}
|
|
|
|
def set_mains_gas(self):
|
|
"""
|
|
Sets whether the property has mains gas
|
|
:return:
|
|
"""
|
|
|
|
map = {
|
|
"Y": True,
|
|
"N": False,
|
|
}
|
|
|
|
if self.data["mains-gas-flag"] == "" or self.data["mains-gas-flag"] in self.DATA_ANOMALY_MATCHES:
|
|
self.mains_gas = None
|
|
else:
|
|
self.mains_gas = map[self.data["mains-gas-flag"]]
|
|
|
|
def _clean_upload_data(self, to_update):
|
|
for k, v in to_update.items():
|
|
if v in self.DATA_ANOMALY_MATCHES:
|
|
to_update[k] = None
|
|
return to_update
|
|
|
|
def get_full_property_data(self):
|
|
"""
|
|
This method extracts the data which is pushed to the database, containing core information, from the EPC
|
|
about a property
|
|
:return:
|
|
"""
|
|
|
|
property_data = {
|
|
"creation_status": "READY",
|
|
"uprn": int(self.data["uprn"]),
|
|
"building_reference_number": int(self.data["building-reference-number"]),
|
|
"has_pre_condition_report": True,
|
|
"has_recommendations": True,
|
|
"property_type": self.data["property-type"],
|
|
"built_form": self.data["built-form"],
|
|
"local_authority": self.data["local-authority-label"],
|
|
"constituency": self.data["constituency-label"],
|
|
"number_of_rooms": self.number_of_rooms,
|
|
"year_built": self.year_built,
|
|
"tenure": self.data["tenure"],
|
|
"current_epc_rating": self.data["current-energy-rating"],
|
|
"current_sap_points": self.data["current-energy-efficiency"],
|
|
}
|
|
|
|
property_data = self._clean_upload_data(property_data)
|
|
|
|
return property_data
|
|
|
|
@classmethod
|
|
def _prepare_rating_field(cls, field, rating_lookup):
|
|
"""
|
|
Utility function for usage in the lambda, for preparing the _rating fields
|
|
"""
|
|
return rating_lookup[field].value if field not in cls.DATA_ANOMALY_MATCHES else None
|
|
|
|
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
|
|
|
|
property_details_epc = {
|
|
"property_id": self.id,
|
|
"portfolio_id": portfolio_id,
|
|
"full_address": self.data["address"],
|
|
"total_floor_area": float(self.data["total-floor-area"]),
|
|
"walls": self.walls["clean_description"],
|
|
"walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"], rating_lookup),
|
|
"roof": self.roof["clean_description"],
|
|
"roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"], rating_lookup),
|
|
"floor": self.floor["clean_description"],
|
|
"floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"], rating_lookup),
|
|
"windows": self.windows["clean_description"],
|
|
"windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"], rating_lookup),
|
|
"heating": self.main_heating["clean_description"],
|
|
"heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"], rating_lookup),
|
|
"heating_controls": self.main_heating_controls["clean_description"],
|
|
"heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"], rating_lookup),
|
|
"hot_water": self.hotwater["clean_description"],
|
|
"hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"], rating_lookup),
|
|
"lighting": self.lighting["clean_description"],
|
|
"lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"], rating_lookup),
|
|
"mainfuel": self.main_fuel["clean_description"],
|
|
"ventilation": self.ventilation["ventilation"],
|
|
"solar_pv": self.solar_pv["solar_pv"],
|
|
"solar_hot_water": self.solar_hot_water["solar_hot_water"],
|
|
"wind_turbine": self.wind_turbine["wind_turbine"],
|
|
"floor_height": self.floor_height,
|
|
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
|
|
"unheated_corridor_length": self.heat_loss_corridor["length"],
|
|
"number_of_open_fireplaces": self.number_of_open_fireplaces,
|
|
"number_of_extensions": self.number_of_extensions,
|
|
"number_of_storeys": self.number_of_storeys,
|
|
"mains_gas": self.mains_gas,
|
|
"energy_tariff": self.data["energy-tariff"],
|
|
"primary_energy_consumption": self.energy["primary_energy_consumption"],
|
|
"co2_emissions": self.energy["co2_emissions"],
|
|
"adjusted_energy_consumption": self.current_adjusted_energy,
|
|
"estimated": self.data.get("estimated", False)
|
|
}
|
|
|
|
return property_details_epc
|
|
|
|
def get_spatial_data(self, uprn_filenames):
|
|
|
|
"""
|
|
Given a property's UPRN, this method will pull the associated spatial data from s3
|
|
:return:
|
|
"""
|
|
|
|
if self.uprn is None:
|
|
logger.warning("We do not have a UPRN for this property - this needs to be implemented")
|
|
self.in_conservation_area = False
|
|
self.is_listed = False
|
|
self.is_heritage = False
|
|
self.restricted_measures = True
|
|
return
|
|
|
|
# We get the file name for the uprn
|
|
filtered_df = uprn_filenames[(uprn_filenames['lower'] <= self.uprn) & (uprn_filenames['upper'] >= self.uprn)]
|
|
if filtered_df.empty:
|
|
logger.warning("Could not find file containing UPRNS")
|
|
return None
|
|
|
|
filename = filtered_df.iloc[0]['filenames']
|
|
|
|
spatial_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
|
|
)
|
|
|
|
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
|
|
|
|
# Pull out spatial features
|
|
self.set_spatial(spatial)
|
|
|
|
def _filter_property_dimensions(self, property_dimensions):
|
|
"""
|
|
Will filter the property dimensions dataframe to only include the relevant rows for the property
|
|
:param property_dimensions:
|
|
:return: filtered property dimensions dataframe
|
|
"""
|
|
|
|
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])]
|
|
|
|
if self.construction_age_band is not None and self.construction_age_band not in self.DATA_ANOMALY_MATCHES:
|
|
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
|
|
|
|
if self.data["built-form"] not in self.DATA_ANOMALY_MATCHES and self.data["built-form"] in result["BUILT_FORM"]:
|
|
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
|
|
|
|
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
|
|
|
|
def set_basic_property_dimensions(self):
|
|
"""
|
|
This method sets the number of floors of the property, using a simple approach based on an estimate for
|
|
average room size, number of rooms and total floor area
|
|
|
|
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
|
|
number of rooms and total floor area
|
|
|
|
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
|
|
medians across the EPC data
|
|
:return:
|
|
"""
|
|
|
|
self.floor_area = float(self.data["total-floor-area"])
|
|
|
|
if not self.data["number-habitable-rooms"] or (
|
|
self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
|
|
):
|
|
if self.property_dimensions is None:
|
|
property_dimensions = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.data['local-authority']}.parquet"
|
|
)
|
|
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
|
|
|
|
if not self.data["number-habitable-rooms"]:
|
|
self.number_of_rooms = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
|
|
else:
|
|
self.number_of_rooms = float(self.data["number-habitable-rooms"])
|
|
|
|
if self.data["property-type"] == "House":
|
|
self.number_of_floors = 2
|
|
elif self.data["property-type"] in ["Flat", "Bungalow"]:
|
|
self.number_of_floors = 1
|
|
elif self.data["property-type"] == "Maisonette":
|
|
self.number_of_floors = 2
|
|
else:
|
|
raise NotImplementedError("Implement me")
|
|
|
|
if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
|
|
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
|
|
else:
|
|
self.floor_height = float(self.data["floor-height"])
|
|
|
|
self.perimeter = estimate_perimeter(
|
|
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
|
|
)
|
|
|
|
self.insulation_wall_area = estimate_external_wall_area(
|
|
num_floors=self.number_of_floors,
|
|
floor_height=self.floor_height,
|
|
perimeter=self.perimeter,
|
|
built_form=self.data["built-form"],
|
|
)
|
|
|
|
self.insulation_floor_area = self.floor_area / self.number_of_floors
|
|
|
|
self.pitched_roof_area = esimtate_pitched_roof_area(
|
|
floor_area=self.insulation_floor_area, floor_height=self.floor_height
|
|
)
|
|
|
|
def set_floor_level(self):
|
|
self.floor_level = (
|
|
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
|
|
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None
|
|
)
|
|
|
|
if self.floor_level is None:
|
|
|
|
if self.data["property-type"] != "Flat":
|
|
return
|
|
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
else:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
|
|
# where a property is marked as being on the first floor
|
|
if self.floor_level > 0:
|
|
|
|
# We check if there is another property below
|
|
if not self.floor["another_property_below"]:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
if self.floor_level == 0:
|
|
# Check if another property below
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
return
|
|
|
|
def set_wall_type(self):
|
|
"""
|
|
This method sets the wall type of the property, using a simple approach based on the wall description
|
|
:return:
|
|
"""
|
|
self.wall_type = get_wall_type(**self.walls)
|
|
|
|
def set_floor_type(self):
|
|
"""
|
|
This method sets the floor type of the property, which is used for calculating u-values
|
|
|
|
Section 5.6 of the BRE indicates that
|
|
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
|
|
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
|
|
and the U-values in Table S12 are used.
|
|
|
|
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
|
|
calculating u-values
|
|
"""
|
|
|
|
if self.floor["is_suspended"] | self.floor["another_property_below"]:
|
|
self.floor_type = "suspended"
|
|
elif self.floor["is_solid"]:
|
|
self.floor_type = "solid"
|
|
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
|
|
self.floor_type = "exposed_floor"
|
|
elif self.floor["thermal_transmittance"] is not None:
|
|
self.floor_type = "solid"
|
|
else:
|
|
raise NotImplementedError("Implement this floor type")
|
|
|
|
@staticmethod
|
|
def _extract_component(component_data, component_rename_cols, component_drop_cols, rename_prefix=None):
|
|
for k in component_rename_cols:
|
|
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
|
|
|
|
component_data = {
|
|
k: v for k, v in component_data.items() if k not in component_drop_cols + component_rename_cols
|
|
}
|
|
|
|
return component_data
|
|
|
|
def get_model_data(self):
|
|
"""
|
|
This method extracts cleaned data from the property object, which is used in our machine learning models
|
|
|
|
This will use many of the cleaned properties, extracted from the epc data, or methods in DataProcessor.
|
|
|
|
For future iterations of this, we probably want to implement a singular method in DataProcessor, which can
|
|
be used in the etl code and in here
|
|
|
|
:return: dictionary of model data to be scored in the model
|
|
"""
|
|
|
|
drop_cols = ["original_description", "clean_description"]
|
|
insulation_drop_cols = ["thermal_transmittance_unit", "is_assumed", "is_valid"]
|
|
insulation_rename_cols = ["thermal_transmittance", "insulation_thickness"]
|
|
|
|
walls = self._extract_component(self.walls, insulation_rename_cols, insulation_drop_cols + drop_cols, "walls")
|
|
roof = self._extract_component(self.roof, insulation_rename_cols, insulation_drop_cols + drop_cols, "roof")
|
|
floor = self._extract_component(self.floor, insulation_rename_cols, insulation_drop_cols + drop_cols, "floor")
|
|
|
|
windows = self._extract_component(self.windows, [], drop_cols + ["no_data"])
|
|
fuel = self._extract_component(self.main_fuel, ["tariff_type"], drop_cols + ["tariff_type"], "main-fuel")
|
|
main_heating = self._extract_component(self.main_heating, [], drop_cols + ["has_assumed"])
|
|
main_heating_controls = self._extract_component(self.main_heating_controls, [], drop_cols)
|
|
hotwater = self._extract_component(self.hotwater, ["tariff_type"], drop_cols + ['assumed'], "hotwater")
|
|
|
|
# We'll need to clean second heating
|
|
second_heating = self.data["secondheat-description"]
|
|
|
|
epc_raw_columns = POTENTIAL_COLUMNS + EFFICIENCY_FEATURES + [
|
|
'TRANSACTION_TYPE',
|
|
'ENERGY_TARIFF',
|
|
'PROPERTY_TYPE',
|
|
'UPRN',
|
|
'NUMBER_OPEN_FIREPLACES',
|
|
'MULTI_GLAZE_PROPORTION',
|
|
'MECHANICAL_VENTILATION',
|
|
'PHOTO_SUPPLY',
|
|
'LOW_ENERGY_LIGHTING',
|
|
'SOLAR_WATER_HEATING_FLAG',
|
|
'GLAZED_TYPE',
|
|
'CONSTITUENCY',
|
|
'NUMBER_HEATED_ROOMS',
|
|
'EXTENSION_COUNT',
|
|
]
|
|
epc_raw_data = {
|
|
k: self.data[k.lower().replace("_", "-")] for k in epc_raw_columns
|
|
}
|
|
|
|
built_form_cleaning_map = {
|
|
"Flat": "Mid-Terrace",
|
|
"House": "Semi-Detached",
|
|
"Bungalow": "Detached",
|
|
"Maisonette": "Mid-Terrace"
|
|
}
|
|
|
|
built_form = self.data["built-form"]
|
|
if built_form in self.DATA_ANOMALY_MATCHES:
|
|
# TODO: If built form isn't captured, we use the most common value for that property type - we shall
|
|
# improve this methodology
|
|
built_form = built_form_cleaning_map.get(self.data["property-type"])
|
|
if not built_form:
|
|
raise NotImplementedError("Not handled this property type when cleaning built form")
|
|
|
|
property_data = {
|
|
**walls,
|
|
**roof,
|
|
**floor,
|
|
**fuel,
|
|
**main_heating,
|
|
**main_heating_controls,
|
|
**hotwater,
|
|
**windows,
|
|
"SECONDHEAT_DESCRIPTION": second_heating,
|
|
"DAYS_TO": DataProcessor.calculate_days_to(self.data["lodgement-date"]),
|
|
"SAP": float(self.data["current-energy-efficiency"]),
|
|
"CARBON": float(self.data["co2-emissions-current"]),
|
|
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),
|
|
"estimated_perimeter": self.perimeter,
|
|
"CONSTRUCTION_AGE_BAND": self.construction_age_band,
|
|
"FLOOR_HEIGHT": self.floor_height,
|
|
"NUMBER_HABITABLE_ROOMS": self.number_of_rooms,
|
|
"TOTAL_FLOOR_AREA": self.floor_area,
|
|
"FIXED_LIGHTING_OUTLETS_COUNT": self.number_lighting_outlets,
|
|
**epc_raw_data,
|
|
"BUILT_FORM": built_form,
|
|
"POSTCODE": self.data["postcode"],
|
|
}
|
|
|
|
return property_data
|
|
|
|
def set_number_lighting_outlets(self, cleaned_property_data):
|
|
"""
|
|
Extracts and cleans the estimated number of lighting outlets
|
|
:return:
|
|
"""
|
|
|
|
if self.data["fixed-lighting-outlets-count"] == "":
|
|
|
|
# We check old EPCs and the full SAP EPC
|
|
|
|
lighting_data = []
|
|
|
|
if len(self.old_data):
|
|
lighting_data.extend([
|
|
int(x["fixed-lighting-outlets-count"]) for x in self.old_data if
|
|
x["fixed-lighting-outlets-count"] != ""
|
|
])
|
|
|
|
if len(self.full_sap_epc):
|
|
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
|
|
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
|
|
|
|
if lighting_data:
|
|
self.number_lighting_outlets = round(np.median(lighting_data))
|
|
else:
|
|
self.number_lighting_outlets = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
|
|
else:
|
|
self.number_lighting_outlets = float(self.data["fixed-lighting-outlets-count"])
|
|
|
|
def set_adjusted_energy(self, current_adjusted_energy, expected_adjusted_energy):
|
|
"""
|
|
Stores these values for usage later
|
|
"""
|
|
self.current_adjusted_energy = current_adjusted_energy
|
|
self.expected_adjusted_energy = expected_adjusted_energy
|
|
|
|
def set_windows_count(self):
|
|
"""
|
|
Using the estimate_windows function, this method will set the number of windows in the property
|
|
:return:
|
|
"""
|
|
|
|
self.number_of_windows = estimate_windows(
|
|
property_type=self.data["property-type"],
|
|
built_form=self.data["built-form"],
|
|
construction_age_band=self.construction_age_band,
|
|
floor_area=self.floor_area,
|
|
number_habitable_rooms=self.number_of_rooms,
|
|
extension_count=float(self.data["extension-count"]),
|
|
)
|