diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 8c199145..b3d1c623 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -28,8 +28,6 @@ from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_e from backend.ml_models.api import ModelApi from backend.Property import Property -from etl.epc.DataProcessor import EPCDataProcessor -from etl.epc.settings import COLUMNS_TO_MERGE_ON from etl.solar.SolarPhotoSupply import SolarPhotoSupply from recommendations.optimiser.CostOptimiser import CostOptimiser @@ -68,7 +66,6 @@ async def trigger_plan(body: PlanTriggerRequest): ) input_properties = [] - for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly @@ -96,13 +93,16 @@ async def trigger_plan(body: PlanTriggerRequest): ) epc_records = { - 'original_epc': epc_searcher.newest_epc, - 'full_sap_epc': epc_searcher.full_sap_epc, - 'old_data': epc_searcher.older_epcs, + 'original_epc': epc_searcher.newest_epc.copy(), + 'full_sap_epc': epc_searcher.full_sap_epc.copy(), + 'old_data': epc_searcher.older_epcs.copy(), } - prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", - cleaning_data=cleaning_data) # This uses all the epc records to clean the data + prepared_epc = EPCRecord( + epc_records=epc_records, + run_mode="newdata", + cleaning_data=cleaning_data + ) input_properties.append( Property( @@ -173,8 +173,6 @@ async def trigger_plan(body: PlanTriggerRequest): "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET } ) - # all_predictions["heat_demand_predictions"]= all_predictions["sap_change_predictions"].copy() - # all_predictions["carbon_change_predictions"] = all_predictions["sap_change_predictions"].copy() # Insert the predictions into the recommendations and run the optimiser logger.info("Optimising recommendations") @@ -310,10 +308,6 @@ async def trigger_plan(body: PlanTriggerRequest): } ) - # all_combined_predictions["heat_demand_predictions"]= all_combined_predictions["sap_change_predictions"].copy() - # all_combined_predictions["carbon_change_predictions"] = all_combined_predictions[ - # "sap_change_predictions"].copy() - # We update the carbon and heat demand predictions for property_id, property_recommendations in recommendations.items(): combined_heat_demand = all_combined_predictions["heat_demand_predictions"] diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 801a9456..5dfeea1a 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -33,7 +33,6 @@ NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS] ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS] POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] - # These lookups are used to clean the construction age band construction_age_bounds_map = { "England and Wales: before 1900": {"l": 0, "u": 1899}, @@ -74,7 +73,8 @@ class EPCDataProcessor: Handle data loading and data preprocessing """ - def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None: + def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, + run_mode: str = "training", violation_mode: bool = False) -> None: """ :param filepath: If specified, is the physical location of the data :param is_newdata: Indicates if we are processing new, testing data. @@ -82,23 +82,23 @@ class EPCDataProcessor: want to perform, such as confine_data() """ is_data_a_dataframe = isinstance(data, pd.DataFrame) - self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() + self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame) - self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() # FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA self.violation_mode = violation_mode if run_mode not in ["training", "newdata"]: raise ValueError("Run mode must be either training or newdata") self.run_mode = run_mode if not violation_mode else "newdata" - + def prepare_data(self, filepath: Path | str | None = None) -> None: """ Given the run mode, we apply the relevant pipeline steps Ignore step is used to highlight which steps are not needed in newdata """ - + ignore_step = True if self.run_mode == "newdata" else False if filepath is not None: @@ -126,7 +126,7 @@ class EPCDataProcessor: self.fill_na_fields() self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step) - + # Final re-casting after data transformed and prepared self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True) @@ -137,32 +137,36 @@ class EPCDataProcessor: self.make_cleaning_averages(ignore_step=ignore_step) # TODO: check if this has impact on training dataset - cleaned_data = self.apply_averages_cleaning( - data_to_clean=self.data, - cleaning_data=self.cleaning_averages, - cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], - colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], - ) + # cleaned_data = self.apply_averages_cleaning( + # data_to_clean=self.data, + # cleaning_data=self.cleaning_averages, + # cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + # colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], + # ) + + # When running in newdata mode, cleaning_averages has lower cases so we co-erce back to upper + cleaning_averages = self.cleaning_averages.copy() + if self.run_mode == "newdata": + cleaning_averages.columns = cleaning_averages.columns.str.upper() cleaned_data = self.apply_averages_cleaning( - data_to_clean=self.data, - cleaning_data=self.cleaning_averages, - cols_to_merge_on=COLUMNS_TO_MERGE_ON, - ) - + data_to_clean=self.data, + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON, + ) + self.data = self.data if cleaned_data is None else cleaned_data self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step) self.cast_data_columns_to_lower() - def cast_data_columns_to_lower(self): """ Convert all columns names to lower """ self.data.columns = self.data.columns.str.lower() - + def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False): """ Convert all column names to lower @@ -171,9 +175,9 @@ class EPCDataProcessor: if ignore_step: return - + self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() - + def add_local_authority_to_cleaning_average(self, ignore_step: bool = False): """ Add the Local authority column to the cleaning averages @@ -182,7 +186,7 @@ class EPCDataProcessor: if ignore_step: return - + self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] def fill_invalid_constituency_fields(self, ignore_step: bool = False): @@ -195,7 +199,7 @@ class EPCDataProcessor: if ignore_step: return - + self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False): @@ -218,7 +222,6 @@ class EPCDataProcessor: for col in convert_to_lower: self.data[col] = self.data[col].str.lower() - def remap_build_form(self): """ Remap build form to standard values @@ -226,7 +229,6 @@ class EPCDataProcessor: """ self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP) - def remap_anomalies(self): """ Remap anomalies to None @@ -258,7 +260,7 @@ class EPCDataProcessor: if ignore_step: return - + self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) def load_data(self, filepath, low_memory=False) -> None: @@ -404,7 +406,8 @@ class EPCDataProcessor: # self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) # # Final re-casting after data transformed and prepared - # coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES + # coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else + # COLUMNTYPES # for k, v in coltypes.items(): # self.data[k] = self.data[k].astype(v) # self.data = self.data.astype(coltypes) @@ -423,7 +426,7 @@ class EPCDataProcessor: # cleaning_data=self.cleaning_averages, # cols_to_merge_on=COLUMNS_TO_MERGE_ON # ) - + # self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] # self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() @@ -431,7 +434,6 @@ class EPCDataProcessor: # return self.data, self.cleaning_averages - def na_remapping(self, auto_subset_columns: bool = False): fill_na_map_apply = { @@ -578,7 +580,7 @@ class EPCDataProcessor: if self.violation_mode: # TODO: to fill in return - + if ignore_step: return @@ -604,15 +606,15 @@ class EPCDataProcessor: self.data[key] = self.data[key].astype(value) else: self.data[key] = self.data[key].astype(values) - + def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: """ Using a dictionary to recast all columns at once - """ + """ if auto_subset_columns: column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} - + self.data = self.data.astype(column_mappings) def confine_data(self, ignore_step: bool = False): @@ -642,7 +644,7 @@ class EPCDataProcessor: violation_missing_hotwater_description, violation_missing_roof_description, violation_invalid_property_type, - ], axis=1, + ], axis=1, keys=[ "violation_uprn_missing", "violation_old_lodgment_date", @@ -654,8 +656,8 @@ class EPCDataProcessor: "violation_missing_roof_description", "violation_invalid_property_type", ] - ) - + ) + self.data = pd.concat([self.data, violation_df], axis=1) if ignore_step: @@ -703,7 +705,7 @@ class EPCDataProcessor: if self.violation_mode: # TODO: return - + if ignore_step: return @@ -721,7 +723,8 @@ class EPCDataProcessor: self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) @staticmethod - def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False): + def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, + ignore_step: bool = False): """ Clean the input DataFrame using averages from a cleaning DataFrame.