removed temp code and fixed bug where cleaning data is lower case in newdata mode

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-18 14:32:24 +00:00
parent 255bfc182d
commit 0c1ce64789
2 changed files with 50 additions and 53 deletions

View file

@ -28,8 +28,6 @@ from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_e
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.optimiser.CostOptimiser import CostOptimiser
@ -68,7 +66,6 @@ async def trigger_plan(body: PlanTriggerRequest):
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -96,13 +93,16 @@ async def trigger_plan(body: PlanTriggerRequest):
)
epc_records = {
'original_epc': epc_searcher.newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc,
'old_data': epc_searcher.older_epcs,
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata",
cleaning_data=cleaning_data) # This uses all the epc records to clean the data
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
input_properties.append(
Property(
@ -173,8 +173,6 @@ async def trigger_plan(body: PlanTriggerRequest):
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# all_predictions["heat_demand_predictions"]= all_predictions["sap_change_predictions"].copy()
# all_predictions["carbon_change_predictions"] = all_predictions["sap_change_predictions"].copy()
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations")
@ -310,10 +308,6 @@ async def trigger_plan(body: PlanTriggerRequest):
}
)
# all_combined_predictions["heat_demand_predictions"]= all_combined_predictions["sap_change_predictions"].copy()
# all_combined_predictions["carbon_change_predictions"] = all_combined_predictions[
# "sap_change_predictions"].copy()
# We update the carbon and heat demand predictions
for property_id, property_recommendations in recommendations.items():
combined_heat_demand = all_combined_predictions["heat_demand_predictions"]

View file

@ -33,7 +33,6 @@ NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS]
ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
# These lookups are used to clean the construction age band
construction_age_bounds_map = {
"England and Wales: before 1900": {"l": 0, "u": 1899},
@ -74,7 +73,8 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training", violation_mode: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
@ -82,23 +82,23 @@ class EPCDataProcessor:
want to perform, such as confine_data()
"""
is_data_a_dataframe = isinstance(data, pd.DataFrame)
self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
if run_mode not in ["training", "newdata"]:
raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata"
def prepare_data(self, filepath: Path | str | None = None) -> None:
"""
Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata
"""
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
@ -126,7 +126,7 @@ class EPCDataProcessor:
self.fill_na_fields()
self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step)
# Final re-casting after data transformed and prepared
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True)
@ -137,32 +137,36 @@ class EPCDataProcessor:
self.make_cleaning_averages(ignore_step=ignore_step)
# TODO: check if this has impact on training dataset
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
# cleaned_data = self.apply_averages_cleaning(
# data_to_clean=self.data,
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
# colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
# )
# When running in newdata mode, cleaning_averages has lower cases so we co-erce back to upper
cleaning_averages = self.cleaning_averages.copy()
if self.run_mode == "newdata":
cleaning_averages.columns = cleaning_averages.columns.str.upper()
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
)
data_to_clean=self.data,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
)
self.data = self.data if cleaned_data is None else cleaned_data
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
self.cast_data_columns_to_lower()
def cast_data_columns_to_lower(self):
"""
Convert all columns names to lower
"""
self.data.columns = self.data.columns.str.lower()
def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False):
"""
Convert all column names to lower
@ -171,9 +175,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
def add_local_authority_to_cleaning_average(self, ignore_step: bool = False):
"""
Add the Local authority column to the cleaning averages
@ -182,7 +186,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
@ -195,7 +199,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
@ -218,7 +222,6 @@ class EPCDataProcessor:
for col in convert_to_lower:
self.data[col] = self.data[col].str.lower()
def remap_build_form(self):
"""
Remap build form to standard values
@ -226,7 +229,6 @@ class EPCDataProcessor:
"""
self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
def remap_anomalies(self):
"""
Remap anomalies to None
@ -258,7 +260,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
def load_data(self, filepath, low_memory=False) -> None:
@ -404,7 +406,8 @@ class EPCDataProcessor:
# self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# # Final re-casting after data transformed and prepared
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
# COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
@ -423,7 +426,7 @@ class EPCDataProcessor:
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=COLUMNS_TO_MERGE_ON
# )
# self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
# self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
@ -431,7 +434,6 @@ class EPCDataProcessor:
# return self.data, self.cleaning_averages
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
@ -578,7 +580,7 @@ class EPCDataProcessor:
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
@ -604,15 +606,15 @@ class EPCDataProcessor:
self.data[key] = self.data[key].astype(value)
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Using a dictionary to recast all columns at once
"""
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
self.data = self.data.astype(column_mappings)
def confine_data(self, ignore_step: bool = False):
@ -642,7 +644,7 @@ class EPCDataProcessor:
violation_missing_hotwater_description,
violation_missing_roof_description,
violation_invalid_property_type,
], axis=1,
], axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
@ -654,8 +656,8 @@ class EPCDataProcessor:
"violation_missing_roof_description",
"violation_invalid_property_type",
]
)
)
self.data = pd.concat([self.data, violation_df], axis=1)
if ignore_step:
@ -703,7 +705,7 @@ class EPCDataProcessor:
if self.violation_mode:
# TODO:
return
if ignore_step:
return
@ -721,7 +723,8 @@ class EPCDataProcessor:
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False):
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None,
ignore_step: bool = False):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.