from pathlib import Path import numpy as np import pandas as pd from model_data.BaseUtility import Definitions from simulation_system.Settings import ( DATA_PROCESSOR_SETTINGS, EARLIEST_EPC_DATE, FULLY_GLAZED_DESCRIPTIONS, AVERAGE_FIXED_FEATURES, FLOOR_HEIGHT_NATIONAL_AVERAGE, TOTAL_FLOOR_AREA_NATIONAL_AVERAGE, FLOOR_LEVEL_MAP, BUILT_FORM_REMAP, COLUMNS_TO_MERGE_ON ) from typing import List class DataProcessor: """ Handle data loading and data preprocessing """ def __init__(self, filepath: Path) -> None: self.filepath = filepath def load_data(self, low_memory=False) -> None: self.data = pd.read_csv(self.filepath, low_memory=low_memory) def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory']) self.confine_data() # TODO: CLean number of heated rooms and habitable rooms self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings']) self.clean_multi_glaze_proportion() self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count']) self.remap_columns() if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1: # If we have multiple EPC records, we can try and do filling self.fill_na_fields() self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) return self.data def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON): """ If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields """ # Each uprn can fille backward from recent and forward fill from oldest # The groupby changes the order and we use the index to make the original data filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply( lambda group: group.fillna(method='bfill').fillna(method='ffill') ).reset_index().set_index('level_1').sort_index() self.data[columns_to_fill] = filled_data[columns_to_fill] def remap_columns(self): """ Remap all columns, for any non values """ # Map all anomaly values to None data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES))) # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values data = self.data.replace(data_anomaly_map) data = data.replace(np.NAN, None) # Remap certain columns data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP) data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP) self.data = data def make_cleaning_averages(self) -> pd.DataFrame: # Define a custom function to calculate the median, excluding missing values def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) cleaning_averages = self.data.groupby( ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], observed=True, dropna=False ).apply(median_without_missing).reset_index() general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply( median_without_missing).reset_index() property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply( median_without_missing).reset_index() built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply( median_without_missing).reset_index() # We can clean up any NA's in the cleaning averages with the general averages here cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'], suffixes=['', '_AVERAGE']) cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'], suffixes=['', '_PROPERTY_AVERAGE']) cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'], suffixes=['', '_BUILT_FORM_AVERAGE']) # Replace any missing NAN values with averages for the same Property type and built form cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE']) cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE']) cleaning_averages_filled = cleaning_averages_filled.drop( columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE']) # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE']) cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE']) cleaning_averages_filled = cleaning_averages_filled.drop( columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE']) # If there are still NA values, use BUILT FORM averages cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE']) cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) cleaning_averages_filled = cleaning_averages_filled.drop( columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) # If there still is na values, use average across all properties in consituecy cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean()) cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( cleaning_averages_filled['FLOOR_HEIGHT'].mean()) # If the consituency is all NA values, then take UK AVERAGE VALUES cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( FLOOR_HEIGHT_NATIONAL_AVERAGE) return cleaning_averages_filled def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: ''' Reduce the data futher by keeping only datasets with multiple epcs ''' counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on='UPRN') def recast_df_columns(self, column_mappings: dict) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ for key, values in column_mappings.items(): if key not in self.data.columns: print('Column mapping incorrectly specified') exit(1) for value in values: self.data[key] = self.data[key].astype(value) def confine_data(self) -> None: """ Include all step to reduce down the data based on assumptions """ # Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one # Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged # before the introduction of SAP09 # Filter 3: We remove EPCS that were conducted for a new build, since these are performed with # full SAP, which produces different results to the RdSAP methodology # Filter 4: We remove floor level in top floor or mid floor since this is ambiguous self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"] self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])] def clean_multi_glaze_proportion(self) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & ( self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100