add new builds

This commit is contained in:
Michael Duong 2024-05-28 17:39:07 +01:00
parent 270a682202
commit a3609ee055

View file

@ -5,7 +5,7 @@ from BaseUtility import Definitions
from etl.epc.settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
IGNORED_TRANSACTION_TYPES,
# IGNORED_TRANSACTION_TYPES,
IGNORED_FLOOR_LEVELS,
IGNORED_PROPERTY_TYPES,
IGNORED_TENURES,
@ -56,8 +56,11 @@ construction_age_remap = {
expanded_map = {
i: [
label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
][0] for i in range(0, 3001)
label
for label, bounds in construction_age_bounds_map.items()
if (i <= bounds["u"]) and (i >= bounds["l"])
][0]
for i in range(0, 3001)
}
@ -74,8 +77,13 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(
self,
data: pd.DataFrame | None = None,
cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training",
violation_mode: bool = False,
) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
@ -86,7 +94,9 @@ class EPCDataProcessor:
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
self.cleaning_averages: pd.DataFrame = (
cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
)
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
@ -103,7 +113,9 @@ class EPCDataProcessor:
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.load_data(
filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]
)
if len(self.data) == 0:
raise Exception("No data to process - check filepath/ data being passed in")
@ -121,7 +133,8 @@ class EPCDataProcessor:
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"],
ignore_step=ignore_step,
)
self.fill_na_fields()
@ -188,7 +201,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[
0
]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
"""
@ -201,7 +216,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
self.data = self.data.fillna(
{"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}
)
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
"""
@ -301,7 +318,7 @@ class EPCDataProcessor:
"""
if self.violation_mode:
# TODO: to fill in
# TODO: to fill in
return
if ignore_step:
@ -311,9 +328,7 @@ class EPCDataProcessor:
lambda x: self.clean_construction_age_band(x)
)
self.data = self.data[
~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])
]
self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])]
def clean_missing_rooms(self, ignore_step: bool = False):
"""
@ -331,31 +346,45 @@ class EPCDataProcessor:
return
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(
lambda x: x.split(" ")[0]
)
def apply_clean(data, matching_columns):
cleaning_data = data[~pd.isnull(data[col])].groupby(
matching_columns
)[col].median().reset_index()
data = data.merge(
cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING")
cleaning_data = (
data[~pd.isnull(data[col])]
.groupby(matching_columns)[col]
.median()
.reset_index()
)
data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col])
data = data.merge(
cleaning_data,
how="left",
on=matching_columns,
suffixes=("", "_CLEANING"),
)
data[col] = np.where(
pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]
)
data = data.drop(columns=f"{col}_CLEANING")
return data
for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]:
to_index = 3
matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"]
matching_columns = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"POSTAL_AREA",
]
has_missings = pd.isnull(self.data[col]).sum()
while has_missings:
self.data = apply_clean(
data=self.data,
matching_columns=matching_columns[0:to_index + 1]
data=self.data, matching_columns=matching_columns[0 : to_index + 1]
)
has_missings = pd.isnull(self.data[col]).sum()
@ -363,7 +392,10 @@ class EPCDataProcessor:
# Check if we've gotten to index 0 and still have missings - something has gone wrong or
# we have a very unique property type
if has_missings:
raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col)
raise NotImplementedError(
"Handle this edge case, we still have missings for column %s"
% col
)
break
to_index -= 1
@ -410,7 +442,7 @@ class EPCDataProcessor:
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
# COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
# self.na_remapping()
@ -437,9 +469,11 @@ class EPCDataProcessor:
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if auto_subset_columns else fill_na_map
fill_na_map_apply = (
{k: v for k, v in fill_na_map.items() if k in self.data.columns}
if auto_subset_columns
else fill_na_map
)
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -535,28 +569,34 @@ class EPCDataProcessor:
for variable in AVERAGE_FIXED_FEATURES:
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_AVERAGE"
)
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_PROPERTY_AVERAGE"
)
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]
)
cleaning_averages_filled["variable"] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_BUILT_FORM_AVERAGE"
)
# If there still is na values, use average across all epc in consituecy
cleaning_averages_filled[variable] = cleaning_averages_filled[
@ -573,7 +613,9 @@ class EPCDataProcessor:
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
def retain_multiple_epc_properties(
self, epc_minimum_count: int = 1, ignore_step: bool = False
) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
"""
@ -592,12 +634,16 @@ class EPCDataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_df_columns(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
for key, values in column_mappings.items():
if key not in self.data.columns:
@ -608,13 +654,17 @@ class EPCDataProcessor:
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_all_data(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Using a dictionary to recast all columns at once
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
self.data = self.data.astype(column_mappings)
@ -625,14 +675,26 @@ class EPCDataProcessor:
if self.violation_mode:
violation_uprn_missing = pd.isnull(self.data["UPRN"])
violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
violation_old_lodgment_date = (
self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
)
# violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(
IGNORED_FLOOR_LEVELS
)
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"])
violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"])
violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"])
violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
violation_missing_windows_description = pd.isnull(
self.data["WINDOWS_DESCRIPTION"]
)
violation_missing_hotwater_description = pd.isnull(
self.data["HOTWATER_DESCRIPTION"]
)
violation_missing_roof_description = pd.isnull(
self.data["ROOF_DESCRIPTION"]
)
violation_invalid_property_type = (
self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
)
violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES)
violation_df = pd.concat(
@ -647,7 +709,8 @@ class EPCDataProcessor:
violation_missing_roof_description,
violation_invalid_property_type,
violation_invalid_tenure,
], axis=1,
],
axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
@ -658,8 +721,8 @@ class EPCDataProcessor:
"violation_missing_hotwater_description",
"violation_missing_roof_description",
"violation_invalid_property_type",
"violation_invalid_tenure"
]
"violation_invalid_tenure",
],
)
self.data = pd.concat([self.data, violation_df], axis=1)
@ -685,10 +748,8 @@ class EPCDataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
]
# self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)]
self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE]
# We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them
@ -705,7 +766,7 @@ class EPCDataProcessor:
self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)]
# We remap zero values to None
self.data.loc[self.data['FLOOR_HEIGHT'] == 0, 'FLOOR_HEIGHT'] = None
self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
@ -734,7 +795,11 @@ class EPCDataProcessor:
@staticmethod
def apply_averages_cleaning(
data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False
data_to_clean,
cleaning_data,
cols_to_merge_on,
colnames=None,
ignore_step: bool = False,
):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
@ -752,12 +817,13 @@ class EPCDataProcessor:
# The desired colnames to clean - which may not be present
if colnames is None:
colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"]
colnames = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
cols_to_clean = [
c for c in colnames if
c in data_to_clean.columns
]
cols_to_clean = [c for c in colnames if c in data_to_clean.columns]
# Enforce data types
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
@ -768,7 +834,15 @@ class EPCDataProcessor:
# Calculate averages
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg(
dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean)))
dict(
zip(
cols_to_clean,
[
"mean",
]
* len(cols_to_clean),
)
)
)
# Merge with the original data
@ -777,7 +851,7 @@ class EPCDataProcessor:
cleaning_averages_to_merge,
on=columns_to_merge_on,
suffixes=("", "_AVERAGE"),
how='left'
how="left",
)
global_averages = cleaning_data[cols_to_clean].mean()
@ -806,14 +880,20 @@ class EPCDataProcessor:
raise Exception("Suffix should be one of _starting or _ending")
if suffix == "_STARTING":
starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix)
starting_cols = (
self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy()
return pd.concat([starting_cols, fixed_cols], axis=1)
return self.data[
ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES
].copy().add_suffix(suffix)
return (
self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
def get_fixed_features(self) -> pd.DataFrame:
"""
@ -831,14 +911,17 @@ class EPCDataProcessor:
:param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids
:return: DataFrame with coerced columns.
"""
object_columns = df.select_dtypes(include=['object']).columns
object_columns = df.select_dtypes(include=["object"]).columns
if cols_to_ignore:
object_columns = [c for c in object_columns if c not in cols_to_ignore]
for column in object_columns:
unique_values = df[column].dropna().unique()
# If the unique values in the column are 'True' and 'False', convert the column to boolean
if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}:
if set(unique_values) == {"True", "False"} or set(unique_values) == {
True,
False,
}:
df[column] = df[column].astype(bool)
return df
@ -877,7 +960,6 @@ class EPCDataProcessor:
@staticmethod
def clean_efficiency_variables(df):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and