From 1eb98d2d35cc8577dd8d78d20eb48a9071d4ea1c Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Wed, 20 Dec 2023 16:27:27 +0000 Subject: [PATCH] new pipeline working --- etl/epc/DataProcessor.py | 30 +- etl/epc/Dataset.py | 42 ++- etl/epc/Pipeline.py | 32 +- etl/epc/property_change_app.py | 611 +-------------------------------- 4 files changed, 72 insertions(+), 643 deletions(-) diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 2c7f892b..7f41c96f 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -83,7 +83,7 @@ class EPCDataProcessor: raise ValueError("Run mode must be either training or newdata") self.run_mode = run_mode if not violation_mode else "newdata" - def prepare_data(self, filepath: str) -> tuple[pd.DataFrame, pd.DataFrame]: + def prepare_data(self, filepath: Path | str) -> None: """ Given the run mode, we apply the relevant pipeline steps Ignore step is used to highlight which steps are not needed in newdata @@ -91,9 +91,12 @@ class EPCDataProcessor: ignore_step = True if self.run_mode == "newdata" else False - if self.data is None: + if filepath is not None: self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) - + + if len(self.data) == 0: + raise Exception("No data to process - check filepath/ data being passed in") + self.confine_data(ignore_step=ignore_step) self.remap_anomalies() self.remap_floor_level(ignore_step=ignore_step) @@ -116,6 +119,7 @@ class EPCDataProcessor: # Final re-casting after data transformed and prepared self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True) self.na_remapping(auto_subset_columns=True) self.fill_invalid_constituency_fields(ignore_step=ignore_step) @@ -132,9 +136,8 @@ class EPCDataProcessor: self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step) self.cast_data_columns_to_lower() - - return self.data, self.cleaning_averages - + + def cast_data_columns_to_lower(self): """ Convert all columns names to lower @@ -577,8 +580,19 @@ class EPCDataProcessor: for key, values in column_mappings.items(): if key not in self.data.columns: raise ValueError("Column mapping incorrectly specified") - for value in values: - self.data[key] = self.data[key].astype(value) + if isinstance(values, list): + for value in values: + self.data[key] = self.data[key].astype(value) + else: + self.data[key] = self.data[key].astype(values) + + def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: + """ + Using a dictionary to recast all columns at once + """ + + if auto_subset_columns: + column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} self.data = self.data.astype(column_mappings) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 26073211..b0c7f4c7 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -45,7 +45,7 @@ class TrainingDataset(BaseDataset): # self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) - + self._feature_generation() self._drop_features() self._clean_efficiency_variables() @@ -112,14 +112,20 @@ class TrainingDataset(BaseDataset): """ Using the apply method, use the get_wall_u_value method to generate the u-value """ - col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" + description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" + thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending" - return get_wall_u_value( - clean_description=row[col_name], - age_band=england_wales_age_band_lookup[row["construction_age_band"]], - is_granite_or_whinstone=row["is_granite_or_whinstone"], - is_sandstone_or_limestone=row["is_sandstone_or_limestone"], - ) + if pd.isnull(row[thermal_transistance_col_name]): + output = get_wall_u_value( + clean_description=row[description_col_name], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + ) + else: + output = row[thermal_transistance_col_name] + + return output @staticmethod def _lambda_function_to_generate_floor_uvalue(row, is_end=False): @@ -235,8 +241,8 @@ class TrainingDataset(BaseDataset): floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue) for component in ["walls", "roof", "floor"]: - self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(f"{component}_starting_uvalue") - self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(f"{component}_ending_uvalue") + self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue")) + self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue")) self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"]) @@ -353,11 +359,17 @@ class TrainingDataset(BaseDataset): for component in components_to_expand: - # TODO: change cleaned dataframe to have underscores instead of dashes - cleaned_key, left_on_starting, left_on_ending, original_cols = ( - ("main-fuel", "main_fuel_starting", "main_fuel_ending", ["main_fuel_starting", "main_fuel_ending"]) if component == "main-fuel" else - (f"{component}-description", f"{component}_description_starting", f"{component}_description_ending", [f"{component}_description_starting", f"{component}_description_ending"]) - ) + # TODO: change cleaned dataframe to have underscores instead of dashes + if component == "main-fuel": + cleaned_key = "main-fuel" + left_on_starting = "main_fuel_starting" + left_on_ending = "main_fuel_ending" + original_cols = ["main_fuel_starting", "main_fuel_ending"] + else: + cleaned_key = f"{component}-description" + left_on_starting = f"{component}_description_starting" + left_on_ending = f"{component}_description_ending" + original_cols = [f"{component}_description_starting", f"{component}_description_ending"] cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 07f54096..c4af6911 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -105,23 +105,23 @@ class EPCPipeline: for directory in self.directories: self.process_directory(directory) - save_dataframe_to_s3_parquet( - df=self.compiled_dataset, - bucket_name=self.epc_bucket_name, - file_key=self.epc_compiled_dataset_key, - ) + # save_dataframe_to_s3_parquet( + # df=self.compiled_dataset, + # bucket_name=self.epc_bucket_name, + # file_key=self.epc_compiled_dataset_key, + # ) - save_dataframe_to_s3_parquet( - df=pd.concat(self.compiled_all_equal_rows), - bucket_name=self.epc_bucket_name, - file_key=self.epc_all_equal_rows_key, - ) + # save_dataframe_to_s3_parquet( + # df=pd.concat(self.compiled_all_equal_rows), + # bucket_name=self.epc_bucket_name, + # file_key=self.epc_all_equal_rows_key, + # ) - save_dataframe_to_s3_parquet( - df=pd.concat(self.compiled_cleaning_averages), - bucket_name=self.epc_bucket_name, - file_key=self.epc_cleaning_dataset_key, - ) + # save_dataframe_to_s3_parquet( + # df=pd.concat(self.compiled_cleaning_averages), + # bucket_name=self.epc_bucket_name, + # file_key=self.epc_cleaning_dataset_key, + # ) def process_directory(self, directory: Path): """ @@ -131,7 +131,7 @@ class EPCPipeline: """ filepath = directory / self.epc_local_file - self.epc_data_processor.pre_process(filepath=filepath) + self.epc_data_processor.prepare_data(filepath=filepath) constituency_data = self.epc_data_processor.data self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages) diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index fb9486e8..bb24f431 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -1,625 +1,28 @@ -import pandas as pd -import numpy as np -from tqdm import tqdm -import msgpack - -from typing import List from pathlib import Path -from etl.epc.settings import ( - MANDATORY_FIXED_FEATURES, - LATEST_FIELD, - COMPONENT_FEATURES, - RDSAP_RESPONSE, - HEAT_DEMAND_RESPONSE, - CARBON_RESPONSE, - CORE_COMPONENT_FEATURES, - EFFICIENCY_FEATURES, - POTENTIAL_COLUMNS, - MINIMUM_FLOOR_HEIGHT -) from etl.epc.DataProcessor import EPCDataProcessor -from etl.epc.Record import EPCRecord, EPCDifferenceRecord -from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 -from recommendations.rdsap_tables import england_wales_age_band_lookup -from recommendations.recommendation_utils import ( - get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, - get_wall_type -) +from etl.epc.Pipeline import EPCPipeline DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" -# TODO: change in setting file -MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] -LATEST_FIELD = [x.lower() for x in LATEST_FIELD] -COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] -RDSAP_RESPONSE = RDSAP_RESPONSE.lower() -HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() -CARBON_RESPONSE = CARBON_RESPONSE.lower() -CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES] -EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] -POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] -VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [ - "lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE -] - -def get_cleaned_description_mapping(): - """ - This function will retrieve the cleaned dataset from s3 which has the cleaned - descriptions for the epc dataset - - This data is stored in MessagePack format and therefore needs to be decoded - :return: - """ - - cleaned = read_from_s3( - s3_file_name="cleaned_epc_data/cleaned.bson", - bucket_name="retrofit-data-dev" - ) - - cleaned = msgpack.unpackb(cleaned, raw=False) - - return cleaned - - -def process_and_prune_desriptions(df, cleaned_lookup): - """ - This method will merge on the cleaned lookup table and ensure that the building fabric in the - starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest - possible dataset. - :param df: - :param cleaned_lookup: - :return: - """ - - cols_to_drop = { - "walls": [ - # We need to cleaned descriptions for pulling out u-values - 'original_description', 'thermal_transmittance_unit', - 'original_description_ENDING', - 'thermal_transmittance_unit_ENDING', - 'is_cavity_wall_ENDING', 'is_filled_cavity_ENDING', - 'is_solid_brick_ENDING', 'is_system_built_ENDING', - 'is_timber_frame_ENDING', 'is_granite_or_whinstone_ENDING', - 'is_as_built_ENDING', 'is_cob_ENDING', 'is_assumed_ENDING', - 'is_sandstone_or_limestone_ENDING', - # Re remove the is_assumed columns - "is_assumed", "is_assumed_ENDING" - ], - "floor": [ - "original_description", "clean_description", "thermal_transmittance_unit", - "no_data", "no_data_ENDING", "original_description_ENDING", - "clean_description_ENDING", "thermal_transmittance_unit_ENDING", - "is_suspended_ENDING", "is_solid_ENDING", "another_property_below_ENDING", - "is_to_unheated_space_ENDING", "is_to_external_air_ENDING", "is_assumed", - "is_assumed_ENDING" - ], - "roof": [ - "original_description", "clean_description", "thermal_transmittance_unit", - "is_assumed", "is_valid", "original_description_ENDING", "clean_description_ENDING", - "thermal_transmittance_unit_ENDING", "is_pitched_ENDING", "is_roof_room_ENDING", - "is_loft_ENDING", "is_flat_ENDING", "is_thatched_ENDING", "is_at_rafters_ENDING", - "has_dwelling_above_ENDING", "is_assumed_ENDING", "is_valid_ENDING" - ], - "hotwater": [ - "original_description", "clean_description", "assumed", "original_description_ENDING", - "clean_description_ENDING", "assumed_ENDING" - ], - "mainheat": [ - "original_description", "clean_description", "original_description_ENDING", - "has_assumed", "original_description_ENDING", "clean_description_ENDING", - "has_assumed_ENDING", - ], - "mainheatcont": [ - "original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING" - ], - "windows": [ - "original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING", - # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature - "has_glazing", "glazing_coverage", "no_data", "has_glazing_ENDING", "glazing_coverage_ENDING", - "no_data_ENDING" - ], - "main-fuel": [ - "original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING" - ], - } - - for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]: - component_upper = component.upper() - if component == "main-fuel": - component_upper = component_upper.replace("-", "_") - - cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description" - left_on_starting = ( - f"{component_upper}_STARTING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_STARTING" - ) - - left_on_ending = ( - f"{component_upper}_ENDING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_ENDING" - ) - - df = df.merge( - pd.DataFrame(cleaned_lookup[cleaned_key]), - how="left", - left_on=left_on_starting, - right_on="original_description", - ).merge( - pd.DataFrame(cleaned_lookup[cleaned_key]), - how="left", - left_on=left_on_ending, - right_on="original_description", - suffixes=("", "_ENDING") - ) - - if component == "walls": - # We make sure the wall construction hasn't changed - df = df[ - (df["is_cavity_wall"] == df["is_cavity_wall_ENDING"]) & - (df["is_solid_brick"] == df["is_solid_brick_ENDING"]) & - (df["is_timber_frame"] == df["is_timber_frame_ENDING"]) & - (df["is_granite_or_whinstone"] == df["is_granite_or_whinstone_ENDING"]) & - (df["is_cob"] == df["is_cob_ENDING"]) & - (df["is_sandstone_or_limestone"] == df["is_sandstone_or_limestone_ENDING"]) - ] - elif component == "floor": - df = df[ - (df["is_suspended"] == df["is_suspended_ENDING"]) & - (df["is_solid"] == df["is_solid_ENDING"]) & - (df["another_property_below"] == df["another_property_below_ENDING"]) & - (df["is_to_unheated_space"] == df["is_to_unheated_space_ENDING"]) & - (df["is_to_external_air"] == df["is_to_external_air_ENDING"]) - ] - elif component == "roof": - df = df[ - (df["is_pitched"] == df["is_pitched_ENDING"]) & - (df["is_roof_room"] == df["is_roof_room_ENDING"]) & - (df["is_loft"] == df["is_loft_ENDING"]) & - (df["is_flat"] == df["is_flat_ENDING"]) & - (df["is_thatched"] == df["is_thatched_ENDING"]) & - (df["is_at_rafters"] == df["is_at_rafters_ENDING"]) & - (df["has_dwelling_above"] == df["has_dwelling_above_ENDING"]) - ] - - # Drop the binary indicators and replace the original description with the cleaned version - - # Drop original cols - original_cols = [ - f"{component_upper}_DESCRIPTION_STARTING", f"{component_upper}_DESCRIPTION_ENDING" - ] if component != "main-fuel" else [ - f"{component_upper}_STARTING", f"{component_upper}_ENDING" - ] - - df = df.drop(columns=cols_to_drop[component] + original_cols) - - # If we have an insulation_thickness column, rename it - if "insulation_thickness" in cleaned_lookup[cleaned_key][0]: - df = df.rename( - columns={ - "insulation_thickness": f"{component}_insulation_thickness", - "insulation_thickness_ENDING": f"{component}_insulation_thickness_ENDING", - } - ) - # If we have thermal transmittance, rename it - if "thermal_transmittance" in cleaned_lookup[cleaned_key][0]: - df = df.rename( - columns={ - "thermal_transmittance": f"{component}_thermal_transmittance", - "thermal_transmittance_ENDING": f"{component}_thermal_transmittance_ENDING", - } - ) - - # If we have tarrif, rename it - if "tariff_type" in cleaned_lookup[cleaned_key][0]: - df = df.rename( - columns={ - "tariff_type": f"{component}_tariff_type", - "tariff_type_ENDING": f"{component}_tariff_type_ENDING", - } - ) - - # We need the walls descriptions so we rename them to distinguish them - if component == "walls": - df = df.rename( - columns={ - "clean_description": f"{component}_clean_description", - "clean_description_ENDING": f"{component}_clean_description_ENDING", - } - ) - - # We don't need any lighting specific cleaning, we just drop the original description as we use - # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING - - df = df.drop(columns=["LIGHTING_DESCRIPTION_STARTING", "LIGHTING_DESCRIPTION_ENDING"]) - - return df - - -def make_uvalues(df): - df["row_index"] = df.index - - uvalues = [] - # TODO: iterrows is the slowest way to do this, we should use a vectorised approach or itertuples - for index_no, x in df.iterrows(): - - uprn = x["uprn"] - row_index = x["row_index"] - age_band = england_wales_age_band_lookup[x["construction_age_band"]] - - # ~~~~~~~~~~~~~~~~~~ - # Walls - # ~~~~~~~~~~~~~~~~~~ - - starting_wall_uvalue = x["walls_thermal_transmittance"] - if pd.isnull(starting_wall_uvalue): - starting_wall_uvalue = get_wall_u_value( - clean_description=x["walls_clean_description"], - age_band=age_band, - is_granite_or_whinstone=x["is_granite_or_whinstone"], - is_sandstone_or_limestone=x["is_sandstone_or_limestone"], - ) - - ending_wall_uvalue = x["walls_thermal_transmittance_ending"] - if pd.isnull(ending_wall_uvalue): - if x["walls_clean_description"] != x["walls_clean_description_ending"]: - ending_wall_uvalue = get_wall_u_value( - clean_description=x["walls_clean_description_ending"], - age_band=age_band, - is_granite_or_whinstone=x["is_granite_or_whinstone"], - is_sandstone_or_limestone=x["is_sandstone_or_limestone"], - ) - else: - ending_wall_uvalue = starting_wall_uvalue - - # ~~~~~~~~~~~~~~~~~~ - # Roof - # ~~~~~~~~~~~~~~~~~~ - - if x["has_dwelling_above"]: - if x["roof_thermal_transmittance"] != 0: - raise ValueError("Should have 0 u-value for roof") - - if x["roof_thermal_transmittance_ENDING"] != 0: - raise ValueError("Should have 0 u-value for roof") - - starting_roof_uvalue = x["roof_thermal_transmittance"] - if pd.isnull(starting_roof_uvalue): - starting_roof_uvalue = get_roof_u_value( - insulation_thickness=x["roof_insulation_thickness"], - has_dwelling_above=x["has_dwelling_above"], - is_loft=x["is_loft"], - is_roof_room=x["is_roof_room"], - is_thatched=x["is_thatched"], - is_flat=x["is_flat"], - is_pitched=x["is_pitched"], - is_at_rafters=x["is_at_rafters"], - age_band=age_band - ) - - ending_roof_uvalue = x["roof_thermal_transmittance_ENDING"] - - if pd.isnull(ending_roof_uvalue): - ending_roof_uvalue = get_roof_u_value( - insulation_thickness=x["roof_insulation_thickness_ENDING"], - has_dwelling_above=x["has_dwelling_above"], - is_loft=x["is_loft"], - is_roof_room=x["is_roof_room"], - is_thatched=x["is_thatched"], - is_flat=x["is_flat"], - is_pitched=x["is_pitched"], - is_at_rafters=x["is_at_rafters"], - age_band=age_band - ) - - # ~~~~~~~~~~~~~~~~~~ - # Floor - # ~~~~~~~~~~~~~~~~~~ - perimeters = {} - for suffix in ["_STARTING", "_ENDING"]: - floor_area = x[f"TOTAL_FLOOR_AREA{suffix}"] - n_rooms = x["NUMBER_HABITABLE_ROOMS"] - - perimeters[f"estimated_perimeter{suffix}"] = estimate_perimeter(floor_area, n_rooms) - - floor_type = "suspended" if x["is_suspended"] else "solid" - wall_type = get_wall_type(**x) - - if x["another_property_below"]: - if x["floor_thermal_transmittance"] != 0: - raise ValueError("Should have 0 u-value for floor") - - if x["floor_thermal_transmittance_ENDING"] != 0: - raise ValueError("Should have 0 u-value for floor") - starting_floor_uvalue, ending_floor_uvalue = 0, 0 - else: - starting_floor_uvalue = x["floor_thermal_transmittance"] - ending_floor_uvalue = x["floor_thermal_transmittance_ENDING"] - - if pd.isnull(starting_floor_uvalue): - starting_floor_uvalue = get_floor_u_value( - floor_type=floor_type, - perimeter=perimeters["estimated_perimeter_STARTING"], - area=x[f"TOTAL_FLOOR_AREA_STARTING"], - insulation_thickness=x["floor_insulation_thickness"], - wall_type=wall_type, - age_band=age_band - ) - - if pd.isnull(ending_floor_uvalue): - ending_floor_uvalue = get_floor_u_value( - floor_type=floor_type, - perimeter=perimeters["estimated_perimeter_ENDING"], - area=x[f"TOTAL_FLOOR_AREA_ENDING"], - insulation_thickness=x["floor_insulation_thickness_ENDING"], - wall_type=wall_type, - age_band=age_band - ) - - uvalues.append( - { - "UPRN": uprn, - "row_index": row_index, - "starting_walls_uvalue": starting_wall_uvalue, - "ending_walls_uvalue": ending_wall_uvalue, - "starting_roof_uvalue": starting_roof_uvalue, - "ending_roof_uvalue": ending_roof_uvalue, - "starting_floor_uvalue": starting_floor_uvalue, - "ending_floor_uvalue": ending_floor_uvalue, - **perimeters - } - ) - - uvalues = pd.DataFrame(uvalues) - - df = df.merge( - uvalues, how="left", on=["UPRN", "row_index"] - ).drop(columns="row_index") - - # Fill missings - for component in ["walls", "floor", "roof"]: - for suffix in ["", "_ENDING"]: - fill_col = f"starting_{component}_uvalue" if suffix == "" else f"ending_{component}_uvalue" - - df[f"{component}_thermal_transmittance{suffix}"] = np.where( - pd.isnull(df[f"{component}_thermal_transmittance{suffix}"]), - df[fill_col], - df[f"{component}_thermal_transmittance{suffix}"] - ) - - df = df.drop( - columns=[ - "starting_walls_uvalue", "ending_walls_uvalue", "starting_roof_uvalue", - "ending_roof_uvalue", "starting_floor_uvalue", "ending_floor_uvalue" - ] - ) - - return df - - -# def compare_records(earliest_record: pd.Series, latest_record: pd.Series, columns: list): -# """ -# For a list of columns, check if the earliest and latest record are the same -# If they are the same, we indicate this, because we have example of SAP scores changing -# without any feature changes -# :param earliest_record: pd.Series -# :param latest_record: pd.Series -# :param columns: list of columns to compare -# :return: boolean indicating whether or not all features are the same -# """ - -# all_equal = True -# for col in columns: -# if earliest_record[col] != latest_record[col]: -# return False -# if all_equal: -# return True - -def generate_property_difference_records(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, all_equal_rows: list): - """ - We can use multiple types of comparison datasets, for example: - - First vs second - - Second vs third - - First vs third - :param epc_records: - :return: - """ - - property_difference_records: list = [] - - property_difference_records, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows) - - # property_difference_records, all_equal_rows = compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows) - - return property_difference_records, all_equal_rows - - -def compare_all_permutation_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list): - """ - Compare all permutations of EPCs for a given UPRN - :param epc_records: - :return: - """ - - for idx in range(0, len(epc_records) - 1): - for idx2 in range(idx + 1, len(epc_records)): - earliest_record: EPCRecord = epc_records[idx] - latest_record: EPCRecord = epc_records[idx2] - - # Auto sort the records so that the record with highest RDSAP score is always record1 - difference_record: EPCDifferenceRecord = latest_record - earliest_record - - # TODO: Pull out RDSAP_CHANGE to a variable - if difference_record.get("rdsap_change") == 0: - continue - - all_equal = difference_record.compare_fields_in_records( - fields=[x.lower() for x in CORE_COMPONENT_FEATURES] - ) - - if all_equal: - # Keep track of this for the moment so we can analyse - all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) - continue - - difference_record.append_fixed_data(fixed_data) - - property_difference_records.append(difference_record) - - return property_difference_records, all_equal_rows - - -def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list): - """ - Compare consecutive EPCs for a given UPRN - :param epc_records: - :return: - """ - - for idx in range(0, len(epc_records) - 1): - - if idx >= len(epc_records) - 1: - break - - earliest_record: EPCRecord = epc_records[idx] - latest_record: EPCRecord = epc_records[idx + 1] - - # Auto sort the records so that the record with highest RDSAP score is always record1 - difference_record: EPCDifferenceRecord = latest_record - earliest_record - - # TODO: Pull out RDSAP_CHANGE to a variable - if difference_record.get("rdsap_change") == 0: - continue - - all_equal = difference_record.compare_fields_in_records( - fields=[x.lower() for x in CORE_COMPONENT_FEATURES] - ) - - if all_equal: - # Keep track of this for the moment so we can analyse - all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) - continue - - difference_record.append_fixed_data(fixed_data) - - property_difference_records.append(difference_record) - - return property_difference_records, all_equal_rows - - -from etl.epc.Dataset import TrainingDataset - -def app(): - # Get all the files in the directory - - # Data glossary: - # https://epc.opendatacommunities.org/docs/guidance#glossary - - cleaned_lookup = get_cleaned_description_mapping() - - # List all subdirectories - directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - - dataset = [] - cleaning_dataset = [] - # Keep track of the all equals - all_equal_rows = [] - - for directory in tqdm(directories): - filepath = directory / "certificates.csv" - - epc_data_processor = EPCDataProcessor(filepath=filepath) - - epc_data_processor.pre_process() - - df = epc_data_processor.data - - cleaning_dataset.append(epc_data_processor.cleaning_averages) - - constituency_difference_records = [] - for uprn, property_data in df.groupby("uprn", observed=True): - - # If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row - if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or ( - pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0 - ): - continue - - # Fixed features - these are property attributes that shouldn't change over time - # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together - fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict() - - # We include the lodgement date here as we probably need to factor time into the - # model, since EPC standards and rigour have changed over time - variable_data = property_data[VARIABLE_DATA_FEATURES] - - uprn = str(uprn) - epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')] - - # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord - - # We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records - property_difference_records, all_equal_rows = generate_property_difference_records(epc_records, uprn, directory, fixed_data, all_equal_rows) - - constituency_difference_records.extend(property_difference_records) - - constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=cleaned_lookup) - - constituency_dataset_df = constituency_dataset.df - - dataset.append(constituency_dataset_df) - - print("Final all equal count: %s" % str(len(all_equal_rows))) - - # Store cleaning dataset in s3 as a parquet file - cleaning_dataset = pd.concat(cleaning_dataset) - save_dataframe_to_s3_parquet( - df=cleaning_dataset, - bucket_name="retrofit-data-dev", - file_key="sap_change_model/cleaning_dataset.parquet", - ) - - output = pd.concat(dataset) - - # TODO: move into difference record - # Remove any records that have huge swings in their floor area - # Move this into TrainingDataset as this won't be run in newdata - # output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"]) - # output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"] - # output = output[output["tfa_diff_prop"] < 0.5] - # output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) - - # # TODO: move into EPCRecord record - # uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col] - # for uvalue_col in uvalue_columns: - # output[uvalue_col] = pd.to_numeric(output[uvalue_col]) - - save_dataframe_to_s3_parquet( - df=output, - bucket_name="retrofit-data-dev", - file_key="sap_change_model/dataset.parquet", - ) - - # Store all_equal_rows - all_equal_rows = pd.DataFrame(all_equal_rows) - save_dataframe_to_s3_parquet( - df=all_equal_rows, - bucket_name="retrofit-data-dev", - file_key="sap_change_model/all_equal_rows.parquet", - ) - -from etl.epc.Pipeline import EPCPipeline - def main(): """ Orchestration function """ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + directories = directories[0:2] epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training")) epc_pipeline.run() + # For testing + epc_pipeline.compiled_dataset + epc_pipeline.compiled_all_equal_rows + import pandas as pd + pd.concat(epc_pipeline.compiled_cleaning_averages) if __name__ == "__main__":