from pathlib import Path import numpy as np import pandas as pd from BaseUtility import Definitions from etl.epc.settings import ( DATA_PROCESSOR_SETTINGS, EARLIEST_EPC_DATE, FULLY_GLAZED_DESCRIPTIONS, AVERAGE_FIXED_FEATURES, BUILT_FORM_REMAP, COLUMNS_TO_MERGE_ON, FIXED_FEATURES, COLUMNTYPES, RDSAP_RESPONSE, MAX_SAP_SCORE, fill_na_map, STARTING_SUFFIX_COMPONENT_COLS, NO_SUFFIX_COMPONENT_COLS, ENDING_SUFFIX_COMPONENT_COLS, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, ) from recommendations.rdsap_tables import FLOOR_LEVEL_MAP from typing import List # These lookups are used to clean the construction age band bounds_map = { "England and Wales: before 1900": {"l": 0, "u": 1899}, "England and Wales: 1930-1949": {"l": 1930, "u": 1949}, "England and Wales: 1900-1929": {"l": 1900, "u": 1929}, "England and Wales: 1950-1966": {"l": 1950, "u": 1966}, "England and Wales: 1967-1975": {"l": 1967, "u": 1975}, "England and Wales: 1976-1982": {"l": 1976, "u": 1982}, "England and Wales: 1983-1990": {"l": 1983, "u": 1990}, "England and Wales: 1991-1995": {"l": 1991, "u": 1995}, "England and Wales: 1996-2002": {"l": 1996, "u": 2002}, "England and Wales: 2003-2006": {"l": 2003, "u": 2006}, "England and Wales: 2007-2011": {"l": 2007, "u": 2011}, "England and Wales: 2012 onwards": {"l": 2012, "u": 3000}, } remap = { "England and Wales: 2007 onwards": "England and Wales: 2007-2011" } expanded_map = { i: [ label for label, bounds in bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l']) ][0] for i in range(0, 3001) } def is_int(x): try: int(x) return True except: return False class DataProcessor: """ Handle data loading and data preprocessing """ def __init__(self, filepath: Path | None, newdata: bool = False) -> None: """ :param filepath: If specified, is the physical location of the data :param newdata: Indicates if we are processing new, testing data. In this instance, there are some operations we do not want to perform, such as confine_data() """ self.filepath = filepath self.data = None self.newdata = newdata def load_data(self, low_memory=False) -> None: if not self.filepath: raise ValueError("No filepath specified") self.data = pd.read_csv(self.filepath, low_memory=low_memory) def insert_data(self, data: pd.DataFrame) -> None: self.data = data @staticmethod def clean_construction_age_band(x): # Firstly, we check if it's an error value if x in Definitions.DATA_ANOMALY_MATCHES or x in [None, np.nan]: return x # Next, we check if it's a value in our map if bounds_map.get(x): return x # We check if it's a standard remap value remap_value = remap.get(x, None) if remap_value: return remap_value # We check if it's a number if is_int(x): x_int = int(x) return expanded_map[x_int] raise NotImplementedError("Not handled the case for value %s" % x) def standardise_construction_age_band(self): """ This function will tidy up some of the non-standard values that are populated in the construction age band, which is useful for cleaning """ self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply( lambda x: self.clean_construction_age_band(x) ) self.data = self.data[ ~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"]) ] def clean_missing_rooms(self): """ For the number of heated rooms and number of habitable rooms, we clean these values up front, based on property archetype and age TODO: We could use a model based impution approach for possibly more accurate cleaning """ self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0]) def apply_clean(data, matching_columns): cleaning_data = data[~pd.isnull(data[col])].groupby( matching_columns )[col].median().reset_index() data = data.merge( cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING") ) data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]) data = data.drop(columns=f"{col}_CLEANING") return data for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]: to_index = 3 matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"] has_missings = pd.isnull(self.data[col]).sum() while has_missings: self.data = apply_clean( data=self.data, matching_columns=matching_columns[0:to_index + 1] ) has_missings = pd.isnull(self.data[col]).sum() if not has_missings or to_index == 0: # Check if we've gotten to index 0 and still have missings - something has gone wrong or # we have a very unique property type if has_missings: raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col) break to_index -= 1 def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ if self.data is None: self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) if not self.newdata: self.confine_data() self.remap_columns() # We have some non-standard construction age bands which we'll clean for matching if not self.newdata: self.standardise_construction_age_band() self.clean_missing_rooms() self.recast_df_columns( column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] ) if not self.newdata: self.clean_multi_glaze_proportion() self.clean_photo_supply() if not self.newdata: self.retain_multiple_epc_properties( epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] ) if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: # If we have multiple EPC records, we can try and do filling self.fill_na_fields() if not self.newdata: self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) # Final re-casting after data transformed and prepared coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.newdata else COLUMNTYPES for k, v in coltypes.items(): self.data[k] = self.data[k].astype(v) self.data = self.data.astype(coltypes) self.na_remapping() return self.data def na_remapping(self): fill_na_map_apply = { k: v for k, v in fill_na_map.items() if k in self.data.columns } if self.newdata else fill_na_map for column, fill_value in fill_na_map_apply.items(): self.data[column] = self.data[column].fillna(fill_value) def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON): """ If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields """ # Each uprn can fille backward from recent and forward fill from oldest # The groupby changes the order and we use the index to make the original data filled_data = ( self.data.groupby("UPRN", group_keys=True)[columns_to_fill] .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) .reset_index() .set_index("level_1") .sort_index() ) self.data[columns_to_fill] = filled_data[columns_to_fill] # For floor area, we also replace "" values with None self.data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = self.data[ ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] ].replace("", None) def remap_columns(self): """ Remap all columns, for any non values """ # Map all anomaly values to None data_anomaly_map = dict( zip( Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES), ) ) # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values data = self.data.replace(data_anomaly_map) data = data.replace(np.NAN, None) # Remap certain columns if not self.newdata: data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) convert_to_lower = ["TRANSACTION_TYPE"] for col in convert_to_lower: data[col] = data[col].str.lower() self.data = data def make_cleaning_averages(self) -> pd.DataFrame: # Define a custom function to calculate the median, excluding missing values def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) cleaning_averages = ( self.data.groupby( [ "PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", ], observed=True, dropna=False, ) .apply(median_without_missing) .reset_index() ) general_averages = ( self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True) .apply(median_without_missing) .reset_index() ) property_averages = ( self.data.groupby(["PROPERTY_TYPE"], observed=True) .apply(median_without_missing) .reset_index() ) built_form_averages = ( self.data.groupby(["BUILT_FORM"], observed=True) .apply(median_without_missing) .reset_index() ) # We can clean up any NA's in the cleaning averages with the general averages here cleaning_averages_filled = pd.merge( cleaning_averages, general_averages, on=["PROPERTY_TYPE", "BUILT_FORM"], suffixes=["", "_AVERAGE"], ) cleaning_averages_filled = pd.merge( cleaning_averages_filled, property_averages, on=["PROPERTY_TYPE"], suffixes=["", "_PROPERTY_AVERAGE"], ) cleaning_averages_filled = pd.merge( cleaning_averages_filled, built_form_averages, on=["BUILT_FORM"], suffixes=["", "_BUILT_FORM_AVERAGE"], ) for variable in AVERAGE_FIXED_FEATURES: # Replace any missing NAN values with averages for the same Property type and built form cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( cleaning_averages_filled[f"{variable}_AVERAGE"] ) cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE") # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"] ) cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE") # If there are still NA values, use BUILT FORM averages cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna( cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"] ) cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE") # If there still is na values, use average across all epc in consituecy cleaning_averages_filled[variable] = cleaning_averages_filled[ variable ].fillna(cleaning_averages_filled[variable].mean()) # If the consituency is all NA values, then take UK AVERAGE VALUES # cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ # "TOTAL_FLOOR_AREA" # ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) # cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ # "FLOOR_HEIGHT" # ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) return cleaning_averages_filled def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: """ Reduce the data futher by keeping only datasets with multiple epcs """ counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on="UPRN") def recast_df_columns(self, column_mappings: dict) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ for key, values in column_mappings.items(): if key not in self.data.columns: raise ValueError("Column mapping incorrectly specified") for value in values: self.data[key] = self.data[key].astype(value) def confine_data(self) -> None: """ Include all step to reduce down the data based on assumptions """ # Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one # Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged # before the introduction of SAP09 # Filter 3: We remove EPCS that were conducted for a new build, since these are performed with # full SAP, which produces different results to the RdSAP methodology # Filter 4: We remove floor level in top floor or mid floor since this is ambiguous # Filter 5: Remove any EPCs with a SAP score above 100 # Filter 6: We found a small number of cases that have missing window description so we drop these # Filter 7: We found a small number of cases that have missing hotwater description so we drop these self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"] self.data = self.data[ ~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"]) ] self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE] # We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them self.data = self.data[~pd.isnull(self.data["WINDOWS_DESCRIPTION"])] self.data = self.data[~pd.isnull(self.data["HOTWATER_DESCRIPTION"])] self.data = self.data[~pd.isnull(self.data["ROOF_DESCRIPTION"])] # Because park homes are surveyed unusually (for example, we don't have u-values to # look up for their different components, they need to be collected in survey and aren't reflected in # EPCs) we'll ignore them from the model self.data = self.data[self.data["PROPERTY_TYPE"] != "Park home"] def clean_multi_glaze_proportion(self) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ no_multi_glaze_proportion_index = pd.isnull( self.data["MULTI_GLAZE_PROPORTION"] ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 def clean_photo_supply(self) -> None: """ We fill photo supply with zeros where it's missing """ self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) @staticmethod def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None): """ Clean the input DataFrame using averages from a cleaning DataFrame. :param data_to_clean: DataFrame to be cleaned. :param cleaning_data: DataFrame containing data for cleaning. :param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this differs depending on where the function is being used. :param colnames: If specified can be used to state exactly which columns to clean :return: Cleaned DataFrame. """ # The desired colnames to clean - which may not be present if colnames is None: colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"] cols_to_clean = [ c for c in colnames if c in data_to_clean.columns ] # Enforce data types for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: data_to_clean[col] = data_to_clean[col].astype(float) # Identify columns with non-NaN values columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() # Calculate averages cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean))) ) # Merge with the original data data_to_clean = pd.merge( data_to_clean, cleaning_averages_to_merge, on=columns_to_merge_on, suffixes=("", "_AVERAGE"), how='left' ) global_averages = cleaning_data[cols_to_clean].mean() # Fill NaN values with averages for col in cols_to_clean: data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) # If we still have missings data_to_clean[col].fillna(data_to_clean[col].mean(), inplace=True) # Final step if we still have missings - use global mean data_to_clean[col].fillna(global_averages[col], inplace=True) return data_to_clean def get_component_features(self, suffix: str) -> pd.DataFrame: """ This function will return the property components such as the walls, roof, heating etc as well as lodgement date. These are features that we expect might change from one EPC to the next :param suffix: Should be one of "_STARTING" or "_ENDING" :return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES """ if suffix not in ["_STARTING", "_ENDING"]: raise Exception("Suffix should be one of _STARTING or _ENDING") if suffix == "_STARTING": starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix) fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy() return pd.concat([starting_cols, fixed_cols], axis=1) return self.data[ ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES ].copy().add_suffix(suffix) def get_fixed_features(self) -> pd.DataFrame: """ Returns the fixed features that we don't believe should vary from one EPC to the next :return: Pandas dataframe containing the columns defined in FIXED_FEATURES """ return self.data[FIXED_FEATURES] @staticmethod def coerce_boolean_columns(df: pd.DataFrame, cols_to_ignore: List | None = None): """ Coerce columns with string 'True'/'False' values to boolean columns. :param df: Input DataFrame. :param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids :return: DataFrame with coerced columns. """ object_columns = df.select_dtypes(include=['object']).columns if cols_to_ignore: object_columns = [c for c in object_columns if c not in cols_to_ignore] for column in object_columns: unique_values = df[column].dropna().unique() # If the unique values in the column are 'True' and 'False', convert the column to boolean if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}: df[column] = df[column].astype(bool) return df @staticmethod def calculate_days_to(lodgement_date): if isinstance(lodgement_date, str): return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).days return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).dt.days @staticmethod def clean_missings_after_description_process(df, ignore_cols=None): missings = pd.isnull(df).sum() missings = missings[missings > 0] if ignore_cols: missings = missings[~missings.index.isin(ignore_cols)] for col in missings.index: unique_values = df[col].unique() if True in unique_values or False in unique_values: df[col] = df[col].fillna(False) if "none" in unique_values: df[col] = df[col].fillna("none") else: df[col] = df[col].fillna("Unknown") return df @staticmethod def clean_efficiency_variables(df): """ These is scope to clean this by the model per corresponding description. E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and fill in the missing values with this. When looking at this initially, there are a large volume of records with missing energy efficiency values and therefore a simpler approach was taken just to test including these variables :param df: :return: """ missings = pd.isnull(df).sum() missings = missings[missings >= 1] if len(missings) == 0: return df # Make sure they are all efficiency columns if any(~missings.index.str.contains("ENERGY_EFF")): raise ValueError("Non efficiency columns are missing") for m in missings.index: df[m] = df[m].fillna("NO_RATING") return df