mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
986 lines
36 KiB
Python
986 lines
36 KiB
Python
from pathlib import Path
|
|
import numpy as np
|
|
import pandas as pd
|
|
from BaseUtility import Definitions
|
|
from etl.epc.settings import (
|
|
DATA_PROCESSOR_SETTINGS,
|
|
EARLIEST_EPC_DATE,
|
|
# IGNORED_TRANSACTION_TYPES,
|
|
IGNORED_FLOOR_LEVELS,
|
|
IGNORED_PROPERTY_TYPES,
|
|
IGNORED_TENURES,
|
|
FULLY_GLAZED_DESCRIPTIONS,
|
|
AVERAGE_FIXED_FEATURES,
|
|
BUILT_FORM_REMAP,
|
|
COLUMNS_TO_MERGE_ON,
|
|
FIXED_FEATURES,
|
|
COLUMNTYPES,
|
|
RDSAP_RESPONSE,
|
|
MAX_SAP_SCORE,
|
|
fill_na_map,
|
|
STARTING_SUFFIX_COMPONENT_COLS,
|
|
NO_SUFFIX_COMPONENT_COLS,
|
|
ENDING_SUFFIX_COMPONENT_COLS,
|
|
POTENTIAL_COLUMNS,
|
|
EFFICIENCY_FEATURES,
|
|
)
|
|
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
|
|
|
|
from typing import List
|
|
|
|
# TODO: change the setting columns to lower
|
|
STARTING_SUFFIX_COMPONENT_COLS = [x.lower() for x in STARTING_SUFFIX_COMPONENT_COLS]
|
|
NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS]
|
|
ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS]
|
|
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
|
|
|
|
# These lookups are used to clean the construction age band
|
|
construction_age_bounds_map = {
|
|
"England and Wales: before 1900": {"l": 0, "u": 1899},
|
|
"England and Wales: 1930-1949": {"l": 1930, "u": 1949},
|
|
"England and Wales: 1900-1929": {"l": 1900, "u": 1929},
|
|
"England and Wales: 1950-1966": {"l": 1950, "u": 1966},
|
|
"England and Wales: 1967-1975": {"l": 1967, "u": 1975},
|
|
"England and Wales: 1976-1982": {"l": 1976, "u": 1982},
|
|
"England and Wales: 1983-1990": {"l": 1983, "u": 1990},
|
|
"England and Wales: 1991-1995": {"l": 1991, "u": 1995},
|
|
"England and Wales: 1996-2002": {"l": 1996, "u": 2002},
|
|
"England and Wales: 2003-2006": {"l": 2003, "u": 2006},
|
|
"England and Wales: 2007-2011": {"l": 2007, "u": 2011},
|
|
"England and Wales: 2012 onwards": {"l": 2012, "u": 3000},
|
|
}
|
|
|
|
construction_age_remap = {
|
|
"England and Wales: 2007 onwards": "England and Wales: 2007-2011"
|
|
}
|
|
|
|
expanded_map = {
|
|
i: [
|
|
label
|
|
for label, bounds in construction_age_bounds_map.items()
|
|
if (i <= bounds["u"]) and (i >= bounds["l"])
|
|
][0]
|
|
for i in range(0, 3001)
|
|
}
|
|
|
|
|
|
def is_int(x):
|
|
try:
|
|
int(x)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
class EPCDataProcessor:
|
|
"""
|
|
Handle data loading and data preprocessing
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
data: pd.DataFrame | None = None,
|
|
cleaning_averages: pd.DataFrame | None = None,
|
|
run_mode: str = "training",
|
|
violation_mode: bool = False,
|
|
) -> None:
|
|
"""
|
|
:param filepath: If specified, is the physical location of the data
|
|
:param is_newdata: Indicates if we are processing new, testing data.
|
|
In this instance, there are some operations we do not
|
|
want to perform, such as confine_data()
|
|
"""
|
|
is_data_a_dataframe = isinstance(data, pd.DataFrame)
|
|
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
|
|
|
|
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
|
|
self.cleaning_averages: pd.DataFrame = (
|
|
cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
|
|
)
|
|
|
|
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
|
|
self.violation_mode = violation_mode
|
|
if run_mode not in ["training", "newdata"]:
|
|
raise ValueError("Run mode must be either training or newdata")
|
|
self.run_mode = run_mode if not violation_mode else "newdata"
|
|
|
|
def prepare_data(self, filepath: Path | str | None = None) -> None:
|
|
"""
|
|
Given the run mode, we apply the relevant pipeline steps
|
|
Ignore step is used to highlight which steps are not needed in newdata
|
|
"""
|
|
|
|
ignore_step = True if self.run_mode == "newdata" else False
|
|
|
|
if filepath is not None:
|
|
self.load_data(
|
|
filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]
|
|
)
|
|
|
|
if len(self.data) == 0:
|
|
raise Exception("No data to process - check filepath/ data being passed in")
|
|
|
|
self.confine_data(ignore_step=ignore_step)
|
|
self.remap_anomalies()
|
|
self.remap_floor_level(ignore_step=ignore_step)
|
|
self.remap_build_form()
|
|
self.cast_data_column_values_to_lower()
|
|
self.standardise_construction_age_band(ignore_step=ignore_step)
|
|
self.clean_missing_rooms(ignore_step=ignore_step)
|
|
self.recast_df_columns(
|
|
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
|
|
)
|
|
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
|
|
self.clean_photo_supply()
|
|
self.retain_multiple_epc_properties(
|
|
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"],
|
|
ignore_step=ignore_step,
|
|
)
|
|
|
|
self.fill_na_fields()
|
|
|
|
self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step)
|
|
|
|
# Final re-casting after data transformed and prepared
|
|
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
|
|
self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True)
|
|
self.na_remapping(auto_subset_columns=True)
|
|
|
|
self.fill_invalid_constituency_fields(ignore_step=ignore_step)
|
|
|
|
self.make_cleaning_averages(ignore_step=ignore_step)
|
|
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
|
|
|
|
# TODO: check if this has impact on training dataset
|
|
# cleaned_data = self.apply_averages_cleaning(
|
|
# data_to_clean=self.data,
|
|
# cleaning_data=self.cleaning_averages,
|
|
# cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
|
|
# colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
|
|
# )
|
|
|
|
# When running in newdata mode, cleaning_averages has lower cases so we co-erce back to upper
|
|
cleaning_averages = self.cleaning_averages.copy()
|
|
if self.run_mode == "newdata":
|
|
cleaning_averages.columns = cleaning_averages.columns.str.upper()
|
|
|
|
cleaned_data = self.apply_averages_cleaning(
|
|
data_to_clean=self.data,
|
|
cleaning_data=cleaning_averages,
|
|
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
|
|
)
|
|
|
|
self.data = self.data if cleaned_data is None else cleaned_data
|
|
|
|
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
|
|
self.cast_data_columns_to_lower()
|
|
|
|
def cast_data_columns_to_lower(self):
|
|
"""
|
|
Convert all columns names to lower
|
|
"""
|
|
self.data.columns = self.data.columns.str.lower()
|
|
|
|
def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False):
|
|
"""
|
|
Convert all column names to lower
|
|
No need in newdata mode
|
|
"""
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
|
|
|
|
def add_local_authority_to_cleaning_average(self, ignore_step: bool = False):
|
|
"""
|
|
Add the Local authority column to the cleaning averages
|
|
No need in newdata mode
|
|
"""
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[
|
|
0
|
|
]
|
|
|
|
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
|
|
"""
|
|
For some weird cases, where data has missing constituency, we add a dummy value
|
|
"""
|
|
if self.violation_mode:
|
|
# TODO: to fill in
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.data = self.data.fillna(
|
|
{"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}
|
|
)
|
|
|
|
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
|
|
"""
|
|
Order data by uprn and lodgement data
|
|
No Violation mode needed
|
|
"""
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
|
|
|
|
def cast_data_column_values_to_lower(self):
|
|
"""
|
|
For given columns, cast values to lower
|
|
No Violation mode or newdata modes required
|
|
"""
|
|
convert_to_lower = ["TRANSACTION_TYPE"]
|
|
for col in convert_to_lower:
|
|
self.data[col] = self.data[col].str.lower()
|
|
|
|
def remap_build_form(self):
|
|
"""
|
|
Remap build form to standard values
|
|
No Violation mode or newdata modes required
|
|
"""
|
|
self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
|
|
|
|
def remap_anomalies(self):
|
|
"""
|
|
Remap anomalies to None
|
|
No Violation mode or newdata modes required
|
|
"""
|
|
|
|
# Map all anomaly values to None
|
|
data_anomaly_map = dict(
|
|
zip(
|
|
Definitions.DATA_ANOMALY_MATCHES,
|
|
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
|
|
)
|
|
)
|
|
|
|
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
|
|
data = self.data.replace(data_anomaly_map)
|
|
data = data.replace(np.NAN, None)
|
|
|
|
self.data = data
|
|
|
|
def remap_floor_level(self, ignore_step: bool = False):
|
|
"""
|
|
Remap floor level to standard values
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
# TODO: We need to handle this case
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
|
|
|
|
def load_data(self, filepath, low_memory=False) -> None:
|
|
if not filepath:
|
|
raise ValueError("No filepath specified")
|
|
self.data = pd.read_csv(filepath, low_memory=low_memory)
|
|
|
|
def insert_data(self, data: pd.DataFrame) -> None:
|
|
self.data = data
|
|
|
|
@staticmethod
|
|
def clean_construction_age_band(x):
|
|
# Firstly, we check if it's an error value
|
|
if x in Definitions.DATA_ANOMALY_MATCHES or x in [None, np.nan]:
|
|
return x
|
|
|
|
# Next, we check if it's a value in our map
|
|
if construction_age_bounds_map.get(x):
|
|
return x
|
|
|
|
# We check if it's a standard remap value
|
|
remap_value = construction_age_remap.get(x, None)
|
|
if remap_value:
|
|
return remap_value
|
|
|
|
# We check if it's a number
|
|
if is_int(x):
|
|
x_int = int(x)
|
|
return expanded_map[x_int]
|
|
|
|
raise NotImplementedError("Not handled the case for value %s" % x)
|
|
|
|
def standardise_construction_age_band(self, ignore_step: bool = False):
|
|
"""
|
|
This function will tidy up some of the non-standard values that are populated in the construction age
|
|
band, which is useful for cleaning
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
# TODO: to fill in
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply(
|
|
lambda x: self.clean_construction_age_band(x)
|
|
)
|
|
|
|
self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])]
|
|
|
|
def clean_missing_rooms(self, ignore_step: bool = False):
|
|
"""
|
|
For the number of heated rooms and number of habitable rooms, we clean these values up front,
|
|
based on property archetype and age
|
|
|
|
TODO: We could use a model based impution approach for possibly more accurate cleaning
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
# TODO: to fill in
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
|
|
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(
|
|
lambda x: x.split(" ")[0]
|
|
)
|
|
|
|
def apply_clean(data, matching_columns):
|
|
|
|
cleaning_data = (
|
|
data[~pd.isnull(data[col])]
|
|
.groupby(matching_columns)[col]
|
|
.median()
|
|
.reset_index()
|
|
)
|
|
|
|
data = data.merge(
|
|
cleaning_data,
|
|
how="left",
|
|
on=matching_columns,
|
|
suffixes=("", "_CLEANING"),
|
|
)
|
|
|
|
data[col] = np.where(
|
|
pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]
|
|
)
|
|
data = data.drop(columns=f"{col}_CLEANING")
|
|
return data
|
|
|
|
for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]:
|
|
|
|
to_index = 3
|
|
matching_columns = [
|
|
"PROPERTY_TYPE",
|
|
"BUILT_FORM",
|
|
"CONSTRUCTION_AGE_BAND",
|
|
"POSTAL_AREA",
|
|
]
|
|
has_missings = pd.isnull(self.data[col]).sum()
|
|
while has_missings:
|
|
self.data = apply_clean(
|
|
data=self.data, matching_columns=matching_columns[0 : to_index + 1]
|
|
)
|
|
has_missings = pd.isnull(self.data[col]).sum()
|
|
|
|
if not has_missings or to_index == 0:
|
|
# Check if we've gotten to index 0 and still have missings - something has gone wrong or
|
|
# we have a very unique property type
|
|
if has_missings:
|
|
raise NotImplementedError(
|
|
"Handle this edge case, we still have missings for column %s"
|
|
% col
|
|
)
|
|
|
|
break
|
|
to_index -= 1
|
|
|
|
# def pre_process(self, filepath: Path | None = None) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
# """
|
|
# Load data and begin initial cleaning
|
|
# """
|
|
# if self.data is None:
|
|
# self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
|
|
|
|
# if not self.is_newdata:
|
|
# self.confine_data()
|
|
|
|
# self.remap_columns()
|
|
|
|
# # We have some non-standard construction age bands which we'll clean for matching
|
|
# if not self.is_newdata:
|
|
# self.standardise_construction_age_band()
|
|
# self.clean_missing_rooms()
|
|
|
|
# self.recast_df_columns(
|
|
# column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
|
|
# )
|
|
|
|
# if not self.is_newdata:
|
|
# self.clean_multi_glaze_proportion()
|
|
|
|
# self.clean_photo_supply()
|
|
|
|
# if not self.is_newdata:
|
|
# self.retain_multiple_epc_properties(
|
|
# epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
|
|
# )
|
|
|
|
# if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
|
|
# # If we have multiple EPC records, we can try and do filling
|
|
# self.fill_na_fields()
|
|
|
|
# if not self.is_newdata:
|
|
# self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
|
|
|
|
# # Final re-casting after data transformed and prepared
|
|
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
|
|
# COLUMNTYPES
|
|
# for k, v in coltypes.items():
|
|
# self.data[k] = self.data[k].astype(v)
|
|
# self.data = self.data.astype(coltypes)
|
|
|
|
# self.na_remapping()
|
|
|
|
# self.cleaning_averages = None
|
|
# if not self.is_newdata:
|
|
# # We have some odd cases with missing constituency so we fill
|
|
# self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
|
|
|
|
# self.cleaning_averages = self.make_cleaning_averages()
|
|
# # We apply averages cleaning to the data
|
|
# self.data = self.apply_averages_cleaning(
|
|
# data_to_clean=self.data,
|
|
# cleaning_data=self.cleaning_averages,
|
|
# cols_to_merge_on=COLUMNS_TO_MERGE_ON
|
|
# )
|
|
|
|
# self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
|
|
# self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
|
|
|
|
# self.data.columns = self.data.columns.str.lower()
|
|
|
|
# return self.data, self.cleaning_averages
|
|
|
|
def na_remapping(self, auto_subset_columns: bool = False):
|
|
|
|
fill_na_map_apply = (
|
|
{k: v for k, v in fill_na_map.items() if k in self.data.columns}
|
|
if auto_subset_columns
|
|
else fill_na_map
|
|
)
|
|
|
|
for column, fill_value in fill_na_map_apply.items():
|
|
self.data[column] = self.data[column].fillna(fill_value)
|
|
|
|
def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON):
|
|
"""
|
|
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
|
|
"""
|
|
# Each uprn can fille backward from recent and forward fill from oldest
|
|
# The groupby changes the order and we use the index to make the original data
|
|
|
|
filled_data = (
|
|
self.data.groupby("UPRN", group_keys=True)[columns_to_fill]
|
|
.apply(lambda group: group.fillna(method="bfill").fillna(method="ffill"))
|
|
.reset_index()
|
|
.set_index("level_1")
|
|
.sort_index()
|
|
)
|
|
|
|
self.data[columns_to_fill] = filled_data[columns_to_fill]
|
|
|
|
# For floor area, we also replace "" values with None
|
|
self.data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = self.data[
|
|
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
|
|
].replace("", None)
|
|
|
|
def make_cleaning_averages(self, ignore_step: bool = False) -> pd.DataFrame:
|
|
"""
|
|
Create a dataset to hold averages based on property type, built form, construction age, and rooms.
|
|
Not require in newdata mode
|
|
"""
|
|
|
|
if ignore_step:
|
|
return pd.DataFrame()
|
|
|
|
# Define a custom function to calculate the median, excluding missing values
|
|
def median_without_missing(group):
|
|
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
|
|
|
|
cleaning_averages = (
|
|
self.data.groupby(
|
|
[
|
|
"PROPERTY_TYPE",
|
|
"BUILT_FORM",
|
|
"CONSTRUCTION_AGE_BAND",
|
|
"NUMBER_HABITABLE_ROOMS",
|
|
"NUMBER_HEATED_ROOMS",
|
|
],
|
|
observed=True,
|
|
dropna=False,
|
|
)
|
|
.apply(median_without_missing)
|
|
.reset_index()
|
|
)
|
|
|
|
general_averages = (
|
|
self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True)
|
|
.apply(median_without_missing)
|
|
.reset_index()
|
|
)
|
|
|
|
property_averages = (
|
|
self.data.groupby(["PROPERTY_TYPE"], observed=True)
|
|
.apply(median_without_missing)
|
|
.reset_index()
|
|
)
|
|
|
|
built_form_averages = (
|
|
self.data.groupby(["BUILT_FORM"], observed=True)
|
|
.apply(median_without_missing)
|
|
.reset_index()
|
|
)
|
|
|
|
# We can clean up any NA's in the cleaning averages with the general averages here
|
|
cleaning_averages_filled = pd.merge(
|
|
cleaning_averages,
|
|
general_averages,
|
|
on=["PROPERTY_TYPE", "BUILT_FORM"],
|
|
suffixes=["", "_AVERAGE"],
|
|
)
|
|
cleaning_averages_filled = pd.merge(
|
|
cleaning_averages_filled,
|
|
property_averages,
|
|
on=["PROPERTY_TYPE"],
|
|
suffixes=["", "_PROPERTY_AVERAGE"],
|
|
)
|
|
cleaning_averages_filled = pd.merge(
|
|
cleaning_averages_filled,
|
|
built_form_averages,
|
|
on=["BUILT_FORM"],
|
|
suffixes=["", "_BUILT_FORM_AVERAGE"],
|
|
)
|
|
|
|
for variable in AVERAGE_FIXED_FEATURES:
|
|
# Replace any missing NAN values with averages for the same Property type and built form
|
|
cleaning_averages_filled[variable] = cleaning_averages_filled[
|
|
variable
|
|
].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"])
|
|
|
|
cleaning_averages_filled = cleaning_averages_filled.drop(
|
|
columns=f"{variable}_AVERAGE"
|
|
)
|
|
|
|
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
|
|
# and built form
|
|
# We can use just the property type average and replace
|
|
|
|
cleaning_averages_filled[variable] = cleaning_averages_filled[
|
|
variable
|
|
].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"])
|
|
|
|
cleaning_averages_filled = cleaning_averages_filled.drop(
|
|
columns=f"{variable}_PROPERTY_AVERAGE"
|
|
)
|
|
|
|
# If there are still NA values, use BUILT FORM averages
|
|
cleaning_averages_filled["variable"] = cleaning_averages_filled[
|
|
variable
|
|
].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"])
|
|
|
|
cleaning_averages_filled = cleaning_averages_filled.drop(
|
|
columns=f"{variable}_BUILT_FORM_AVERAGE"
|
|
)
|
|
|
|
# If there still is na values, use average across all epc in consituecy
|
|
cleaning_averages_filled[variable] = cleaning_averages_filled[
|
|
variable
|
|
].fillna(cleaning_averages_filled[variable].mean())
|
|
|
|
# If the consituency is all NA values, then take UK AVERAGE VALUES
|
|
# cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
|
|
# "TOTAL_FLOOR_AREA"
|
|
# ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
|
|
# cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
|
|
# "FLOOR_HEIGHT"
|
|
# ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
|
|
|
|
self.cleaning_averages = cleaning_averages_filled
|
|
|
|
def retain_multiple_epc_properties(
|
|
self, epc_minimum_count: int = 1, ignore_step: bool = False
|
|
) -> None:
|
|
"""
|
|
Reduce the data futher by keeping only datasets with multiple epcs
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
# TODO: to fill in
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
counts = self.data.groupby("UPRN").size().reset_index()
|
|
counts.columns = ["UPRN", "count"]
|
|
|
|
# take UPRNS with multiple EPCs
|
|
counts = counts[counts["count"] > epc_minimum_count]
|
|
self.data = pd.merge(self.data, counts, on="UPRN")
|
|
|
|
def recast_df_columns(
|
|
self, column_mappings: dict, auto_subset_columns: bool = False
|
|
) -> None:
|
|
"""
|
|
Recast columns from the dataframe to ensure the behaviour we want
|
|
"""
|
|
if auto_subset_columns:
|
|
column_mappings = {
|
|
k: v for k, v in column_mappings.items() if k in self.data.columns
|
|
}
|
|
|
|
for key, values in column_mappings.items():
|
|
if key not in self.data.columns:
|
|
raise ValueError("Column mapping incorrectly specified")
|
|
if isinstance(values, list):
|
|
for value in values:
|
|
self.data[key] = self.data[key].astype(value)
|
|
else:
|
|
self.data[key] = self.data[key].astype(values)
|
|
|
|
def recast_all_data(
|
|
self, column_mappings: dict, auto_subset_columns: bool = False
|
|
) -> None:
|
|
"""
|
|
Using a dictionary to recast all columns at once
|
|
"""
|
|
|
|
if auto_subset_columns:
|
|
column_mappings = {
|
|
k: v for k, v in column_mappings.items() if k in self.data.columns
|
|
}
|
|
|
|
self.data = self.data.astype(column_mappings)
|
|
|
|
def confine_data(self, ignore_step: bool = False):
|
|
"""
|
|
Include all step to reduce down the data based on assumptions
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
violation_uprn_missing = pd.isnull(self.data["UPRN"])
|
|
violation_old_lodgment_date = (
|
|
self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
|
|
)
|
|
# violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
|
|
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(
|
|
IGNORED_FLOOR_LEVELS
|
|
)
|
|
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
|
|
violation_missing_windows_description = pd.isnull(
|
|
self.data["WINDOWS_DESCRIPTION"]
|
|
)
|
|
violation_missing_hotwater_description = pd.isnull(
|
|
self.data["HOTWATER_DESCRIPTION"]
|
|
)
|
|
violation_missing_roof_description = pd.isnull(
|
|
self.data["ROOF_DESCRIPTION"]
|
|
)
|
|
violation_invalid_property_type = (
|
|
self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
|
|
)
|
|
violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES)
|
|
|
|
violation_df = pd.concat(
|
|
[
|
|
violation_uprn_missing,
|
|
violation_old_lodgment_date,
|
|
violation_invalid_transaction_type,
|
|
violation_ignored_floor_level,
|
|
violation_rdsap_score_above_max,
|
|
violation_missing_windows_description,
|
|
violation_missing_hotwater_description,
|
|
violation_missing_roof_description,
|
|
violation_invalid_property_type,
|
|
violation_invalid_tenure,
|
|
],
|
|
axis=1,
|
|
keys=[
|
|
"violation_uprn_missing",
|
|
"violation_old_lodgment_date",
|
|
"violation_invalid_transaction_type",
|
|
"violation_ignored_floor_level",
|
|
"violation_rdsap_score_above_max",
|
|
"violation_missing_windows_description",
|
|
"violation_missing_hotwater_description",
|
|
"violation_missing_roof_description",
|
|
"violation_invalid_property_type",
|
|
"violation_invalid_tenure",
|
|
],
|
|
)
|
|
|
|
self.data = pd.concat([self.data, violation_df], axis=1)
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
|
|
|
|
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
|
|
# before the introduction of SAP09
|
|
|
|
# Filter 3: We remove EPCS that were conducted for a new build, since these are performed with
|
|
# full SAP, which produces different results to the RdSAP methodology
|
|
|
|
# Filter 4: We remove floor level in top floor or mid floor since this is ambiguous
|
|
|
|
# Filter 5: Remove any EPCs with a SAP score above 100
|
|
|
|
# Filter 6: We found a small number of cases that have missing window description so we drop these
|
|
|
|
# Filter 7: We found a small number of cases that have missing hotwater description so we drop these
|
|
|
|
self.data = self.data[~pd.isnull(self.data["UPRN"])]
|
|
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
|
|
# self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
|
|
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)]
|
|
self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE]
|
|
|
|
# We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them
|
|
self.data = self.data[~pd.isnull(self.data["WINDOWS_DESCRIPTION"])]
|
|
self.data = self.data[~pd.isnull(self.data["HOTWATER_DESCRIPTION"])]
|
|
self.data = self.data[~pd.isnull(self.data["ROOF_DESCRIPTION"])]
|
|
|
|
# Because park homes are surveyed unusually (for example, we don't have u-values to
|
|
# look up for their different components, they need to be collected in survey and aren't reflected in
|
|
# EPCs) we'll ignore them from the model
|
|
self.data = self.data[self.data["PROPERTY_TYPE"] != IGNORED_PROPERTY_TYPES]
|
|
|
|
# We remove EPCs where the tenure is unknown, but is usually an indicator of a new build
|
|
self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)]
|
|
|
|
# We remap zero values to None
|
|
self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None
|
|
|
|
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
|
|
"""
|
|
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
|
|
"""
|
|
|
|
if self.violation_mode:
|
|
# TODO:
|
|
return
|
|
|
|
if ignore_step:
|
|
return
|
|
|
|
no_multi_glaze_proportion_index = pd.isnull(
|
|
self.data["MULTI_GLAZE_PROPORTION"]
|
|
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
|
|
|
|
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100
|
|
|
|
def clean_photo_supply(self) -> None:
|
|
"""
|
|
We fill photo supply with zeros where it's missing
|
|
"""
|
|
|
|
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
|
|
|
|
@staticmethod
|
|
def apply_averages_cleaning(
|
|
data_to_clean,
|
|
cleaning_data,
|
|
cols_to_merge_on,
|
|
colnames=None,
|
|
ignore_step: bool = False,
|
|
):
|
|
"""
|
|
Clean the input DataFrame using averages from a cleaning DataFrame.
|
|
|
|
:param data_to_clean: DataFrame to be cleaned.
|
|
:param cleaning_data: DataFrame containing data for cleaning.
|
|
:param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this
|
|
differs depending on where the function is being used.
|
|
:param colnames: If specified can be used to state exactly which columns to clean
|
|
:return: Cleaned DataFrame.
|
|
"""
|
|
|
|
if ignore_step:
|
|
return None
|
|
|
|
# The desired colnames to clean - which may not be present
|
|
if colnames is None:
|
|
colnames = [
|
|
"TOTAL_FLOOR_AREA",
|
|
"FLOOR_HEIGHT",
|
|
"FIXED_LIGHTING_OUTLETS_COUNT",
|
|
]
|
|
|
|
cols_to_clean = [c for c in colnames if c in data_to_clean.columns]
|
|
|
|
# Enforce data types
|
|
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
|
|
data_to_clean[col] = data_to_clean[col].astype(float)
|
|
|
|
# Identify columns with non-NaN values
|
|
columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist()
|
|
|
|
# Calculate averages
|
|
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg(
|
|
dict(
|
|
zip(
|
|
cols_to_clean,
|
|
[
|
|
"mean",
|
|
]
|
|
* len(cols_to_clean),
|
|
)
|
|
)
|
|
)
|
|
|
|
# Merge with the original data
|
|
data_to_clean = pd.merge(
|
|
data_to_clean,
|
|
cleaning_averages_to_merge,
|
|
on=columns_to_merge_on,
|
|
suffixes=("", "_AVERAGE"),
|
|
how="left",
|
|
)
|
|
|
|
global_averages = cleaning_data[cols_to_clean].mean()
|
|
|
|
# Fill NaN values with averages
|
|
for col in cols_to_clean:
|
|
data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True)
|
|
data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True)
|
|
# If we still have missings
|
|
data_to_clean[col].fillna(data_to_clean[col].mean(), inplace=True)
|
|
# Final step if we still have missings - use global mean
|
|
data_to_clean[col].fillna(global_averages[col], inplace=True)
|
|
|
|
return data_to_clean
|
|
|
|
def get_component_features(self, suffix: str) -> pd.DataFrame:
|
|
"""
|
|
This function will return the property components such as the walls, roof, heating etc
|
|
as well as lodgement date. These are features that we expect might change from one EPC to the
|
|
next
|
|
:param suffix: Should be one of "_STARTING" or "_ENDING"
|
|
:return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES
|
|
"""
|
|
|
|
if suffix not in ["_starting", "_ending"]:
|
|
raise Exception("Suffix should be one of _starting or _ending")
|
|
|
|
if suffix == "_STARTING":
|
|
starting_cols = (
|
|
self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
|
|
.copy()
|
|
.add_suffix(suffix)
|
|
)
|
|
fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy()
|
|
|
|
return pd.concat([starting_cols, fixed_cols], axis=1)
|
|
|
|
return (
|
|
self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
|
|
.copy()
|
|
.add_suffix(suffix)
|
|
)
|
|
|
|
def get_fixed_features(self) -> pd.DataFrame:
|
|
"""
|
|
Returns the fixed features that we don't believe should vary from one EPC to the next
|
|
:return: Pandas dataframe containing the columns defined in FIXED_FEATURES
|
|
"""
|
|
return self.data[FIXED_FEATURES]
|
|
|
|
@staticmethod
|
|
def coerce_boolean_columns(df: pd.DataFrame, cols_to_ignore: List | None = None):
|
|
"""
|
|
Coerce columns with string 'True'/'False' values to boolean columns.
|
|
|
|
:param df: Input DataFrame.
|
|
:param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids
|
|
:return: DataFrame with coerced columns.
|
|
"""
|
|
object_columns = df.select_dtypes(include=["object"]).columns
|
|
if cols_to_ignore:
|
|
object_columns = [c for c in object_columns if c not in cols_to_ignore]
|
|
|
|
for column in object_columns:
|
|
unique_values = df[column].dropna().unique()
|
|
# If the unique values in the column are 'True' and 'False', convert the column to boolean
|
|
if set(unique_values) == {"True", "False"} or set(unique_values) == {
|
|
True,
|
|
False,
|
|
}:
|
|
df[column] = df[column].astype(bool)
|
|
|
|
return df
|
|
|
|
@staticmethod
|
|
def calculate_days_to(lodgement_date):
|
|
|
|
if isinstance(lodgement_date, str):
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).days
|
|
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).dt.days
|
|
|
|
@staticmethod
|
|
def clean_missings_after_description_process(df, ignore_cols=None):
|
|
missings = pd.isnull(df).sum()
|
|
missings = missings[missings > 0]
|
|
|
|
if ignore_cols:
|
|
missings = missings[~missings.index.isin(ignore_cols)]
|
|
|
|
for col in missings.index:
|
|
unique_values = df[col].unique()
|
|
# TODO: confirm this behaviour
|
|
if True in unique_values or False in unique_values:
|
|
df[col] = df[col].fillna(False)
|
|
if "none" in unique_values:
|
|
df[col] = df[col].fillna("none")
|
|
else:
|
|
df[col] = df[col].fillna("Unknown")
|
|
|
|
return df
|
|
|
|
@staticmethod
|
|
def clean_efficiency_variables(df):
|
|
"""
|
|
These is scope to clean this by the model per corresponding description.
|
|
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
|
|
fill in the missing values with this.
|
|
When looking at this initially, there are a large volume of records with missing energy efficiency
|
|
values and therefore a simpler approach was taken just to test including these variables
|
|
:param df:
|
|
:return:
|
|
"""
|
|
|
|
missings = pd.isnull(df).sum()
|
|
missings = missings[missings >= 1]
|
|
|
|
if len(missings) == 0:
|
|
return df
|
|
|
|
# Make sure they are all efficiency columns
|
|
if any(~missings.index.str.contains("ENERGY_EFF")):
|
|
raise ValueError("Non efficiency columns are missing")
|
|
|
|
for m in missings.index:
|
|
df[m] = df[m].fillna("NO_RATING")
|
|
|
|
return df
|