From 36f0e883bbd44e12af79552cfb8127d82d5f4ecd Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Wed, 14 Aug 2024 09:17:03 +0100 Subject: [PATCH] add different stratgey classes --- etl/epc/DataProcessorNew.py | 1009 +++++++++++++++++++++++++++++++++++ 1 file changed, 1009 insertions(+) create mode 100644 etl/epc/DataProcessorNew.py diff --git a/etl/epc/DataProcessorNew.py b/etl/epc/DataProcessorNew.py new file mode 100644 index 00000000..690f2b9d --- /dev/null +++ b/etl/epc/DataProcessorNew.py @@ -0,0 +1,1009 @@ +from pathlib import Path +import numpy as np +import pandas as pd +from BaseUtility import Definitions +from etl.epc.settings import ( + DATA_PROCESSOR_SETTINGS, + EARLIEST_EPC_DATE, + # IGNORED_TRANSACTION_TYPES, + IGNORED_FLOOR_LEVELS, + IGNORED_PROPERTY_TYPES, + IGNORED_TENURES, + FULLY_GLAZED_DESCRIPTIONS, + AVERAGE_FIXED_FEATURES, + BUILT_FORM_REMAP, + COLUMNS_TO_MERGE_ON, + FIXED_FEATURES, + COLUMNTYPES, + RDSAP_RESPONSE, + MAX_SAP_SCORE, + fill_na_map, + STARTING_SUFFIX_COMPONENT_COLS, + NO_SUFFIX_COMPONENT_COLS, + ENDING_SUFFIX_COMPONENT_COLS, + POTENTIAL_COLUMNS, + EFFICIENCY_FEATURES, +) +from recommendations.rdsap_tables import FLOOR_LEVEL_MAP +from abc import ABC, abstractmethod +from typing import List + +# TODO: change the setting columns to lower +STARTING_SUFFIX_COMPONENT_COLS = [x.lower() for x in STARTING_SUFFIX_COMPONENT_COLS] +NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS] +ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS] +POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] + +# These lookups are used to clean the construction age band +construction_age_bounds_map = { + "England and Wales: before 1900": {"l": 0, "u": 1899}, + "England and Wales: 1930-1949": {"l": 1930, "u": 1949}, + "England and Wales: 1900-1929": {"l": 1900, "u": 1929}, + "England and Wales: 1950-1966": {"l": 1950, "u": 1966}, + "England and Wales: 1967-1975": {"l": 1967, "u": 1975}, + "England and Wales: 1976-1982": {"l": 1976, "u": 1982}, + "England and Wales: 1983-1990": {"l": 1983, "u": 1990}, + "England and Wales: 1991-1995": {"l": 1991, "u": 1995}, + "England and Wales: 1996-2002": {"l": 1996, "u": 2002}, + "England and Wales: 2003-2006": {"l": 2003, "u": 2006}, + "England and Wales: 2007-2011": {"l": 2007, "u": 2011}, + "England and Wales: 2012 onwards": {"l": 2012, "u": 3000}, +} + +construction_age_remap = { + "England and Wales: 2007 onwards": "England and Wales: 2007-2011" +} + +expanded_map = { + i: [ + label + for label, bounds in construction_age_bounds_map.items() + if (i <= bounds["u"]) and (i >= bounds["l"]) + ][0] + for i in range(0, 3001) +} + + +def is_int(x): + try: + int(x) + return True + except: + return False + + +class EPCDataProcessor(ABC): + """ + Abstract class for data processing + """ + + def __init__( + self, + data: pd.DataFrame | None = None, + cleaning_averages: pd.DataFrame | None = None, + ) -> None: + is_data_a_dataframe = isinstance(data, pd.DataFrame) + self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() + + is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame) + self.cleaning_averages: pd.DataFrame = ( + cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + ) + + @abstractmethod + def prepare_data(self, filepath: Path | str | None = None) -> None: + """ + Main pipeline for data processing + """ + pass + + def rename_kwhdata_columns(self): + """ + Rename the columns for the kwh data to the epc api data, which are uppercase and underscore + """ + self.data.columns = self.data.columns.str.upper().str.replace("-", "_") + + def cast_data_columns_to_lower(self): + """ + Convert all columns names to lower + """ + self.data.columns = self.data.columns.str.lower() + + def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False): + """ + Convert all column names to lower + No need in newdata mode + """ + + if ignore_step: + return + + self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower() + + def add_local_authority_to_cleaning_average(self, ignore_step: bool = False): + """ + Add the Local authority column to the cleaning averages + No need in newdata mode + """ + + if ignore_step: + return + + self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[ + 0 + ] + + def fill_invalid_constituency_fields(self, ignore_step: bool = False): + """ + For some weird cases, where data has missing constituency, we add a dummy value + """ + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + + self.data = self.data.fillna( + {"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]} + ) + + def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False): + """ + Order data by uprn and lodgement data + No Violation mode needed + """ + + if ignore_step: + return + + self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) + + def cast_data_column_values_to_lower(self): + """ + For given columns, cast values to lower + No Violation mode or newdata modes required + """ + convert_to_lower = ["TRANSACTION_TYPE"] + for col in convert_to_lower: + self.data[col] = self.data[col].str.lower() + + def remap_build_form(self): + """ + Remap build form to standard values + No Violation mode or newdata modes required + """ + self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP) + + def remap_anomalies(self): + """ + Remap anomalies to None + No Violation mode or newdata modes required + """ + + # Map all anomaly values to None + data_anomaly_map = dict( + zip( + Definitions.DATA_ANOMALY_MATCHES, + [None] * len(Definitions.DATA_ANOMALY_MATCHES), + ) + ) + + # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values + data = self.data.replace(data_anomaly_map) + data = data.replace(np.NAN, None) + + self.data = data + + def remap_floor_level(self, ignore_step: bool = False): + """ + Remap floor level to standard values + """ + + if self.violation_mode: + # TODO: We need to handle this case + return + + if ignore_step: + return + + self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) + + def load_data(self, filepath, low_memory=False) -> None: + if not filepath: + raise ValueError("No filepath specified") + self.data = pd.read_csv(filepath, low_memory=low_memory) + + def insert_data(self, data: pd.DataFrame) -> None: + self.data = data + + @staticmethod + def clean_construction_age_band(x): + # Firstly, we check if it's an error value + if x in Definitions.DATA_ANOMALY_MATCHES or x in [None, np.nan]: + return x + + # Next, we check if it's a value in our map + if construction_age_bounds_map.get(x): + return x + + # We check if it's a standard remap value + remap_value = construction_age_remap.get(x, None) + if remap_value: + return remap_value + + # We check if it's a number + if is_int(x): + x_int = int(x) + return expanded_map[x_int] + + raise NotImplementedError("Not handled the case for value %s" % x) + + def standardise_construction_age_band(self, ignore_step: bool = False): + """ + This function will tidy up some of the non-standard values that are populated in the construction age + band, which is useful for cleaning + """ + + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + + self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply( + lambda x: self.clean_construction_age_band(x) + ) + + self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])] + + def clean_missing_rooms(self, ignore_step: bool = False): + """ + For the number of heated rooms and number of habitable rooms, we clean these values up front, + based on property archetype and age + + TODO: We could use a model based impution approach for possibly more accurate cleaning + """ + + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + + # TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning) + self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply( + lambda x: x.split(" ")[0] + ) + + def apply_clean(data, matching_columns): + + cleaning_data = ( + data[~pd.isnull(data[col])] + .groupby(matching_columns)[col] + .median() + .reset_index() + ) + + data = data.merge( + cleaning_data, + how="left", + on=matching_columns, + suffixes=("", "_CLEANING"), + ) + + data[col] = np.where( + pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col] + ) + data = data.drop(columns=f"{col}_CLEANING") + return data + + for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]: + + to_index = 3 + matching_columns = [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "POSTAL_AREA", + ] + has_missings = pd.isnull(self.data[col]).sum() + while has_missings: + self.data = apply_clean( + data=self.data, matching_columns=matching_columns[0 : to_index + 1] + ) + has_missings = pd.isnull(self.data[col]).sum() + + if not has_missings or to_index == 0: + # Check if we've gotten to index 0 and still have missings - something has gone wrong or + # we have a very unique property type + if has_missings: + raise NotImplementedError( + "Handle this edge case, we still have missings for column %s" + % col + ) + + break + to_index -= 1 + + def na_remapping(self, auto_subset_columns: bool = False): + + fill_na_map_apply = ( + {k: v for k, v in fill_na_map.items() if k in self.data.columns} + if auto_subset_columns + else fill_na_map + ) + + for column, fill_value in fill_na_map_apply.items(): + self.data[column] = self.data[column].fillna(fill_value) + + def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON): + """ + If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields + """ + # Each uprn can fille backward from recent and forward fill from oldest + # The groupby changes the order and we use the index to make the original data + + filled_data = ( + self.data.groupby("UPRN", group_keys=True)[columns_to_fill] + .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) + .reset_index() + .set_index("level_1") + .sort_index() + ) + + self.data[columns_to_fill] = filled_data[columns_to_fill] + + # For floor area, we also replace "" values with None + self.data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = self.data[ + ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] + ].replace("", None) + + def make_cleaning_averages(self, ignore_step: bool = False) -> pd.DataFrame: + """ + Create a dataset to hold averages based on property type, built form, construction age, and rooms. + Not require in newdata mode + """ + + if ignore_step: + return pd.DataFrame() + + # Define a custom function to calculate the median, excluding missing values + def median_without_missing(group): + return group[AVERAGE_FIXED_FEATURES].median(skipna=True) + + cleaning_averages = ( + self.data.groupby( + [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "NUMBER_HABITABLE_ROOMS", + "NUMBER_HEATED_ROOMS", + ], + observed=True, + dropna=False, + ) + .apply(median_without_missing) + .reset_index() + ) + + general_averages = ( + self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) + + property_averages = ( + self.data.groupby(["PROPERTY_TYPE"], observed=True) + .apply(median_without_missing) + .reset_index() + ) + + built_form_averages = ( + self.data.groupby(["BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) + + # We can clean up any NA's in the cleaning averages with the general averages here + cleaning_averages_filled = pd.merge( + cleaning_averages, + general_averages, + on=["PROPERTY_TYPE", "BUILT_FORM"], + suffixes=["", "_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + property_averages, + on=["PROPERTY_TYPE"], + suffixes=["", "_PROPERTY_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + built_form_averages, + on=["BUILT_FORM"], + suffixes=["", "_BUILT_FORM_AVERAGE"], + ) + + for variable in AVERAGE_FIXED_FEATURES: + # Replace any missing NAN values with averages for the same Property type and built form + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"]) + + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_AVERAGE" + ) + + # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope + # and built form + # We can use just the property type average and replace + + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]) + + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_PROPERTY_AVERAGE" + ) + + # If there are still NA values, use BUILT FORM averages + cleaning_averages_filled["variable"] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]) + + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_BUILT_FORM_AVERAGE" + ) + + # If there still is na values, use average across all epc in consituecy + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[variable].mean()) + + # If the consituency is all NA values, then take UK AVERAGE VALUES + # cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + # "TOTAL_FLOOR_AREA" + # ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) + # cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + # "FLOOR_HEIGHT" + # ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) + + self.cleaning_averages = cleaning_averages_filled + + def retain_multiple_epc_properties( + self, epc_minimum_count: int = 1, ignore_step: bool = False + ) -> None: + """ + Reduce the data futher by keeping only datasets with multiple epcs + """ + + if self.violation_mode: + # TODO: to fill in + return + + if ignore_step: + return + + counts = self.data.groupby("UPRN").size().reset_index() + counts.columns = ["UPRN", "count"] + + # take UPRNS with multiple EPCs + counts = counts[counts["count"] > epc_minimum_count] + self.data = pd.merge(self.data, counts, on="UPRN") + + def recast_df_columns( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: + """ + Recast columns from the dataframe to ensure the behaviour we want + """ + if auto_subset_columns: + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } + + for key, values in column_mappings.items(): + if key not in self.data.columns: + raise ValueError("Column mapping incorrectly specified") + if isinstance(values, list): + for value in values: + self.data[key] = self.data[key].astype(value) + else: + self.data[key] = self.data[key].astype(values) + + def recast_all_data( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: + """ + Using a dictionary to recast all columns at once + """ + + if auto_subset_columns: + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } + + self.data = self.data.astype(column_mappings) + + def confine_data(self, ignore_step: bool = False): + """ + Include all step to reduce down the data based on assumptions + """ + + if self.violation_mode: + violation_uprn_missing = pd.isnull(self.data["UPRN"]) + violation_old_lodgment_date = ( + self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE + ) + # violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES + violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin( + IGNORED_FLOOR_LEVELS + ) + violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE + violation_missing_windows_description = pd.isnull( + self.data["WINDOWS_DESCRIPTION"] + ) + violation_missing_hotwater_description = pd.isnull( + self.data["HOTWATER_DESCRIPTION"] + ) + violation_missing_roof_description = pd.isnull( + self.data["ROOF_DESCRIPTION"] + ) + violation_invalid_property_type = ( + self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + ) + violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES) + + violation_df = pd.concat( + [ + violation_uprn_missing, + violation_old_lodgment_date, + violation_invalid_transaction_type, + violation_ignored_floor_level, + violation_rdsap_score_above_max, + violation_missing_windows_description, + violation_missing_hotwater_description, + violation_missing_roof_description, + violation_invalid_property_type, + violation_invalid_tenure, + ], + axis=1, + keys=[ + "violation_uprn_missing", + "violation_old_lodgment_date", + "violation_invalid_transaction_type", + "violation_ignored_floor_level", + "violation_rdsap_score_above_max", + "violation_missing_windows_description", + "violation_missing_hotwater_description", + "violation_missing_roof_description", + "violation_invalid_property_type", + "violation_invalid_tenure", + ], + ) + + self.data = pd.concat([self.data, violation_df], axis=1) + + if ignore_step: + return + + # Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one + + # Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged + # before the introduction of SAP09 + + # Filter 3: We remove EPCS that were conducted for a new build, since these are performed with + # full SAP, which produces different results to the RdSAP methodology + + # Filter 4: We remove floor level in top floor or mid floor since this is ambiguous + + # Filter 5: Remove any EPCs with a SAP score above 100 + + # Filter 6: We found a small number of cases that have missing window description so we drop these + + # Filter 7: We found a small number of cases that have missing hotwater description so we drop these + + self.data = self.data[~pd.isnull(self.data["UPRN"])] + self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] + # self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES] + self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)] + self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE] + + # We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them + self.data = self.data[~pd.isnull(self.data["WINDOWS_DESCRIPTION"])] + self.data = self.data[~pd.isnull(self.data["HOTWATER_DESCRIPTION"])] + self.data = self.data[~pd.isnull(self.data["ROOF_DESCRIPTION"])] + + # Because park homes are surveyed unusually (for example, we don't have u-values to + # look up for their different components, they need to be collected in survey and aren't reflected in + # EPCs) we'll ignore them from the model + self.data = self.data[self.data["PROPERTY_TYPE"] != IGNORED_PROPERTY_TYPES] + + # We remove EPCs where the tenure is unknown, but is usually an indicator of a new build + self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)] + + # We remap zero values to None + self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None + + def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None: + """ + If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 + """ + + if self.violation_mode: + # TODO: + return + + if ignore_step: + return + + no_multi_glaze_proportion_index = pd.isnull( + self.data["MULTI_GLAZE_PROPORTION"] + ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) + + self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 + + def clean_photo_supply(self) -> None: + """ + We fill photo supply with zeros where it's missing + """ + + self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) + + @staticmethod + def apply_averages_cleaning( + data_to_clean, + cleaning_data, + cols_to_merge_on, + colnames=None, + ignore_step: bool = False, + ): + """ + Clean the input DataFrame using averages from a cleaning DataFrame. + + :param data_to_clean: DataFrame to be cleaned. + :param cleaning_data: DataFrame containing data for cleaning. + :param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this + differs depending on where the function is being used. + :param colnames: If specified can be used to state exactly which columns to clean + :return: Cleaned DataFrame. + """ + + if ignore_step: + return None + + # The desired colnames to clean - which may not be present + if colnames is None: + colnames = [ + "TOTAL_FLOOR_AREA", + "FLOOR_HEIGHT", + "FIXED_LIGHTING_OUTLETS_COUNT", + ] + + cols_to_clean = [c for c in colnames if c in data_to_clean.columns] + + # Enforce data types + for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: + data_to_clean[col] = data_to_clean[col].astype(float) + + # Identify columns with non-NaN values + columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() + + # Calculate averages + cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( + dict( + zip( + cols_to_clean, + [ + "mean", + ] + * len(cols_to_clean), + ) + ) + ) + + # Merge with the original data + data_to_clean = pd.merge( + data_to_clean, + cleaning_averages_to_merge, + on=columns_to_merge_on, + suffixes=("", "_AVERAGE"), + how="left", + ) + + global_averages = cleaning_data[cols_to_clean].mean() + + # Fill NaN values with averages + for col in cols_to_clean: + data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) + data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) + # If we still have missings + data_to_clean[col].fillna(data_to_clean[col].mean(), inplace=True) + # Final step if we still have missings - use global mean + data_to_clean[col].fillna(global_averages[col], inplace=True) + + return data_to_clean + + def get_component_features(self, suffix: str) -> pd.DataFrame: + """ + This function will return the property components such as the walls, roof, heating etc + as well as lodgement date. These are features that we expect might change from one EPC to the + next + :param suffix: Should be one of "_STARTING" or "_ENDING" + :return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES + """ + + if suffix not in ["_starting", "_ending"]: + raise Exception("Suffix should be one of _starting or _ending") + + if suffix == "_STARTING": + starting_cols = ( + self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) + fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy() + + return pd.concat([starting_cols, fixed_cols], axis=1) + + return ( + self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) + + def get_fixed_features(self) -> pd.DataFrame: + """ + Returns the fixed features that we don't believe should vary from one EPC to the next + :return: Pandas dataframe containing the columns defined in FIXED_FEATURES + """ + return self.data[FIXED_FEATURES] + + @staticmethod + def coerce_boolean_columns(df: pd.DataFrame, cols_to_ignore: List | None = None): + """ + Coerce columns with string 'True'/'False' values to boolean columns. + + :param df: Input DataFrame. + :param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids + :return: DataFrame with coerced columns. + """ + object_columns = df.select_dtypes(include=["object"]).columns + if cols_to_ignore: + object_columns = [c for c in object_columns if c not in cols_to_ignore] + + for column in object_columns: + unique_values = df[column].dropna().unique() + # If the unique values in the column are 'True' and 'False', convert the column to boolean + if set(unique_values) == {"True", "False"} or set(unique_values) == { + True, + False, + }: + df[column] = df[column].astype(bool) + + return df + + @staticmethod + def calculate_days_to(lodgement_date): + + if isinstance(lodgement_date, str): + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).days + + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).dt.days + + @staticmethod + def clean_missings_after_description_process(df, ignore_cols=None): + missings = pd.isnull(df).sum() + missings = missings[missings > 0] + + if ignore_cols: + missings = missings[~missings.index.isin(ignore_cols)] + + for col in missings.index: + unique_values = df[col].unique() + # TODO: confirm this behaviour + if True in unique_values or False in unique_values: + df[col] = df[col].fillna(False) + if "none" in unique_values: + df[col] = df[col].fillna("none") + else: + df[col] = df[col].fillna("Unknown") + + return df + + @staticmethod + def clean_efficiency_variables(df): + """ + These is scope to clean this by the model per corresponding description. + E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and + fill in the missing values with this. + When looking at this initially, there are a large volume of records with missing energy efficiency + values and therefore a simpler approach was taken just to test including these variables + :param df: + :return: + """ + + missings = pd.isnull(df).sum() + missings = missings[missings >= 1] + + if len(missings) == 0: + return df + + # Make sure they are all efficiency columns + if any(~missings.index.str.contains("ENERGY_EFF")): + raise ValueError("Non efficiency columns are missing") + + for m in missings.index: + df[m] = df[m].fillna("NO_RATING") + + return df + + +class TrainingDataProcessor(EPCDataProcessor): + """ + Handle data loading and data preprocessing for training data + """ + + def prepare_data(self, filepath: Path | str | None = None) -> None: + """ + Process the data for training + """ + if filepath is not None: + self.load_data( + filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"] + ) + + if len(self.data) == 0: + raise Exception("No data to process - check filepath/ data being passed in") + + self.confine_data() + self.remap_anomalies() + self.remap_floor_level(ignore_step=ignore_step) + self.remap_build_form() + self.cast_data_column_values_to_lower() + self.standardise_construction_age_band(ignore_step=ignore_step) + if self.run_mode != "kwhdata": + self.clean_missing_rooms(ignore_step=ignore_step) + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) + self.clean_multi_glaze_proportion(ignore_step=ignore_step) + self.clean_photo_supply() + if self.run_mode != "kwhdata": + self.retain_multiple_epc_properties( + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], + ignore_step=ignore_step, + ) + + if self.run_mode != "kwhdata": + self.fill_na_fields() + + self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step) + + # Final re-casting after data transformed and prepared + self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.na_remapping(auto_subset_columns=True) + + self.fill_invalid_constituency_fields(ignore_step=ignore_step) + + if self.run_mode != "kwhdata": + self.make_cleaning_averages(ignore_step=ignore_step) + self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) + + # TODO: check if this has impact on training dataset + # cleaned_data = self.apply_averages_cleaning( + # data_to_clean=self.data, + # cleaning_data=self.cleaning_averages, + # cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + # colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], + # ) + + # When running in newdata mode, cleaning_averages has lower cases so we co-erce back to upper + + cleaning_averages = self.cleaning_averages.copy() + if self.run_mode == "newdata": + cleaning_averages.columns = cleaning_averages.columns.str.upper() + + if self.run_mode == "kwhdata": + cleaned_data = self.data + else: + cleaned_data = self.apply_averages_cleaning( + data_to_clean=self.data, + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON, + ) + + self.data = self.data if cleaned_data is None else cleaned_data + + if self.run_mode != "kwhdata": + self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step) + + self.cast_data_columns_to_lower() + + +class InferenceDataProcessor(EPCDataProcessor): + """ + Handle data loading and data preprocessing for inference data, + There are certain steps that are not required for inference data + """ + + def prepare_data(self, filepath: Path | str | None = None) -> None: + """ + Process the data for training + """ + if filepath is not None: + self.load_data( + filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"] + ) + + if len(self.data) == 0: + raise Exception("No data to process - check filepath/ data being passed in") + + self.remap_anomalies() + self.remap_build_form() + self.cast_data_column_values_to_lower() + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) + self.clean_photo_supply() + self.fill_na_fields() + + # Final re-casting after data transformed and prepared + self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.na_remapping(auto_subset_columns=True) + + cleaning_averages = self.cleaning_averages.copy() + if self.run_mode == "newdata": + cleaning_averages.columns = cleaning_averages.columns.str.upper() + + self.cast_data_columns_to_lower() + + +class KwhDataProcessor(EPCDataProcessor): + """ + Handle data loading and data preprocessing for kwh data + Will have different steps to the other data processors + """ + + def prepare_data(self, filepath: Path | str | None = None) -> None: + """ + Given the run mode, we apply the relevant pipeline steps + Ignore step is used to highlight which steps are not needed in newdata + """ + + self.rename_kwhdata_columns() + + if len(self.data) == 0: + raise Exception("No data to process - check filepath/ data being passed in") + + self.confine_data() + self.remap_anomalies() + self.remap_floor_level() + self.remap_build_form() + self.cast_data_column_values_to_lower() + self.standardise_construction_age_band() + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) + self.clean_multi_glaze_proportion() + self.clean_photo_supply() + + self.sort_data_by_uprn_lodgement_date() + + # Final re-casting after data transformed and prepared + self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True) + self.na_remapping(auto_subset_columns=True) + + self.fill_invalid_constituency_fields() + self.cast_data_columns_to_lower()