mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
887 lines
33 KiB
Python
887 lines
33 KiB
Python
import pandas as pd
|
|
from typing import List
|
|
from etl.epc.Record import EPCDifferenceRecord
|
|
from etl.epc.ValidationConfiguration import DatasetValidationConfiguration
|
|
from etl.epc.settings import EARLIEST_EPC_DATE
|
|
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
|
|
from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes
|
|
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
|
|
from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes
|
|
from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
|
|
from etl.epc_clean.epc_attributes.MainheatControlAttributes import (
|
|
MainheatControlAttributes,
|
|
)
|
|
from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes
|
|
from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes
|
|
|
|
from recommendations.rdsap_tables import england_wales_age_band_lookup
|
|
from recommendations.recommendation_utils import (
|
|
estimate_number_of_floors,
|
|
get_wall_u_value,
|
|
get_roof_u_value,
|
|
get_floor_u_value,
|
|
estimate_perimeter,
|
|
get_wall_type,
|
|
)
|
|
|
|
# TODO: Can probably produce this in the property change app and store in S3
|
|
BOOLEAN_VARIABLES = [
|
|
"is_cavity_wall",
|
|
"is_filled_cavity",
|
|
"is_solid_brick",
|
|
"is_system_built",
|
|
"is_timber_frame",
|
|
"is_granite_or_whinstone",
|
|
"is_as_built",
|
|
"is_cob",
|
|
"is_sandstone_or_limestone",
|
|
"is_park_home",
|
|
"external_insulation",
|
|
"internal_insulation",
|
|
"is_park_home_ending",
|
|
"external_insulation_ending",
|
|
"internal_insulation_ending",
|
|
"is_to_unheated_space",
|
|
"is_to_external_air",
|
|
"is_suspended",
|
|
"is_solid",
|
|
"another_property_below",
|
|
"is_pitched",
|
|
"is_roof_room",
|
|
"is_loft",
|
|
"is_flat",
|
|
"is_thatched",
|
|
"is_at_rafters",
|
|
"has_dwelling_above",
|
|
"has_radiators",
|
|
"has_fan_coil_units",
|
|
"has_pipes_in_screed_above_insulation",
|
|
"has_pipes_in_insulated_timber_floor",
|
|
"has_pipes_in_concrete_slab",
|
|
"has_boiler",
|
|
"has_air_source_heat_pump",
|
|
"has_room_heaters",
|
|
"has_electric_storage_heaters",
|
|
"has_warm_air",
|
|
"has_electric_underfloor_heating",
|
|
"has_electric_ceiling_heating",
|
|
"has_community_scheme",
|
|
"has_ground_source_heat_pump",
|
|
"has_no_system_present",
|
|
"has_portable_electric_heaters",
|
|
"has_water_source_heat_pump",
|
|
"has_electric_heat_pump",
|
|
"has_micro-cogeneration",
|
|
"has_solar_assisted_heat_pump",
|
|
"has_exhaust_source_heat_pump",
|
|
"has_community_heat_pump",
|
|
"has_electric",
|
|
"has_mains_gas",
|
|
"has_wood_logs",
|
|
"has_coal",
|
|
"has_oil",
|
|
"has_wood_pellets",
|
|
"has_anthracite",
|
|
"has_dual_fuel_mineral_and_wood",
|
|
"has_smokeless_fuel",
|
|
"has_lpg",
|
|
"has_b30k",
|
|
"has_electricaire",
|
|
"has_assumed_for_most_rooms",
|
|
"has_underfloor_heating",
|
|
"has_radiators_ending",
|
|
"has_fan_coil_units_ending",
|
|
"has_pipes_in_screed_above_insulation_ending",
|
|
"has_pipes_in_insulated_timber_floor_ending",
|
|
"has_pipes_in_concrete_slab_ending",
|
|
"has_boiler_ending",
|
|
"has_air_source_heat_pump_ending",
|
|
"has_room_heaters_ending",
|
|
"has_electric_storage_heaters_ending",
|
|
"has_warm_air_ending",
|
|
"has_electric_underfloor_heating_ending",
|
|
"has_electric_ceiling_heating_ending",
|
|
"has_community_scheme_ending",
|
|
"has_ground_source_heat_pump_ending",
|
|
"has_no_system_present_ending",
|
|
"has_portable_electric_heaters_ending",
|
|
"has_water_source_heat_pump_ending",
|
|
"has_electric_heat_pump_ending",
|
|
"has_micro-cogeneration_ending",
|
|
"has_solar_assisted_heat_pump_ending",
|
|
"has_exhaust_source_heat_pump_ending",
|
|
"has_community_heat_pump_ending",
|
|
"has_electric_ending",
|
|
"has_mains_gas_ending",
|
|
"has_wood_logs_ending",
|
|
"has_coal_ending",
|
|
"has_oil_ending",
|
|
"has_wood_pellets_ending",
|
|
"has_anthracite_ending",
|
|
"has_dual_fuel_mineral_and_wood_ending",
|
|
"has_smokeless_fuel_ending",
|
|
"has_lpg_ending",
|
|
"has_b30k_ending",
|
|
"has_electricaire_ending",
|
|
"has_assumed_for_most_rooms_ending",
|
|
"has_underfloor_heating_ending",
|
|
"multiple_room_thermostats",
|
|
"multiple_room_thermostats_ending",
|
|
"is_community",
|
|
"no_individual_heating_or_community_network",
|
|
"is_community_ending",
|
|
"no_individual_heating_or_community_network_ending",
|
|
]
|
|
|
|
|
|
class BaseDataset:
|
|
"""
|
|
Base class for all datasets
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.pipeline_steps = {}
|
|
|
|
def validate_dataset(self):
|
|
"""
|
|
Validate the dataset against the validation configuration
|
|
"""
|
|
self.dataset_validation: dict = DatasetValidationConfiguration
|
|
|
|
# def pipeline_factory(self, pipeline_type: str) -> dict:
|
|
# """
|
|
# Factory method for creating a pipeline
|
|
# """
|
|
# if pipeline_type not in self.pipeline_steps:
|
|
# raise ValueError(f"Pipeline type {pipeline_type} not found")
|
|
|
|
# return self.pipeline_steps[pipeline_type]
|
|
|
|
|
|
class TrainingDataset(BaseDataset):
|
|
"""
|
|
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
|
|
"""
|
|
|
|
def __init__(
|
|
self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict
|
|
) -> None:
|
|
# self.pipeline_steps = self.pipeline_factory("training")
|
|
self.datasets = datasets
|
|
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
|
|
|
|
self._feature_generation()
|
|
# self._drop_features()
|
|
self._clean_efficiency_variables()
|
|
self._null_validation(information="Clean Efficiency Variables")
|
|
self._expand_description_to_features(cleaned_lookup)
|
|
self._adjust_assumed_values_in_wall_descriptions()
|
|
self._generate_u_values_from_features()
|
|
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
|
|
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
|
|
# need to
|
|
self._clean_missing_values()
|
|
self._null_validation(information="Clean Missing Values")
|
|
self._remove_abnormal_change_in_floor_area()
|
|
self._ensure_numeric()
|
|
self._organise_starting_ending_columns()
|
|
|
|
def _organise_starting_ending_columns(self):
|
|
"""
|
|
Organise the starting and ending columns so that they are next to each other
|
|
"""
|
|
no_suffix_cols = [
|
|
col
|
|
for col in self.df.columns
|
|
if "_ending" not in col and "_starting" not in col
|
|
]
|
|
starting_cols = [col for col in self.df.columns if "_starting" in col]
|
|
ending_cols = [col for col in self.df.columns if "_ending" in col]
|
|
|
|
common_cols = [
|
|
col.rsplit("_", 1)[0]
|
|
for col in starting_cols
|
|
if col.replace("_starting", "_ending") in ending_cols
|
|
]
|
|
only_ending_cols = [
|
|
col
|
|
for col in ending_cols
|
|
if col.replace("_ending", "_starting") not in starting_cols
|
|
]
|
|
|
|
common_cols = [[col + "_starting", col + "_ending"] for col in common_cols]
|
|
|
|
self.df = self.df.loc[
|
|
:,
|
|
no_suffix_cols
|
|
+ only_ending_cols
|
|
+ [col for cols in common_cols for col in cols],
|
|
]
|
|
|
|
def _remove_abnormal_change_in_floor_area(self):
|
|
"""
|
|
Remove properties where the change in floor area is greater than 100%
|
|
"""
|
|
|
|
self.df["tfa_diff_abs"] = abs(
|
|
self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"]
|
|
)
|
|
self.df["tfa_diff_prop"] = (
|
|
self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"]
|
|
)
|
|
self.df = self.df[self.df["tfa_diff_prop"] < 0.5]
|
|
self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
|
|
|
|
def _ensure_numeric(self):
|
|
"""
|
|
Ensure that all columns are numeric
|
|
"""
|
|
# TODO: move into EPCRecord record
|
|
uvalue_columns = [
|
|
col
|
|
for col in self.df.columns
|
|
if "thermal_transmittance" in col and "_unit" not in col
|
|
]
|
|
for uvalue_col in uvalue_columns:
|
|
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
|
|
|
|
@staticmethod
|
|
def _lambda_function_to_generate_roof_uvalue(row, is_end=False):
|
|
"""
|
|
Using the apply method, use the get_roof_u_value method to generate the u-value
|
|
"""
|
|
|
|
col_name = (
|
|
"roof_insulation_thickness"
|
|
if not is_end
|
|
else "roof_insulation_thickness_ending"
|
|
)
|
|
|
|
if row["has_dwelling_above"]:
|
|
if row["roof_thermal_transmittance"] != 0:
|
|
raise ValueError("Should have 0 u-value for roof")
|
|
|
|
if row["roof_thermal_transmittance_ending"] != 0:
|
|
raise ValueError("Should have 0 u-value for roof")
|
|
|
|
return get_roof_u_value(
|
|
insulation_thickness=row[col_name],
|
|
has_dwelling_above=row["has_dwelling_above"],
|
|
is_loft=row["is_loft"],
|
|
is_roof_room=row["is_roof_room"],
|
|
is_thatched=row["is_thatched"],
|
|
is_flat=row["is_flat"],
|
|
is_pitched=row["is_pitched"],
|
|
is_at_rafters=row["is_at_rafters"],
|
|
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
|
)
|
|
|
|
@staticmethod
|
|
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
|
|
"""
|
|
Using the apply method, use the get_wall_u_value method to generate the u-value
|
|
"""
|
|
description_col_name = (
|
|
"walls_clean_description"
|
|
if not is_end
|
|
else "walls_clean_description_ending"
|
|
)
|
|
thermal_transistance_col_name = (
|
|
"walls_thermal_transmittance"
|
|
if not is_end
|
|
else "walls_thermal_transmittance_ending"
|
|
)
|
|
|
|
if pd.isnull(row[thermal_transistance_col_name]):
|
|
output = get_wall_u_value(
|
|
clean_description=row[description_col_name],
|
|
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
|
is_granite_or_whinstone=row["is_granite_or_whinstone"],
|
|
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
|
|
)
|
|
else:
|
|
output = row[thermal_transistance_col_name]
|
|
|
|
return output
|
|
|
|
@staticmethod
|
|
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
|
|
"""
|
|
Using the apply method, use the get_floor_u_value method to generate the u-value
|
|
"""
|
|
|
|
floor_thermal_col_name = (
|
|
"floor_thermal_transmittance"
|
|
if not is_end
|
|
else "floor_thermal_transmittance_ending"
|
|
)
|
|
|
|
if row["another_property_below"]:
|
|
if row["floor_thermal_transmittance"] != 0:
|
|
raise ValueError("Should have 0 u-value for floor")
|
|
|
|
if row["floor_thermal_transmittance_ending"] != 0:
|
|
raise ValueError("Should have 0 u-value for floor")
|
|
return 0
|
|
else:
|
|
uvalue = row[floor_thermal_col_name]
|
|
|
|
if pd.isnull(uvalue):
|
|
insulation_col_name = (
|
|
"floor_insulation_thickness"
|
|
if not is_end
|
|
else "floor_insulation_thickness_ending"
|
|
)
|
|
perimeter_col_name = (
|
|
"estimated_perimeter_starting"
|
|
if not is_end
|
|
else "estimated_perimeter_ending"
|
|
)
|
|
floor_area_col_name = (
|
|
"ground_floor_area_starting"
|
|
if not is_end
|
|
else "ground_floor_area_ending"
|
|
)
|
|
|
|
uvalue = get_floor_u_value(
|
|
floor_type=row["floor_type"],
|
|
perimeter=row[perimeter_col_name],
|
|
area=row[floor_area_col_name],
|
|
insulation_thickness=row[insulation_col_name],
|
|
wall_type=row["wall_type"],
|
|
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
|
)
|
|
|
|
return uvalue
|
|
|
|
def _generate_u_values_from_features(self):
|
|
"""
|
|
Generate u-values from the features
|
|
"""
|
|
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
# Walls
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
|
|
walls_starting_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1
|
|
)
|
|
walls_ending_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True),
|
|
axis=1,
|
|
)
|
|
|
|
walls_starting_uvalue = self.df["walls_thermal_transmittance"].fillna(
|
|
walls_starting_uvalue
|
|
)
|
|
walls_starting_equals_ending_flag = (
|
|
self.df["walls_clean_description"]
|
|
== self.df["walls_clean_description_ending"]
|
|
)
|
|
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[
|
|
walls_starting_equals_ending_flag
|
|
]
|
|
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
# Roof
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
|
|
roof_starting_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1
|
|
)
|
|
roof_ending_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True),
|
|
axis=1,
|
|
)
|
|
|
|
roof_starting_uvalue = pd.to_numeric(
|
|
self.df["roof_thermal_transmittance"], errors="coerce"
|
|
).fillna(roof_starting_uvalue)
|
|
|
|
roof_ending_uvalue = pd.to_numeric(
|
|
self.df["roof_thermal_transmittance_ending"], errors="coerce"
|
|
).fillna(roof_ending_uvalue)
|
|
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
# Floor
|
|
# ~~~~~~~~~~~~~~~~~~
|
|
|
|
self.df["estimated_number_of_floors"] = self.df.apply(
|
|
lambda row: estimate_number_of_floors(row["property_type"]), axis=1
|
|
)
|
|
|
|
self.df["ground_floor_area_starting"] = (
|
|
self.df["total_floor_area_starting"] / self.df["estimated_number_of_floors"]
|
|
)
|
|
self.df["ground_floor_area_ending"] = (
|
|
self.df["total_floor_area_ending"] / self.df["estimated_number_of_floors"]
|
|
)
|
|
|
|
self.df["estimated_perimeter_starting"] = self.df.apply(
|
|
lambda row: estimate_perimeter(
|
|
row["ground_floor_area_starting"],
|
|
row["number_habitable_rooms_starting"]
|
|
/ row["estimated_number_of_floors"],
|
|
),
|
|
axis=1,
|
|
)
|
|
self.df["estimated_perimeter_ending"] = self.df.apply(
|
|
lambda row: estimate_perimeter(
|
|
row["ground_floor_area_starting"],
|
|
row["number_habitable_rooms_ending"]
|
|
/ row["estimated_number_of_floors"],
|
|
),
|
|
axis=1,
|
|
)
|
|
self.df["floor_type"] = self.df["is_suspended"].replace(
|
|
{True: "suspended", False: "solid"}
|
|
)
|
|
self.df["wall_type"] = self.df.apply(
|
|
lambda row: get_wall_type(
|
|
is_cavity_wall=row["is_cavity_wall"],
|
|
is_solid_brick=row["is_solid_brick"],
|
|
is_timber_frame=row["is_timber_frame"],
|
|
is_granite_or_whinstone=row["is_granite_or_whinstone"],
|
|
is_cob=row["is_cob"],
|
|
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
|
|
is_system_built=row["is_system_built"],
|
|
is_park_home=row["is_park_home"],
|
|
),
|
|
axis=1,
|
|
)
|
|
|
|
floor_starting_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1
|
|
)
|
|
floor_ending_uvalue = self.df.apply(
|
|
lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True), axis=1
|
|
)
|
|
|
|
floor_starting_uvalue = pd.to_numeric(
|
|
self.df["floor_thermal_transmittance"], errors="coerce"
|
|
).fillna(floor_starting_uvalue)
|
|
floor_ending_uvalue = pd.to_numeric(
|
|
self.df["floor_thermal_transmittance_ending"], errors="coerce"
|
|
).fillna(floor_ending_uvalue)
|
|
|
|
for component in ["walls", "roof", "floor"]:
|
|
self.df[f"{component}_thermal_transmittance"] = pd.to_numeric(
|
|
self.df[f"{component}_thermal_transmittance"], errors="coerce"
|
|
).fillna(eval(f"{component}_starting_uvalue"))
|
|
self.df[f"{component}_thermal_transmittance_ending"] = pd.to_numeric(
|
|
self.df[f"{component}_thermal_transmittance_ending"], errors="coerce"
|
|
).fillna(eval(f"{component}_ending_uvalue"))
|
|
|
|
self.df = self.df.drop(
|
|
columns=[
|
|
"floor_type",
|
|
"wall_type",
|
|
"walls_clean_description",
|
|
"walls_clean_description_ending",
|
|
"estimated_number_of_floors",
|
|
"ground_floor_area_starting",
|
|
"ground_floor_area_ending",
|
|
]
|
|
)
|
|
|
|
def _adjust_assumed_values_in_wall_descriptions(self):
|
|
"""
|
|
Strip out assumed values for all wall descriptions
|
|
"""
|
|
for col in ["walls_clean_description", "walls_clean_description_ending"]:
|
|
self.df[col] = (
|
|
self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip()
|
|
)
|
|
|
|
def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str):
|
|
"""
|
|
Drop properties that have inconsistent data, i.e. changing material types
|
|
"""
|
|
|
|
starting_and_finishing_null = (
|
|
expanded_df["original_description"].isin([None, ""]) &
|
|
expanded_df["original_description_ending"].isin([None, ""])
|
|
)
|
|
|
|
if component == "walls":
|
|
|
|
expanded_df = expanded_df[
|
|
starting_and_finishing_null | (
|
|
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"])
|
|
& (expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"])
|
|
& (expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"])
|
|
& (expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"])
|
|
& (expanded_df["is_cob"] == expanded_df["is_cob_ending"])
|
|
& (expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"])
|
|
)
|
|
]
|
|
elif component == "floor":
|
|
expanded_df = expanded_df[
|
|
starting_and_finishing_null | (
|
|
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"])
|
|
& (expanded_df["is_solid"] == expanded_df["is_solid_ending"])
|
|
& (expanded_df["another_property_below"] == expanded_df["another_property_below_ending"])
|
|
& (expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"])
|
|
& (expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"])
|
|
)
|
|
]
|
|
elif component == "roof":
|
|
expanded_df = expanded_df[
|
|
starting_and_finishing_null | (
|
|
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"])
|
|
& (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"])
|
|
& (expanded_df["is_loft"] == expanded_df["is_loft_ending"])
|
|
& (expanded_df["is_flat"] == expanded_df["is_flat_ending"])
|
|
& (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"])
|
|
& (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"])
|
|
& (expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
|
|
)
|
|
]
|
|
|
|
return expanded_df
|
|
|
|
def _expand_description_to_features(self, cleaned_lookup: dict):
|
|
"""
|
|
This method will merge on the cleaned lookup table and ensure that the building fabric in the
|
|
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
|
|
possible dataset.
|
|
# We look for key building fabric features that have changed from one EPC to the next.
|
|
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
|
|
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
|
|
# is low
|
|
# We also replace descriptions with their cleaned variants
|
|
"""
|
|
|
|
cols_to_drop = {
|
|
"walls": [
|
|
# We need to cleaned descriptions for pulling out u-values
|
|
"original_description",
|
|
# "thermal_transmittance_unit",
|
|
"original_description_ending",
|
|
# "thermal_transmittance_unit_ending",
|
|
"is_cavity_wall_ending",
|
|
"is_solid_brick_ending",
|
|
"is_system_built_ending",
|
|
"is_timber_frame_ending",
|
|
"is_granite_or_whinstone_ending",
|
|
# "is_as_built_ending",
|
|
"is_cob_ending",
|
|
"is_sandstone_or_limestone_ending",
|
|
# Re remove the is_assumed columns
|
|
# "is_assumed",
|
|
# "is_assumed_ending",
|
|
],
|
|
"floor": [
|
|
"original_description",
|
|
"clean_description",
|
|
"thermal_transmittance_unit",
|
|
"no_data",
|
|
"no_data_ending",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
"thermal_transmittance_unit_ending",
|
|
"is_suspended_ending",
|
|
"is_solid_ending",
|
|
"another_property_below_ending",
|
|
"is_to_unheated_space_ending",
|
|
"is_to_external_air_ending",
|
|
"is_assumed",
|
|
"is_assumed_ending",
|
|
],
|
|
"roof": [
|
|
"original_description",
|
|
"clean_description",
|
|
"thermal_transmittance_unit",
|
|
"is_assumed",
|
|
"is_valid",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
"thermal_transmittance_unit_ending",
|
|
"is_pitched_ending",
|
|
"is_roof_room_ending",
|
|
"is_loft_ending",
|
|
"is_flat_ending",
|
|
"is_thatched_ending",
|
|
"has_dwelling_above_ending",
|
|
"is_assumed_ending",
|
|
"is_valid_ending",
|
|
],
|
|
"hotwater": [
|
|
"original_description",
|
|
"clean_description",
|
|
"assumed",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
"assumed_ending",
|
|
],
|
|
"mainheat": [
|
|
"original_description",
|
|
"clean_description",
|
|
"original_description_ending",
|
|
"has_assumed",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
"has_assumed_ending",
|
|
],
|
|
"mainheatcont": [
|
|
"original_description",
|
|
"clean_description",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
],
|
|
"windows": [
|
|
"original_description",
|
|
"clean_description",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
|
|
"has_glazing",
|
|
"glazing_coverage",
|
|
"no_data",
|
|
"has_glazing_ending",
|
|
"glazing_coverage_ending",
|
|
"no_data_ending",
|
|
],
|
|
"main-fuel": [
|
|
"original_description",
|
|
"clean_description",
|
|
"original_description_ending",
|
|
"clean_description_ending",
|
|
],
|
|
}
|
|
|
|
components_to_expand = cols_to_drop.keys()
|
|
|
|
cleaning_lookup = {
|
|
"walls": WallAttributes,
|
|
"floor": FloorAttributes,
|
|
"roof": RoofAttributes,
|
|
"hotwater": HotWaterAttributes,
|
|
"mainheat": MainHeatAttributes,
|
|
"mainheatcont": MainheatControlAttributes,
|
|
"windows": WindowAttributes,
|
|
"main-fuel": MainFuelAttributes,
|
|
}
|
|
|
|
for component in components_to_expand:
|
|
if component == "main-fuel":
|
|
cleaned_key = "main-fuel"
|
|
left_on_starting = "main_fuel_starting"
|
|
left_on_ending = "main_fuel_ending"
|
|
original_cols = ["main_fuel_starting", "main_fuel_ending"]
|
|
else:
|
|
cleaned_key = f"{component}-description"
|
|
left_on_starting = f"{component}_description_starting"
|
|
left_on_ending = f"{component}_description_ending"
|
|
original_cols = [
|
|
f"{component}_description_starting",
|
|
f"{component}_description_ending",
|
|
]
|
|
|
|
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
|
|
|
|
# We handle a specific edge case where we're missing information for the original description
|
|
descriptions = [
|
|
x for x in self.df[left_on_starting].unique() if pd.notnull(x)
|
|
]
|
|
# take any not in the cleaned lookup
|
|
missing_descriptions = [
|
|
x
|
|
for x in descriptions
|
|
if x not in cleaned_lookup_df_for_key["original_description"].values
|
|
]
|
|
if missing_descriptions:
|
|
# We handle them here
|
|
cleaner = cleaning_lookup[component]
|
|
cleaned_data = []
|
|
for x in missing_descriptions:
|
|
desc_cleaner = cleaner(x)
|
|
cleaned = desc_cleaner.process()
|
|
# IF NODATA, REMAP TO NONE VALUES
|
|
if all((pd.DataFrame(cleaned, index=[0]).T)[0] == False):
|
|
cleaned = {key: None for key in cleaned.keys()}
|
|
cleaned_data.append(
|
|
{
|
|
"original_description": x,
|
|
"clean_description": desc_cleaner.description.replace(
|
|
"(assumed)", ""
|
|
)
|
|
.rstrip()
|
|
.capitalize(),
|
|
**cleaned,
|
|
}
|
|
)
|
|
cleaned_lookup_df_for_key = pd.concat(
|
|
[
|
|
cleaned_lookup_df_for_key,
|
|
pd.DataFrame(cleaned_data),
|
|
],
|
|
ignore_index=True,
|
|
)
|
|
|
|
expanded_df = self.df.merge(
|
|
cleaned_lookup_df_for_key,
|
|
how="left",
|
|
left_on=left_on_starting,
|
|
right_on="original_description",
|
|
).merge(
|
|
cleaned_lookup_df_for_key,
|
|
how="left",
|
|
left_on=left_on_ending,
|
|
right_on="original_description",
|
|
suffixes=("", "_ending"),
|
|
)
|
|
|
|
# Drop properties where key material types have changed
|
|
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
|
|
|
|
# Drop original cols and cols to drop
|
|
expanded_df = expanded_df.drop(
|
|
columns=cols_to_drop[component] + original_cols
|
|
)
|
|
|
|
# Rename columns to component specific names, if they have not been dropped
|
|
expanded_df = expanded_df.rename(
|
|
columns={
|
|
"is_assumed": f"{component}_is_assumed",
|
|
"is_assumed_ending": f"{component}_is_assumed_ending",
|
|
"insulation_thickness": f"{component}_insulation_thickness",
|
|
"insulation_thickness_ending": f"{component}_insulation_thickness_ending",
|
|
"thermal_transmittance": f"{component}_thermal_transmittance",
|
|
"thermal_transmittance_ending": f"{component}_thermal_transmittance_ending",
|
|
"thermal_transmittance_unit": f"{component}_thermal_transmittance_unit",
|
|
"thermal_transmittance_unit_ending": f"{component}_thermal_transmittance_unit_ending",
|
|
"tariff_type": f"{component}_tariff_type",
|
|
"tariff_type_ending": f"{component}_tariff_type_ending",
|
|
"clean_description": f"{component}_clean_description",
|
|
"clean_description_ending": f"{component}_clean_description_ending",
|
|
}
|
|
)
|
|
self.df = expanded_df
|
|
|
|
# We don't need any lighting specific cleaning, we just drop the original description as we use
|
|
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
|
|
self.df = self.df.drop(
|
|
columns=["lighting_description_starting", "lighting_description_ending"]
|
|
)
|
|
|
|
def _clean_missing_values(self, ignore_cols=None):
|
|
missings = pd.isnull(self.df).sum()
|
|
missings = missings[missings > 0]
|
|
|
|
if ignore_cols:
|
|
missings = missings[~missings.index.isin(ignore_cols)]
|
|
|
|
for col in missings.index:
|
|
unique_values = self.df[col].unique()
|
|
if (
|
|
(True in unique_values)
|
|
or (False in unique_values)
|
|
or (col in BOOLEAN_VARIABLES)
|
|
):
|
|
self.df[col] = self.df[col].fillna(False)
|
|
if "none" in unique_values:
|
|
self.df[col] = self.df[col].fillna("none")
|
|
else:
|
|
self.df[col] = self.df[col].fillna("Unknown")
|
|
|
|
def _null_validation(self, information: str):
|
|
# print(f"Null validation after {information}")
|
|
if pd.isnull(self.df).sum().sum():
|
|
raise ValueError(f"Null values found in dataset, after step {information}")
|
|
|
|
def _drop_features(self):
|
|
"""
|
|
Drop features that are not needed for modelling
|
|
"""
|
|
self.df = self.df.drop(
|
|
columns=["lodgement_date_starting", "lodgement_date_ending"]
|
|
)
|
|
|
|
def _feature_generation(self):
|
|
"""
|
|
Generate features for modelling
|
|
"""
|
|
self.df["days_to_starting"] = self._calculate_days_to(
|
|
self.df["lodgement_date_starting"]
|
|
)
|
|
self.df["days_to_ending"] = self._calculate_days_to(
|
|
self.df["lodgement_date_ending"]
|
|
)
|
|
|
|
def _clean_efficiency_variables(self):
|
|
"""
|
|
These is scope to clean this by the model per corresponding description.
|
|
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
|
|
fill in the missing values with this.
|
|
When looking at this initially, there are a large volume of records with missing energy efficiency
|
|
values and therefore a simpler approach was taken just to test including these variables
|
|
:param df:
|
|
:return:
|
|
"""
|
|
|
|
missings = pd.isnull(self.df).sum()
|
|
missings = missings[missings >= 1]
|
|
|
|
if len(missings) == 0:
|
|
return
|
|
|
|
#
|
|
|
|
# Make sure they are all efficiency columns
|
|
if any(~missings.index.str.contains("energy_eff")):
|
|
raise ValueError(f"Non efficiency columns are missing {missings.index}")
|
|
|
|
for m in missings.index:
|
|
self.df[m] = self.df[m].fillna("NO_RATING")
|
|
|
|
@staticmethod
|
|
def _calculate_days_to(lodgement_date):
|
|
if isinstance(lodgement_date, str):
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).days
|
|
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).dt.days
|
|
|
|
# def __add__(self, other) -> "TrainingDataset":
|
|
# if not isinstance(other, TrainingDataset):
|
|
# raise TypeError("Addition can only be performed with another instance of TrainingDataset")
|
|
# return TrainingDataset(self.datasets + other.datasets)
|
|
|
|
# def __radd__(self, other):
|
|
# """
|
|
# Required for sum() to work
|
|
# """
|
|
# if isinstance(other, int):
|
|
# return self
|
|
# else:
|
|
# return self.__add__(other)
|
|
|
|
|
|
class NewDataset(BaseDataset):
|
|
"""
|
|
A collection of EPCDifferenceRecords can be combined into a ScoringDataset.
|
|
"""
|
|
|
|
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
|
|
# self.pipeline_steps = self.pipeline_factory("newdata")
|
|
self.datasets = datasets
|
|
|
|
def __add__(self, other) -> "NewDataset":
|
|
if not isinstance(other, NewDataset):
|
|
raise TypeError(
|
|
"Addition can only be performed with another instance of ScoringDataset"
|
|
)
|
|
return NewDataset(self.datasets + other.datasets)
|
|
|
|
def __radd__(self, other):
|
|
"""
|
|
Required for sum() to work
|
|
"""
|
|
if isinstance(other, int):
|
|
return self
|
|
else:
|
|
return self.__add__(other)
|