From d5bb04a0917008c487b63f40bd093b737caa384d Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 13 Sep 2023 16:27:13 +0100 Subject: [PATCH] minor modifications to pre processor for it to work in the backend --- backend/app/plan/router.py | 39 +++++------ .../simulation_system/core/DataProcessor.py | 65 +++++++++++++++++-- 2 files changed, 76 insertions(+), 28 deletions(-) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index e383d665..bed1f4b9 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -164,6 +164,7 @@ async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") Session = sessionmaker(bind=db_engine) session = Session() + created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") try: session.begin() @@ -321,15 +322,17 @@ async def trigger_plan(body: PlanTriggerRequest): # TODO: We should use the cleaned data from get_components in the data rather than the raw # values. We should create a method in Property which takes the EPC data and inserts the cleaned # data - epc_data = p.data.copy() - epc_data = pd.DataFrame([epc_data]) - epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] - starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING") - ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING") - fixed_data = epc_data[FIXED_FEATURES] + + data_processor = DataProcessor(None, newdata=True) + data_processor.insert_data(pd.DataFrame([p.data.copy()])) + data_processor.pre_process() + + starting_epc_data = data_processor.get_component_features(suffix="_STARTING") + ending_epc_data = data_processor.get_component_features(suffix="_ENDING") + fixed_data = data_processor.get_fixed_features() # We update the ending record with the recommended updates and we set lodgement date to today - ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d") + ending_epc_data["LODGEMENT_DATE_ENDING"] = created_at scoring_map = { 'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)', @@ -357,6 +360,9 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data.append(scoring_dict) + # cleanup + del data_processor + logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) @@ -365,32 +371,21 @@ async def trigger_plan(body: PlanTriggerRequest): cleaning_data = read_parquet_from_s3( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", - ) - cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"}) - # Merge the cleaning data onto recommendations_scoring_data + ).rename(columns={"local-authority": "LOCAL_AUTHORITY"}) - recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[ - ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] - ].replace("", None) + # Merge the cleaning data onto recommendations_scoring_data # Perform the same cleaning as in the model recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"] - ) - recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"]) - - # Note: We might need to perform the full pre-processing here - data_processor = DataProcessor(filepath=None) - data_processor.insert_data(recommendations_scoring_data) - data_processor.remap_columns() - recommendations_scoring_data = data_processor.data + ).drop(columns=["LOCAL_AUTHORITY"]) # Remap column types recommendations_scoring_data = recommendations_scoring_data.astype(columntypes) # Store parquet file in s3 for scoring - created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( portfolio_id=body.portfolio_id, timestamp=created_at diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 6d61d4d5..db333eac 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -12,6 +12,8 @@ from model_data.simulation_system.core.Settings import ( FLOOR_LEVEL_MAP, BUILT_FORM_REMAP, COLUMNS_TO_MERGE_ON, + COMPONENT_FEATURES, + FIXED_FEATURES ) from typing import List @@ -21,9 +23,16 @@ class DataProcessor: Handle data loading and data preprocessing """ - def __init__(self, filepath: Path | None) -> None: + def __init__(self, filepath: Path | None, newdata: bool = False) -> None: + """ + :param filepath: If specified, is the physical location of the data + :param newdata: Indicates if we are processing new, testing data. + In this instance, there are some operations we do not + want to perform, such as confine_data() + """ self.filepath = filepath self.data = None + self.newdata = newdata def load_data(self, low_memory=False) -> None: if not self.filepath: @@ -140,14 +149,30 @@ class DataProcessor: break to_index -= 1 + def reformat_columns(self): + """ + This function applies the re-formattng of columns from lower case to capitalised + + When requesting the epc data from the api, the columns are lower case + and separated by a hyphen, whereas in the bulk download, the columns + are capitalised and separated by underscores. If rename_columns is True + we convert the columns from lower case to capitalised format + :return: + """ + self.data.columns = [col.upper().replace("-", "_") for col in self.data.columns] + def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ - if not self.data: + if self.data is None: self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) - self.confine_data() + if self.newdata: + self.reformat_columns() + + if not self.newdata: + self.confine_data() # We have some non-standard construction age bands which we'll clean for matching self.standardise_construction_age_band() @@ -159,9 +184,10 @@ class DataProcessor: self.clean_multi_glaze_proportion() self.clean_photo_supply() - self.retain_multiple_epc_properties( - epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] - ) + if not self.newdata: + self.retain_multiple_epc_properties( + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] + ) self.remap_columns() if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: @@ -178,6 +204,7 @@ class DataProcessor: """ # Each uprn can fille backward from recent and forward fill from oldest # The groupby changes the order and we use the index to make the original data + filled_data = ( self.data.groupby("UPRN", group_keys=True)[columns_to_fill] .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) @@ -188,6 +215,11 @@ class DataProcessor: self.data[columns_to_fill] = filled_data[columns_to_fill] + # For floor area, we also replace "" values with None + self.data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = self.data[ + ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] + ].replace("", None) + def remap_columns(self): """ Remap all columns, for any non values @@ -430,3 +462,24 @@ class DataProcessor: data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) return data_to_clean + + def get_component_features(self, suffix: str) -> pd.DataFrame: + """ + This function will return the property components such as the walls, roof, heating etc + as well as lodgement date. These are features that we expect might change from one EPC to the + next + :param suffix: Should be one of "_STARTING" or "_ENDING" + :return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES + """ + + if suffix not in ["_STARTING", "_ENDING"]: + raise Exception("Suffix should be one of _STARTING or _ENFING") + + return self.data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix(suffix) + + def get_fixed_features(self) -> pd.DataFrame: + """ + Returns the fixed features that we don't believe should vary from one EPC to the next + :return: Pandas dataframe containing the columns defined in FIXED_FEATURES + """ + return self.data[FIXED_FEATURES]