From 1fd976f3507ae3d78228791ab7c12742ca8c43e5 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Mon, 29 Jan 2024 10:10:33 +0000 Subject: [PATCH] add all permutations, and fixed typo --- etl/epc/Dataset.py | 417 ++++++++++++++++++------- etl/epc/Pipeline.py | 174 +++++++---- etl/epc/Record.py | 544 +++++++++++++++++++++++---------- etl/epc/property_change_app.py | 14 +- 4 files changed, 817 insertions(+), 332 deletions(-) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index fbc7a2d2..217c65b5 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -7,8 +7,12 @@ from etl.epc.settings import EARLIEST_EPC_DATE from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( - estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, - get_wall_type + estimate_number_of_floors, + get_wall_u_value, + get_roof_u_value, + get_floor_u_value, + estimate_perimeter, + get_wall_type, ) @@ -41,8 +45,9 @@ class TrainingDataset(BaseDataset): A collection of EPCDifferenceRecords can be combined into a TrainingDataset. """ - def __init__(self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict) -> None: - + def __init__( + self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict + ) -> None: # self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) @@ -61,14 +66,51 @@ class TrainingDataset(BaseDataset): self._null_validation(information="Clean Missing Values") self._remove_abnormal_change_in_floor_area() self._ensure_numeric() + self._organise_starting_ending_columns() + + def _organise_starting_ending_columns(self): + """ + Organise the starting and ending columns so that they are next to each other + """ + no_suffix_cols = [ + col + for col in self.df.columns + if "_ending" not in col and "_starting" not in col + ] + starting_cols = [col for col in self.df.columns if "_starting" in col] + ending_cols = [col for col in self.df.columns if "_ending" in col] + + common_cols = [ + col.rsplit("_", 1)[0] + for col in starting_cols + if col.replace("_starting", "_ending") in ending_cols + ] + only_ending_cols = [ + col + for col in ending_cols + if col.replace("_ending", "_starting") not in starting_cols + ] + + common_cols = [[col + "_starting", col + "_ending"] for col in common_cols] + + self.df = self.df.loc[ + :, + no_suffix_cols + + only_ending_cols + + [col for cols in common_cols for col in cols], + ] def _remove_abnormal_change_in_floor_area(self): """ Remove properties where the change in floor area is greater than 100% """ - self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"]) - self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] + self.df["tfa_diff_abs"] = abs( + self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"] + ) + self.df["tfa_diff_prop"] = ( + self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] + ) self.df = self.df[self.df["tfa_diff_prop"] < 0.5] self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) @@ -77,7 +119,9 @@ class TrainingDataset(BaseDataset): Ensure that all columns are numeric """ # TODO: move into EPCRecord record - uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col] + uvalue_columns = [ + col for col in self.df.columns if "thermal_transmittance" in col + ] for uvalue_col in uvalue_columns: self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) @@ -87,7 +131,11 @@ class TrainingDataset(BaseDataset): Using the apply method, use the get_roof_u_value method to generate the u-value """ - col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending" + col_name = ( + "roof_insulation_thickness" + if not is_end + else "roof_insulation_thickness_ending" + ) if row["has_dwelling_above"]: if row["roof_thermal_transmittance"] != 0: @@ -105,7 +153,7 @@ class TrainingDataset(BaseDataset): is_flat=row["is_flat"], is_pitched=row["is_pitched"], is_at_rafters=row["is_at_rafters"], - age_band=england_wales_age_band_lookup[row["construction_age_band"]] + age_band=england_wales_age_band_lookup[row["construction_age_band"]], ) @staticmethod @@ -113,9 +161,16 @@ class TrainingDataset(BaseDataset): """ Using the apply method, use the get_wall_u_value method to generate the u-value """ - description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" - thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else \ - "walls_thermal_transmittance_ending" + description_col_name = ( + "walls_clean_description" + if not is_end + else "walls_clean_description_ending" + ) + thermal_transistance_col_name = ( + "walls_thermal_transmittance" + if not is_end + else "walls_thermal_transmittance_ending" + ) if pd.isnull(row[thermal_transistance_col_name]): output = get_wall_u_value( @@ -135,7 +190,11 @@ class TrainingDataset(BaseDataset): Using the apply method, use the get_floor_u_value method to generate the u-value """ - floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending" + floor_thermal_col_name = ( + "floor_thermal_transmittance" + if not is_end + else "floor_thermal_transmittance_ending" + ) if row["another_property_below"]: if row["floor_thermal_transmittance"] != 0: @@ -148,9 +207,21 @@ class TrainingDataset(BaseDataset): uvalue = row[floor_thermal_col_name] if pd.isnull(uvalue): - insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending" - perimeter_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending" - floor_area_col_name = "ground_floor_area_starting" if not is_end else "ground_floor_area_ending" + insulation_col_name = ( + "floor_insulation_thickness" + if not is_end + else "floor_insulation_thickness_ending" + ) + perimeter_col_name = ( + "estimated_perimeter_starting" + if not is_end + else "estimated_perimeter_ending" + ) + floor_area_col_name = ( + "ground_floor_area_starting" + if not is_end + else "ground_floor_area_ending" + ) uvalue = get_floor_u_value( floor_type=row["floor_type"], @@ -158,7 +229,7 @@ class TrainingDataset(BaseDataset): area=row[floor_area_col_name], insulation_thickness=row[insulation_col_name], wall_type=row["wall_type"], - age_band=england_wales_age_band_lookup[row["construction_age_band"]] + age_band=england_wales_age_band_lookup[row["construction_age_band"]], ) return uvalue @@ -173,65 +244,75 @@ class TrainingDataset(BaseDataset): # ~~~~~~~~~~~~~~~~~~ walls_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_wall_uvalue(row), - axis=1 + lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1 ) walls_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True), - axis=1 + axis=1, ) - walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue) - walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df[ - "walls_clean_description_ending"] + walls_starting_uvalue = self.df["walls_thermal_transmittance"].fillna( + walls_starting_uvalue + ) + walls_starting_equals_ending_flag = ( + self.df["walls_clean_description"] + == self.df["walls_clean_description_ending"] + ) walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[ - walls_starting_equals_ending_flag] + walls_starting_equals_ending_flag + ] # ~~~~~~~~~~~~~~~~~~ # Roof # ~~~~~~~~~~~~~~~~~~ roof_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_roof_uvalue(row), - axis=1 + lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1 ) roof_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True), - axis=1 + axis=1, ) - roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue) - roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue) + roof_starting_uvalue = self.df["roof_thermal_transmittance"].fillna( + roof_starting_uvalue + ) + roof_ending_uvalue = self.df["roof_thermal_transmittance_ending"].fillna( + roof_ending_uvalue + ) # ~~~~~~~~~~~~~~~~~~ # Floor # ~~~~~~~~~~~~~~~~~~ - self.df['estimated_number_of_floors'] = self.df.apply( - lambda row: estimate_number_of_floors(row['property_type']), - axis=1 + self.df["estimated_number_of_floors"] = self.df.apply( + lambda row: estimate_number_of_floors(row["property_type"]), axis=1 ) self.df["ground_floor_area_starting"] = ( - self.df["total_floor_area_starting"] / self.df['estimated_number_of_floors'] + self.df["total_floor_area_starting"] / self.df["estimated_number_of_floors"] ) self.df["ground_floor_area_ending"] = ( - self.df["total_floor_area_ending"] / self.df['estimated_number_of_floors'] + self.df["total_floor_area_ending"] / self.df["estimated_number_of_floors"] ) - self.df['estimated_perimeter_starting'] = self.df.apply( + self.df["estimated_perimeter_starting"] = self.df.apply( lambda row: estimate_perimeter( - row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors'] + row["ground_floor_area_starting"], + row["number_habitable_rooms"] / row["estimated_number_of_floors"], ), - axis=1 + axis=1, ) - self.df['estimated_perimeter_ending'] = self.df.apply( + self.df["estimated_perimeter_ending"] = self.df.apply( lambda row: estimate_perimeter( - row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors'] + row["ground_floor_area_starting"], + row["number_habitable_rooms"] / row["estimated_number_of_floors"], ), - axis=1 + axis=1, + ) + self.df["floor_type"] = self.df["is_suspended"].replace( + {True: "suspended", False: "solid"} ) - self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"}) self.df["wall_type"] = self.df.apply( lambda row: get_wall_type( is_cavity_wall=row["is_cavity_wall"], @@ -241,39 +322,56 @@ class TrainingDataset(BaseDataset): is_cob=row["is_cob"], is_sandstone_or_limestone=row["is_sandstone_or_limestone"], is_system_built=row["is_system_built"], - is_park_home=row["is_park_home"] + is_park_home=row["is_park_home"], ), - axis=1 + axis=1, ) floor_starting_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_floor_uvalue(row), - axis=1 + lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1 ) floor_ending_uvalue = self.df.apply( - lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True), - axis=1 + lambda row: self._lambda_function_to_generate_floor_uvalue( + row, is_end=True + ), + axis=1, ) - floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue) - floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue) + floor_starting_uvalue = self.df["floor_thermal_transmittance"].fillna( + floor_starting_uvalue + ) + floor_ending_uvalue = self.df["floor_thermal_transmittance_ending"].fillna( + floor_ending_uvalue + ) for component in ["walls", "roof", "floor"]: - self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna( - eval(f"{component}_starting_uvalue")) + self.df[f"{component}_thermal_transmittance"] = self.df[ + f"{component}_thermal_transmittance" + ].fillna(eval(f"{component}_starting_uvalue")) self.df[f"{component}_thermal_transmittance_ending"] = self.df[ - f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue")) + f"{component}_thermal_transmittance_ending" + ].fillna(eval(f"{component}_ending_uvalue")) self.df = self.df.drop( - columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", - 'estimated_number_of_floors', "ground_floor_area_starting", "ground_floor_area_ending"]) + columns=[ + "floor_type", + "wall_type", + "walls_clean_description", + "walls_clean_description_ending", + "estimated_number_of_floors", + "ground_floor_area_starting", + "ground_floor_area_ending", + ] + ) def _adjust_assumed_values_in_wall_descriptions(self): """ Strip out assumed values for all wall descriptions """ for col in ["walls_clean_description", "walls_clean_description_ending"]: - self.df[col] = self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip() + self.df[col] = ( + self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip() + ) def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str): """ @@ -282,31 +380,55 @@ class TrainingDataset(BaseDataset): if component == "walls": expanded_df = expanded_df[ - (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) & - (expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) & - (expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) & - (expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) & - (expanded_df["is_cob"] == expanded_df["is_cob_ending"]) & - (expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"]) - ] + (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) + & ( + expanded_df["is_solid_brick"] + == expanded_df["is_solid_brick_ending"] + ) + & ( + expanded_df["is_timber_frame"] + == expanded_df["is_timber_frame_ending"] + ) + & ( + expanded_df["is_granite_or_whinstone"] + == expanded_df["is_granite_or_whinstone_ending"] + ) + & (expanded_df["is_cob"] == expanded_df["is_cob_ending"]) + & ( + expanded_df["is_sandstone_or_limestone"] + == expanded_df["is_sandstone_or_limestone_ending"] + ) + ] elif component == "floor": expanded_df = expanded_df[ - (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) & - (expanded_df["is_solid"] == expanded_df["is_solid_ending"]) & - (expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) & - (expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) & - (expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"]) - ] + (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) + & (expanded_df["is_solid"] == expanded_df["is_solid_ending"]) + & ( + expanded_df["another_property_below"] + == expanded_df["another_property_below_ending"] + ) + & ( + expanded_df["is_to_unheated_space"] + == expanded_df["is_to_unheated_space_ending"] + ) + & ( + expanded_df["is_to_external_air"] + == expanded_df["is_to_external_air_ending"] + ) + ] elif component == "roof": expanded_df = expanded_df[ - (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) & - (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) & - (expanded_df["is_loft"] == expanded_df["is_loft_ending"]) & - (expanded_df["is_flat"] == expanded_df["is_flat_ending"]) & - (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) & - (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) & - (expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"]) - ] + (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) + & (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) + & (expanded_df["is_loft"] == expanded_df["is_loft_ending"]) + & (expanded_df["is_flat"] == expanded_df["is_flat_ending"]) + & (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) + & (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) + & ( + expanded_df["has_dwelling_above"] + == expanded_df["has_dwelling_above_ending"] + ) + ] return expanded_df @@ -325,60 +447,108 @@ class TrainingDataset(BaseDataset): cols_to_drop = { "walls": [ # We need to cleaned descriptions for pulling out u-values - 'original_description', 'thermal_transmittance_unit', - 'original_description_ending', - 'thermal_transmittance_unit_ending', - 'is_cavity_wall_ending', 'is_filled_cavity_ending', - 'is_solid_brick_ending', 'is_system_built_ending', - 'is_timber_frame_ending', 'is_granite_or_whinstone_ending', - 'is_as_built_ending', 'is_cob_ending', 'is_assumed_ending', - 'is_sandstone_or_limestone_ending', + "original_description", + "thermal_transmittance_unit", + "original_description_ending", + "thermal_transmittance_unit_ending", + "is_cavity_wall_ending", + "is_filled_cavity_ending", + "is_solid_brick_ending", + "is_system_built_ending", + "is_timber_frame_ending", + "is_granite_or_whinstone_ending", + "is_as_built_ending", + "is_cob_ending", + "is_assumed_ending", + "is_sandstone_or_limestone_ending", # Re remove the is_assumed columns - "is_assumed", "is_assumed_ending" + "is_assumed", + "is_assumed_ending", ], "floor": [ - "original_description", "clean_description", "thermal_transmittance_unit", - "no_data", "no_data_ending", "original_description_ending", - "clean_description_ending", "thermal_transmittance_unit_ending", - "is_suspended_ending", "is_solid_ending", "another_property_below_ending", - "is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed", - "is_assumed_ending" + "original_description", + "clean_description", + "thermal_transmittance_unit", + "no_data", + "no_data_ending", + "original_description_ending", + "clean_description_ending", + "thermal_transmittance_unit_ending", + "is_suspended_ending", + "is_solid_ending", + "another_property_below_ending", + "is_to_unheated_space_ending", + "is_to_external_air_ending", + "is_assumed", + "is_assumed_ending", ], "roof": [ - "original_description", "clean_description", "thermal_transmittance_unit", - "is_assumed", "is_valid", "original_description_ending", "clean_description_ending", - "thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending", - "is_loft_ending", "is_flat_ending", "is_thatched_ending", "is_at_rafters_ending", - "has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending" + "original_description", + "clean_description", + "thermal_transmittance_unit", + "is_assumed", + "is_valid", + "original_description_ending", + "clean_description_ending", + "thermal_transmittance_unit_ending", + "is_pitched_ending", + "is_roof_room_ending", + "is_loft_ending", + "is_flat_ending", + "is_thatched_ending", + "is_at_rafters_ending", + "has_dwelling_above_ending", + "is_assumed_ending", + "is_valid_ending", ], "hotwater": [ - "original_description", "clean_description", "assumed", "original_description_ending", - "clean_description_ending", "assumed_ending" + "original_description", + "clean_description", + "assumed", + "original_description_ending", + "clean_description_ending", + "assumed_ending", ], "mainheat": [ - "original_description", "clean_description", "original_description_ending", - "has_assumed", "original_description_ending", "clean_description_ending", + "original_description", + "clean_description", + "original_description_ending", + "has_assumed", + "original_description_ending", + "clean_description_ending", "has_assumed_ending", ], "mainheatcont": [ - "original_description", "clean_description", "original_description_ending", "clean_description_ending" + "original_description", + "clean_description", + "original_description_ending", + "clean_description_ending", ], "windows": [ - "original_description", "clean_description", "original_description_ending", "clean_description_ending", + "original_description", + "clean_description", + "original_description_ending", + "clean_description_ending", # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature - "has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending", - "no_data_ending" + "has_glazing", + "glazing_coverage", + "no_data", + "has_glazing_ending", + "glazing_coverage_ending", + "no_data_ending", ], "main-fuel": [ - "original_description", "clean_description", "original_description_ending", "clean_description_ending" + "original_description", + "clean_description", + "original_description_ending", + "clean_description_ending", ], } components_to_expand = cols_to_drop.keys() for component in components_to_expand: - - # TODO: change cleaned dataframe to have underscores instead of dashes + # TODO: change cleaned dataframe to have underscores instead of dashes if component == "main-fuel": cleaned_key = "main-fuel" left_on_starting = "main_fuel_starting" @@ -388,7 +558,10 @@ class TrainingDataset(BaseDataset): cleaned_key = f"{component}-description" left_on_starting = f"{component}_description_starting" left_on_ending = f"{component}_description_ending" - original_cols = [f"{component}_description_starting", f"{component}_description_ending"] + original_cols = [ + f"{component}_description_starting", + f"{component}_description_ending", + ] cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) @@ -402,14 +575,16 @@ class TrainingDataset(BaseDataset): how="left", left_on=left_on_ending, right_on="original_description", - suffixes=("", "_ending") + suffixes=("", "_ending"), ) # Drop properties where key material types have changed expanded_df = self._drop_inconsistent_properties(expanded_df, component) # Drop original cols and cols to drop - expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols) + expanded_df = expanded_df.drop( + columns=cols_to_drop[component] + original_cols + ) # Rename columns to component specific names, if they have not been dropped expanded_df = expanded_df.rename( @@ -428,7 +603,9 @@ class TrainingDataset(BaseDataset): # We don't need any lighting specific cleaning, we just drop the original description as we use # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING - self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"]) + self.df = self.df.drop( + columns=["lighting_description_starting", "lighting_description_ending"] + ) def _clean_missing_values(self, ignore_cols=None): missings = pd.isnull(self.df).sum() @@ -455,17 +632,22 @@ class TrainingDataset(BaseDataset): """ Drop features that are not needed for modelling """ - self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"]) + self.df = self.df.drop( + columns=["lodgement_date_starting", "lodgement_date_ending"] + ) def _feature_generation(self): """ Generate features for modelling """ - self.df["days_to_starting"] = self._calculate_days_to(self.df["lodgement_date_starting"]) - self.df["day_to_ending"] = self._calculate_days_to(self.df["lodgement_date_ending"]) + self.df["days_to_starting"] = self._calculate_days_to( + self.df["lodgement_date_starting"] + ) + self.df["days_to_ending"] = self._calculate_days_to( + self.df["lodgement_date_ending"] + ) def _clean_efficiency_variables(self): - """ These is scope to clean this by the model per corresponding description. E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and @@ -491,7 +673,6 @@ class TrainingDataset(BaseDataset): @staticmethod def _calculate_days_to(lodgement_date): - if isinstance(lodgement_date, str): return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) @@ -527,7 +708,9 @@ class NewDataset(BaseDataset): def __add__(self, other) -> "NewDataset": if not isinstance(other, NewDataset): - raise TypeError("Addition can only be performed with another instance of ScoringDataset") + raise TypeError( + "Addition can only be performed with another instance of ScoringDataset" + ) return NewDataset(self.datasets + other.datasets) def __radd__(self, other): diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 168f7f71..99bbb22f 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -31,9 +31,13 @@ CARBON_RESPONSE = CARBON_RESPONSE.lower() CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES] EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] -VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [ - "lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE -] +VARIABLE_DATA_FEATURES = ( + COMPONENT_FEATURES + + EFFICIENCY_FEATURES + + POTENTIAL_COLUMNS + + ["lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE] +) + def get_cleaned_description_mapping(): """ @@ -45,16 +49,17 @@ def get_cleaned_description_mapping(): """ cleaned = read_from_s3( - s3_file_name="cleaned_epc_data/cleaned.bson", - bucket_name="retrofit-data-dev" + s3_file_name="cleaned_epc_data/cleaned.bson", bucket_name="retrofit-data-dev" ) cleaned = msgpack.unpackb(cleaned, raw=False) return cleaned + clean_lookup = get_cleaned_description_mapping() + class EPCPipeline: """ This class will take a list of directories and process them to create a dataset: @@ -66,17 +71,17 @@ class EPCPipeline: """ def __init__( - self, - epc_data_processor: EPCDataProcessor, - api_epc_records: dict = None, - directories: List[Path] | None = None, - run_mode="training", - epc_local_file="certificates.csv", - epc_bucket_name="retrofit-data-dev", - epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet", - epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet", - epc_compiled_dataset_key="sap_change_model/dataset.parquet" - ): + self, + epc_data_processor: EPCDataProcessor, + api_epc_records: dict = None, + directories: List[Path] | None = None, + run_mode="training", + epc_local_file="certificates.csv", + epc_bucket_name="retrofit-data-dev", + epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet", + epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet", + epc_compiled_dataset_key="sap_change_model/dataset.parquet", + ): """ :param directories: List of directories to process :param epc_data_processor: EPCDataProcessor object @@ -100,7 +105,7 @@ class EPCPipeline: self.epc_cleaning_dataset_key = epc_cleaning_dataset_key self.epc_all_equal_rows_key = epc_all_equal_rows_key self.epc_compiled_dataset_key = epc_compiled_dataset_key - + def run(self): """ Entrypoint to run the pipeline @@ -111,29 +116,33 @@ class EPCPipeline: self.run_newdata_dataset_pipeline() else: raise ValueError("Run mode defined needs to be in 'training' or 'newdata'") - + def run_newdata_dataset_pipeline(self): """ Main function to run the newdata pipeline """ - prepared_epc = EPCRecord(self.api_epc_records, run_mode="newdata") # This uses all the epc records to clean the data + prepared_epc = EPCRecord( + self.api_epc_records, run_mode="newdata" + ) # This uses all the epc records to clean the data self.epc_data_processor.insert_data(prepared_epc) self.epc_data_processor.prepare_data() data = self.epc_data_processor.data + epc_records = [ + EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient="records") + ] - - epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')] - def run_training_dataset_pipeline(self): """ Main function to run the training dataset generation pipeline """ if self.directories is None: - raise ValueError("Directories not specified - Unable to run Training pipeline") - + raise ValueError( + "Directories not specified - Unable to run Training pipeline" + ) + for directory in tqdm(self.directories): self.process_directory(directory) @@ -153,8 +162,8 @@ class EPCPipeline: df=pd.concat(self.compiled_cleaning_averages), bucket_name=self.epc_bucket_name, file_key=self.epc_cleaning_dataset_key, - ) - + ) + def process_directory(self, directory: Path): """ Process a single directory @@ -166,17 +175,32 @@ class EPCPipeline: self.epc_data_processor.prepare_data(filepath=filepath) constituency_data = self.epc_data_processor.data - self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages) + self.compiled_cleaning_averages.append( + self.epc_data_processor.cleaning_averages + ) constituency_difference_records = [] + # self.check_records = [] for uprn, property_data in constituency_data.groupby("uprn", observed=True): - difference_records = self.process_uprn(uprn=str(uprn), property_data=property_data, directory=directory) + difference_records = self.process_uprn( + uprn=str(uprn), property_data=property_data, directory=directory + ) if difference_records is not None: constituency_difference_records.extend(difference_records) - - constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=clean_lookup) - self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df]) + # check_list = [] + # for check_record in self.check_records: + # check_list.append(check_record["difference_record"]) + + # td = TrainingDataset(datasets=check_list, cleaned_lookup=clean_lookup) + + constituency_dataset = TrainingDataset( + datasets=constituency_difference_records, cleaned_lookup=clean_lookup + ) + + self.compiled_dataset = pd.concat( + [self.compiled_dataset, constituency_dataset.df] + ) def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path): """ @@ -192,25 +216,34 @@ class EPCPipeline: ): return None - # Fixed features - these are property attributes that shouldn't change over time + # Fixed features - these are property attributes that shouldn't change over time # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together - fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict() + fixed_data = ( + property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict() + ) # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = property_data[VARIABLE_DATA_FEATURES] uprn = str(uprn) - epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')] + epc_records = [ + EPCRecord(uprn, **x, run_mode="training") + for x in variable_data.to_dict(orient="records") + ] # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord # We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records - property_difference_records = self._generate_property_difference_records(epc_records, uprn, directory, fixed_data) + property_difference_records = self._generate_property_difference_records( + epc_records, uprn, directory, fixed_data + ) return property_difference_records - def _generate_property_difference_records(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict): + def _generate_property_difference_records( + self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict + ): """ We can use multiple types of comparison datasets, for example: - First vs second @@ -222,14 +255,22 @@ class EPCPipeline: property_difference_records: list = [] - property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) + # property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) - # property_difference_records = self._compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) + property_difference_records = self._compare_all_permutation_epcs( + epc_records, uprn, directory, fixed_data, property_difference_records + ) return property_difference_records - - def _compare_all_permutation_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list): + def _compare_all_permutation_epcs( + self, + epc_records: List[EPCRecord], + uprn: str, + directory: Path, + fixed_data: dict, + property_difference_records: list, + ): """ Compare all permutations of EPCs for a given UPRN :param epc_records: @@ -243,28 +284,40 @@ class EPCPipeline: # Auto sort the records so that the record with highest RDSAP score is always record1 difference_record: EPCDifferenceRecord = latest_record - earliest_record + # TODO: Use method above instead of overloading operator + difference_record.append_fixed_data(fixed_data) # TODO: Pull out RDSAP_CHANGE to a variable if difference_record.get("rdsap_change") == 0: - continue - + if not difference_record.ensure_adequate_data(): + # Rdsap hasn't changed but we have enough data to use this record + # i.e. all fields aside from mechnical ventilation are the same] + # self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record}) + continue + all_equal = difference_record.compare_fields_in_records( fields=[x.lower() for x in CORE_COMPONENT_FEATURES] - ) - + ) + if all_equal: # Keep track of this for the moment so we can analyse - self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) + self.compiled_all_equal_rows.append( + {"uprn": uprn, "directory_name": directory.name} + ) continue - difference_record.append_fixed_data(fixed_data) - property_difference_records.append(difference_record) - + return property_difference_records - - def _compare_consecutive_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list): + def _compare_consecutive_epcs( + self, + epc_records: List[EPCRecord], + uprn: str, + directory: Path, + fixed_data: dict, + property_difference_records: list, + ): """ Compare consecutive EPCs for a given UPRN :param epc_records: @@ -272,7 +325,6 @@ class EPCPipeline: """ for idx in range(0, len(epc_records) - 1): - if idx >= len(epc_records) - 1: break @@ -281,21 +333,29 @@ class EPCPipeline: # Auto sort the records so that the record with highest RDSAP score is always record1 difference_record: EPCDifferenceRecord = latest_record - earliest_record + # TODO: Use method above instead of overloading operator + difference_record.append_fixed_data(fixed_data) # TODO: Pull out RDSAP_CHANGE to a variable if difference_record.get("rdsap_change") == 0: - continue - + if not difference_record.ensure_adequate_data(): + # Rdsap hasn't changed but we have enough data to use this record + # i.e. all fields aside from mechnical ventilation are the same] + # self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record}) + continue + all_equal = difference_record.compare_fields_in_records( fields=[x.lower() for x in CORE_COMPONENT_FEATURES] - ) - + ) + if all_equal: # Keep track of this for the moment so we can analyse - self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) + self.compiled_all_equal_rows.append( + {"uprn": uprn, "directory_name": directory.name} + ) continue - difference_record.append_fixed_data(fixed_data) + # difference_record.append_fixed_data(fixed_data) property_difference_records.append(difference_record) diff --git a/etl/epc/Record.py b/etl/epc/Record.py index 70586749..ac86a636 100644 --- a/etl/epc/Record.py +++ b/etl/epc/Record.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from etl.epc.ValidationConfiguration import ( EPCRecordValidationConfiguration, EPCDifferenceRecordValidationConfiguration, - EPCDifferenceRecordFixedDataValidationConfiguration + EPCDifferenceRecordFixedDataValidationConfiguration, ) from etl.epc.DataProcessor import EPCDataProcessor from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP @@ -18,20 +18,23 @@ from etl.epc.settings import ( HEAT_DEMAND_RESPONSE, CARBON_RESPONSE, COMPONENT_FEATURES, - EFFICIENCY_FEATURES + EFFICIENCY_FEATURES, ) +from recommendations.recommendation_utils import estimate_number_of_floors from utils.s3 import read_dataframe_from_s3_parquet from etl.epc.settings import EARLIEST_EPC_DATE -# TODO: Change these in the settings file +# TODO: Change these in the settings file RDSAP_RESPONSE = RDSAP_RESPONSE.lower() HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() CARBON_RESPONSE = CARBON_RESPONSE.lower() COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] -ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') -DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None) +ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev") +DATA_BUCKET = os.environ.get( + "DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None +) @dataclass @@ -106,7 +109,7 @@ class EPCRecord: def __post_init__(self): # We can have validation and cleaning steps for each of the fields # self.WALLS_DESCRIPTION = 'check' - # Could also have cleaning of records if needed + # Could also have cleaning of records if needed if self.run_mode == "training": self.validation_configuration = EPCRecordValidationConfiguration @@ -117,18 +120,17 @@ class EPCRecord: if self.epc_records is None: raise ValueError("Must provide epc records if running in newdata mode") - self.prepared_epc = self.epc_records['original_epc'] - self.original_epc = self.epc_records['original_epc'].copy() + self.prepared_epc = self.epc_records["original_epc"] + self.original_epc = self.epc_records["original_epc"].copy() - self.full_sap_epc = self.epc_records['full_sap_epc'] - self.old_data = self.epc_records['old_data'] + self.full_sap_epc = self.epc_records["full_sap_epc"] + self.old_data = self.epc_records["old_data"] if self.cleaning_data is None: raise ValueError("Must provide cleaning data if running in newdata mode") self._clean_records_using_epc_records() self._clean_with_data_processor() - self._temp_uprn_catch() self._expand_prepared_epc_to_attributes() @@ -139,7 +141,7 @@ class EPCRecord: # selff.df = self.epc_record_as_dataframe('prepared_epc') # self._feature_generation() - # self._drop_features() + # self._drop_features() return @@ -156,17 +158,20 @@ class EPCRecord: """ Drop features that are not needed for modelling """ - self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"]) + self.df = self.df.drop( + columns=["lodgement_date_starting", "lodgement_date_ending"] + ) def _feature_generation(self): """ Generate features for modelling """ - self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"]) + self.df["days_to_lodgement_date"] = self._calculate_days_to( + self.prepared_epc["lodgement_date"] + ) @staticmethod def _calculate_days_to(lodgement_date): - if isinstance(lodgement_date, str): return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) @@ -176,13 +181,6 @@ class EPCRecord: pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).dt.days - def _temp_uprn_catch(self): - """ - Catch the case we do now have uprn - """ - if self.prepared_epc["uprn"] == "": - self.prepared_epc["uprn"] = 0 - def _clean_with_data_processor(self): """ This method will clean the records using the data processor @@ -190,7 +188,7 @@ class EPCRecord: epc_data_processor = EPCDataProcessor( data=self.epc_record_as_dataframe("prepared_epc"), run_mode="newdata", - cleaning_averages=self.cleaning_data + cleaning_averages=self.cleaning_data, ) epc_data_processor.prepare_data() @@ -216,11 +214,21 @@ class EPCRecord: self.secondheat_description: str = self.prepared_epc["secondheat_description"] self.windows_description: str = self.prepared_epc["windows_description"] self.glazed_type: str = self.prepared_epc["glazed_type"] - self.multi_glaze_proportion: float = float(self.prepared_epc["multi_glaze_proportion"]) - self.low_energy_lighting: float = float(self.prepared_epc["low_energy_lighting"]) - self.number_open_fireplaces: float = float(self.prepared_epc["number_open_fireplaces"]) - self.mainheatcont_description: str = self.prepared_epc["mainheatcont_description"] - self.solar_water_heating_flag: str = self.prepared_epc["solar_water_heating_flag"] + self.multi_glaze_proportion: float = float( + self.prepared_epc["multi_glaze_proportion"] + ) + self.low_energy_lighting: float = float( + self.prepared_epc["low_energy_lighting"] + ) + self.number_open_fireplaces: float = float( + self.prepared_epc["number_open_fireplaces"] + ) + self.mainheatcont_description: str = self.prepared_epc[ + "mainheatcont_description" + ] + self.solar_water_heating_flag: str = self.prepared_epc[ + "solar_water_heating_flag" + ] self.photo_supply: float = float(self.prepared_epc["photo_supply"]) self.transaction_type: str = self.prepared_epc["transaction_type"] self.energy_tariff: str = self.prepared_epc["energy_tariff"] @@ -236,14 +244,28 @@ class EPCRecord: self.mainheat_energy_eff: str = self.prepared_epc["mainheat_energy_eff"] self.mainheatc_energy_eff: str = self.prepared_epc["mainheatc_energy_eff"] self.lighting_energy_eff: str = self.prepared_epc["lighting_energy_eff"] - self.potential_energy_efficiency: float = float(self.prepared_epc["potential_energy_efficiency"]) - self.environment_impact_potential: float = float(self.prepared_epc["environment_impact_potential"]) - self.energy_consumption_potential: float = float(self.prepared_epc["energy_consumption_potential"]) - self.co2_emissions_potential: float = float(self.prepared_epc["co2_emissions_potential"]) + self.potential_energy_efficiency: float = float( + self.prepared_epc["potential_energy_efficiency"] + ) + self.environment_impact_potential: float = float( + self.prepared_epc["environment_impact_potential"] + ) + self.energy_consumption_potential: float = float( + self.prepared_epc["energy_consumption_potential"] + ) + self.co2_emissions_potential: float = float( + self.prepared_epc["co2_emissions_potential"] + ) self.lodgement_date: str = self.prepared_epc["lodgement_date"] - self.current_energy_efficiency: int = int(self.prepared_epc["current_energy_efficiency"]) - self.energy_consumption_current: int = int(self.prepared_epc["energy_consumption_current"]) - self.co2_emissions_current: float = float(self.prepared_epc["co2_emissions_current"]) + self.current_energy_efficiency: int = int( + self.prepared_epc["current_energy_efficiency"] + ) + self.energy_consumption_current: int = int( + self.prepared_epc["energy_consumption_current"] + ) + self.co2_emissions_current: float = float( + self.prepared_epc["co2_emissions_current"] + ) def _identify_delta_between_prepared_and_original_records(self): """ @@ -252,7 +274,11 @@ class EPCRecord: prepared_epc_df = self.epc_record_as_dataframe("prepared_epc") original_epc_df = self.epc_record_as_dataframe("original_epc") - df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0) + df = pd.concat( + [prepared_epc_df, original_epc_df], + keys=["prepared_epc", "original_epc"], + axis=0, + ) same_index = df.apply(pd.Series.duplicated).any() self.prepared_epc_delta_metadata = df[same_index[~same_index].index] @@ -269,7 +295,7 @@ class EPCRecord: # This method will merge on the cleaned lookup table and ensure that the building fabric in the # starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest # possible dataset. - # """ + # """ # for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]: # if component == "main-fuel": # component = component.replace("-", "_") @@ -325,8 +351,12 @@ class EPCRecord: # self._clean_energy_consumption_current() # self._clean_co2_emissions_current() - def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True, - replace_empty_string: bool = False): + def epc_record_as_dataframe( + self, + epc_type: str = "prepared_epc", + use_upper_columns: bool = True, + replace_empty_string: bool = False, + ): """ This method will return the dataframe representation of the epc record """ @@ -348,8 +378,9 @@ class EPCRecord: raise ValueError("EPC Recrod doesn not contain epc data") self.prepared_epc["floor-level"] = ( - FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if - self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None + FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] + if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES + else None ) def _clean_number_lighting_outlets(self): @@ -360,35 +391,50 @@ class EPCRecord: raise ValueError("EPC Recrod doesn not contain epc data") if self.prepared_epc["fixed-lighting-outlets-count"] == "": - # We check old EPCs and the full SAP EPC lighting_data = [] if len(self.old_data): - lighting_data.extend([ - int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if - old_record["fixed-lighting-outlets-count"] != "" - ]) + lighting_data.extend( + [ + int(old_record["fixed-lighting-outlets-count"]) + for old_record in self.old_data + if old_record["fixed-lighting-outlets-count"] != "" + ] + ) if len(self.full_sap_epc): if self.full_sap_epc["fixed-lighting-outlets-count"] != "": - lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"])) + lighting_data.append( + int(self.full_sap_epc["fixed-lighting-outlets-count"]) + ) if lighting_data: - self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data)) + self.prepared_epc["fixed-lighting-outlets-count"] = round( + np.median(lighting_data) + ) else: - # Use averages from the cleaning dataset, based on the property type, built form, construction age - # band and local authority + # Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority cleaned_property_data = EPCDataProcessor.apply_averages_cleaning( - data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True), + data_to_clean=self.epc_record_as_dataframe( + "prepared_epc", replace_empty_string=True + ), cleaning_data=self.cleaning_data, - cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + cols_to_merge_on=[ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "LOCAL_AUTHORITY", + ], ) self.prepared_epc["fixed-lighting-outlets-count"] = round( - cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]) + cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0] + ) else: - self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"]) + self.prepared_epc["fixed-lighting-outlets-count"] = float( + self.prepared_epc["fixed-lighting-outlets-count"] + ) def _filter_property_dimensions(self, property_dimensions): """ @@ -397,16 +443,27 @@ class EPCRecord: :return: filtered property dimensions dataframe """ - result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])] + result = property_dimensions[ + (property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"]) + ] - if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES: - result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)] + if ( + self.construction_age_band is not None + and self.construction_age_band not in DATA_ANOMALY_MATCHES + ): + result = result[ + (result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band) + ] - if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result[ - "BUILT_FORM"]: + if ( + self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES + and self.prepared_epc["built-form"] in result["BUILT_FORM"] + ): result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])] - return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() + return result[ + ["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"] + ].mean() def _clean_property_dimensions(self): """ @@ -417,30 +474,46 @@ class EPCRecord: raise ValueError("EPC Recrod doesn not contain epc data") if not self.prepared_epc["number-habitable-rooms"] or ( - self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES + self.prepared_epc["floor-height"] == "" + or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES ): property_dimensions = read_dataframe_from_s3_parquet( - bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet" + bucket_name=DATA_BUCKET, + file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet", + ) + self.property_dimensions = self._filter_property_dimensions( + property_dimensions ) - self.property_dimensions = self._filter_property_dimensions(property_dimensions) if not self.prepared_epc["number-habitable-rooms"]: self.prepared_epc["number-habitable-rooms"] = float( - self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()) + self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round() + ) else: - self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"]) + self.prepared_epc["number-habitable-rooms"] = float( + self.prepared_epc["number-habitable-rooms"] + ) - if self.prepared_epc["property-type"] == "House": - self.number_of_floors = 2 - elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]: - self.number_of_floors = 1 - elif self.prepared_epc["property-type"] == "Maisonette": - self.number_of_floors = 2 - else: - raise NotImplementedError("Implement me") + self.number_of_floors = estimate_number_of_floors( + self.prepared_epc["property-type"] + ) - if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES: - self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2)) + # if self.prepared_epc["property-type"] == "House": + # self.number_of_floors = 2 + # elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]: + # self.number_of_floors = 1 + # elif self.prepared_epc["property-type"] == "Maisonette": + # self.number_of_floors = 2 + # else: + # raise NotImplementedError("Implement me") + + if ( + self.prepared_epc["floor-height"] == "" + or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES + ): + self.prepared_epc["floor-height"] = float( + self.property_dimensions["FLOOR_HEIGHT"].round(2) + ) else: self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"]) @@ -451,7 +524,9 @@ class EPCRecord: if not self.prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"]) + self.prepared_epc["total-floor-area"] = float( + self.prepared_epc["total-floor-area"] + ) def _clean_mains_gas(self): """ @@ -465,9 +540,14 @@ class EPCRecord: "N": False, } - self.prepared_epc["mains-gas-flag"] = None if ( - self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES - ) else map[self.prepared_epc["mains-gas-flag"]] + self.prepared_epc["mains-gas-flag"] = ( + None + if ( + self.prepared_epc["mains-gas-flag"] == "" + or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES + ) + else map[self.prepared_epc["mains-gas-flag"]] + ) def _clean_heat_loss_corridor(self): """ @@ -479,16 +559,19 @@ class EPCRecord: map = { "no corridor": False, "unheated corridor": True, - "heated corridor": False + "heated corridor": False, } - self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc[ - "heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[ - self.prepared_epc["heat-loss-corridor"]] + self.prepared_epc["heat-loss-corridor"] = ( + False + if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES + else map[self.prepared_epc["heat-loss-corridor"]] + ) self.prepared_epc["unheated-corridor-length"] = ( - float(self.prepared_epc["unheated-corridor-length"]) if - self.prepared_epc["unheated-corridor-length"] != "" else None + float(self.prepared_epc["unheated-corridor-length"]) + if self.prepared_epc["unheated-corridor-length"] != "" + else None ) def _clean_count_variables(self): @@ -496,7 +579,7 @@ class EPCRecord: This method will clean the count variables, if empty or invalid """ if not self.prepared_epc: - raise ValueError("EPC Record doesn not contain epc data") + raise ValueError("EPC Recrod doesn not contain epc data") fields = { "number_of_open_fireplaces": "number-open-fireplaces", @@ -508,6 +591,8 @@ class EPCRecord: null_attributes = ["number_of_storeys", "number_of_rooms"] for attribute, epc_field in fields.items(): + # TODO: check this + # value = self.data["extension-count"] value = self.prepared_epc[epc_field] if value == "" or value in DATA_ANOMALY_MATCHES: if attribute in null_attributes: @@ -526,8 +611,11 @@ class EPCRecord: if not self.prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc[ - 'wind-turbine-count'] != "" else None + self.prepared_epc["wind-turbine-count"] = ( + int(self.prepared_epc["wind-turbine-count"]) + if self.prepared_epc["wind-turbine-count"] != "" + else None + ) def _clean_solar_hot_water(self): """ @@ -542,7 +630,9 @@ class EPCRecord: "": None, } - self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']] + self.prepared_epc["solar-water-heating-flag"] = value_map[ + self.prepared_epc["solar-water-heating-flag"] + ] def _clean_solar_pv(self): """ @@ -551,9 +641,11 @@ class EPCRecord: if not self.prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc[ - 'photo-supply'] != "" \ + self.prepared_epc["photo-supply"] = ( + float(self.prepared_epc["photo-supply"]) + if self.prepared_epc["photo-supply"] != "" else None + ) def _clean_energy(self): """ @@ -562,8 +654,12 @@ class EPCRecord: if not self.prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"]) - self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"]) + self.prepared_epc["energy-consumption-current"] = float( + self.prepared_epc["energy-consumption-current"] + ) + self.prepared_epc["co2-emissions-current"] = float( + self.prepared_epc["co2-emissions-current"] + ) def _clean_built_form(self): """ @@ -572,8 +668,9 @@ class EPCRecord: if not self.prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"], - self.prepared_epc["built-form"]) + self.prepared_epc["built-form"] = BUILT_FORM_REMAP.get( + self.prepared_epc["built-form"], self.prepared_epc["built-form"] + ) if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES: if self.prepared_epc["property-type"] == "Flat": self.prepared_epc["built-form"] = "Semi-Detached" @@ -586,26 +683,38 @@ class EPCRecord: raise ValueError("EPC Recrod doesn not contain epc data") self.construction_age_band = EPCDataProcessor.clean_construction_age_band( - self.prepared_epc["construction-age-band"]) + self.prepared_epc["construction-age-band"] + ) if self.construction_age_band in DATA_ANOMALY_MATCHES: if self.old_data: # Take the most recent max_datetime = max( - [old_record["lodgement-datetime"] for old_record in self.old_data if - old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES] + [ + old_record["lodgement-datetime"] + for old_record in self.old_data + if old_record["construction-age-band"] + not in DATA_ANOMALY_MATCHES + ] ) - most_recent = [old_record for old_record in self.old_data if - old_record["lodgement-datetime"] == max_datetime] + most_recent = [ + old_record + for old_record in self.old_data + if old_record["lodgement-datetime"] == max_datetime + ] - self.construction_age_band = EPCDataProcessor.clean_construction_age_band( - most_recent[0]["construction-age-band"] + self.construction_age_band = ( + EPCDataProcessor.clean_construction_age_band( + most_recent[0]["construction-age-band"] + ) ) self.age_band = england_wales_age_band_lookup.get(self.construction_age_band) - if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None): + if (self.prepared_epc["transaction-type"] == "new dwelling") and ( + self.age_band is None + ): self.age_band = "L" - self.construction_age_band = 'England and Wales: 2012 onwards' + self.construction_age_band = "England and Wales: 2012 onwards" if self.age_band is None: raise ValueError("age_band is missing") @@ -615,7 +724,9 @@ class EPCRecord: This method will clean the year built, if empty or invalid """ if self.full_sap_epc: - self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year + self.year_built = datetime.strptime( + self.full_sap_epc["lodgement-date"], "%Y-%m-%d" + ).year return @@ -623,7 +734,12 @@ class EPCRecord: # Take the lower limit. If we're pessimistic about the age of the property, that at least means we have # more options for recommendations if that age falls before the year that insulation in walls became # common practice - band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])] + band = [ + int(x) + for x in re.findall( + r"\b\d{4}\b", self.prepared_epc["construction-age-band"] + ) + ] self.year_built = band[0] return @@ -634,9 +750,14 @@ class EPCRecord: """ This method will clean the ventilation, if empty or invalid """ - self.prepared_epc['mechanical-ventilation'] = None if ( - self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else ( - self.mechanical_ventilation) + self.prepared_epc["mechanical-ventilation"] = ( + None + if ( + self.mechanical_ventilation == "" + or self.mechanical_ventilation in DATA_ANOMALY_MATCHES + ) + else self.mechanical_ventilation + ) def _field_validation(self): """ @@ -647,54 +768,67 @@ class EPCRecord: # Get the variable named record key from self field_value = self.__dict__[record_key] - if validation_config['type'] == "string": + if validation_config["type"] == "string": self._validate_string(record_key, field_value, validation_config) - elif validation_config['type'] == "float": + elif validation_config["type"] == "float": self._validate_float(record_key, field_value, validation_config) else: - raise ValueError(f"Validation type {validation_config['type']} not supported") + raise ValueError( + f"Validation type {validation_config['type']} not supported" + ) - def _validate_string(self, record_key: str, field_value: Union[str, float], validation_config: dict): + def _validate_string( + self, record_key: str, field_value: Union[str, float], validation_config: dict + ): """ Validate a string field """ if not isinstance(field_value, str): - raise ValueError(f"Field {record_key} has value {field_value} which is not a string") + raise ValueError( + f"Field {record_key} has value {field_value} which is not a string" + ) - if 'function' in validation_config: + if "function" in validation_config: try: - validation_config['function'](field_value) + validation_config["function"](field_value) except: raise ValueError( - f"Field {record_key} has value {field_value} which does not pass the validation function " - f"{validation_config['function']}") + f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}" + ) - if validation_config['acceptable_values'] is not None: - if field_value not in validation_config['acceptable_values']: + if validation_config["acceptable_values"] is not None: + if field_value not in validation_config["acceptable_values"]: raise ValueError( - f"Field {record_key} has value {field_value} which is not in the acceptable values of " - f"{validation_config['acceptable_values']}") + f"Field {record_key} has value {field_value} which is not in the acceptable values of {validation_config['acceptable_values']}" + ) - def _validate_float(self, record_key: str, field_value: Union[str, float], validation_config: dict): + def _validate_float( + self, record_key: str, field_value: Union[str, float], validation_config: dict + ): """ Validate a float field """ if not isinstance(field_value, float): - raise ValueError(f"Field {record_key} has value {field_value} which is not a float") + raise ValueError( + f"Field {record_key} has value {field_value} which is not a float" + ) - if 'function' in validation_config: + if "function" in validation_config: try: - validation_config['function'](field_value) + validation_config["function"](field_value) except: raise ValueError( - f"Field {record_key} has value {field_value} which does not pass the validation function " - f"{validation_config['function']}") + f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}" + ) - if validation_config['range'] is not None: - if field_value < validation_config['range'][0] or field_value > validation_config['range'][1]: + if validation_config["range"] is not None: + if ( + field_value < validation_config["range"][0] + or field_value > validation_config["range"][1] + ): raise ValueError( - f"Field {record_key} has value {field_value} which is not in the acceptable range of " - f"{validation_config['range']}") + f"Field {record_key} has value {field_value} which is not in the acceptable range of {validation_config['range']}" + ) def __sub__(self, other): """ @@ -703,7 +837,9 @@ class EPCRecord: if not isinstance(other, EPCRecord): raise ValueError("Can only subtract EPCRecord from EPCRecord") - difference_record = EPCDifferenceRecord(record1=self, record2=other, auto_sort=True) + difference_record = EPCDifferenceRecord( + record1=self, record2=other, auto_sort=True + ) return difference_record @@ -743,18 +879,27 @@ class EPCRecord: return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE] - def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any: + def get( + self, + key: Union[str, List[str]], + return_asdict: bool = False, + key_suffix: str | None = None, + ) -> Any: """ This method will return the value of the key """ if return_asdict: - output_dict = {x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key} + output_dict = { + x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key + } if key_suffix is not None: output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()} return output_dict if isinstance(key, list): - return [self.__dict__[x] if x in self.__dict__.keys() else None for x in key] + return [ + self.__dict__[x] if x in self.__dict__.keys() else None for x in key + ] elif isinstance(key, str): return self.__dict__[key] if key in self.__dict__.keys() else None @@ -771,12 +916,18 @@ class EPCDifferenceRecord: """ self.record1 = record1 self.record2 = record2 - self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2 + self.earliest_record = ( + record1 if record1.lodgement_date < record2.lodgement_date else record2 + ) self.flag_fabric_consistency = False self.difference_record = {} - self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration - self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration + self.difference_validation_configuration = ( + EPCDifferenceRecordValidationConfiguration + ) + self.fixed_data_validation_configuration = ( + EPCDifferenceRecordFixedDataValidationConfiguration + ) if auto_sort and (self.record2 <= self.record1): self.record1, self.record2 = self.record2, self.record1 @@ -790,15 +941,27 @@ class EPCDifferenceRecord: This method will construct the difference record between the two records """ - rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(RDSAP_RESPONSE) - heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(HEAT_DEMAND_RESPONSE) - carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(CARBON_RESPONSE) + rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get( + RDSAP_RESPONSE + ) + heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get( + HEAT_DEMAND_RESPONSE + ) + carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get( + CARBON_RESPONSE + ) component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES - ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True, - key_suffix="_ending") - starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True, - key_suffix="_starting") + ending_record = self.record2.get( + component_variables + ["lodgement_date"], + return_asdict=True, + key_suffix="_ending", + ) + starting_record = self.record1.get( + component_variables + ["lodgement_date"], + return_asdict=True, + key_suffix="_starting", + ) self.difference_record = { "uprn": self.record1.get("uprn"), @@ -811,12 +974,20 @@ class EPCDifferenceRecord: "heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE), "carbon_starting": self.record1.get(CARBON_RESPONSE), "carbon_ending": self.record2.get(CARBON_RESPONSE), - "potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"), - "environment_impact_potential": self.earliest_record.get("environment_impact_potential"), - "energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"), - "co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"), + "potential_energy_efficiency": self.earliest_record.get( + "potential_energy_efficiency" + ), + "environment_impact_potential": self.earliest_record.get( + "environment_impact_potential" + ), + "energy_consumption_potential": self.earliest_record.get( + "energy_consumption_potential" + ), + "co2_emissions_potential": self.earliest_record.get( + "co2_emissions_potential" + ), **ending_record, - **starting_record + **starting_record, } def _validate_difference_record(self): @@ -849,7 +1020,11 @@ class EPCDifferenceRecord: """ This method will return the value of the key """ - return self.difference_record[key] if key in self.difference_record.keys() else None + return ( + self.difference_record[key] + if key in self.difference_record.keys() + else None + ) def append_fixed_data(self, fixed_data: dict): """ @@ -863,7 +1038,70 @@ class EPCDifferenceRecord: This method will validate the fixed data """ - # Can have more sophisticated checks here + # Can have more sophisticated checks here # self.fixed_data_validataion_configuration pass + + def ensure_adequate_data(self) -> bool: + """ + This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is zero + Can move into the initiation of the difference record + """ + wall_check = self.record1.walls_description == self.record2.walls_description + + floor_check = self.record1.floor_description == self.record2.floor_description + + roof_check = self.record1.roof_description == self.record2.roof_description + + mainheat_check = ( + self.record1.mainheat_description == self.record2.mainheat_description + ) + + windows_check = ( + self.record1.windows_description == self.record2.windows_description + ) + + solar_water_heating_flag_check = ( + self.record1.solar_water_heating_flag + == self.record2.solar_water_heating_flag + ) + + solar_pv_check = self.record1.photo_supply == self.record2.photo_supply + + heating_control_check = ( + self.record1.mainheatcont_description + == self.record2.mainheatcont_description + ) + + extension_count_check = ( + self.record1.extension_count == self.record2.extension_count + ) + + floor_height_check = ( + abs(1 - (self.record1.floor_height / self.record2.floor_height)) < 0.05 + ) + + total_floor_area_check = ( + abs(1 - (self.record1.total_floor_area / self.record2.total_floor_area)) + < 0.05 + ) + + if all( + [ + wall_check, + floor_check, + roof_check, + mainheat_check, + windows_check, + solar_water_heating_flag_check, + extension_count_check, + floor_height_check, + total_floor_area_check, + solar_pv_check, + heating_control_check, + ] + ): + return True + else: + return False diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 5486ee99..c8923d6d 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -5,23 +5,27 @@ from etl.epc.Pipeline import EPCPipeline DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" + def main(): """ Orchestration function """ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - # directories = directories[125:128] + # directories = directories[0:3] - epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training")) + epc_pipeline = EPCPipeline( + directories=directories, + epc_data_processor=EPCDataProcessor(run_mode="training"), + ) epc_pipeline.run() # For testing # dataset_df = epc_pipeline.compiled_dataset - # dataset_df.to_parquet("refactor_datasets/dataset.parquet") - # pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet") - # pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet") + # dataset_df.to_parquet("refactor_datasets/dataset_with0perm_all.parquet") + # pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows_with0perm_all.parquet") + # pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages_with0perm_all.parquet") # from utils.s3 import read_dataframe_from_s3_parquet # dataset = read_dataframe_from_s3_parquet(