migrate process code to trainingdataset

This commit is contained in:
Michael Duong 2023-12-15 18:40:23 +00:00
parent eeeea467cf
commit 8674dc415f
4 changed files with 290 additions and 188 deletions

View file

@ -360,6 +360,7 @@ class EPCDataProcessor:
)
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
self.data.columns = self.data.columns.str.lower()

View file

@ -33,7 +33,8 @@ class TrainingDataset(BaseDataset):
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
def __init__(self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict) -> None:
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
@ -42,12 +43,176 @@ class TrainingDataset(BaseDataset):
self._feature_generation()
self._drop_features()
# self._clean_dataframe()
# self._clean_efficiency_variables()
# self._null_validation(information="Clean Efficiency Variables")
# # self._process_and_prune()
# self._clean_missing_values()
# self._null_validation(information="Clean Missing Values")
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
self._expand_description_to_features(cleaned_lookup)
self._adjust_assumed_values_in_wall_descriptions()
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# need to
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
def _adjust_assumed_values_in_wall_descriptions(self):
"""
Strip out assumed values for all wall descriptions
"""
for col in ["walls_clean_description", "walls_clean_description_ending"]:
self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip()
def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str):
"""
Drop properties that have inconsistent data, i.e. changing material types
"""
if component == "walls":
expanded_df = expanded_df[
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) &
(expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) &
(expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) &
(expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) &
(expanded_df["is_cob"] == expanded_df["is_cob_ending"]) &
(expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"])
]
elif component == "floor":
expanded_df = expanded_df[
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) &
(expanded_df["is_solid"] == expanded_df["is_solid_ending"]) &
(expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) &
(expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) &
(expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"])
]
elif component == "roof":
expanded_df = expanded_df[
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) &
(expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) &
(expanded_df["is_loft"] == expanded_df["is_loft_ending"]) &
(expanded_df["is_flat"] == expanded_df["is_flat_ending"]) &
(expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) &
(expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) &
(expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
]
return expanded_df
def _expand_description_to_features(self, cleaned_lookup: dict):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ending',
'thermal_transmittance_unit_ending',
'is_cavity_wall_ending', 'is_filled_cavity_ending',
'is_solid_brick_ending', 'is_system_built_ending',
'is_timber_frame_ending', 'is_granite_or_whinstone_ending',
'is_as_built_ending', 'is_cob_ending', 'is_assumed_ending',
'is_sandstone_or_limestone_ending',
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ending"
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ending", "original_description_ending",
"clean_description_ending", "thermal_transmittance_unit_ending",
"is_suspended_ending", "is_solid_ending", "another_property_below_ending",
"is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed",
"is_assumed_ending"
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ending", "clean_description_ending",
"thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending",
"is_loft_ending", "is_flat_ending", "is_thatched_ending", "is_at_rafters_ending",
"has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending"
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ending",
"clean_description_ending", "assumed_ending"
],
"mainheat": [
"original_description", "clean_description", "original_description_ending",
"has_assumed", "original_description_ending", "clean_description_ending",
"has_assumed_ending",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
],
"windows": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending",
"no_data_ending"
],
"main-fuel": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
],
}
components_to_expand = cols_to_drop.keys()
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
cleaned_key, left_on_starting, left_on_ending, original_cols = (
("main-fuel", "main_fuel_starting", "main_fuel_ending", ["main_fuel_starting", "main_fuel_ending"]) if component == "main-fuel" else
(f"{component}-description", f"{component}_description_starting", f"{component}_description_ending", [f"{component}_description_starting", f"{component}_description_ending"])
)
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
expanded_df = self.df.merge(
cleaned_lookup_df_for_key,
how="left",
left_on=left_on_starting,
right_on="original_description",
).merge(
cleaned_lookup_df_for_key,
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ending")
)
# Drop inconsistent properties
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
# Drop original cols and cols to drop
expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols)
# Rename columns to component specific names, if they have not been dropped
expanded_df = expanded_df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"insulation_thickness_ending": f"{component}_insulation_thickness_ending",
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ending": f"{component}_thermal_transmittance_ending",
"tariff_type": f"{component}_tariff_type",
"tariff_type_ending": f"{component}_tariff_type_ending",
"clean_description": f"{component}_clean_description",
"clean_description_ending": f"{component}_clean_description_ending",
}
)
self.df = expanded_df
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"])
def _clean_missing_values(self, ignore_cols=None):
missings = pd.isnull(self.df).sum()
@ -66,7 +231,8 @@ class TrainingDataset(BaseDataset):
self.df[col] = self.df[col].fillna("Unknown")
def _null_validation(self, information: str = ""):
def _null_validation(self, information: str):
print(f"Null validation after {information}")
if pd.isnull(self.df).sum().sum():
raise ValueError(f"Null values found in dataset, after step {information}")
@ -103,7 +269,7 @@ class TrainingDataset(BaseDataset):
return
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("ENERGY_EFF")):
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")
for m in missings.index:

View file

@ -26,10 +26,6 @@ class EPCRecord:
"""
Base class for a EPC record
"""
# TODO: lower case and underscore
walls = None
floor = None
roof = None
uprn: str
walls_description: str
@ -72,6 +68,11 @@ class EPCRecord:
energy_consumption_current : int
co2_emissions_current : float
# TODO: lower case and underscore
walls = None
floor = None
roof = None
u_values_walls = None
u_values_roof = None
u_values_floor = None
@ -84,16 +85,48 @@ class EPCRecord:
# self._field_validation()
self._clean_records()
self._expand_description()
self._expand_description_to_uvalues()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
def _expand_description(self):
def _expand_description_to_uvalues(self):
# TODO: can be loop over all the descriptions, or done in one
pass
# def _process_and_prune(self, cleaned_lookup: dict):
# """
# This method will merge on the cleaned lookup table and ensure that the building fabric in the
# starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
# possible dataset.
# """
# for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
# if component == "main-fuel":
# component = component.replace("-", "_")
# cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
# left_on_starting = (
# f"{component}_starting" if component == "main-fuel" else f"{component}_description_starting"
# )
# left_on_ending = (
# f"{component}_ending" if component == "main-fuel" else f"{component}_description_ending"
# )
# self.df2 = self.df.merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_starting,
# right_on="original_description",
# ).merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_ending,
# right_on="original_description",
# suffixes=("", "_ending")
# )
def _clean_records(self):
"""
@ -120,7 +153,7 @@ class EPCRecord:
if validation_config['type'] == "string":
self._validate_string(record_key, field_value, validation_config)
elif validation_config['type'] == "float":
self._validate_float(field_value, validation_config)
self._validate_float(record_key, field_value, validation_config)
else:
raise ValueError(f"Validation type {validation_config['type']} not supported")
@ -262,7 +295,6 @@ class EPCDifferenceRecord:
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_starting")
# TODO: Take the earliest potentials
self.difference_record = {
"uprn": self.record1.get("uprn"),
"rdsap_change": rdsap_change,

View file

@ -39,6 +39,9 @@ CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
def get_cleaned_description_mapping():
"""
@ -236,8 +239,8 @@ def make_uvalues(df):
uvalues = []
# TODO: iterrows is the slowest way to do this, we should use a vectorised approach or itertuples
for _, x in df.iterrows():
for index_no, x in df.iterrows():
uprn = x["UPRN"]
row_index = x["row_index"]
age_band = england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]]
@ -462,12 +465,67 @@ class EPCPipeline:
pass
def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_model_data: list, all_equal_rows: list):
def generate_property_difference_records(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, all_equal_rows: list):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
- Second vs third
- First vs third
:param epc_records:
:return:
"""
property_difference_records: list = []
property_difference_records, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows)
# property_difference_records, all_equal_rows = compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows)
return property_difference_records, all_equal_rows
def compare_all_permutation_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
for idx2 in range(idx + 1, len(epc_records)):
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx2]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records, all_equal_rows
def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
@ -494,9 +552,12 @@ def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory:
difference_record.append_fixed_data(fixed_data)
property_model_data.append(difference_record)
property_difference_records.append(difference_record)
return property_model_data, all_equal_rows
return property_difference_records, all_equal_rows
from etl.epc.Dataset import TrainingDataset
def app():
# Get all the files in the directory
@ -525,7 +586,7 @@ def app():
cleaning_dataset.append(epc_data_processor.cleaning_averages)
data_by_uprn = []
constituency_difference_records = []
for uprn, property_data in df.groupby("uprn", observed=True):
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
@ -540,183 +601,25 @@ def app():
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[
COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')]
# TODO: Make this part of a strategy pattern, as we can generate different training datasets
property_model_data, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_model_data, all_equal_rows)
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records, all_equal_rows = generate_property_difference_records(epc_records, uprn, directory, fixed_data, all_equal_rows)
constituency_difference_records.extend(property_difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=cleaned_lookup)
# for idx in range(0, len(epc_records) - 1):
# if idx >= len(epc_records) - 1:
# break
# earliest_record: EPCRecord = epc_records[idx]
# latest_record: EPCRecord = epc_records[idx + 1]
# # Auto sort the records so that the record with highest RDSAP score is always record1
# difference_record: EPCDifferenceRecord = latest_record - earliest_record
# # TODO: Pull out RDSAP_CHANGE to a variable
# if difference_record.get("RDSAP_CHANGE") == 0:
# continue
# all_equal = difference_record.compare_fields_in_records(
# fields=CORE_COMPONENT_FEATURES
# )
# if all_equal:
# # Keep track of this for the moment so we can analyse
# all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
# continue
# difference_record.append_fixed_data(fixed_data)
# property_model_data.append(difference_record)
# property_model_data.append(difference_record.difference_record)
# for idx in range(0, property_data.shape[0] - 1):
# if idx >= property_data.shape[0] - 1:
# break
# earliest_record = variable_data.iloc[idx]
# latest_record = variable_data.iloc[idx + 1]
# # Check if the sap gets better or worse
# gets_better = earliest_record[RDSAP_RESPONSE] <= latest_record[RDSAP_RESPONSE]
# component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
# if gets_better:
# starting_sap = earliest_record[RDSAP_RESPONSE]
# starting_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
# starting_carbon = earliest_record[CARBON_RESPONSE]
# ending_sap = latest_record[RDSAP_RESPONSE]
# ending_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
# ending_carbon = latest_record[CARBON_RESPONSE]
# rdsap_change = latest_record[RDSAP_RESPONSE] - starting_sap
# heat_demand_change = latest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
# carbon_change = latest_record[CARBON_RESPONSE] - starting_carbon
# starting_record = earliest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
# ending_record = latest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
# else:
# starting_sap = latest_record[RDSAP_RESPONSE]
# starting_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
# starting_carbon = latest_record[CARBON_RESPONSE]
# ending_sap = earliest_record[RDSAP_RESPONSE]
# ending_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
# ending_carbon = earliest_record[CARBON_RESPONSE]
# rdsap_change = earliest_record[RDSAP_RESPONSE] - starting_sap
# heat_demand_change = earliest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
# carbon_change = earliest_record[CARBON_RESPONSE] - starting_carbon
# starting_record = latest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
# ending_record = earliest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
# if rdsap_change == 0:
# continue
# all_equal = compare_records(
# earliest_record=earliest_record,
# latest_record=latest_record,
# columns=CORE_COMPONENT_FEATURES
# )
# if all_equal:
# # Keep track of this for the moment so we can analyse
# all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
# continue
# asdasd
# features = pd.concat([starting_record, ending_record])
# property_model_data.append(
# {
# "UPRN": uprn,
# "RDSAP_CHANGE": rdsap_change,
# "HEAT_DEMAND_CHANGE": heat_demand_change,
# "CARBON_CHANGE": carbon_change,
# "SAP_STARTING": starting_sap,
# "SAP_ENDING": ending_sap,
# "HEAT_DEMAND_STARTING": starting_heat_demand,
# "HEAT_DEMAND_ENDING": ending_heat_demand,
# "CARBON_STARTING": starting_carbon,
# "CARBON_ENDING": ending_carbon,
# "POTENTIAL_ENERGY_EFFICIENCY": earliest_record["POTENTIAL_ENERGY_EFFICIENCY"],
# "ENVIRONMENT_IMPACT_POTENTIAL": earliest_record["ENVIRONMENT_IMPACT_POTENTIAL"],
# "ENERGY_CONSUMPTION_POTENTIAL": earliest_record["ENERGY_CONSUMPTION_POTENTIAL"],
# "CO2_EMISSIONS_POTENTIAL": earliest_record["CO2_EMISSIONS_POTENTIAL"],
# **fixed_data,
# **features.to_dict(),
# }
# )
# data_by_urpn.extend(property_model_data)
data_by_uprn.extend(property_model_data)
from etl.epc.Dataset import TrainingDataset
constituency_data = TrainingDataset(datasets=data_by_uprn)
# data_by_urpn_df = pd.DataFrame(data_by_urpn)
# # TODO: can we move this into the epc record?
# data_by_urpn_df["DAYS_TO_STARTING"] = DataProcessor.calculate_days_to(
# data_by_urpn_df["LODGEMENT_DATE_STARTING"]
# )
# data_by_urpn_df["DAYS_TO_ENDING"] = DataProcessor.calculate_days_to(
# data_by_urpn_df["LODGEMENT_DATE_ENDING"]
# )
# data_by_urpn_df = data_by_urpn_df.drop(columns=["LODGEMENT_DATE_STARTING", "LODGEMENT_DATE_ENDING"])
# data_by_urpn_df = DataProcessor.clean_efficiency_variables(data_by_urpn_df)
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
# if pd.isnull(data_by_urpn_df).sum().sum():
# raise ValueError("Null values found in dataset")
data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup)
constituency_dataset_df = constituency_dataset.df
# Apply u-values
# for col in ["walls_clean_description", "walls_clean_description_ENDING"]:
# data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip()
data_by_urpn_df = make_uvalues(data_by_urpn_df).drop(
data_by_urpn_df = make_uvalues(df = constituency_dataset_df).drop(
columns=["walls_clean_description", "walls_clean_description_ENDING"]
)
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# need to
# data_by_urpn_df = DataProcessor.clean_missings_after_description_process(data_by_urpn_df)
# if pd.isnull(data_by_urpn_df).sum().sum():
# raise ValueError("Null values found in dataset after process_and_prune_desriptions")
dataset.append(data_by_urpn_df)
print("Final all equal count: %s" % str(len(all_equal_rows)))