Model/backend/Property.py
2023-12-20 10:37:52 +00:00

869 lines
34 KiB
Python

from datetime import datetime
import re
import os
import numpy as np
import pandas as pd
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from epc_api.client import EpcClient
from BaseUtility import Definitions
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
logger = setup_logger()
class Property(Definitions):
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",
"main-fuel": "main_fuel",
"mainheat-description": "main_heating",
"mainheatcont-description": "main_heating_controls",
"roof-description": "roof",
"walls-description": "walls",
"windows-description": "windows",
"lighting-description": "lighting"
}
floor = None
hotwater = None
main_fuel = None
main_heating = None
main_heating_controls = None
roof = None
walls = None
windows = None
lighting = None
spatial = None
def __init__(self, id, postcode, address1, epc_client=None, data=None):
self.id = id
self.postcode = postcode
self.address1 = address1
self.data = data
self.old_data = None
self.property_dimensions = None
self.uprn = None
self.full_sap_epc = None
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = None
self.number_of_rooms = None
self.age_band = None
self.construction_age_band = None
self.number_of_floors = None
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy = None
self.ventilation = None
self.solar_pv = None
self.solar_hot_water = None
self.wind_turbine = None
self.number_of_open_fireplaces = None
self.number_of_extensions = None
self.number_of_storeys = None
self.heat_loss_corridor = None
self.mains_gas = None
self.floor_height = None
self.insulation_wall_area = None
self.floor_area = None
self.pitched_roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = None
self.floor_level = None
self.number_of_windows = None
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
if epc_client:
self.epc_client = epc_client
else:
self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
def search_address_epc(self):
"""
This method searches for an address in the EPC database and returns the first result
:return: property data
"""
if self.data:
return
# This will fail if a property does not have an EPC - this has been documented as a case to handle
response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
# Check if we have a full sap EPC
self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"]
self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc
if len(response["rows"]) > 1:
newest_response = [
r for r in response["rows"] if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]])
]
if len(newest_response) > 1:
raise Exception("More than one result found for this address - investigate me")
# We'll keep old EPCs in case it contains information, not present on the newest one
self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
response["rows"] = newest_response
self.data = response["rows"][0]
# For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in
# the future by using the Ordnance Survey places API
if not self.data["uprn"]:
logger.warning("We do not have a UPRN for this property")
else:
self.uprn = int(self.data["uprn"])
def set_energy(self):
"""
Extracts and formats data about the home's energy and co2 consumption
To being with, this is just formatting epc data
Data:
- primary_energy_consumption
This is based on the "energy-consumption-current" field in the EPC data.
Current estimated total energy consumption for the property in a 12 month period (kWh/m2). Displayed on EPC
as the current primary energy use per square metre of floor area.
- co2_emissions
This is based on the "co2-emissions-current" field in the EPC data.
CO₂ emissions per year in tonnes/year.
"""
self.energy = {
"primary_energy_consumption": float(self.data["energy-consumption-current"]),
"co2_emissions": float(self.data["co2-emissions-current"]),
}
def set_ventilation(self):
"""
Extracts and formats data about the home's ventilation
To being with, this is just formatting epc data
Data:
- ventilation
This is based on the "ventilation-type" field in the EPC data.
Ventilation type of the property.
"""
ventilation = self.data["mechanical-ventilation"]
# perform some simple cleaning - when checking 300k epc, the only unique values were
# {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'}
if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]:
ventilation = None
self.ventilation = {
"ventilation": ventilation,
}
def set_solar_pv(self):
"""
Extracts and formats data about the home's solar pv
To being with, this is just formatting epc data
Data:
- solar_pv
This is based on the "photo-supply" field in the EPC data.
When checking 100k epc, either the value was "" or a stringified number
"""
solar_pv = self.data["photo-supply"]
if solar_pv == "":
solar_pv = None
else:
solar_pv = float(solar_pv)
self.solar_pv = {
"solar_pv": solar_pv,
}
def set_solar_hot_water(self):
"""
Extracts and formats data about the home's solar hot water
We are just formatting the solar-water-heating-flag in the epc data
:return:
"""
value_map = {
"Y": True,
"N": False,
"": None,
}
self.solar_hot_water = {
"solar_hot_water": value_map[self.data["solar-water-heating-flag"]],
}
def set_wind_turbine(self):
"""
Extracts and formats data about the home's wind turbine
We are just formatting the wind-turbine-flag in the epc data
:return:
"""
wind_turbine_count = self.data["wind-turbine-count"]
if wind_turbine_count == "":
wind_turbine_count = None
else:
wind_turbine_count = int(wind_turbine_count)
self.wind_turbine = {
"wind_turbine": wind_turbine_count,
}
def set_count_variables(self):
"""
For EPC fields that are just counts, we'll set them here
These are fields that are integers but may contain additional values such as "" so we can't do a direct
conversion straight to an integer
:return:
"""
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
value = self.data["extension-count"]
if value == "" or value in self.DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
setattr(self, attribute, value)
def get_components(self, cleaned):
"""
Given the cleaning that has been performed, we'll use this to identify the property
components, from roof to walls to windows, heating and hot water
:param cleaned: This is the dictionary of components found in cleaner.cleaned
:return:
"""
if not cleaned:
raise ValueError("Cleaner does not contain cleaned data")
if not self.data:
raise ValueError("Property does not contain data")
# We need to implement an EPC cleaning process, which we run on the EPC data, immediately after we download
# it
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
self.set_energy()
self.set_ventilation()
self.set_solar_pv()
self.set_solar_hot_water()
self.set_wind_turbine()
self.set_count_variables()
self.set_heat_loss_corridor()
self.set_mains_gas()
self.set_age_band()
self.set_basic_property_dimensions()
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
fill_dict.update({
"original_description": self.data[description],
"clean_description": self.data[description],
})
setattr(
self,
self.ATTRIBUTE_MAP[description],
fill_dict,
)
continue
attributes = [
x for x in cleaned[description] if x["original_description"] == self.data[description]
]
if len(attributes) > 1:
raise ValueError("Either No attributes or multiple found for %s" % description)
if len(attributes) == 0:
# We attempt to perform the clean on the fly
cleaner_cls = all_cleaner_map[description]
cleaner_cls = cleaner_cls(self.data[description])
processed = {
"original_description": self.data[description],
"clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(),
**cleaner_cls.process()
}
attributes = [processed]
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
self.set_wall_type()
self.set_floor_type()
self.set_floor_level()
self.set_windows_count()
def set_age_band(self):
"""
Sets a cleaned version of the age band of the property given the EPC data
:return:
"""
if not self.data:
raise ValueError("Property does not contain data")
self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
if self.construction_age_band in self.DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[x["lodgement-datetime"] for x in self.old_data if
x["construction-age-band"] not in self.DATA_ANOMALY_MATCHES]
)
most_recent = [x for x in self.old_data if x["lodgement-datetime"] == max_datetime]
self.construction_age_band = DataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.data["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
:param spatial: Dataframe, containing the spatial data for the property
"""
self.in_conservation_area = spatial["conservation_status"].values[0]
self.is_listed = spatial["is_listed_building"].values[0]
self.is_heritage = spatial["is_heritage_building"].values[0]
# We do an equals True, in the case of one of these variables being True
if (self.in_conservation_area == True) | (self.is_listed == True) | (self.is_heritage == True):
self.restricted_measures = True
spatial_dict = spatial.to_dict("records")[0]
self.spatial = {
"x_coordinate": spatial_dict["X_COORDINATE"],
"y_coordinate": spatial_dict["Y_COORDINATE"],
"latitude": spatial_dict["LATITUDE"],
"longitude": spatial_dict["LONGITUDE"],
"conservation_status": spatial_dict["conservation_status"],
"is_listed_building": spatial_dict["is_listed_building"],
"is_heritage_building": spatial_dict["is_heritage_building"],
}
def set_year_built(self):
"""
Estimates when the property was built based on as much available data as possible.
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.data["construction-age-band"] not in self.DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.data["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def set_heat_loss_corridor(self):
"""
cleans the heat-loss-corridor
:return:
"""
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
}
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
has_heat_loss_corridor = False
else:
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
length = self.data["unheated-corridor-length"]
if length == "":
length = None
else:
length = float(length)
self.heat_loss_corridor = {
"heat_loss_corridor": has_heat_loss_corridor,
"length": length
}
def set_mains_gas(self):
"""
Sets whether the property has mains gas
:return:
"""
map = {
"Y": True,
"N": False,
}
if self.data["mains-gas-flag"] == "" or self.data["mains-gas-flag"] in self.DATA_ANOMALY_MATCHES:
self.mains_gas = None
else:
self.mains_gas = map[self.data["mains-gas-flag"]]
def _clean_upload_data(self, to_update):
for k, v in to_update.items():
if v in self.DATA_ANOMALY_MATCHES:
to_update[k] = None
return to_update
def get_full_property_data(self):
"""
This method extracts the data which is pushed to the database, containing core information, from the EPC
about a property
:return:
"""
property_data = {
"creation_status": "READY",
"uprn": int(self.data["uprn"]),
"building_reference_number": int(self.data["building-reference-number"]),
"has_pre_condition_report": True,
"has_recommendations": True,
"property_type": self.data["property-type"],
"built_form": self.data["built-form"],
"local_authority": self.data["local-authority-label"],
"constituency": self.data["constituency-label"],
"number_of_rooms": self.number_of_rooms,
"year_built": self.year_built,
"tenure": self.data["tenure"],
"current_epc_rating": self.data["current-energy-rating"],
"current_sap_points": self.data["current-energy-efficiency"],
}
property_data = self._clean_upload_data(property_data)
return property_data
@classmethod
def _prepare_rating_field(cls, field, rating_lookup):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
return rating_lookup[field].value if field not in cls.DATA_ANOMALY_MATCHES else None
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
property_details_epc = {
"property_id": self.id,
"portfolio_id": portfolio_id,
"full_address": self.data["address"],
"total_floor_area": float(self.data["total-floor-area"]),
"walls": self.walls["clean_description"],
"walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"], rating_lookup),
"roof": self.roof["clean_description"],
"roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"], rating_lookup),
"floor": self.floor["clean_description"],
"floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"], rating_lookup),
"windows": self.windows["clean_description"],
"windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"], rating_lookup),
"heating": self.main_heating["clean_description"],
"heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"], rating_lookup),
"heating_controls": self.main_heating_controls["clean_description"],
"heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"], rating_lookup),
"hot_water": self.hotwater["clean_description"],
"hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"], rating_lookup),
"lighting": self.lighting["clean_description"],
"lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"], rating_lookup),
"mainfuel": self.main_fuel["clean_description"],
"ventilation": self.ventilation["ventilation"],
"solar_pv": self.solar_pv["solar_pv"],
"solar_hot_water": self.solar_hot_water["solar_hot_water"],
"wind_turbine": self.wind_turbine["wind_turbine"],
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces,
"number_of_extensions": self.number_of_extensions,
"number_of_storeys": self.number_of_storeys,
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"adjusted_energy_consumption": self.current_adjusted_energy,
}
return property_details_epc
def get_spatial_data(self, uprn_filenames):
"""
Given a property's UPRN, this method will pull the associated spatial data from s3
:return:
"""
if self.uprn is None:
logger.warning("We do not have a UPRN for this property - this needs to be implemented")
self.in_conservation_area = False
self.is_listed = False
self.is_heritage = False
self.restricted_measures = True
return
# We get the file name for the uprn
filtered_df = uprn_filenames[(uprn_filenames['lower'] <= self.uprn) & (uprn_filenames['upper'] >= self.uprn)]
if filtered_df.empty:
logger.warning("Could not find file containing UPRNS")
return None
filename = filtered_df.iloc[0]['filenames']
spatial_data = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
)
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
# Pull out spatial features
self.set_spatial(spatial)
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])]
if self.construction_age_band is not None and self.construction_age_band not in self.DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.data["built-form"] not in self.DATA_ANOMALY_MATCHES and self.data["built-form"] in result["BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
def set_basic_property_dimensions(self):
"""
This method sets the number of floors of the property, using a simple approach based on an estimate for
average room size, number of rooms and total floor area
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
number of rooms and total floor area
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
medians across the EPC data
:return:
"""
self.floor_area = float(self.data["total-floor-area"])
if not self.data["number-habitable-rooms"] or (
self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
):
if self.property_dimensions is None:
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.data['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.data["number-habitable-rooms"]:
self.number_of_rooms = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.number_of_rooms = float(self.data["number-habitable-rooms"])
if self.data["property-type"] == "House":
self.number_of_floors = 2
elif self.data["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.data["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.floor_height = float(self.data["floor-height"])
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
)
self.insulation_wall_area = estimate_external_wall_area(
num_floors=self.number_of_floors,
floor_height=self.floor_height,
perimeter=self.perimeter,
built_form=self.data["built-form"],
)
self.insulation_floor_area = self.floor_area / self.number_of_floors
self.pitched_roof_area = esimtate_pitched_roof_area(
floor_area=self.insulation_floor_area, floor_height=self.floor_height
)
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None
)
if self.floor_level is None:
if self.data["property-type"] != "Flat":
return
if self.floor["another_property_below"]:
self.floor_level = 1
else:
self.floor_level = 0
return
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
# where a property is marked as being on the first floor
if self.floor_level > 0:
# We check if there is another property below
if not self.floor["another_property_below"]:
self.floor_level = 0
return
if self.floor_level == 0:
# Check if another property below
if self.floor["another_property_below"]:
self.floor_level = 1
return
def set_wall_type(self):
"""
This method sets the wall type of the property, using a simple approach based on the wall description
:return:
"""
self.wall_type = get_wall_type(**self.walls)
def set_floor_type(self):
"""
This method sets the floor type of the property, which is used for calculating u-values
Section 5.6 of the BRE indicates that
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
and the U-values in Table S12 are used.
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
calculating u-values
"""
if self.floor["is_suspended"] | self.floor["another_property_below"]:
self.floor_type = "suspended"
elif self.floor["is_solid"]:
self.floor_type = "solid"
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
self.floor_type = "exposed_floor"
elif self.floor["thermal_transmittance"] is not None:
self.floor_type = "solid"
else:
raise NotImplementedError("Implement this floor type")
@staticmethod
def _extract_component(component_data, component_rename_cols, component_drop_cols, rename_prefix=None):
for k in component_rename_cols:
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
component_data = {
k: v for k, v in component_data.items() if k not in component_drop_cols + component_rename_cols
}
return component_data
def get_model_data(self):
"""
This method extracts cleaned data from the property object, which is used in our machine learning models
This will use many of the cleaned properties, extracted from the epc data, or methods in DataProcessor.
For future iterations of this, we probably want to implement a singular method in DataProcessor, which can
be used in the etl code and in here
:return: dictionary of model data to be scored in the model
"""
drop_cols = ["original_description", "clean_description"]
insulation_drop_cols = ["thermal_transmittance_unit", "is_assumed", "is_valid"]
insulation_rename_cols = ["thermal_transmittance", "insulation_thickness"]
walls = self._extract_component(self.walls, insulation_rename_cols, insulation_drop_cols + drop_cols, "walls")
roof = self._extract_component(self.roof, insulation_rename_cols, insulation_drop_cols + drop_cols, "roof")
floor = self._extract_component(self.floor, insulation_rename_cols, insulation_drop_cols + drop_cols, "floor")
windows = self._extract_component(self.windows, [], drop_cols + ["no_data"])
fuel = self._extract_component(self.main_fuel, ["tariff_type"], drop_cols + ["tariff_type"], "main-fuel")
main_heating = self._extract_component(self.main_heating, [], drop_cols + ["has_assumed"])
main_heating_controls = self._extract_component(self.main_heating_controls, [], drop_cols)
hotwater = self._extract_component(self.hotwater, ["tariff_type"], drop_cols + ['assumed'], "hotwater")
# We'll need to clean second heating
second_heating = self.data["secondheat-description"]
epc_raw_columns = POTENTIAL_COLUMNS + EFFICIENCY_FEATURES + [
'TRANSACTION_TYPE',
'ENERGY_TARIFF',
'PROPERTY_TYPE',
'UPRN',
'NUMBER_OPEN_FIREPLACES',
'MULTI_GLAZE_PROPORTION',
'MECHANICAL_VENTILATION',
'PHOTO_SUPPLY',
'LOW_ENERGY_LIGHTING',
'SOLAR_WATER_HEATING_FLAG',
'GLAZED_TYPE',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'EXTENSION_COUNT',
]
epc_raw_data = {
k: self.data[k.lower().replace("_", "-")] for k in epc_raw_columns
}
built_form_cleaning_map = {
"Flat": "Mid-Terrace",
"House": "Semi-Detached",
"Bungalow": "Detached",
"Maisonette": "Mid-Terrace"
}
built_form = self.data["built-form"]
if built_form in self.DATA_ANOMALY_MATCHES:
# TODO: If built form isn't captured, we use the most common value for that property type - we shall
# improve this methodology
built_form = built_form_cleaning_map.get(self.data["property-type"])
if not built_form:
raise NotImplementedError("Not handled this property type when cleaning built form")
property_data = {
**walls,
**roof,
**floor,
**fuel,
**main_heating,
**main_heating_controls,
**hotwater,
**windows,
"SECONDHEAT_DESCRIPTION": second_heating,
"DAYS_TO": DataProcessor.calculate_days_to(self.data["lodgement-date"]),
"SAP": float(self.data["current-energy-efficiency"]),
"CARBON": float(self.data["co2-emissions-current"]),
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),
"estimated_perimeter": self.perimeter,
"CONSTRUCTION_AGE_BAND": self.construction_age_band,
"FLOOR_HEIGHT": self.floor_height,
"NUMBER_HABITABLE_ROOMS": self.number_of_rooms,
"TOTAL_FLOOR_AREA": self.floor_area,
"FIXED_LIGHTING_OUTLETS_COUNT": self.number_lighting_outlets,
**epc_raw_data,
"BUILT_FORM": built_form,
"POSTCODE": self.data["postcode"],
}
return property_data
def set_number_lighting_outlets(self, cleaned_property_data):
"""
Extracts and cleans the estimated number of lighting outlets
:return:
"""
if self.data["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(x["fixed-lighting-outlets-count"]) for x in self.old_data if
x["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.number_lighting_outlets = round(np.median(lighting_data))
else:
self.number_lighting_outlets = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.number_lighting_outlets = float(self.data["fixed-lighting-outlets-count"])
def set_adjusted_energy(self, current_adjusted_energy, expected_adjusted_energy):
"""
Stores these values for usage later
"""
self.current_adjusted_energy = current_adjusted_energy
self.expected_adjusted_energy = expected_adjusted_energy
def set_windows_count(self):
"""
Using the estimate_windows function, this method will set the number of windows in the property
:return:
"""
self.number_of_windows = estimate_windows(
property_type=self.data["property-type"],
built_form=self.data["built-form"],
construction_age_band=self.construction_age_band,
floor_area=self.floor_area,
number_habitable_rooms=self.number_of_rooms,
extension_count=float(self.data["extension-count"]),
)