From a35c3faccac9739b3ce4810bcf5cfaaa2030b364 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Wed, 20 Dec 2023 14:46:12 +0000 Subject: [PATCH] add ignore step to data processor --- etl/epc/DataProcessor.py | 527 ++++++++++++++---------- etl/epc/Dataset.py | 192 ++++++++- etl/epc/Pipeline.py | 270 ++++++++++++ etl/epc/Record.py | 17 +- etl/epc/property_change_app.py | 108 ++--- recommendations/recommendation_utils.py | 3 +- 6 files changed, 813 insertions(+), 304 deletions(-) create mode 100644 etl/epc/Pipeline.py diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 0f07cdfa..2c7f892b 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -28,7 +28,7 @@ from recommendations.rdsap_tables import FLOOR_LEVEL_MAP from typing import List # These lookups are used to clean the construction age band -bounds_map = { +construction_age_bounds_map = { "England and Wales: before 1900": {"l": 0, "u": 1899}, "England and Wales: 1930-1949": {"l": 1930, "u": 1949}, "England and Wales: 1900-1929": {"l": 1900, "u": 1929}, @@ -43,13 +43,13 @@ bounds_map = { "England and Wales: 2012 onwards": {"l": 2012, "u": 3000}, } -remap = { +construction_age_remap = { "England and Wales: 2007 onwards": "England and Wales: 2007-2011" } expanded_map = { i: [ - label for label, bounds in bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l']) + label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l']) ][0] for i in range(0, 3001) } @@ -67,154 +67,182 @@ class EPCDataProcessor: Handle data loading and data preprocessing """ - training_pipeline = { - "confine_data": { - "function": "confine_data", - "args": [], - "kwargs": {}, - }, - "remap_columns": { - "function": "remap_columns", - "args": [], - "kwargs": {}, - }, - "standardise_construction_age_band": { - "function": "standardise_construction_age_band", - "args": [], - "kwargs": {}, - }, - "clean_missing_rooms": { - "function": "clean_missing_rooms", - "args": [], - "kwargs": {}, - }, - "recast_df_columns": { - "function": "recast_df_columns", - "args": [], - "kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]}, - }, - "clean_multi_glaze_proportion": { - "function": "clean_multi_glaze_proportion", - "args": [], - "kwargs": {}, - }, - "clean_photo_supply": { - "function": "clean_photo_supply", - "args": [], - "kwargs": {}, - }, - "retain_multiple_epc_properties": { - "function": "retain_multiple_epc_properties", - "args": [], - "kwargs": {"epc_minimum_count": DATA_PROCESSOR_SETTINGS["epc_minimum_count"]}, - }, - "fill_na_fields": { - "function": "fill_na_fields", - "args": [], - "kwargs": {"columns_to_fill": COLUMNS_TO_MERGE_ON}, - }, - "cleaning_averages": { - "function": "make_cleaning_averages", - "args": [], - "kwargs": {}, - }, - "apply_averages_cleaning": { - "function": "apply_averages_cleaning", - "args": [], - "kwargs": { - "data_to_clean": "data", - "cleaning_data": "cleaning_averages", - "cols_to_merge_on": COLUMNS_TO_MERGE_ON, - }, - }, - "na_remapping": { - "function": "na_remapping", - "args": [], - "kwargs": {}, - }, - } - - newdata_pipeline = { - "remap_columns": { - "function": "remap_columns", - "args": [], - "kwargs": {}, - }, - "recast_df_columns": { - "function": "recast_df_columns", - "args": [], - "kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]}, - }, - "clean_multi_glaze_proportion": { - "function": "clean_multi_glaze_proportion", - "args": [], - "kwargs": {}, - }, - "clean_photo_supply": { - "function": "clean_photo_supply", - "args": [], - "kwargs": {}, - }, - "na_remapping": { - "function": "na_remapping", - "args": [], - "kwargs": {}, - }, - } - - def __init__(self, filepath: Path | None, is_newdata: bool = False) -> None: + def __init__(self, data: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None: """ :param filepath: If specified, is the physical location of the data :param is_newdata: Indicates if we are processing new, testing data. In this instance, there are some operations we do not want to perform, such as confine_data() """ - self.data : pd.DataFrame - self.cleaning_averages : pd.DataFrame + self.data : pd.DataFrame = data if data else pd.DataFrame() + self.cleaning_averages : pd.DataFrame = pd.DataFrame() - self.filepath = filepath - self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training") - self.is_newdata = is_newdata - - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) - - def pipeline_factory(self, pipeline_type: str) -> dict: - """ - Determine which dataclient to use - """ - - pipelines = { - "training": self.training_pipeline, - "newdata": self.newdata_pipeline, - } - - if pipeline_type not in pipelines: - raise ValueError("Pipeline type specified is not in factory") - - return pipelines[pipeline_type] + # FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA + self.violation_mode = violation_mode + if run_mode not in ["training", "newdata"]: + raise ValueError("Run mode must be either training or newdata") + self.run_mode = run_mode if not violation_mode else "newdata" - - def pre_process_pipeline(self) -> None: + def prepare_data(self, filepath: str) -> tuple[pd.DataFrame, pd.DataFrame]: """ - For the pipeline_steps, we apply each function in turn - This will alter self.data inplace + Given the run mode, we apply the relevant pipeline steps + Ignore step is used to highlight which steps are not needed in newdata """ - for step in self.pipeline_steps: - step_function = getattr(self, self.pipeline_steps[step]["function"]) - step_args = self.pipeline_steps[step]["args"] - step_kwargs = self.pipeline_steps[step]["kwargs"] + ignore_step = True if self.run_mode == "newdata" else False - if step_args: - step_function(*step_args, **step_kwargs) - else: - step_function(**step_kwargs) + if self.data is None: + self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + + self.confine_data(ignore_step=ignore_step) + self.remap_anomalies() + self.remap_floor_level(ignore_step=ignore_step) + self.remap_build_form() + self.cast_data_column_values_to_lower() + self.standardise_construction_age_band(ignore_step=ignore_step) + self.clean_missing_rooms(ignore_step=ignore_step) + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) + self.clean_multi_glaze_proportion(ignore_step=ignore_step) + self.clean_photo_supply() + self.retain_multiple_epc_properties( + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step + ) + + self.fill_na_fields() + + self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step) + + # Final re-casting after data transformed and prepared + self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.na_remapping(auto_subset_columns=True) + + self.fill_invalid_constituency_fields(ignore_step=ignore_step) + + self.cleaning_averages = self.make_cleaning_averages(ignore_step=ignore_step) + cleaned_data = self.apply_averages_cleaning( + data_to_clean=self.data, + cleaning_data=self.cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON, + ignore_step=ignore_step + ) + self.data = self.data if cleaned_data is None else cleaned_data + + self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) + self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step) + self.cast_data_columns_to_lower() + + return self.data, self.cleaning_averages + + def cast_data_columns_to_lower(self): + """ + Convert all columns names to lower + """ + self.data.columns = self.data.columns.str.lower() + + def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False): + """ + Convert all column names to lower + No need in newdata mode + """ + + if ignore_step: + return + + self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() + + def add_local_authority_to_cleaning_average(self, ignore_step: bool = False): + """ + Add the Local authority column to the cleaning averages + No need in newdata mode + """ + + if ignore_step: + return + + self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] + + def fill_invalid_constituency_fields(self, ignore_step: bool = False): + """ + For some weird cases, where data has missing constituency, we add a dummy value + """ + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + + self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) + + def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False): + """ + Order data by uprn and lodgement data + No Violation mode needed + """ + + if ignore_step: + return + + self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) + + def cast_data_column_values_to_lower(self): + """ + For given columns, cast values to lower + No Violation mode or newdata modes required + """ + convert_to_lower = ["TRANSACTION_TYPE"] + for col in convert_to_lower: + self.data[col] = self.data[col].str.lower() - def load_data(self, low_memory=False) -> None: - if not self.filepath: + def remap_build_form(self): + """ + Remap build form to standard values + No Violation mode or newdata modes required + """ + self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP) + + + def remap_anomalies(self): + """ + Remap anomalies to None + No Violation mode or newdata modes required + """ + + # Map all anomaly values to None + data_anomaly_map = dict( + zip( + Definitions.DATA_ANOMALY_MATCHES, + [None] * len(Definitions.DATA_ANOMALY_MATCHES), + ) + ) + + # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values + data = self.data.replace(data_anomaly_map) + data = data.replace(np.NAN, None) + + self.data = data + + def remap_floor_level(self, ignore_step: bool = False): + """ + Remap floor level to standard values + """ + + if self.violation_mode: + # TODO: We need to handle this case + return + + if ignore_step: + return + + self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) + + def load_data(self, filepath, low_memory=False) -> None: + if not filepath: raise ValueError("No filepath specified") - self.data = pd.read_csv(self.filepath, low_memory=low_memory) + self.data = pd.read_csv(filepath, low_memory=low_memory) def insert_data(self, data: pd.DataFrame) -> None: self.data = data @@ -226,11 +254,11 @@ class EPCDataProcessor: return x # Next, we check if it's a value in our map - if bounds_map.get(x): + if construction_age_bounds_map.get(x): return x # We check if it's a standard remap value - remap_value = remap.get(x, None) + remap_value = construction_age_remap.get(x, None) if remap_value: return remap_value @@ -241,12 +269,19 @@ class EPCDataProcessor: raise NotImplementedError("Not handled the case for value %s" % x) - def standardise_construction_age_band(self): + def standardise_construction_age_band(self, ignore_step: bool = False): """ This function will tidy up some of the non-standard values that are populated in the construction age band, which is useful for cleaning """ + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply( lambda x: self.clean_construction_age_band(x) ) @@ -255,7 +290,7 @@ class EPCDataProcessor: ~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"]) ] - def clean_missing_rooms(self): + def clean_missing_rooms(self, ignore_step: bool = False): """ For the number of heated rooms and number of habitable rooms, we clean these values up front, based on property archetype and age @@ -263,6 +298,13 @@ class EPCDataProcessor: TODO: We could use a model based impution approach for possibly more accurate cleaning """ + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + # TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning) self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0]) @@ -301,75 +343,78 @@ class EPCDataProcessor: break to_index -= 1 - def pre_process(self): - """ - Load data and begin initial cleaning - """ - if self.data is None: - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + # def pre_process(self, filepath: Path | None = None) -> tuple[pd.DataFrame, pd.DataFrame]: + # """ + # Load data and begin initial cleaning + # """ + # if self.data is None: + # self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) - if not self.is_newdata: - self.confine_data() + # if not self.is_newdata: + # self.confine_data() - self.remap_columns() + # self.remap_columns() - # We have some non-standard construction age bands which we'll clean for matching - if not self.is_newdata: - self.standardise_construction_age_band() - self.clean_missing_rooms() + # # We have some non-standard construction age bands which we'll clean for matching + # if not self.is_newdata: + # self.standardise_construction_age_band() + # self.clean_missing_rooms() - self.recast_df_columns( - column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] - ) + # self.recast_df_columns( + # column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + # ) - if not self.is_newdata: - self.clean_multi_glaze_proportion() + # if not self.is_newdata: + # self.clean_multi_glaze_proportion() - self.clean_photo_supply() + # self.clean_photo_supply() - if not self.is_newdata: - self.retain_multiple_epc_properties( - epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] - ) + # if not self.is_newdata: + # self.retain_multiple_epc_properties( + # epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] + # ) - if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: - # If we have multiple EPC records, we can try and do filling - self.fill_na_fields() + # if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: + # # If we have multiple EPC records, we can try and do filling + # self.fill_na_fields() - if not self.is_newdata: - self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) + # if not self.is_newdata: + # self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) - # Final re-casting after data transformed and prepared - coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES - for k, v in coltypes.items(): - self.data[k] = self.data[k].astype(v) - self.data = self.data.astype(coltypes) + # # Final re-casting after data transformed and prepared + # coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES + # for k, v in coltypes.items(): + # self.data[k] = self.data[k].astype(v) + # self.data = self.data.astype(coltypes) - self.na_remapping() + # self.na_remapping() - if not self.is_newdata: - # We have some odd cases with missing constituency so we fill - self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) + # self.cleaning_averages = None + # if not self.is_newdata: + # # We have some odd cases with missing constituency so we fill + # self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) - self.cleaning_averages = self.make_cleaning_averages() - # We apply averages cleaning to the data - self.data = self.apply_averages_cleaning( - data_to_clean=self.data, - cleaning_data=self.cleaning_averages, - cols_to_merge_on=COLUMNS_TO_MERGE_ON - ) + # self.cleaning_averages = self.make_cleaning_averages() + # # We apply averages cleaning to the data + # self.data = self.apply_averages_cleaning( + # data_to_clean=self.data, + # cleaning_data=self.cleaning_averages, + # cols_to_merge_on=COLUMNS_TO_MERGE_ON + # ) - self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] - self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() + # self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] + # self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() - self.data.columns = self.data.columns.str.lower() + # self.data.columns = self.data.columns.str.lower() + + # return self.data, self.cleaning_averages - def na_remapping(self): + def na_remapping(self, auto_subset_columns: bool = False): fill_na_map_apply = { k: v for k, v in fill_na_map.items() if k in self.data.columns - } if self.is_newdata else fill_na_map + } if auto_subset_columns else fill_na_map for column, fill_value in fill_na_map_apply.items(): self.data[column] = self.data[column].fillna(fill_value) @@ -396,35 +441,15 @@ class EPCDataProcessor: ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] ].replace("", None) - def remap_columns(self): + def make_cleaning_averages(self, ignore_step: bool = False) -> pd.DataFrame: """ - Remap all columns, for any non values + Create a dataset to hold averages based on property type, built form, construction age, and rooms. + Not require in newdata mode """ - # Map all anomaly values to None - data_anomaly_map = dict( - zip( - Definitions.DATA_ANOMALY_MATCHES, - [None] * len(Definitions.DATA_ANOMALY_MATCHES), - ) - ) + if ignore_step: + return pd.DataFrame() - # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values - data = self.data.replace(data_anomaly_map) - data = data.replace(np.NAN, None) - - # Remap certain columns - if not self.is_newdata: - data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) - data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) - - convert_to_lower = ["TRANSACTION_TYPE"] - for col in convert_to_lower: - data[col] = data[col].str.lower() - - self.data = data - - def make_cleaning_averages(self) -> pd.DataFrame: # Define a custom function to calculate the median, excluding missing values def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) @@ -523,11 +548,18 @@ class EPCDataProcessor: return cleaning_averages_filled - def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: + def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None: """ Reduce the data futher by keeping only datasets with multiple epcs """ + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] @@ -535,22 +567,67 @@ class EPCDataProcessor: counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on="UPRN") - def recast_df_columns(self, column_mappings: dict) -> None: + def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ + if auto_subset_columns: + column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} for key, values in column_mappings.items(): if key not in self.data.columns: raise ValueError("Column mapping incorrectly specified") for value in values: self.data[key] = self.data[key].astype(value) + + self.data = self.data.astype(column_mappings) - def confine_data(self) -> None: + def confine_data(self, ignore_step: bool = False): """ Include all step to reduce down the data based on assumptions """ + if self.violation_mode: + violation_uprn_missing = pd.isnull(self.data["UPRN"]) + violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE + violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES + violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS) + violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE + violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"]) + violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"]) + violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"]) + violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + + violation_df = pd.concat( + [ + violation_uprn_missing, + violation_old_lodgment_date, + violation_invalid_transaction_type, + violation_ignored_floor_level, + violation_rdsap_score_above_max, + violation_missing_windows_description, + violation_missing_hotwater_description, + violation_missing_roof_description, + violation_invalid_property_type, + ], axis=1, + keys=[ + "violation_uprn_missing", + "violation_old_lodgment_date", + "violation_invalid_transaction_type", + "violation_ignored_floor_level", + "violation_rdsap_score_above_max", + "violation_missing_windows_description", + "violation_missing_hotwater_description", + "violation_missing_roof_description", + "violation_invalid_property_type", + ] + ) + + self.data = pd.concat([self.data, violation_df], axis=1) + + if ignore_step: + return + # Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one # Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged @@ -585,14 +662,22 @@ class EPCDataProcessor: # EPCs) we'll ignore them from the model self.data = self.data[self.data["PROPERTY_TYPE"] != IGNORED_PROPERTY_TYPES] - def clean_multi_glaze_proportion(self) -> None: + def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ + if self.violation_mode: + # TODO: + return + + if ignore_step: + return + no_multi_glaze_proportion_index = pd.isnull( self.data["MULTI_GLAZE_PROPORTION"] ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) + self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 def clean_photo_supply(self) -> None: @@ -603,7 +688,7 @@ class EPCDataProcessor: self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) @staticmethod - def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None): + def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False): """ Clean the input DataFrame using averages from a cleaning DataFrame. @@ -615,6 +700,9 @@ class EPCDataProcessor: :return: Cleaned DataFrame. """ + if ignore_step: + return None + # The desired colnames to clean - which may not be present if colnames is None: colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"] @@ -726,6 +814,7 @@ class EPCDataProcessor: for col in missings.index: unique_values = df[col].unique() + # TODO: confirm this behaviour if True in unique_values or False in unique_values: df[col] = df[col].fillna(False) if "none" in unique_values: diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 68823f27..26073211 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -1,9 +1,16 @@ +import numpy as np import pandas as pd from typing import List from etl.epc.Record import EPCDifferenceRecord from ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE +from recommendations.rdsap_tables import england_wales_age_band_lookup +from recommendations.recommendation_utils import ( + get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, + get_wall_type +) + class BaseDataset: """ @@ -39,20 +46,199 @@ class TrainingDataset(BaseDataset): self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) - # TODO: replace these with the pipeline steps self._feature_generation() self._drop_features() - # self._clean_dataframe() self._clean_efficiency_variables() self._null_validation(information="Clean Efficiency Variables") self._expand_description_to_features(cleaned_lookup) self._adjust_assumed_values_in_wall_descriptions() - + self._generate_u_values_from_features() # TODO: For some of the features that we clean, we have either a true, false or possibly null value # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't # need to self._clean_missing_values() self._null_validation(information="Clean Missing Values") + self._remove_abnormal_change_in_floor_area() + self._ensure_numeric() + + def _remove_abnormal_change_in_floor_area(self): + """ + Remove properties where the change in floor area is greater than 100% + """ + + self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"]) + self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] + self.df = self.df[self.df["tfa_diff_prop"] < 0.5] + self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) + + def _ensure_numeric(self): + """ + Ensure that all columns are numeric + """ + # TODO: move into EPCRecord record + uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col] + for uvalue_col in uvalue_columns: + self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) + + @staticmethod + def _lambda_function_to_generate_roof_uvalue(row, is_end=False): + """ + Using the apply method, use the get_roof_u_value method to generate the u-value + """ + + col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending" + + if row["has_dwelling_above"]: + if row["roof_thermal_transmittance"] != 0: + raise ValueError("Should have 0 u-value for roof") + + if row["roof_thermal_transmittance_ending"] != 0: + raise ValueError("Should have 0 u-value for roof") + + return get_roof_u_value( + insulation_thickness=row[col_name], + has_dwelling_above=row["has_dwelling_above"], + is_loft=row["is_loft"], + is_roof_room=row["is_roof_room"], + is_thatched=row["is_thatched"], + is_flat=row["is_flat"], + is_pitched=row["is_pitched"], + is_at_rafters=row["is_at_rafters"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + + @staticmethod + def _lambda_function_to_generate_wall_uvalue(row, is_end=False): + """ + Using the apply method, use the get_wall_u_value method to generate the u-value + """ + col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending" + + return get_wall_u_value( + clean_description=row[col_name], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + ) + + @staticmethod + def _lambda_function_to_generate_floor_uvalue(row, is_end=False): + """ + Using the apply method, use the get_floor_u_value method to generate the u-value + """ + + floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending" + + if row["another_property_below"]: + if row["floor_thermal_transmittance"] != 0: + raise ValueError("Should have 0 u-value for floor") + + if row["floor_thermal_transmittance_ending"] != 0: + raise ValueError("Should have 0 u-value for floor") + return 0 + else: + uvalue = row[floor_thermal_col_name] + + if pd.isnull(uvalue): + + insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending" + floor_area_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending" + perimeter_col_name = "total_floor_area_starting" if not is_end else "total_floor_area_ending" + + uvalue = get_floor_u_value( + floor_type=row["floor_type"], + perimeter=row[floor_area_col_name], + area=row[perimeter_col_name], + insulation_thickness=row[insulation_col_name], + wall_type=row["wall_type"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]] + ) + + return uvalue + + def _generate_u_values_from_features(self): + """ + Generate u-values from the features + """ + + # ~~~~~~~~~~~~~~~~~~ + # Walls + # ~~~~~~~~~~~~~~~~~~ + + walls_starting_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_wall_uvalue(row), + axis=1 + ) + walls_ending_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True), + axis=1 + ) + + walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue) + walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df["walls_clean_description_ending"] + walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[walls_starting_equals_ending_flag] + + # ~~~~~~~~~~~~~~~~~~ + # Roof + # ~~~~~~~~~~~~~~~~~~ + + roof_starting_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_roof_uvalue(row), + axis=1 + ) + roof_ending_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True), + axis=1 + ) + + roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue) + roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue) + + + # ~~~~~~~~~~~~~~~~~~ + # Floor + # ~~~~~~~~~~~~~~~~~~ + + self.df['estimated_perimeter_starting'] = self.df.apply( + lambda row: estimate_perimeter(row["total_floor_area_starting"], row["number_habitable_rooms"]), + axis=1 + ) + self.df['estimated_perimeter_ending'] = self.df.apply( + lambda row: estimate_perimeter(row["total_floor_area_ending"], row["number_habitable_rooms"]), + axis=1 + ) + self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"}) + self.df["wall_type"] = self.df.apply( + lambda row: get_wall_type( + is_cavity_wall=row["is_cavity_wall"], + is_solid_brick=row["is_solid_brick"], + is_timber_frame=row["is_timber_frame"], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_cob=row["is_cob"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + is_system_built=row["is_system_built"], + is_park_home=row["is_park_home"] + ), + axis=1 + ) + + floor_starting_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_floor_uvalue(row), + axis=1 + ) + floor_ending_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True), + axis=1 + ) + + floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue) + floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue) + + for component in ["walls", "roof", "floor"]: + self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(f"{component}_starting_uvalue") + self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(f"{component}_ending_uvalue") + + self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"]) def _adjust_assumed_values_in_wall_descriptions(self): diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py new file mode 100644 index 00000000..07f54096 --- /dev/null +++ b/etl/epc/Pipeline.py @@ -0,0 +1,270 @@ +import msgpack +import pandas as pd + +from typing import List +from pathlib import Path + +from etl.epc.DataProcessor import EPCDataProcessor +from etl.epc.Record import EPCRecord, EPCDifferenceRecord +from etl.epc.Dataset import TrainingDataset +from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 +from etl.epc.settings import ( + MANDATORY_FIXED_FEATURES, + LATEST_FIELD, + COMPONENT_FEATURES, + RDSAP_RESPONSE, + HEAT_DEMAND_RESPONSE, + CARBON_RESPONSE, + CORE_COMPONENT_FEATURES, + EFFICIENCY_FEATURES, + POTENTIAL_COLUMNS, +) + +# TODO: change in setting file +MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] +LATEST_FIELD = [x.lower() for x in LATEST_FIELD] +COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] +RDSAP_RESPONSE = RDSAP_RESPONSE.lower() +HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() +CARBON_RESPONSE = CARBON_RESPONSE.lower() +CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES] +EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] +POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] +VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [ + "lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE +] + +def get_cleaned_description_mapping(): + """ + This function will retrieve the cleaned dataset from s3 which has the cleaned + descriptions for the epc dataset + + This data is stored in MessagePack format and therefore needs to be decoded + :return: + """ + + cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" + ) + + cleaned = msgpack.unpackb(cleaned, raw=False) + + return cleaned + + + +class EPCPipeline: + """ + This class will take a list of directories and process them to create a dataset: + - Load the data + - Pre-process the data + - Create a dataset + - Clean the dataset + - Store the dataset + """ + + def __init__( + self, + directories: List[Path], + epc_data_processor: EPCDataProcessor, + run_mode="training", + epc_local_file="certificates.csv", + epc_bucket_name="retrofit-data-dev", + epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet", + epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet", + epc_compiled_dataset_key="sap_change_model/dataset.parquet" + ): + """ + :param directories: List of directories to process + :param epc_data_processor: EPCDataProcessor object + :param run_mode: Either training or newdata + :param epc_local_file: Local file name of the EPC data + :param epc_bucket_name: S3 bucket name + :param epc_cleaning_dataset_key: S3 key for the cleaning dataset + :param epc_all_equal_rows_key: S3 key for the all equal rows dataset + :param epc_compiled_dataset_key: S3 key for the compiled dataset + """ + self.compiled_dataset: pd.DataFrame = pd.DataFrame() + self.compiled_all_equal_rows: list = [] + self.compiled_cleaning_averages: list = [] + + self.directories = directories + self.epc_data_processor = epc_data_processor + self.run_mode = run_mode + self.epc_local_file = epc_local_file + self.epc_bucket_name = epc_bucket_name + self.epc_cleaning_dataset_key = epc_cleaning_dataset_key + self.epc_all_equal_rows_key = epc_all_equal_rows_key + self.epc_compiled_dataset_key = epc_compiled_dataset_key + + def run(self): + """ + Entrypoint to run the pipeline + """ + for directory in self.directories: + self.process_directory(directory) + + save_dataframe_to_s3_parquet( + df=self.compiled_dataset, + bucket_name=self.epc_bucket_name, + file_key=self.epc_compiled_dataset_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.concat(self.compiled_all_equal_rows), + bucket_name=self.epc_bucket_name, + file_key=self.epc_all_equal_rows_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.concat(self.compiled_cleaning_averages), + bucket_name=self.epc_bucket_name, + file_key=self.epc_cleaning_dataset_key, + ) + + def process_directory(self, directory: Path): + """ + Process a single directory + :param directory: + :return: + """ + filepath = directory / self.epc_local_file + + self.epc_data_processor.pre_process(filepath=filepath) + + constituency_data = self.epc_data_processor.data + self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages) + + constituency_difference_records = [] + for uprn, property_data in constituency_data.groupby("uprn", observed=True): + difference_records = self.process_uprn(uprn=str(uprn), property_data=property_data, directory=directory) + if difference_records is not None: + constituency_difference_records.extend(difference_records) + + constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=get_cleaned_description_mapping()) + + self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df]) + + def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path): + """ + Process a single UPRN, which may have multiple different EPCs + :param uprn: UPRN + :param property_data: pd.DataFrame, Data for a single UPRN + :param directory: Path, Directory of the UPRN + :return: + """ + # If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row + if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or ( + pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0 + ): + return None + + # Fixed features - these are property attributes that shouldn't change over time + # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together + fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict() + + # We include the lodgement date here as we probably need to factor time into the + # model, since EPC standards and rigour have changed over time + variable_data = property_data[VARIABLE_DATA_FEATURES] + + uprn = str(uprn) + epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')] + + # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord + + # We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records + property_difference_records = self._generate_property_difference_records(epc_records, uprn, directory, fixed_data) + + return property_difference_records + + def _generate_property_difference_records(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict): + """ + We can use multiple types of comparison datasets, for example: + - First vs second + - Second vs third + - First vs third + :param epc_records: + :return: + """ + + property_difference_records: list = [] + + property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) + + # property_difference_records = self._compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) + + return property_difference_records + + + def _compare_all_permutation_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list): + """ + Compare all permutations of EPCs for a given UPRN + :param epc_records: + :return: + """ + + for idx in range(0, len(epc_records) - 1): + for idx2 in range(idx + 1, len(epc_records)): + earliest_record: EPCRecord = epc_records[idx] + latest_record: EPCRecord = epc_records[idx2] + + # Auto sort the records so that the record with highest RDSAP score is always record1 + difference_record: EPCDifferenceRecord = latest_record - earliest_record + + # TODO: Pull out RDSAP_CHANGE to a variable + if difference_record.get("rdsap_change") == 0: + continue + + all_equal = difference_record.compare_fields_in_records( + fields=[x.lower() for x in CORE_COMPONENT_FEATURES] + ) + + if all_equal: + # Keep track of this for the moment so we can analyse + self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) + continue + + difference_record.append_fixed_data(fixed_data) + + property_difference_records.append(difference_record) + + return property_difference_records + + + def _compare_consecutive_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list): + """ + Compare consecutive EPCs for a given UPRN + :param epc_records: + :return: + """ + + for idx in range(0, len(epc_records) - 1): + + if idx >= len(epc_records) - 1: + break + + earliest_record: EPCRecord = epc_records[idx] + latest_record: EPCRecord = epc_records[idx + 1] + + # Auto sort the records so that the record with highest RDSAP score is always record1 + difference_record: EPCDifferenceRecord = latest_record - earliest_record + + # TODO: Pull out RDSAP_CHANGE to a variable + if difference_record.get("rdsap_change") == 0: + continue + + all_equal = difference_record.compare_fields_in_records( + fields=[x.lower() for x in CORE_COMPONENT_FEATURES] + ) + + if all_equal: + # Keep track of this for the moment so we can analyse + self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) + continue + + difference_record.append_fixed_data(fixed_data) + + property_difference_records.append(difference_record) + + return property_difference_records diff --git a/etl/epc/Record.py b/etl/epc/Record.py index 4e799d66..f8c72969 100644 --- a/etl/epc/Record.py +++ b/etl/epc/Record.py @@ -68,15 +68,12 @@ class EPCRecord: energy_consumption_current : int co2_emissions_current : float - # TODO: lower case and underscore - walls = None - floor = None - roof = None - u_values_walls = None u_values_roof = None u_values_floor = None + run_mode: str = "training" + def __post_init__(self): # We can have validation and cleaning steps for each of the fields # self.WALLS_DESCRIPTION = 'check' @@ -85,13 +82,20 @@ class EPCRecord: # self._field_validation() self._clean_records() - self._expand_description_to_uvalues() + if self.run_mode == "newdata": + self._expand_description_to_features() + self._expand_description_to_uvalues() + + # self._generate_uvalues() # self._validate_expanded_description() # self._validate_u_values() # etc pass + def _expand_description_to_features(self): + pass + def _expand_description_to_uvalues(self): # TODO: can be loop over all the descriptions, or done in one pass @@ -270,7 +274,6 @@ class EPCDifferenceRecord: self.flag_fabric_consistency = False self.difference_record = {} - self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 9cb2911e..fb9486e8 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -240,10 +240,10 @@ def make_uvalues(df): uvalues = [] # TODO: iterrows is the slowest way to do this, we should use a vectorised approach or itertuples for index_no, x in df.iterrows(): - - uprn = x["UPRN"] + + uprn = x["uprn"] row_index = x["row_index"] - age_band = england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]] + age_band = england_wales_age_band_lookup[x["construction_age_band"]] # ~~~~~~~~~~~~~~~~~~ # Walls @@ -258,11 +258,11 @@ def make_uvalues(df): is_sandstone_or_limestone=x["is_sandstone_or_limestone"], ) - ending_wall_uvalue = x["walls_thermal_transmittance_ENDING"] + ending_wall_uvalue = x["walls_thermal_transmittance_ending"] if pd.isnull(ending_wall_uvalue): - if x["walls_clean_description"] != x["walls_clean_description_ENDING"]: + if x["walls_clean_description"] != x["walls_clean_description_ending"]: ending_wall_uvalue = get_wall_u_value( - clean_description=x["walls_clean_description_ENDING"], + clean_description=x["walls_clean_description_ending"], age_band=age_band, is_granite_or_whinstone=x["is_granite_or_whinstone"], is_sandstone_or_limestone=x["is_sandstone_or_limestone"], @@ -413,58 +413,6 @@ def make_uvalues(df): # if all_equal: # return True -class EPCPipeline: - """ - This class will take a list of directories and process them to create a dataset: - - Load the data - - Pre-process the data - - Create a dataset - - Clean the dataset - - Store the dataset - """ - - def __init__(self, directories: List[Path]): - self.directories = directories - self.dataset = [] - self.cleaning_dataset = [] - self.all_equal_rows = [] - - def load_data(self): - """ - Load the data from the directories - :return: - """ - for directory in self.directories: - filepath = directory / "certificates.csv" - - data_processor = DataProcessor(filepath=filepath) - - data_processor.pre_process() - - self.dataset.append(data_processor.data) - - def create_dataset(self): - """ - Create a dataset from the data - :return: - """ - pass - - def clean_dataset(self): - """ - Clean the dataset - :return: - """ - pass - - def store_dataset(self): - """ - Store the dataset - :return: - """ - pass - - def generate_property_difference_records(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, all_equal_rows: list): """ We can use multiple types of comparison datasets, for example: @@ -604,7 +552,9 @@ def app(): variable_data = property_data[VARIABLE_DATA_FEATURES] uprn = str(uprn) - epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')] + epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')] + + # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord # We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records property_difference_records, all_equal_rows = generate_property_difference_records(epc_records, uprn, directory, fixed_data, all_equal_rows) @@ -615,12 +565,7 @@ def app(): constituency_dataset_df = constituency_dataset.df - # Apply u-values - data_by_urpn_df = make_uvalues(df = constituency_dataset_df).drop( - columns=["walls_clean_description", "walls_clean_description_ENDING"] - ) - - dataset.append(data_by_urpn_df) + dataset.append(constituency_dataset_df) print("Final all equal count: %s" % str(len(all_equal_rows))) @@ -637,15 +582,15 @@ def app(): # TODO: move into difference record # Remove any records that have huge swings in their floor area # Move this into TrainingDataset as this won't be run in newdata - output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"]) - output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"] - output = output[output["tfa_diff_prop"] < 0.5] - output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) + # output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"]) + # output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"] + # output = output[output["tfa_diff_prop"] < 0.5] + # output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) - # TODO: move into EPCRecord record - uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col] - for uvalue_col in uvalue_columns: - output[uvalue_col] = pd.to_numeric(output[uvalue_col]) + # # TODO: move into EPCRecord record + # uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col] + # for uvalue_col in uvalue_columns: + # output[uvalue_col] = pd.to_numeric(output[uvalue_col]) save_dataframe_to_s3_parquet( df=output, @@ -661,6 +606,21 @@ def app(): file_key="sap_change_model/all_equal_rows.parquet", ) +from etl.epc.Pipeline import EPCPipeline + +def main(): + """ + Orchestration function + """ + + directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + + epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training")) + + epc_pipeline.run() + + + if __name__ == "__main__": - app() + main() diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index 100ecb15..6244912d 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -1,5 +1,6 @@ import math from copy import deepcopy +from typing import Union import numpy as np import pandas as pd @@ -497,7 +498,7 @@ def get_wall_type( is_system_built, is_park_home, **kwargs -): +) -> Union[str, None]: """ Converts booleans to a string wall type, for querying the wall thickness table :return: