added a dataprocessor class

This commit is contained in:
Michael Duong 2023-08-10 20:10:50 +00:00
parent bac9c2e6ae
commit 387a19d7cf

View file

@ -3,8 +3,8 @@ import os
import pandas as pd
from tqdm import tqdm
from model_data.BaseUtility import BaseUtility
# from BaseUtility import BaseUtility # I need this import as working in different folder
from pathlib import Path
from typing import Tuple
def list_subdirectories(directory_path):
return [entry for entry in directory_path.iterdir() if entry.is_dir()]
@ -91,56 +91,34 @@ RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
def make_cleaning_averages(df):
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].dropna().median()
cleaning_averages = df.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True
).apply(median_without_missing).reset_index()
general_averages = df.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
return cleaning_averages, general_averages
def iterative_filtering(cleaning_averages, property_data):
# Define the columns to filter on
columns_to_filter = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS"]
# Merge datasets together on columns
filtered_data = pd.merge(cleaning_averages, property_data.iloc[[-1]], on=columns_to_filter)
# Start with the entire cleaning_averages DataFrame
filtered_data = cleaning_averages.copy()
# # Start with the entire cleaning_averages DataFrame
# filtered_data = cleaning_averages.copy()
# Iterate through the columns and apply filters one by one
for column in columns_to_filter:
# Apply the filter using the value from property_data
new_filtered_data = filtered_data[filtered_data[column] == property_data[column].iloc[0]]
# # Iterate through the columns and apply filters one by one
# for column in columns_to_filter:
# # Apply the filter using the value from property_data
# new_filtered_data = filtered_data[filtered_data[column] == property_data[column].iloc[0]]
# If the filter results in no data, return the previous result
if new_filtered_data.empty:
continue
# # If the filter results in no data, return the previous result
# if new_filtered_data.empty:
# continue
# If the filter is successful, update the filtered data
filtered_data = new_filtered_data
# # If the filter is successful, update the filtered data
# filtered_data = new_filtered_data
return filtered_data
def clean_multi_glaze_proportion(df: pd.DataFrame) -> pd.DataFrame:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (df["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
df = df.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
return df
def ordinal(n):
if 10 <= n % 100 <= 20:
suffix = 'th'
@ -167,57 +145,107 @@ BUILT_FORM_REMAP = {
"Enclosed Mid-Terrace": "Mid-Terrace",
}
DATA_PROCESSOR_SETTINGS = {
'low_memory': False,
'epc_minimum_count': 1,
'column_mappings': {'UPRN': [int, str]}
}
def confine_data(df: pd.DataFrame) -> pd.DataFrame:
class DataProcessor:
"""
Include all step to reduce down the data based on assumptions
Handle data loading and data preprocessing
"""
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
def __init__(self, filepath: Path) -> None:
self.filepath = filepath
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
# before the introduction of SAP09
def load_data(self, low_memory=False) -> None:
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
# Filter 3: We remove EPCS that were conducted for a new build, since these are performed with
# full SAP, which produces different results to the RdSAP methodology
def process(self) -> pd.DataFrame:
"""
Load all data adnd process data via composition
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory'])
self.confine_data()
self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings'])
self.clean_multi_glaze_proportion()
self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count'])
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Filter 4: We remove floor level in top floor or mid floor since this is ambiguous
return self.data
def make_cleaning_averages(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True
).apply(median_without_missing).reset_index()
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
return cleaning_averages, general_averages
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
'''
Reduce the data futher by keeping only datasets with multiple epcs
'''
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
def recast_df_columns(self, column_mappings: dict) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
for key, values in column_mappings.items():
if key not in self.data.columns:
print('Column mapping incorrectly specified')
exit(1)
for value in values:
self.data[key] = self.data[key].astype(value)
df = df[~pd.isnull(df["UPRN"])] \
[df["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] \
[df["TRANSACTION_TYPE"] != "new dwelling"] \
[~df["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
def confine_data(self) -> None:
"""
Include all step to reduce down the data based on assumptions
"""
return df
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
def retain_multiple_epc_properties(df: pd.DataFrame, minimum_count: int = 1) -> pd.DataFrame:
'''
Reduce the data futher by keeping only datasets with multiple epcs
'''
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
# before the introduction of SAP09
counts = df.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# Filter 3: We remove EPCS that were conducted for a new build, since these are performed with
# full SAP, which produces different results to the RdSAP methodology
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > minimum_count]
df = pd.merge(df, counts, on='UPRN')
# Filter 4: We remove floor level in top floor or mid floor since this is ambiguous
return df
def recast_df_columns(df: pd.DataFrame, column_mappings: dict) -> pd.DataFrame:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
self.data = self.data[~pd.isnull(self.data["UPRN"])] \
[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] \
[self.data["TRANSACTION_TYPE"] != "new dwelling"] \
[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
for key, values in column_mappings.items():
if key not in df.columns:
print('Column mapping incorrectly specified')
exit(1)
for value in values:
df[key] = df[key].astype(value)
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
return df
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
@ -233,22 +261,17 @@ def app():
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = confine_data(df)
df = recast_df_columns(df, {'UPRN': [int, str]})
data_processor = DataProcessor(filepath=filepath)
df = clean_multi_glaze_proportion(df)
df = retain_multiple_epc_properties(df, minimum_count=1)
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
cleaning_averages, general_averages = make_cleaning_averages(df)
df = data_processor.process()
cleaning_averages, general_averages = data_processor.make_cleaning_averages()
for uprn, property_data in df.groupby("UPRN", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
ignore_epc = False
fixed_data = {}
for field in FIXED_FEATURES: