add ignore step to data processor

This commit is contained in:
Michael Duong 2023-12-20 14:46:12 +00:00
parent 8674dc415f
commit a35c3facca
6 changed files with 813 additions and 304 deletions

View file

@ -28,7 +28,7 @@ from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
from typing import List
# These lookups are used to clean the construction age band
bounds_map = {
construction_age_bounds_map = {
"England and Wales: before 1900": {"l": 0, "u": 1899},
"England and Wales: 1930-1949": {"l": 1930, "u": 1949},
"England and Wales: 1900-1929": {"l": 1900, "u": 1929},
@ -43,13 +43,13 @@ bounds_map = {
"England and Wales: 2012 onwards": {"l": 2012, "u": 3000},
}
remap = {
construction_age_remap = {
"England and Wales: 2007 onwards": "England and Wales: 2007-2011"
}
expanded_map = {
i: [
label for label, bounds in bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
][0] for i in range(0, 3001)
}
@ -67,154 +67,182 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
training_pipeline = {
"confine_data": {
"function": "confine_data",
"args": [],
"kwargs": {},
},
"remap_columns": {
"function": "remap_columns",
"args": [],
"kwargs": {},
},
"standardise_construction_age_band": {
"function": "standardise_construction_age_band",
"args": [],
"kwargs": {},
},
"clean_missing_rooms": {
"function": "clean_missing_rooms",
"args": [],
"kwargs": {},
},
"recast_df_columns": {
"function": "recast_df_columns",
"args": [],
"kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]},
},
"clean_multi_glaze_proportion": {
"function": "clean_multi_glaze_proportion",
"args": [],
"kwargs": {},
},
"clean_photo_supply": {
"function": "clean_photo_supply",
"args": [],
"kwargs": {},
},
"retain_multiple_epc_properties": {
"function": "retain_multiple_epc_properties",
"args": [],
"kwargs": {"epc_minimum_count": DATA_PROCESSOR_SETTINGS["epc_minimum_count"]},
},
"fill_na_fields": {
"function": "fill_na_fields",
"args": [],
"kwargs": {"columns_to_fill": COLUMNS_TO_MERGE_ON},
},
"cleaning_averages": {
"function": "make_cleaning_averages",
"args": [],
"kwargs": {},
},
"apply_averages_cleaning": {
"function": "apply_averages_cleaning",
"args": [],
"kwargs": {
"data_to_clean": "data",
"cleaning_data": "cleaning_averages",
"cols_to_merge_on": COLUMNS_TO_MERGE_ON,
},
},
"na_remapping": {
"function": "na_remapping",
"args": [],
"kwargs": {},
},
}
newdata_pipeline = {
"remap_columns": {
"function": "remap_columns",
"args": [],
"kwargs": {},
},
"recast_df_columns": {
"function": "recast_df_columns",
"args": [],
"kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]},
},
"clean_multi_glaze_proportion": {
"function": "clean_multi_glaze_proportion",
"args": [],
"kwargs": {},
},
"clean_photo_supply": {
"function": "clean_photo_supply",
"args": [],
"kwargs": {},
},
"na_remapping": {
"function": "na_remapping",
"args": [],
"kwargs": {},
},
}
def __init__(self, filepath: Path | None, is_newdata: bool = False) -> None:
def __init__(self, data: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.data : pd.DataFrame
self.cleaning_averages : pd.DataFrame
self.data : pd.DataFrame = data if data else pd.DataFrame()
self.cleaning_averages : pd.DataFrame = pd.DataFrame()
self.filepath = filepath
self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training")
self.is_newdata = is_newdata
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
def pipeline_factory(self, pipeline_type: str) -> dict:
"""
Determine which dataclient to use
"""
pipelines = {
"training": self.training_pipeline,
"newdata": self.newdata_pipeline,
}
if pipeline_type not in pipelines:
raise ValueError("Pipeline type specified is not in factory")
return pipelines[pipeline_type]
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
if run_mode not in ["training", "newdata"]:
raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata"
def pre_process_pipeline(self) -> None:
def prepare_data(self, filepath: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
For the pipeline_steps, we apply each function in turn
This will alter self.data inplace
Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata
"""
for step in self.pipeline_steps:
step_function = getattr(self, self.pipeline_steps[step]["function"])
step_args = self.pipeline_steps[step]["args"]
step_kwargs = self.pipeline_steps[step]["kwargs"]
ignore_step = True if self.run_mode == "newdata" else False
if step_args:
step_function(*step_args, **step_kwargs)
else:
step_function(**step_kwargs)
if self.data is None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data(ignore_step=ignore_step)
self.remap_anomalies()
self.remap_floor_level(ignore_step=ignore_step)
self.remap_build_form()
self.cast_data_column_values_to_lower()
self.standardise_construction_age_band(ignore_step=ignore_step)
self.clean_missing_rooms(ignore_step=ignore_step)
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step
)
self.fill_na_fields()
self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step)
# Final re-casting after data transformed and prepared
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.na_remapping(auto_subset_columns=True)
self.fill_invalid_constituency_fields(ignore_step=ignore_step)
self.cleaning_averages = self.make_cleaning_averages(ignore_step=ignore_step)
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
ignore_step=ignore_step
)
self.data = self.data if cleaned_data is None else cleaned_data
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
self.cast_data_columns_to_lower()
return self.data, self.cleaning_averages
def cast_data_columns_to_lower(self):
"""
Convert all columns names to lower
"""
self.data.columns = self.data.columns.str.lower()
def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False):
"""
Convert all column names to lower
No need in newdata mode
"""
if ignore_step:
return
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
def add_local_authority_to_cleaning_average(self, ignore_step: bool = False):
"""
Add the Local authority column to the cleaning averages
No need in newdata mode
"""
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
"""
For some weird cases, where data has missing constituency, we add a dummy value
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
"""
Order data by uprn and lodgement data
No Violation mode needed
"""
if ignore_step:
return
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
def cast_data_column_values_to_lower(self):
"""
For given columns, cast values to lower
No Violation mode or newdata modes required
"""
convert_to_lower = ["TRANSACTION_TYPE"]
for col in convert_to_lower:
self.data[col] = self.data[col].str.lower()
def load_data(self, low_memory=False) -> None:
if not self.filepath:
def remap_build_form(self):
"""
Remap build form to standard values
No Violation mode or newdata modes required
"""
self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
def remap_anomalies(self):
"""
Remap anomalies to None
No Violation mode or newdata modes required
"""
# Map all anomaly values to None
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
self.data = data
def remap_floor_level(self, ignore_step: bool = False):
"""
Remap floor level to standard values
"""
if self.violation_mode:
# TODO: We need to handle this case
return
if ignore_step:
return
self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
def load_data(self, filepath, low_memory=False) -> None:
if not filepath:
raise ValueError("No filepath specified")
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
self.data = pd.read_csv(filepath, low_memory=low_memory)
def insert_data(self, data: pd.DataFrame) -> None:
self.data = data
@ -226,11 +254,11 @@ class EPCDataProcessor:
return x
# Next, we check if it's a value in our map
if bounds_map.get(x):
if construction_age_bounds_map.get(x):
return x
# We check if it's a standard remap value
remap_value = remap.get(x, None)
remap_value = construction_age_remap.get(x, None)
if remap_value:
return remap_value
@ -241,12 +269,19 @@ class EPCDataProcessor:
raise NotImplementedError("Not handled the case for value %s" % x)
def standardise_construction_age_band(self):
def standardise_construction_age_band(self, ignore_step: bool = False):
"""
This function will tidy up some of the non-standard values that are populated in the construction age
band, which is useful for cleaning
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply(
lambda x: self.clean_construction_age_band(x)
)
@ -255,7 +290,7 @@ class EPCDataProcessor:
~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])
]
def clean_missing_rooms(self):
def clean_missing_rooms(self, ignore_step: bool = False):
"""
For the number of heated rooms and number of habitable rooms, we clean these values up front,
based on property archetype and age
@ -263,6 +298,13 @@ class EPCDataProcessor:
TODO: We could use a model based impution approach for possibly more accurate cleaning
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
@ -301,75 +343,78 @@ class EPCDataProcessor:
break
to_index -= 1
def pre_process(self):
"""
Load data and begin initial cleaning
"""
if self.data is None:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
# def pre_process(self, filepath: Path | None = None) -> tuple[pd.DataFrame, pd.DataFrame]:
# """
# Load data and begin initial cleaning
# """
# if self.data is None:
# self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if not self.is_newdata:
self.confine_data()
# if not self.is_newdata:
# self.confine_data()
self.remap_columns()
# self.remap_columns()
# We have some non-standard construction age bands which we'll clean for matching
if not self.is_newdata:
self.standardise_construction_age_band()
self.clean_missing_rooms()
# # We have some non-standard construction age bands which we'll clean for matching
# if not self.is_newdata:
# self.standardise_construction_age_band()
# self.clean_missing_rooms()
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
# self.recast_df_columns(
# column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
# )
if not self.is_newdata:
self.clean_multi_glaze_proportion()
# if not self.is_newdata:
# self.clean_multi_glaze_proportion()
self.clean_photo_supply()
# self.clean_photo_supply()
if not self.is_newdata:
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
# if not self.is_newdata:
# self.retain_multiple_epc_properties(
# epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
# )
if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
# if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# # If we have multiple EPC records, we can try and do filling
# self.fill_na_fields()
if not self.is_newdata:
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# if not self.is_newdata:
# self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Final re-casting after data transformed and prepared
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
for k, v in coltypes.items():
self.data[k] = self.data[k].astype(v)
self.data = self.data.astype(coltypes)
# # Final re-casting after data transformed and prepared
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
self.na_remapping()
# self.na_remapping()
if not self.is_newdata:
# We have some odd cases with missing constituency so we fill
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
# self.cleaning_averages = None
# if not self.is_newdata:
# # We have some odd cases with missing constituency so we fill
# self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
self.cleaning_averages = self.make_cleaning_averages()
# We apply averages cleaning to the data
self.data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
# self.cleaning_averages = self.make_cleaning_averages()
# # We apply averages cleaning to the data
# self.data = self.apply_averages_cleaning(
# data_to_clean=self.data,
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=COLUMNS_TO_MERGE_ON
# )
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
# self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
# self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
self.data.columns = self.data.columns.str.lower()
# self.data.columns = self.data.columns.str.lower()
# return self.data, self.cleaning_averages
def na_remapping(self):
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if self.is_newdata else fill_na_map
} if auto_subset_columns else fill_na_map
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -396,35 +441,15 @@ class EPCDataProcessor:
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
def remap_columns(self):
def make_cleaning_averages(self, ignore_step: bool = False) -> pd.DataFrame:
"""
Remap all columns, for any non values
Create a dataset to hold averages based on property type, built form, construction age, and rooms.
Not require in newdata mode
"""
# Map all anomaly values to None
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
if ignore_step:
return pd.DataFrame()
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
if not self.is_newdata:
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
convert_to_lower = ["TRANSACTION_TYPE"]
for col in convert_to_lower:
data[col] = data[col].str.lower()
self.data = data
def make_cleaning_averages(self) -> pd.DataFrame:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
@ -523,11 +548,18 @@ class EPCDataProcessor:
return cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
@ -535,22 +567,67 @@ class EPCDataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict) -> None:
def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
for key, values in column_mappings.items():
if key not in self.data.columns:
raise ValueError("Column mapping incorrectly specified")
for value in values:
self.data[key] = self.data[key].astype(value)
self.data = self.data.astype(column_mappings)
def confine_data(self) -> None:
def confine_data(self, ignore_step: bool = False):
"""
Include all step to reduce down the data based on assumptions
"""
if self.violation_mode:
violation_uprn_missing = pd.isnull(self.data["UPRN"])
violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"])
violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"])
violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"])
violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
violation_df = pd.concat(
[
violation_uprn_missing,
violation_old_lodgment_date,
violation_invalid_transaction_type,
violation_ignored_floor_level,
violation_rdsap_score_above_max,
violation_missing_windows_description,
violation_missing_hotwater_description,
violation_missing_roof_description,
violation_invalid_property_type,
], axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
"violation_invalid_transaction_type",
"violation_ignored_floor_level",
"violation_rdsap_score_above_max",
"violation_missing_windows_description",
"violation_missing_hotwater_description",
"violation_missing_roof_description",
"violation_invalid_property_type",
]
)
self.data = pd.concat([self.data, violation_df], axis=1)
if ignore_step:
return
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
@ -585,14 +662,22 @@ class EPCDataProcessor:
# EPCs) we'll ignore them from the model
self.data = self.data[self.data["PROPERTY_TYPE"] != IGNORED_PROPERTY_TYPES]
def clean_multi_glaze_proportion(self) -> None:
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
if self.violation_mode:
# TODO:
return
if ignore_step:
return
no_multi_glaze_proportion_index = pd.isnull(
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100
def clean_photo_supply(self) -> None:
@ -603,7 +688,7 @@ class EPCDataProcessor:
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None):
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
@ -615,6 +700,9 @@ class EPCDataProcessor:
:return: Cleaned DataFrame.
"""
if ignore_step:
return None
# The desired colnames to clean - which may not be present
if colnames is None:
colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"]
@ -726,6 +814,7 @@ class EPCDataProcessor:
for col in missings.index:
unique_values = df[col].unique()
# TODO: confirm this behaviour
if True in unique_values or False in unique_values:
df[col] = df[col].fillna(False)
if "none" in unique_values:

View file

@ -1,9 +1,16 @@
import numpy as np
import pandas as pd
from typing import List
from etl.epc.Record import EPCDifferenceRecord
from ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.settings import EARLIEST_EPC_DATE
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
)
class BaseDataset:
"""
@ -39,20 +46,199 @@ class TrainingDataset(BaseDataset):
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
# TODO: replace these with the pipeline steps
self._feature_generation()
self._drop_features()
# self._clean_dataframe()
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
self._expand_description_to_features(cleaned_lookup)
self._adjust_assumed_values_in_wall_descriptions()
self._generate_u_values_from_features()
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# need to
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
self._remove_abnormal_change_in_floor_area()
self._ensure_numeric()
def _remove_abnormal_change_in_floor_area(self):
"""
Remove properties where the change in floor area is greater than 100%
"""
self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"])
self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"]
self.df = self.df[self.df["tfa_diff_prop"] < 0.5]
self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
def _ensure_numeric(self):
"""
Ensure that all columns are numeric
"""
# TODO: move into EPCRecord record
uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
@staticmethod
def _lambda_function_to_generate_roof_uvalue(row, is_end=False):
"""
Using the apply method, use the get_roof_u_value method to generate the u-value
"""
col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending"
if row["has_dwelling_above"]:
if row["roof_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for roof")
if row["roof_thermal_transmittance_ending"] != 0:
raise ValueError("Should have 0 u-value for roof")
return get_roof_u_value(
insulation_thickness=row[col_name],
has_dwelling_above=row["has_dwelling_above"],
is_loft=row["is_loft"],
is_roof_room=row["is_roof_room"],
is_thatched=row["is_thatched"],
is_flat=row["is_flat"],
is_pitched=row["is_pitched"],
is_at_rafters=row["is_at_rafters"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
@staticmethod
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
return get_wall_u_value(
clean_description=row[col_name],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
)
@staticmethod
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
"""
Using the apply method, use the get_floor_u_value method to generate the u-value
"""
floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending"
if row["another_property_below"]:
if row["floor_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for floor")
if row["floor_thermal_transmittance_ending"] != 0:
raise ValueError("Should have 0 u-value for floor")
return 0
else:
uvalue = row[floor_thermal_col_name]
if pd.isnull(uvalue):
insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending"
floor_area_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending"
perimeter_col_name = "total_floor_area_starting" if not is_end else "total_floor_area_ending"
uvalue = get_floor_u_value(
floor_type=row["floor_type"],
perimeter=row[floor_area_col_name],
area=row[perimeter_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
return uvalue
def _generate_u_values_from_features(self):
"""
Generate u-values from the features
"""
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
walls_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row),
axis=1
)
walls_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True),
axis=1
)
walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue)
walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df["walls_clean_description_ending"]
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[walls_starting_equals_ending_flag]
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
roof_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row),
axis=1
)
roof_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True),
axis=1
)
roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue)
roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
self.df['estimated_perimeter_starting'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_starting"], row["number_habitable_rooms"]),
axis=1
)
self.df['estimated_perimeter_ending'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_ending"], row["number_habitable_rooms"]),
axis=1
)
self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"})
self.df["wall_type"] = self.df.apply(
lambda row: get_wall_type(
is_cavity_wall=row["is_cavity_wall"],
is_solid_brick=row["is_solid_brick"],
is_timber_frame=row["is_timber_frame"],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_cob=row["is_cob"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
is_system_built=row["is_system_built"],
is_park_home=row["is_park_home"]
),
axis=1
)
floor_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row),
axis=1
)
floor_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True),
axis=1
)
floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue)
floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(f"{component}_starting_uvalue")
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(f"{component}_ending_uvalue")
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"])
def _adjust_assumed_values_in_wall_descriptions(self):

270
etl/epc/Pipeline.py Normal file
View file

@ -0,0 +1,270 @@
import msgpack
import pandas as pd
from typing import List
from pathlib import Path
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from etl.epc.Dataset import TrainingDataset
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
)
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
def get_cleaned_description_mapping():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
- Load the data
- Pre-process the data
- Create a dataset
- Clean the dataset
- Store the dataset
"""
def __init__(
self,
directories: List[Path],
epc_data_processor: EPCDataProcessor,
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet",
epc_compiled_dataset_key="sap_change_model/dataset.parquet"
):
"""
:param directories: List of directories to process
:param epc_data_processor: EPCDataProcessor object
:param run_mode: Either training or newdata
:param epc_local_file: Local file name of the EPC data
:param epc_bucket_name: S3 bucket name
:param epc_cleaning_dataset_key: S3 key for the cleaning dataset
:param epc_all_equal_rows_key: S3 key for the all equal rows dataset
:param epc_compiled_dataset_key: S3 key for the compiled dataset
"""
self.compiled_dataset: pd.DataFrame = pd.DataFrame()
self.compiled_all_equal_rows: list = []
self.compiled_cleaning_averages: list = []
self.directories = directories
self.epc_data_processor = epc_data_processor
self.run_mode = run_mode
self.epc_local_file = epc_local_file
self.epc_bucket_name = epc_bucket_name
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key
self.epc_all_equal_rows_key = epc_all_equal_rows_key
self.epc_compiled_dataset_key = epc_compiled_dataset_key
def run(self):
"""
Entrypoint to run the pipeline
"""
for directory in self.directories:
self.process_directory(directory)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
bucket_name=self.epc_bucket_name,
file_key=self.epc_compiled_dataset_key,
)
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_all_equal_rows),
bucket_name=self.epc_bucket_name,
file_key=self.epc_all_equal_rows_key,
)
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_cleaning_averages),
bucket_name=self.epc_bucket_name,
file_key=self.epc_cleaning_dataset_key,
)
def process_directory(self, directory: Path):
"""
Process a single directory
:param directory:
:return:
"""
filepath = directory / self.epc_local_file
self.epc_data_processor.pre_process(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages)
constituency_difference_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(uprn=str(uprn), property_data=property_data, directory=directory)
if difference_records is not None:
constituency_difference_records.extend(difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=get_cleaned_description_mapping())
self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df])
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
"""
Process a single UPRN, which may have multiple different EPCs
:param uprn: UPRN
:param property_data: pd.DataFrame, Data for a single UPRN
:param directory: Path, Directory of the UPRN
:return:
"""
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
return None
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records = self._generate_property_difference_records(epc_records, uprn, directory, fixed_data)
return property_difference_records
def _generate_property_difference_records(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
- Second vs third
- First vs third
:param epc_records:
:return:
"""
property_difference_records: list = []
property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
# property_difference_records = self._compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
return property_difference_records
def _compare_all_permutation_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
for idx2 in range(idx + 1, len(epc_records)):
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx2]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records
def _compare_consecutive_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx + 1]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records

View file

@ -68,15 +68,12 @@ class EPCRecord:
energy_consumption_current : int
co2_emissions_current : float
# TODO: lower case and underscore
walls = None
floor = None
roof = None
u_values_walls = None
u_values_roof = None
u_values_floor = None
run_mode: str = "training"
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
@ -85,13 +82,20 @@ class EPCRecord:
# self._field_validation()
self._clean_records()
self._expand_description_to_uvalues()
if self.run_mode == "newdata":
self._expand_description_to_features()
self._expand_description_to_uvalues()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
def _expand_description_to_features(self):
pass
def _expand_description_to_uvalues(self):
# TODO: can be loop over all the descriptions, or done in one
pass
@ -270,7 +274,6 @@ class EPCDifferenceRecord:
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration
self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration

View file

@ -240,10 +240,10 @@ def make_uvalues(df):
uvalues = []
# TODO: iterrows is the slowest way to do this, we should use a vectorised approach or itertuples
for index_no, x in df.iterrows():
uprn = x["UPRN"]
uprn = x["uprn"]
row_index = x["row_index"]
age_band = england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]]
age_band = england_wales_age_band_lookup[x["construction_age_band"]]
# ~~~~~~~~~~~~~~~~~~
# Walls
@ -258,11 +258,11 @@ def make_uvalues(df):
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
ending_wall_uvalue = x["walls_thermal_transmittance_ENDING"]
ending_wall_uvalue = x["walls_thermal_transmittance_ending"]
if pd.isnull(ending_wall_uvalue):
if x["walls_clean_description"] != x["walls_clean_description_ENDING"]:
if x["walls_clean_description"] != x["walls_clean_description_ending"]:
ending_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description_ENDING"],
clean_description=x["walls_clean_description_ending"],
age_band=age_band,
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
@ -413,58 +413,6 @@ def make_uvalues(df):
# if all_equal:
# return True
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
- Load the data
- Pre-process the data
- Create a dataset
- Clean the dataset
- Store the dataset
"""
def __init__(self, directories: List[Path]):
self.directories = directories
self.dataset = []
self.cleaning_dataset = []
self.all_equal_rows = []
def load_data(self):
"""
Load the data from the directories
:return:
"""
for directory in self.directories:
filepath = directory / "certificates.csv"
data_processor = DataProcessor(filepath=filepath)
data_processor.pre_process()
self.dataset.append(data_processor.data)
def create_dataset(self):
"""
Create a dataset from the data
:return:
"""
pass
def clean_dataset(self):
"""
Clean the dataset
:return:
"""
pass
def store_dataset(self):
"""
Store the dataset
:return:
"""
pass
def generate_property_difference_records(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, all_equal_rows: list):
"""
We can use multiple types of comparison datasets, for example:
@ -604,7 +552,9 @@ def app():
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')]
epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records, all_equal_rows = generate_property_difference_records(epc_records, uprn, directory, fixed_data, all_equal_rows)
@ -615,12 +565,7 @@ def app():
constituency_dataset_df = constituency_dataset.df
# Apply u-values
data_by_urpn_df = make_uvalues(df = constituency_dataset_df).drop(
columns=["walls_clean_description", "walls_clean_description_ENDING"]
)
dataset.append(data_by_urpn_df)
dataset.append(constituency_dataset_df)
print("Final all equal count: %s" % str(len(all_equal_rows)))
@ -637,15 +582,15 @@ def app():
# TODO: move into difference record
# Remove any records that have huge swings in their floor area
# Move this into TrainingDataset as this won't be run in newdata
output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"])
output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"]
output = output[output["tfa_diff_prop"] < 0.5]
output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
# output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"])
# output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"]
# output = output[output["tfa_diff_prop"] < 0.5]
# output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
# TODO: move into EPCRecord record
uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
output[uvalue_col] = pd.to_numeric(output[uvalue_col])
# # TODO: move into EPCRecord record
# uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col]
# for uvalue_col in uvalue_columns:
# output[uvalue_col] = pd.to_numeric(output[uvalue_col])
save_dataframe_to_s3_parquet(
df=output,
@ -661,6 +606,21 @@ def app():
file_key="sap_change_model/all_equal_rows.parquet",
)
from etl.epc.Pipeline import EPCPipeline
def main():
"""
Orchestration function
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training"))
epc_pipeline.run()
if __name__ == "__main__":
app()
main()

View file

@ -1,5 +1,6 @@
import math
from copy import deepcopy
from typing import Union
import numpy as np
import pandas as pd
@ -497,7 +498,7 @@ def get_wall_type(
is_system_built,
is_park_home,
**kwargs
):
) -> Union[str, None]:
"""
Converts booleans to a string wall type, for querying the wall thickness table
:return: