Model/model_data/simulation_system/DataProcessor.py

188 lines
9.1 KiB
Python

from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import BaseUtility
from simulation_system.Settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,
AVERAGE_FIXED_FEATURES,
FLOOR_HEIGHT_NATIONAL_AVERAGE,
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON
)
from typing import List
class DataProcessor:
"""
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path) -> None:
self.filepath = filepath
def load_data(self, low_memory=False) -> None:
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory'])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings'])
self.clean_multi_glaze_proportion()
self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count'])
self.remap_columns()
if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
return self.data
def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON):
"""
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
"""
# Each uprn can fille backward from recent and forward fill from oldest
# The groupby changes the order and we use the index to make the original data
filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply(
lambda group: group.fillna(method='bfill').fillna(method='ffill')
).reset_index().set_index('level_1').sort_index()
self.data[columns_to_fill] = filled_data[columns_to_fill]
def remap_columns(self):
"""
Remap all columns, for any non values
"""
# Map all anomaly values to None
data_anomaly_map = dict(zip(BaseUtility.DATA_ANOMALY_MATCHES, [None]*len(BaseUtility.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP)
self.data = data
def make_cleaning_averages(self) -> pd.DataFrame:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True,
dropna=False
).apply(median_without_missing).reset_index()
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply(
median_without_missing).reset_index()
built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
# We can clean up any NA's in the cleaning averages with the general averages here
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'], suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'], suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'], suffixes=['', '_BUILT_FORM_AVERAGE'])
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope and built form
# We can use just the property type average and replace
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
# If there still is na values, use average across all properties in consituecy
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT'].mean())
# If the consituency is all NA values, then take UK AVERAGE VALUES
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
'''
Reduce the data futher by keeping only datasets with multiple epcs
'''
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
def recast_df_columns(self, column_mappings: dict) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
for key, values in column_mappings.items():
if key not in self.data.columns:
print('Column mapping incorrectly specified')
exit(1)
for value in values:
self.data[key] = self.data[key].astype(value)
def confine_data(self) -> None:
"""
Include all step to reduce down the data based on assumptions
"""
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
# before the introduction of SAP09
# Filter 3: We remove EPCS that were conducted for a new build, since these are performed with
# full SAP, which produces different results to the RdSAP methodology
# Filter 4: We remove floor level in top floor or mid floor since this is ambiguous
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100