diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 7f989633..5a7e3083 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -1,133 +1,19 @@ -import numpy as np import pandas as pd from typing import List from etl.epc.Record import EPCDifferenceRecord -from etl.epc.ValidationConfiguration import DatasetValidationConfiguration +from ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( - estimate_number_of_floors, - get_wall_u_value, - get_roof_u_value, - get_floor_u_value, - estimate_perimeter, - get_wall_type, + get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, + get_wall_type ) -# TODO: Can probably produce this in the property change app and store in S3 -BOOLEAN_VARIABLES = [ - "is_cavity_wall", - "is_filled_cavity", - "is_solid_brick", - "is_system_built", - "is_timber_frame", - "is_granite_or_whinstone", - "is_as_built", - "is_cob", - "is_sandstone_or_limestone", - "is_park_home", - "external_insulation", - "internal_insulation", - "is_park_home_ending", - "external_insulation_ending", - "internal_insulation_ending", - "is_to_unheated_space", - "is_to_external_air", - "is_suspended", - "is_solid", - "another_property_below", - "is_pitched", - "is_roof_room", - "is_loft", - "is_flat", - "is_thatched", - "is_at_rafters", - "has_dwelling_above", - "has_radiators", - "has_fan_coil_units", - "has_pipes_in_screed_above_insulation", - "has_pipes_in_insulated_timber_floor", - "has_pipes_in_concrete_slab", - "has_boiler", - "has_air_source_heat_pump", - "has_room_heaters", - "has_electric_storage_heaters", - "has_warm_air", - "has_electric_underfloor_heating", - "has_electric_ceiling_heating", - "has_community_scheme", - "has_ground_source_heat_pump", - "has_no_system_present", - "has_portable_electric_heaters", - "has_water_source_heat_pump", - "has_electric_heat_pump", - "has_micro-cogeneration", - "has_solar_assisted_heat_pump", - "has_exhaust_source_heat_pump", - "has_community_heat_pump", - "has_electric", - "has_mains_gas", - "has_wood_logs", - "has_coal", - "has_oil", - "has_wood_pellets", - "has_anthracite", - "has_dual_fuel_mineral_and_wood", - "has_smokeless_fuel", - "has_lpg", - "has_b30k", - "has_electricaire", - "has_assumed_for_most_rooms", - "has_underfloor_heating", - "has_radiators_ending", - "has_fan_coil_units_ending", - "has_pipes_in_screed_above_insulation_ending", - "has_pipes_in_insulated_timber_floor_ending", - "has_pipes_in_concrete_slab_ending", - "has_boiler_ending", - "has_air_source_heat_pump_ending", - "has_room_heaters_ending", - "has_electric_storage_heaters_ending", - "has_warm_air_ending", - "has_electric_underfloor_heating_ending", - "has_electric_ceiling_heating_ending", - "has_community_scheme_ending", - "has_ground_source_heat_pump_ending", - "has_no_system_present_ending", - "has_portable_electric_heaters_ending", - "has_water_source_heat_pump_ending", - "has_electric_heat_pump_ending", - "has_micro-cogeneration_ending", - "has_solar_assisted_heat_pump_ending", - "has_exhaust_source_heat_pump_ending", - "has_community_heat_pump_ending", - "has_electric_ending", - "has_mains_gas_ending", - "has_wood_logs_ending", - "has_coal_ending", - "has_oil_ending", - "has_wood_pellets_ending", - "has_anthracite_ending", - "has_dual_fuel_mineral_and_wood_ending", - "has_smokeless_fuel_ending", - "has_lpg_ending", - "has_b30k_ending", - "has_electricaire_ending", - "has_assumed_for_most_rooms_ending", - "has_underfloor_heating_ending", - "multiple_room_thermostats", - "multiple_room_thermostats_ending", - "is_community", - "no_individual_heating_or_community_network", - "is_community_ending", - "no_individual_heating_or_community_network_ending", -] - class BaseDataset: """ - Base class for all datasets + # Base class for all datasets """ def __init__(self) -> None: @@ -147,20 +33,18 @@ class BaseDataset: # raise ValueError(f"Pipeline type {pipeline_type} not found") # return self.pipeline_steps[pipeline_type] - - + class TrainingDataset(BaseDataset): """ A collection of EPCDifferenceRecords can be combined into a TrainingDataset. """ - def __init__( - self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict - ) -> None: + def __init__(self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict) -> None: + # self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) - + self._feature_generation() self._drop_features() self._clean_efficiency_variables() @@ -175,51 +59,14 @@ class TrainingDataset(BaseDataset): self._null_validation(information="Clean Missing Values") self._remove_abnormal_change_in_floor_area() self._ensure_numeric() - self._organise_starting_ending_columns() - - def _organise_starting_ending_columns(self): - """ - Organise the starting and ending columns so that they are next to each other - """ - no_suffix_cols = [ - col - for col in self.df.columns - if "_ending" not in col and "_starting" not in col - ] - starting_cols = [col for col in self.df.columns if "_starting" in col] - ending_cols = [col for col in self.df.columns if "_ending" in col] - - common_cols = [ - col.rsplit("_", 1)[0] - for col in starting_cols - if col.replace("_starting", "_ending") in ending_cols - ] - only_ending_cols = [ - col - for col in ending_cols - if col.replace("_ending", "_starting") not in starting_cols - ] - - common_cols = [[col + "_starting", col + "_ending"] for col in common_cols] - - self.df = self.df.loc[ - :, - no_suffix_cols - + only_ending_cols - + [col for cols in common_cols for col in cols], - ] def _remove_abnormal_change_in_floor_area(self): """ Remove properties where the change in floor area is greater than 100% """ - self.df["tfa_diff_abs"] = abs( - self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"] - ) - self.df["tfa_diff_prop"] = ( - self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] - ) + self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"]) + self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] self.df = self.df[self.df["tfa_diff_prop"] < 0.5] self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) @@ -228,9 +75,7 @@ class TrainingDataset(BaseDataset): Ensure that all columns are numeric """ # TODO: move into EPCRecord record - uvalue_columns = [ - col for col in self.df.columns if "thermal_transmittance" in col - ] + uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col] for uvalue_col in uvalue_columns: self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) @@ -240,16 +85,12 @@ class TrainingDataset(BaseDataset): Using the apply method, use the get_roof_u_value method to generate the u-value """ - col_name = ( - "roof_insulation_thickness" - if not is_end - else "roof_insulation_thickness_ending" - ) + col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending" if row["has_dwelling_above"]: if row["roof_thermal_transmittance"] != 0: raise ValueError("Should have 0 u-value for roof") - + if row["roof_thermal_transmittance_ending"] != 0: raise ValueError("Should have 0 u-value for roof") @@ -262,24 +103,16 @@ class TrainingDataset(BaseDataset): is_flat=row["is_flat"], is_pitched=row["is_pitched"], is_at_rafters=row["is_at_rafters"], - age_band=england_wales_age_band_lookup[row["construction_age_band"]], - ) - + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + @staticmethod def _lambda_function_to_generate_wall_uvalue(row, is_end=False): """ Using the apply method, use the get_wall_u_value method to generate the u-value """ - description_col_name = ( - "walls_clean_description" - if not is_end - else "walls_clean_description_ending" - ) - thermal_transistance_col_name = ( - "walls_thermal_transmittance" - if not is_end - else "walls_thermal_transmittance_ending" - ) + description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" + thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending" if pd.isnull(row[thermal_transistance_col_name]): output = get_wall_u_value( @@ -292,18 +125,14 @@ class TrainingDataset(BaseDataset): output = row[thermal_transistance_col_name] return output - + @staticmethod def _lambda_function_to_generate_floor_uvalue(row, is_end=False): """ Using the apply method, use the get_floor_u_value method to generate the u-value """ - floor_thermal_col_name = ( - "floor_thermal_transmittance" - if not is_end - else "floor_thermal_transmittance_ending" - ) + floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending" if row["another_property_below"]: if row["floor_thermal_transmittance"] != 0: @@ -316,31 +145,20 @@ class TrainingDataset(BaseDataset): uvalue = row[floor_thermal_col_name] if pd.isnull(uvalue): - insulation_col_name = ( - "floor_insulation_thickness" - if not is_end - else "floor_insulation_thickness_ending" - ) - perimeter_col_name = ( - "estimated_perimeter_starting" - if not is_end - else "estimated_perimeter_ending" - ) - floor_area_col_name = ( - "ground_floor_area_starting" - if not is_end - else "ground_floor_area_ending" - ) + + insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending" + floor_area_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending" + perimeter_col_name = "total_floor_area_starting" if not is_end else "total_floor_area_ending" uvalue = get_floor_u_value( - floor_type=row["floor_type"], - perimeter=row[perimeter_col_name], - area=row[floor_area_col_name], - insulation_thickness=row[insulation_col_name], - wall_type=row["wall_type"], - age_band=england_wales_age_band_lookup[row["construction_age_band"]], - ) - + floor_type=row["floor_type"], + perimeter=row[floor_area_col_name], + area=row[perimeter_col_name], + insulation_thickness=row[insulation_col_name], + wall_type=row["wall_type"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + return uvalue def _generate_u_values_from_features(self): @@ -353,136 +171,88 @@ class TrainingDataset(BaseDataset): # ~~~~~~~~~~~~~~~~~~ walls_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1 + lambda row: self._lambda_function_to_generate_wall_uvalue(row), + axis=1 ) walls_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True), - axis=1, + axis=1 ) - walls_starting_uvalue = self.df["walls_thermal_transmittance"].fillna( - walls_starting_uvalue - ) - walls_starting_equals_ending_flag = ( - self.df["walls_clean_description"] - == self.df["walls_clean_description_ending"] - ) - walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[ - walls_starting_equals_ending_flag - ] - + walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue) + walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df["walls_clean_description_ending"] + walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[walls_starting_equals_ending_flag] + # ~~~~~~~~~~~~~~~~~~ # Roof # ~~~~~~~~~~~~~~~~~~ - + roof_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1 + lambda row: self._lambda_function_to_generate_roof_uvalue(row), + axis=1 ) roof_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True), - axis=1, + axis=1 ) - roof_starting_uvalue = self.df["roof_thermal_transmittance"].fillna( - roof_starting_uvalue - ) - roof_ending_uvalue = self.df["roof_thermal_transmittance_ending"].fillna( - roof_ending_uvalue - ) + roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue) + roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue) + # ~~~~~~~~~~~~~~~~~~ # Floor # ~~~~~~~~~~~~~~~~~~ - - self.df["estimated_number_of_floors"] = self.df.apply( - lambda row: estimate_number_of_floors(row["property_type"]), axis=1 + + self.df['estimated_perimeter_starting'] = self.df.apply( + lambda row: estimate_perimeter(row["total_floor_area_starting"], row["number_habitable_rooms"]), + axis=1 ) - - self.df["ground_floor_area_starting"] = ( - self.df["total_floor_area_starting"] / self.df["estimated_number_of_floors"] - ) - self.df["ground_floor_area_ending"] = ( - self.df["total_floor_area_ending"] / self.df["estimated_number_of_floors"] - ) - - self.df["estimated_perimeter_starting"] = self.df.apply( - lambda row: estimate_perimeter( - row["ground_floor_area_starting"], - row["number_habitable_rooms_starting"] - / row["estimated_number_of_floors"], - ), - axis=1, - ) - self.df["estimated_perimeter_ending"] = self.df.apply( - lambda row: estimate_perimeter( - row["ground_floor_area_starting"], - row["number_habitable_rooms_ending"] - / row["estimated_number_of_floors"], - ), - axis=1, - ) - self.df["floor_type"] = self.df["is_suspended"].replace( - {True: "suspended", False: "solid"} + self.df['estimated_perimeter_ending'] = self.df.apply( + lambda row: estimate_perimeter(row["total_floor_area_ending"], row["number_habitable_rooms"]), + axis=1 ) + self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"}) self.df["wall_type"] = self.df.apply( lambda row: get_wall_type( - is_cavity_wall=row["is_cavity_wall"], - is_solid_brick=row["is_solid_brick"], - is_timber_frame=row["is_timber_frame"], - is_granite_or_whinstone=row["is_granite_or_whinstone"], - is_cob=row["is_cob"], + is_cavity_wall=row["is_cavity_wall"], + is_solid_brick=row["is_solid_brick"], + is_timber_frame=row["is_timber_frame"], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_cob=row["is_cob"], is_sandstone_or_limestone=row["is_sandstone_or_limestone"], is_system_built=row["is_system_built"], - is_park_home=row["is_park_home"], - ), - axis=1, + is_park_home=row["is_park_home"] + ), + axis=1 ) - + floor_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1 + lambda row: self._lambda_function_to_generate_floor_uvalue(row), + axis=1 ) floor_ending_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_floor_uvalue( - row, is_end=True - ), - axis=1, + lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True), + axis=1 ) - floor_starting_uvalue = self.df["floor_thermal_transmittance"].fillna( - floor_starting_uvalue - ) - floor_ending_uvalue = self.df["floor_thermal_transmittance_ending"].fillna( - floor_ending_uvalue - ) + floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue) + floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue) for component in ["walls", "roof", "floor"]: - self.df[f"{component}_thermal_transmittance"] = self.df[ - f"{component}_thermal_transmittance" - ].fillna(eval(f"{component}_starting_uvalue")) - self.df[f"{component}_thermal_transmittance_ending"] = self.df[ - f"{component}_thermal_transmittance_ending" - ].fillna(eval(f"{component}_ending_uvalue")) + self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue")) + self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue")) - self.df = self.df.drop( - columns=[ - "floor_type", - "wall_type", - "walls_clean_description", - "walls_clean_description_ending", - "estimated_number_of_floors", - "ground_floor_area_starting", - "ground_floor_area_ending", - ] - ) + self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"]) + def _adjust_assumed_values_in_wall_descriptions(self): """ Strip out assumed values for all wall descriptions """ for col in ["walls_clean_description", "walls_clean_description_ending"]: - self.df[col] = ( - self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip() - ) + self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip() + def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str): """ @@ -491,57 +261,34 @@ class TrainingDataset(BaseDataset): if component == "walls": expanded_df = expanded_df[ - (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) - & ( - expanded_df["is_solid_brick"] - == expanded_df["is_solid_brick_ending"] - ) - & ( - expanded_df["is_timber_frame"] - == expanded_df["is_timber_frame_ending"] - ) - & ( - expanded_df["is_granite_or_whinstone"] - == expanded_df["is_granite_or_whinstone_ending"] - ) - & (expanded_df["is_cob"] == expanded_df["is_cob_ending"]) - & ( - expanded_df["is_sandstone_or_limestone"] - == expanded_df["is_sandstone_or_limestone_ending"] - ) - ] + (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) & + (expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) & + (expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) & + (expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) & + (expanded_df["is_cob"] == expanded_df["is_cob_ending"]) & + (expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"]) + ] elif component == "floor": expanded_df = expanded_df[ - (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) - & (expanded_df["is_solid"] == expanded_df["is_solid_ending"]) - & ( - expanded_df["another_property_below"] - == expanded_df["another_property_below_ending"] - ) - & ( - expanded_df["is_to_unheated_space"] - == expanded_df["is_to_unheated_space_ending"] - ) - & ( - expanded_df["is_to_external_air"] - == expanded_df["is_to_external_air_ending"] - ) - ] + (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) & + (expanded_df["is_solid"] == expanded_df["is_solid_ending"]) & + (expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) & + (expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) & + (expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"]) + ] elif component == "roof": expanded_df = expanded_df[ - (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) - & (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) - & (expanded_df["is_loft"] == expanded_df["is_loft_ending"]) - & (expanded_df["is_flat"] == expanded_df["is_flat_ending"]) - & (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) - & (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) - & ( - expanded_df["has_dwelling_above"] - == expanded_df["has_dwelling_above_ending"] - ) - ] - + (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) & + (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) & + (expanded_df["is_loft"] == expanded_df["is_loft_ending"]) & + (expanded_df["is_flat"] == expanded_df["is_flat_ending"]) & + (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) & + (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) & + (expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"]) + ] + return expanded_df + def _expand_description_to_features(self, cleaned_lookup: dict): """ @@ -553,111 +300,65 @@ class TrainingDataset(BaseDataset): # remove this record, as it indicates that the quality of the EPC conducted in the first instance # is low # We also replace descriptions with their cleaned variants - """ + """ cols_to_drop = { "walls": [ # We need to cleaned descriptions for pulling out u-values - "original_description", - "thermal_transmittance_unit", - "original_description_ending", - "thermal_transmittance_unit_ending", - "is_cavity_wall_ending", - "is_solid_brick_ending", - "is_system_built_ending", - "is_timber_frame_ending", - "is_granite_or_whinstone_ending", - "is_as_built_ending", - "is_cob_ending", - "is_assumed_ending", - "is_sandstone_or_limestone_ending", + 'original_description', 'thermal_transmittance_unit', + 'original_description_ending', + 'thermal_transmittance_unit_ending', + 'is_cavity_wall_ending', 'is_filled_cavity_ending', + 'is_solid_brick_ending', 'is_system_built_ending', + 'is_timber_frame_ending', 'is_granite_or_whinstone_ending', + 'is_as_built_ending', 'is_cob_ending', 'is_assumed_ending', + 'is_sandstone_or_limestone_ending', # Re remove the is_assumed columns - "is_assumed", - "is_assumed_ending", + "is_assumed", "is_assumed_ending" ], "floor": [ - "original_description", - "clean_description", - "thermal_transmittance_unit", - "no_data", - "no_data_ending", - "original_description_ending", - "clean_description_ending", - "thermal_transmittance_unit_ending", - "is_suspended_ending", - "is_solid_ending", - "another_property_below_ending", - "is_to_unheated_space_ending", - "is_to_external_air_ending", - "is_assumed", - "is_assumed_ending", + "original_description", "clean_description", "thermal_transmittance_unit", + "no_data", "no_data_ending", "original_description_ending", + "clean_description_ending", "thermal_transmittance_unit_ending", + "is_suspended_ending", "is_solid_ending", "another_property_below_ending", + "is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed", + "is_assumed_ending" ], "roof": [ - "original_description", - "clean_description", - "thermal_transmittance_unit", - "is_assumed", - "is_valid", - "original_description_ending", - "clean_description_ending", - "thermal_transmittance_unit_ending", - "is_pitched_ending", - "is_roof_room_ending", - "is_loft_ending", - "is_flat_ending", - "is_thatched_ending", - "has_dwelling_above_ending", - "is_assumed_ending", - "is_valid_ending", + "original_description", "clean_description", "thermal_transmittance_unit", + "is_assumed", "is_valid", "original_description_ending", "clean_description_ending", + "thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending", + "is_loft_ending", "is_flat_ending", "is_thatched_ending", "is_at_rafters_ending", + "has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending" ], "hotwater": [ - "original_description", - "clean_description", - "assumed", - "original_description_ending", - "clean_description_ending", - "assumed_ending", + "original_description", "clean_description", "assumed", "original_description_ending", + "clean_description_ending", "assumed_ending" ], "mainheat": [ - "original_description", - "clean_description", - "original_description_ending", - "has_assumed", - "original_description_ending", - "clean_description_ending", + "original_description", "clean_description", "original_description_ending", + "has_assumed", "original_description_ending", "clean_description_ending", "has_assumed_ending", ], "mainheatcont": [ - "original_description", - "clean_description", - "original_description_ending", - "clean_description_ending", + "original_description", "clean_description", "original_description_ending", "clean_description_ending" ], "windows": [ - "original_description", - "clean_description", - "original_description_ending", - "clean_description_ending", + "original_description", "clean_description", "original_description_ending", "clean_description_ending", # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature - "has_glazing", - "glazing_coverage", - "no_data", - "has_glazing_ending", - "glazing_coverage_ending", - "no_data_ending", + "has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending", + "no_data_ending" ], "main-fuel": [ - "original_description", - "clean_description", - "original_description_ending", - "clean_description_ending", + "original_description", "clean_description", "original_description_ending", "clean_description_ending" ], } components_to_expand = cols_to_drop.keys() - + for component in components_to_expand: - # TODO: change cleaned dataframe to have underscores instead of dashes + + # TODO: change cleaned dataframe to have underscores instead of dashes if component == "main-fuel": cleaned_key = "main-fuel" left_on_starting = "main_fuel_starting" @@ -667,13 +368,10 @@ class TrainingDataset(BaseDataset): cleaned_key = f"{component}-description" left_on_starting = f"{component}_description_starting" left_on_ending = f"{component}_description_ending" - original_cols = [ - f"{component}_description_starting", - f"{component}_description_ending", - ] + original_cols = [f"{component}_description_starting", f"{component}_description_ending"] cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) - + expanded_df = self.df.merge( cleaned_lookup_df_for_key, how="left", @@ -684,16 +382,14 @@ class TrainingDataset(BaseDataset): how="left", left_on=left_on_ending, right_on="original_description", - suffixes=("", "_ending"), + suffixes=("", "_ending") ) - # Drop properties where key material types have changed + # Drop inconsistent properties expanded_df = self._drop_inconsistent_properties(expanded_df, component) - + # Drop original cols and cols to drop - expanded_df = expanded_df.drop( - columns=cols_to_drop[component] + original_cols - ) + expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols) # Rename columns to component specific names, if they have not been dropped expanded_df = expanded_df.rename( @@ -709,12 +405,11 @@ class TrainingDataset(BaseDataset): } ) self.df = expanded_df - + # We don't need any lighting specific cleaning, we just drop the original description as we use # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING - self.df = self.df.drop( - columns=["lighting_description_starting", "lighting_description_ending"] - ) + self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"]) + def _clean_missing_values(self, ignore_cols=None): missings = pd.isnull(self.df).sum() @@ -725,17 +420,14 @@ class TrainingDataset(BaseDataset): for col in missings.index: unique_values = self.df[col].unique() - if ( - (True in unique_values) - or (False in unique_values) - or (col in BOOLEAN_VARIABLES) - ): + if True in unique_values or False in unique_values: self.df[col] = self.df[col].fillna(False) if "none" in unique_values: self.df[col] = self.df[col].fillna("none") else: self.df[col] = self.df[col].fillna("Unknown") + def _null_validation(self, information: str): print(f"Null validation after {information}") if pd.isnull(self.df).sum().sum(): @@ -745,21 +437,267 @@ class TrainingDataset(BaseDataset): """ Drop features that are not needed for modelling """ - self.df = self.df.drop( - columns=["lodgement_date_starting", "lodgement_date_ending"] - ) + self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"]) + def _feature_generation(self): """ Generate features for modelling """ - self.df["days_to_starting"] = self._calculate_days_to( - self.df["lodgement_date_starting"] + self.df["days_to_starting"] = self._calculate_days_to(self.df["lodgement_date_starting"]) + self.df["day_to_ending"] = self._calculate_days_to(self.df["lodgement_date_ending"]) + + def _clean_efficiency_variables(self): + + """ + These is scope to clean this by the model per corresponding description. + E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and + fill in the missing values with this. + When looking at this initially, there are a large volume of records with missing energy efficiency + values and therefore a simpler approach was taken just to test including these variables + :param df: + :return: + """ + + missings = pd.isnull(self.df).sum() + missings = missings[missings >= 1] + + if len(missings) == 0: + return + + # Make sure they are all efficiency columns + if any(~missings.index.str.contains("energy_eff")): + raise ValueError("Non efficiency columns are missing") + + for m in missings.index: + self.df[m] = self.df[m].fillna("NO_RATING") + + + @staticmethod + def _calculate_days_to(lodgement_date): + + if isinstance(lodgement_date, str): + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).days + + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).dt.days + + # def __add__(self, other) -> "TrainingDataset": + # if not isinstance(other, TrainingDataset): + # raise TypeError("Addition can only be performed with another instance of TrainingDataset") + # return TrainingDataset(self.datasets + other.datasets) + + # def __radd__(self, other): + # """ + # Required for sum() to work + # """ + # if isinstance(other, int): + # return self + # else: + # return self.__add__(other) + +class RecordDataset(BaseDataset): + """ + A collection of EPCRecrods can be combined into a Dataset. + """ + + def __init__(self, datasets: pd.DataFrame, cleaned_lookup: dict) -> None: + # self.pipeline_steps = self.pipeline_factory("newdata") + self.datasets = datasets + self.df = datasets + + self._clean_efficiency_variables() + self._null_validation(information="Clean Efficiency Variables") + self._expand_description_to_features(cleaned_lookup) + self._adjust_assumed_values_in_wall_descriptions() + self._generate_u_values_from_features() + # # TODO: For some of the features that we clean, we have either a true, false or possibly null value + # # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't + # # need to + self._clean_missing_values() + self._null_validation(information="Clean Missing Values") + # self._remove_abnormal_change_in_floor_area() + self._ensure_numeric() + + + def _ensure_numeric(self): + """ + Ensure that all columns are numeric + """ + # TODO: move into EPCRecord record + uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col] + for uvalue_col in uvalue_columns: + self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) + + + def _clean_missing_values(self, ignore_cols=None): + missings = pd.isnull(self.df).sum() + missings = missings[missings > 0] + + if ignore_cols: + missings = missings[~missings.index.isin(ignore_cols)] + + for col in missings.index: + unique_values = self.df[col].unique() + if True in unique_values or False in unique_values: + self.df[col] = self.df[col].fillna(False) + if "none" in unique_values: + self.df[col] = self.df[col].fillna("none") + else: + self.df[col] = self.df[col].fillna("Unknown") + + + @staticmethod + def _lambda_function_to_generate_roof_uvalue(row, is_end=False): + """ + Using the apply method, use the get_roof_u_value method to generate the u-value + """ + + col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending" + + if row["has_dwelling_above"]: + if row["roof_thermal_transmittance"] != 0: + raise ValueError("Should have 0 u-value for roof") + + return get_roof_u_value( + insulation_thickness=row[col_name], + has_dwelling_above=row["has_dwelling_above"], + is_loft=row["is_loft"], + is_roof_room=row["is_roof_room"], + is_thatched=row["is_thatched"], + is_flat=row["is_flat"], + is_pitched=row["is_pitched"], + is_at_rafters=row["is_at_rafters"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + + @staticmethod + def _lambda_function_to_generate_wall_uvalue(row, is_end=False): + """ + Using the apply method, use the get_wall_u_value method to generate the u-value + """ + description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" + thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending" + + if pd.isnull(row[thermal_transistance_col_name]): + output = get_wall_u_value( + clean_description=row[description_col_name], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + ) + else: + output = row[thermal_transistance_col_name] + + return output + + @staticmethod + def _lambda_function_to_generate_floor_uvalue(row, is_end=False): + """ + Using the apply method, use the get_floor_u_value method to generate the u-value + """ + + floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending" + + if row["another_property_below"]: + if row["floor_thermal_transmittance"] != 0: + raise ValueError("Should have 0 u-value for floor") + + return 0 + else: + uvalue = row[floor_thermal_col_name] + + if pd.isnull(uvalue): + + insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending" + floor_area_col_name = "estimated_perimeter" if not is_end else "estimated_perimeter_ending" + perimeter_col_name = "total_floor_area" if not is_end else "total_floor_area_ending" + + uvalue = get_floor_u_value( + floor_type=row["floor_type"], + perimeter=row[floor_area_col_name], + area=row[perimeter_col_name], + insulation_thickness=row[insulation_col_name], + wall_type=row["wall_type"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + + return uvalue + + def _generate_u_values_from_features(self): + """ + Generate u-values from the features + """ + + # ~~~~~~~~~~~~~~~~~~ + # Walls + # ~~~~~~~~~~~~~~~~~~ + + walls_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_wall_uvalue(row), + axis=1 ) - self.df["days_to_ending"] = self._calculate_days_to( - self.df["lodgement_date_ending"] + + walls_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Roof + # ~~~~~~~~~~~~~~~~~~ + + roof_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_roof_uvalue(row), + axis=1 ) + roof_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Floor + # ~~~~~~~~~~~~~~~~~~ + + self.df['estimated_perimeter'] = self.df.apply( + lambda row: estimate_perimeter(row["total_floor_area"], row["number_habitable_rooms"]), + axis=1 + ) + + self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"}) + self.df["wall_type"] = self.df.apply( + lambda row: get_wall_type( + is_cavity_wall=row["is_cavity_wall"], + is_solid_brick=row["is_solid_brick"], + is_timber_frame=row["is_timber_frame"], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_cob=row["is_cob"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + is_system_built=row["is_system_built"], + is_park_home=row["is_park_home"] + ), + axis=1 + ) + + floor_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_floor_uvalue(row), + axis=1 + ) + + floor_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_uvalue) + + for component in ["walls", "roof", "floor"]: + self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_uvalue")) + + self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description"]) + + def _adjust_assumed_values_in_wall_descriptions(self): + """ + Strip out assumed values for all wall descriptions + """ + for col in ["walls_clean_description"]: + self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip() + + def _clean_efficiency_variables(self): """ These is scope to clean this by the model per corresponding description. @@ -775,31 +713,118 @@ class TrainingDataset(BaseDataset): missings = missings[missings >= 1] if len(missings) == 0: - return + return - # Make sure they are all efficiency columns + # Make sure they are all efficiency columns if any(~missings.index.str.contains("energy_eff")): raise ValueError("Non efficiency columns are missing") for m in missings.index: - self.df[m] = self.df[m].fillna("NO_RATING") + column_index = self.df[m].isna() + self.df.loc[column_index, m] = "NO_RATING" - @staticmethod - def _calculate_days_to(lodgement_date): - if isinstance(lodgement_date, str): - return ( - pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) - ).days - return ( - pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) - ).dt.days + def _null_validation(self, information: str): + print(f"Null validation after {information}") + if pd.isnull(self.df).sum().sum(): + raise ValueError(f"Null values found in dataset, after step {information}") - # def __add__(self, other) -> "TrainingDataset": - # if not isinstance(other, TrainingDataset): - # raise TypeError("Addition can only be performed with another instance of TrainingDataset") - # return TrainingDataset(self.datasets + other.datasets) + + def _expand_description_to_features(self, cleaned_lookup: dict): + """ + This method will merge on the cleaned lookup table and ensure that the building fabric in the + starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest + possible dataset. + # We look for key building fabric features that have changed from one EPC to the next. + # if, for example, we see that a home has gone from being a cavity wall to a solid wall, we + # remove this record, as it indicates that the quality of the EPC conducted in the first instance + # is low + # We also replace descriptions with their cleaned variants + """ + cols_to_drop = { + "walls": [ + # We need to cleaned descriptions for pulling out u-values + 'original_description', 'thermal_transmittance_unit', + # Re remove the is_assumed columns + "is_assumed" + ], + "floor": [ + "original_description", "clean_description", "thermal_transmittance_unit", + "no_data", + "is_assumed" + ], + "roof": [ + "original_description", "clean_description", "thermal_transmittance_unit", + "is_assumed", "is_valid" + ], + "hotwater": [ + "original_description", "clean_description", "assumed", + ], + "mainheat": [ + "original_description", "clean_description", + "has_assumed", + ], + "mainheatcont": [ + "original_description", "clean_description", + ], + "windows": [ + "original_description", "clean_description", + # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature + "has_glazing", "glazing_coverage", "no_data", + ], + "main-fuel": [ + "original_description", "clean_description", + ], + } + + components_to_expand = cols_to_drop.keys() + + for component in components_to_expand: + + # TODO: change cleaned dataframe to have underscores instead of dashes + if component == "main-fuel": + cleaned_key = "main-fuel" + left_on_key = "main_fuel" + original_cols = ["main_fuel"] + else: + cleaned_key = f"{component}-description" + left_on_key = f"{component}_description" + original_cols = [f"{component}_description"] + + cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) + + expanded_df = self.df.merge( + cleaned_lookup_df_for_key, + how="left", + left_on=left_on_key, + right_on="original_description" + ) + + # Drop original cols and cols to drop + expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols) + + # Rename columns to component specific names, if they have not been dropped + expanded_df = expanded_df.rename( + columns={ + "insulation_thickness": f"{component}_insulation_thickness", + "thermal_transmittance": f"{component}_thermal_transmittance", + "tariff_type": f"{component}_tariff_type", + "clean_description": f"{component}_clean_description", + } + ) + self.df = expanded_df + + # We don't need any lighting specific cleaning, we just drop the original description as we use + # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING + self.df = self.df.drop(columns=["lighting_description"]) + + + # def __add__(self, other) -> "NewDataset": + # if not isinstance(other, NewDataset): + # raise TypeError("Addition can only be performed with another instance of ScoringDataset") + # return NewDataset(self.datasets + other.datasets) + # def __radd__(self, other): # """ # Required for sum() to work @@ -807,30 +832,4 @@ class TrainingDataset(BaseDataset): # if isinstance(other, int): # return self # else: - # return self.__add__(other) - - -class NewDataset(BaseDataset): - """ - A collection of EPCDifferenceRecords can be combined into a ScoringDataset. - """ - - def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: - # self.pipeline_steps = self.pipeline_factory("newdata") - self.datasets = datasets - - def __add__(self, other) -> "NewDataset": - if not isinstance(other, NewDataset): - raise TypeError( - "Addition can only be performed with another instance of ScoringDataset" - ) - return NewDataset(self.datasets + other.datasets) - - def __radd__(self, other): - """ - Required for sum() to work - """ - if isinstance(other, int): - return self - else: - return self.__add__(other) + # return self.__add__(other) \ No newline at end of file diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index ba228d89..f0be3c2f 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -7,7 +7,7 @@ from tqdm import tqdm from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Record import EPCRecord, EPCDifferenceRecord -from etl.epc.Dataset import TrainingDataset +from etl.epc.Dataset import TrainingDataset, RecordDataset from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 from etl.epc.settings import ( MANDATORY_FIXED_FEATURES, @@ -24,8 +24,8 @@ from etl.epc.settings import ( # TODO: change in setting file MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] -# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] -LATEST_FIELD = [x.lower() for x in LATEST_FIELD] +LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] +# LATEST_FIELD = [x.lower() for x in LATEST_FIELD] COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] RDSAP_RESPONSE = RDSAP_RESPONSE.lower() HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() @@ -62,6 +62,12 @@ def get_cleaned_description_mapping(): clean_lookup = get_cleaned_description_mapping() +# import pickle +# with open("./clean_lookup.pkl", "wb") as f: +# pickle.dump(clean_lookup, f) + +# clean_lookup = pickle.load(open("./clean_lookup.pkl", "rb")) + class EPCPipeline: """ @@ -117,8 +123,58 @@ class EPCPipeline: self.run_training_dataset_pipeline() elif self.run_mode == "newdata": self.run_newdata_dataset_pipeline() + elif self.run_mode == "record": + self.run_record_dataset_pipeline() else: raise ValueError("Run mode defined needs to be in 'training' or 'newdata'") + + + def run_record_dataset_pipeline(self): + """ + Running pipeline with just the EPCRecords + """ + + if self.directories is None: + raise ValueError( + "Directories not specified - Unable to run Training pipeline" + ) + + for directory in tqdm(self.directories): + + filepath = directory / self.epc_local_file + self.epc_data_processor.prepare_data(filepath=filepath) + + constituency_data = self.epc_data_processor.data + self.compiled_cleaning_averages.append( + self.epc_data_processor.cleaning_averages + ) + + # TODO: integrate with EPCRecord + record_dataset = constituency_data[['uprn'] + VARIABLE_DATA_FEATURES + MANDATORY_FIXED_FEATURES + LATEST_FIELD] + + constituency_dataset = RecordDataset(datasets=record_dataset, cleaned_lookup=clean_lookup) + + self.compiled_dataset = pd.concat( + [self.compiled_dataset, constituency_dataset.df] + ) + + save_dataframe_to_s3_parquet( + df=self.compiled_dataset, + bucket_name=self.epc_bucket_name, + file_key=self.epc_compiled_dataset_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.DataFrame(self.compiled_all_equal_rows), + bucket_name=self.epc_bucket_name, + file_key=self.epc_all_equal_rows_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.concat(self.compiled_cleaning_averages), + bucket_name=self.epc_bucket_name, + file_key=self.epc_cleaning_dataset_key, + ) def run_newdata_dataset_pipeline(self): """