add all permutations, and fixed typo

This commit is contained in:
Michael Duong 2024-01-29 10:10:33 +00:00
parent df33ff93c3
commit 1fd976f350
4 changed files with 817 additions and 332 deletions

View file

@ -7,8 +7,12 @@ from etl.epc.settings import EARLIEST_EPC_DATE
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
estimate_number_of_floors,
get_wall_u_value,
get_roof_u_value,
get_floor_u_value,
estimate_perimeter,
get_wall_type,
)
@ -41,8 +45,9 @@ class TrainingDataset(BaseDataset):
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict) -> None:
def __init__(
self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict
) -> None:
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
@ -61,14 +66,51 @@ class TrainingDataset(BaseDataset):
self._null_validation(information="Clean Missing Values")
self._remove_abnormal_change_in_floor_area()
self._ensure_numeric()
self._organise_starting_ending_columns()
def _organise_starting_ending_columns(self):
"""
Organise the starting and ending columns so that they are next to each other
"""
no_suffix_cols = [
col
for col in self.df.columns
if "_ending" not in col and "_starting" not in col
]
starting_cols = [col for col in self.df.columns if "_starting" in col]
ending_cols = [col for col in self.df.columns if "_ending" in col]
common_cols = [
col.rsplit("_", 1)[0]
for col in starting_cols
if col.replace("_starting", "_ending") in ending_cols
]
only_ending_cols = [
col
for col in ending_cols
if col.replace("_ending", "_starting") not in starting_cols
]
common_cols = [[col + "_starting", col + "_ending"] for col in common_cols]
self.df = self.df.loc[
:,
no_suffix_cols
+ only_ending_cols
+ [col for cols in common_cols for col in cols],
]
def _remove_abnormal_change_in_floor_area(self):
"""
Remove properties where the change in floor area is greater than 100%
"""
self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"])
self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"]
self.df["tfa_diff_abs"] = abs(
self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"]
)
self.df["tfa_diff_prop"] = (
self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"]
)
self.df = self.df[self.df["tfa_diff_prop"] < 0.5]
self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
@ -77,7 +119,9 @@ class TrainingDataset(BaseDataset):
Ensure that all columns are numeric
"""
# TODO: move into EPCRecord record
uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col]
uvalue_columns = [
col for col in self.df.columns if "thermal_transmittance" in col
]
for uvalue_col in uvalue_columns:
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
@ -87,7 +131,11 @@ class TrainingDataset(BaseDataset):
Using the apply method, use the get_roof_u_value method to generate the u-value
"""
col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending"
col_name = (
"roof_insulation_thickness"
if not is_end
else "roof_insulation_thickness_ending"
)
if row["has_dwelling_above"]:
if row["roof_thermal_transmittance"] != 0:
@ -105,7 +153,7 @@ class TrainingDataset(BaseDataset):
is_flat=row["is_flat"],
is_pitched=row["is_pitched"],
is_at_rafters=row["is_at_rafters"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
)
@staticmethod
@ -113,9 +161,16 @@ class TrainingDataset(BaseDataset):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else \
"walls_thermal_transmittance_ending"
description_col_name = (
"walls_clean_description"
if not is_end
else "walls_clean_description_ending"
)
thermal_transistance_col_name = (
"walls_thermal_transmittance"
if not is_end
else "walls_thermal_transmittance_ending"
)
if pd.isnull(row[thermal_transistance_col_name]):
output = get_wall_u_value(
@ -135,7 +190,11 @@ class TrainingDataset(BaseDataset):
Using the apply method, use the get_floor_u_value method to generate the u-value
"""
floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending"
floor_thermal_col_name = (
"floor_thermal_transmittance"
if not is_end
else "floor_thermal_transmittance_ending"
)
if row["another_property_below"]:
if row["floor_thermal_transmittance"] != 0:
@ -148,9 +207,21 @@ class TrainingDataset(BaseDataset):
uvalue = row[floor_thermal_col_name]
if pd.isnull(uvalue):
insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending"
perimeter_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending"
floor_area_col_name = "ground_floor_area_starting" if not is_end else "ground_floor_area_ending"
insulation_col_name = (
"floor_insulation_thickness"
if not is_end
else "floor_insulation_thickness_ending"
)
perimeter_col_name = (
"estimated_perimeter_starting"
if not is_end
else "estimated_perimeter_ending"
)
floor_area_col_name = (
"ground_floor_area_starting"
if not is_end
else "ground_floor_area_ending"
)
uvalue = get_floor_u_value(
floor_type=row["floor_type"],
@ -158,7 +229,7 @@ class TrainingDataset(BaseDataset):
area=row[floor_area_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
)
return uvalue
@ -173,65 +244,75 @@ class TrainingDataset(BaseDataset):
# ~~~~~~~~~~~~~~~~~~
walls_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row),
axis=1
lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1
)
walls_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True),
axis=1
axis=1,
)
walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue)
walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df[
"walls_clean_description_ending"]
walls_starting_uvalue = self.df["walls_thermal_transmittance"].fillna(
walls_starting_uvalue
)
walls_starting_equals_ending_flag = (
self.df["walls_clean_description"]
== self.df["walls_clean_description_ending"]
)
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[
walls_starting_equals_ending_flag]
walls_starting_equals_ending_flag
]
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
roof_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row),
axis=1
lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1
)
roof_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True),
axis=1
axis=1,
)
roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue)
roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue)
roof_starting_uvalue = self.df["roof_thermal_transmittance"].fillna(
roof_starting_uvalue
)
roof_ending_uvalue = self.df["roof_thermal_transmittance_ending"].fillna(
roof_ending_uvalue
)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
self.df['estimated_number_of_floors'] = self.df.apply(
lambda row: estimate_number_of_floors(row['property_type']),
axis=1
self.df["estimated_number_of_floors"] = self.df.apply(
lambda row: estimate_number_of_floors(row["property_type"]), axis=1
)
self.df["ground_floor_area_starting"] = (
self.df["total_floor_area_starting"] / self.df['estimated_number_of_floors']
self.df["total_floor_area_starting"] / self.df["estimated_number_of_floors"]
)
self.df["ground_floor_area_ending"] = (
self.df["total_floor_area_ending"] / self.df['estimated_number_of_floors']
self.df["total_floor_area_ending"] / self.df["estimated_number_of_floors"]
)
self.df['estimated_perimeter_starting'] = self.df.apply(
self.df["estimated_perimeter_starting"] = self.df.apply(
lambda row: estimate_perimeter(
row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors']
row["ground_floor_area_starting"],
row["number_habitable_rooms"] / row["estimated_number_of_floors"],
),
axis=1
axis=1,
)
self.df['estimated_perimeter_ending'] = self.df.apply(
self.df["estimated_perimeter_ending"] = self.df.apply(
lambda row: estimate_perimeter(
row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors']
row["ground_floor_area_starting"],
row["number_habitable_rooms"] / row["estimated_number_of_floors"],
),
axis=1
axis=1,
)
self.df["floor_type"] = self.df["is_suspended"].replace(
{True: "suspended", False: "solid"}
)
self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"})
self.df["wall_type"] = self.df.apply(
lambda row: get_wall_type(
is_cavity_wall=row["is_cavity_wall"],
@ -241,39 +322,56 @@ class TrainingDataset(BaseDataset):
is_cob=row["is_cob"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
is_system_built=row["is_system_built"],
is_park_home=row["is_park_home"]
is_park_home=row["is_park_home"],
),
axis=1
axis=1,
)
floor_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row),
axis=1
lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1
)
floor_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True),
axis=1
lambda row: self._lambda_function_to_generate_floor_uvalue(
row, is_end=True
),
axis=1,
)
floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue)
floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue)
floor_starting_uvalue = self.df["floor_thermal_transmittance"].fillna(
floor_starting_uvalue
)
floor_ending_uvalue = self.df["floor_thermal_transmittance_ending"].fillna(
floor_ending_uvalue
)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(
eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance"] = self.df[
f"{component}_thermal_transmittance"
].fillna(eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[
f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
f"{component}_thermal_transmittance_ending"
].fillna(eval(f"{component}_ending_uvalue"))
self.df = self.df.drop(
columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending",
'estimated_number_of_floors', "ground_floor_area_starting", "ground_floor_area_ending"])
columns=[
"floor_type",
"wall_type",
"walls_clean_description",
"walls_clean_description_ending",
"estimated_number_of_floors",
"ground_floor_area_starting",
"ground_floor_area_ending",
]
)
def _adjust_assumed_values_in_wall_descriptions(self):
"""
Strip out assumed values for all wall descriptions
"""
for col in ["walls_clean_description", "walls_clean_description_ending"]:
self.df[col] = self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip()
self.df[col] = (
self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip()
)
def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str):
"""
@ -282,31 +380,55 @@ class TrainingDataset(BaseDataset):
if component == "walls":
expanded_df = expanded_df[
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) &
(expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) &
(expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) &
(expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) &
(expanded_df["is_cob"] == expanded_df["is_cob_ending"]) &
(expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"])
]
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"])
& (
expanded_df["is_solid_brick"]
== expanded_df["is_solid_brick_ending"]
)
& (
expanded_df["is_timber_frame"]
== expanded_df["is_timber_frame_ending"]
)
& (
expanded_df["is_granite_or_whinstone"]
== expanded_df["is_granite_or_whinstone_ending"]
)
& (expanded_df["is_cob"] == expanded_df["is_cob_ending"])
& (
expanded_df["is_sandstone_or_limestone"]
== expanded_df["is_sandstone_or_limestone_ending"]
)
]
elif component == "floor":
expanded_df = expanded_df[
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) &
(expanded_df["is_solid"] == expanded_df["is_solid_ending"]) &
(expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) &
(expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) &
(expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"])
]
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"])
& (expanded_df["is_solid"] == expanded_df["is_solid_ending"])
& (
expanded_df["another_property_below"]
== expanded_df["another_property_below_ending"]
)
& (
expanded_df["is_to_unheated_space"]
== expanded_df["is_to_unheated_space_ending"]
)
& (
expanded_df["is_to_external_air"]
== expanded_df["is_to_external_air_ending"]
)
]
elif component == "roof":
expanded_df = expanded_df[
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) &
(expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) &
(expanded_df["is_loft"] == expanded_df["is_loft_ending"]) &
(expanded_df["is_flat"] == expanded_df["is_flat_ending"]) &
(expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) &
(expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) &
(expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
]
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"])
& (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"])
& (expanded_df["is_loft"] == expanded_df["is_loft_ending"])
& (expanded_df["is_flat"] == expanded_df["is_flat_ending"])
& (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"])
& (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"])
& (
expanded_df["has_dwelling_above"]
== expanded_df["has_dwelling_above_ending"]
)
]
return expanded_df
@ -325,60 +447,108 @@ class TrainingDataset(BaseDataset):
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ending',
'thermal_transmittance_unit_ending',
'is_cavity_wall_ending', 'is_filled_cavity_ending',
'is_solid_brick_ending', 'is_system_built_ending',
'is_timber_frame_ending', 'is_granite_or_whinstone_ending',
'is_as_built_ending', 'is_cob_ending', 'is_assumed_ending',
'is_sandstone_or_limestone_ending',
"original_description",
"thermal_transmittance_unit",
"original_description_ending",
"thermal_transmittance_unit_ending",
"is_cavity_wall_ending",
"is_filled_cavity_ending",
"is_solid_brick_ending",
"is_system_built_ending",
"is_timber_frame_ending",
"is_granite_or_whinstone_ending",
"is_as_built_ending",
"is_cob_ending",
"is_assumed_ending",
"is_sandstone_or_limestone_ending",
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ending"
"is_assumed",
"is_assumed_ending",
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ending", "original_description_ending",
"clean_description_ending", "thermal_transmittance_unit_ending",
"is_suspended_ending", "is_solid_ending", "another_property_below_ending",
"is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed",
"is_assumed_ending"
"original_description",
"clean_description",
"thermal_transmittance_unit",
"no_data",
"no_data_ending",
"original_description_ending",
"clean_description_ending",
"thermal_transmittance_unit_ending",
"is_suspended_ending",
"is_solid_ending",
"another_property_below_ending",
"is_to_unheated_space_ending",
"is_to_external_air_ending",
"is_assumed",
"is_assumed_ending",
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ending", "clean_description_ending",
"thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending",
"is_loft_ending", "is_flat_ending", "is_thatched_ending", "is_at_rafters_ending",
"has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending"
"original_description",
"clean_description",
"thermal_transmittance_unit",
"is_assumed",
"is_valid",
"original_description_ending",
"clean_description_ending",
"thermal_transmittance_unit_ending",
"is_pitched_ending",
"is_roof_room_ending",
"is_loft_ending",
"is_flat_ending",
"is_thatched_ending",
"is_at_rafters_ending",
"has_dwelling_above_ending",
"is_assumed_ending",
"is_valid_ending",
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ending",
"clean_description_ending", "assumed_ending"
"original_description",
"clean_description",
"assumed",
"original_description_ending",
"clean_description_ending",
"assumed_ending",
],
"mainheat": [
"original_description", "clean_description", "original_description_ending",
"has_assumed", "original_description_ending", "clean_description_ending",
"original_description",
"clean_description",
"original_description_ending",
"has_assumed",
"original_description_ending",
"clean_description_ending",
"has_assumed_ending",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
"original_description",
"clean_description",
"original_description_ending",
"clean_description_ending",
],
"windows": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending",
"original_description",
"clean_description",
"original_description_ending",
"clean_description_ending",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending",
"no_data_ending"
"has_glazing",
"glazing_coverage",
"no_data",
"has_glazing_ending",
"glazing_coverage_ending",
"no_data_ending",
],
"main-fuel": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
"original_description",
"clean_description",
"original_description_ending",
"clean_description_ending",
],
}
components_to_expand = cols_to_drop.keys()
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
left_on_starting = "main_fuel_starting"
@ -388,7 +558,10 @@ class TrainingDataset(BaseDataset):
cleaned_key = f"{component}-description"
left_on_starting = f"{component}_description_starting"
left_on_ending = f"{component}_description_ending"
original_cols = [f"{component}_description_starting", f"{component}_description_ending"]
original_cols = [
f"{component}_description_starting",
f"{component}_description_ending",
]
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
@ -402,14 +575,16 @@ class TrainingDataset(BaseDataset):
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ending")
suffixes=("", "_ending"),
)
# Drop properties where key material types have changed
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
# Drop original cols and cols to drop
expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols)
expanded_df = expanded_df.drop(
columns=cols_to_drop[component] + original_cols
)
# Rename columns to component specific names, if they have not been dropped
expanded_df = expanded_df.rename(
@ -428,7 +603,9 @@ class TrainingDataset(BaseDataset):
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"])
self.df = self.df.drop(
columns=["lighting_description_starting", "lighting_description_ending"]
)
def _clean_missing_values(self, ignore_cols=None):
missings = pd.isnull(self.df).sum()
@ -455,17 +632,22 @@ class TrainingDataset(BaseDataset):
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
self.df = self.df.drop(
columns=["lodgement_date_starting", "lodgement_date_ending"]
)
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_starting"] = self._calculate_days_to(self.df["lodgement_date_starting"])
self.df["day_to_ending"] = self._calculate_days_to(self.df["lodgement_date_ending"])
self.df["days_to_starting"] = self._calculate_days_to(
self.df["lodgement_date_starting"]
)
self.df["days_to_ending"] = self._calculate_days_to(
self.df["lodgement_date_ending"]
)
def _clean_efficiency_variables(self):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
@ -491,7 +673,6 @@ class TrainingDataset(BaseDataset):
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
@ -527,7 +708,9 @@ class NewDataset(BaseDataset):
def __add__(self, other) -> "NewDataset":
if not isinstance(other, NewDataset):
raise TypeError("Addition can only be performed with another instance of ScoringDataset")
raise TypeError(
"Addition can only be performed with another instance of ScoringDataset"
)
return NewDataset(self.datasets + other.datasets)
def __radd__(self, other):

View file

@ -31,9 +31,13 @@ CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
VARIABLE_DATA_FEATURES = (
COMPONENT_FEATURES
+ EFFICIENCY_FEATURES
+ POTENTIAL_COLUMNS
+ ["lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE]
)
def get_cleaned_description_mapping():
"""
@ -45,16 +49,17 @@ def get_cleaned_description_mapping():
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
s3_file_name="cleaned_epc_data/cleaned.bson", bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
clean_lookup = get_cleaned_description_mapping()
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
@ -66,17 +71,17 @@ class EPCPipeline:
"""
def __init__(
self,
epc_data_processor: EPCDataProcessor,
api_epc_records: dict = None,
directories: List[Path] | None = None,
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet",
epc_compiled_dataset_key="sap_change_model/dataset.parquet"
):
self,
epc_data_processor: EPCDataProcessor,
api_epc_records: dict = None,
directories: List[Path] | None = None,
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet",
epc_compiled_dataset_key="sap_change_model/dataset.parquet",
):
"""
:param directories: List of directories to process
:param epc_data_processor: EPCDataProcessor object
@ -100,7 +105,7 @@ class EPCPipeline:
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key
self.epc_all_equal_rows_key = epc_all_equal_rows_key
self.epc_compiled_dataset_key = epc_compiled_dataset_key
def run(self):
"""
Entrypoint to run the pipeline
@ -111,29 +116,33 @@ class EPCPipeline:
self.run_newdata_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def run_newdata_dataset_pipeline(self):
"""
Main function to run the newdata pipeline
"""
prepared_epc = EPCRecord(self.api_epc_records, run_mode="newdata") # This uses all the epc records to clean the data
prepared_epc = EPCRecord(
self.api_epc_records, run_mode="newdata"
) # This uses all the epc records to clean the data
self.epc_data_processor.insert_data(prepared_epc)
self.epc_data_processor.prepare_data()
data = self.epc_data_processor.data
epc_records = [
EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient="records")
]
epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')]
def run_training_dataset_pipeline(self):
"""
Main function to run the training dataset generation pipeline
"""
if self.directories is None:
raise ValueError("Directories not specified - Unable to run Training pipeline")
raise ValueError(
"Directories not specified - Unable to run Training pipeline"
)
for directory in tqdm(self.directories):
self.process_directory(directory)
@ -153,8 +162,8 @@ class EPCPipeline:
df=pd.concat(self.compiled_cleaning_averages),
bucket_name=self.epc_bucket_name,
file_key=self.epc_cleaning_dataset_key,
)
)
def process_directory(self, directory: Path):
"""
Process a single directory
@ -166,17 +175,32 @@ class EPCPipeline:
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages)
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
constituency_difference_records = []
# self.check_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(uprn=str(uprn), property_data=property_data, directory=directory)
difference_records = self.process_uprn(
uprn=str(uprn), property_data=property_data, directory=directory
)
if difference_records is not None:
constituency_difference_records.extend(difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=clean_lookup)
self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df])
# check_list = []
# for check_record in self.check_records:
# check_list.append(check_record["difference_record"])
# td = TrainingDataset(datasets=check_list, cleaned_lookup=clean_lookup)
constituency_dataset = TrainingDataset(
datasets=constituency_difference_records, cleaned_lookup=clean_lookup
)
self.compiled_dataset = pd.concat(
[self.compiled_dataset, constituency_dataset.df]
)
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
"""
@ -192,25 +216,34 @@ class EPCPipeline:
):
return None
# Fixed features - these are property attributes that shouldn't change over time
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
fixed_data = (
property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
)
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')]
epc_records = [
EPCRecord(uprn, **x, run_mode="training")
for x in variable_data.to_dict(orient="records")
]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records = self._generate_property_difference_records(epc_records, uprn, directory, fixed_data)
property_difference_records = self._generate_property_difference_records(
epc_records, uprn, directory, fixed_data
)
return property_difference_records
def _generate_property_difference_records(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict):
def _generate_property_difference_records(
self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict
):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
@ -222,14 +255,22 @@ class EPCPipeline:
property_difference_records: list = []
property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
# property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
# property_difference_records = self._compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
property_difference_records = self._compare_all_permutation_epcs(
epc_records, uprn, directory, fixed_data, property_difference_records
)
return property_difference_records
def _compare_all_permutation_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
def _compare_all_permutation_epcs(
self,
epc_records: List[EPCRecord],
uprn: str,
directory: Path,
fixed_data: dict,
property_difference_records: list,
):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
@ -243,28 +284,40 @@ class EPCPipeline:
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Use method above instead of overloading operator
difference_record.append_fixed_data(fixed_data)
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
if not difference_record.ensure_adequate_data():
# Rdsap hasn't changed but we have enough data to use this record
# i.e. all fields aside from mechnical ventilation are the same]
# self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record})
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
self.compiled_all_equal_rows.append(
{"uprn": uprn, "directory_name": directory.name}
)
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records
def _compare_consecutive_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
def _compare_consecutive_epcs(
self,
epc_records: List[EPCRecord],
uprn: str,
directory: Path,
fixed_data: dict,
property_difference_records: list,
):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
@ -272,7 +325,6 @@ class EPCPipeline:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
@ -281,21 +333,29 @@ class EPCPipeline:
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Use method above instead of overloading operator
difference_record.append_fixed_data(fixed_data)
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
if not difference_record.ensure_adequate_data():
# Rdsap hasn't changed but we have enough data to use this record
# i.e. all fields aside from mechnical ventilation are the same]
# self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record})
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
self.compiled_all_equal_rows.append(
{"uprn": uprn, "directory_name": directory.name}
)
continue
difference_record.append_fixed_data(fixed_data)
# difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)

View file

@ -3,7 +3,7 @@ from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration
EPCDifferenceRecordFixedDataValidationConfiguration,
)
from etl.epc.DataProcessor import EPCDataProcessor
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
@ -18,20 +18,23 @@ from etl.epc.settings import (
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
COMPONENT_FEATURES,
EFFICIENCY_FEATURES
EFFICIENCY_FEATURES,
)
from recommendations.recommendation_utils import estimate_number_of_floors
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import EARLIEST_EPC_DATE
# TODO: Change these in the settings file
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
DATA_BUCKET = os.environ.get(
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
)
@dataclass
@ -106,7 +109,7 @@ class EPCRecord:
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
# Could also have cleaning of records if needed
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
@ -117,18 +120,17 @@ class EPCRecord:
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records['original_epc']
self.original_epc = self.epc_records['original_epc'].copy()
self.prepared_epc = self.epc_records["original_epc"]
self.original_epc = self.epc_records["original_epc"].copy()
self.full_sap_epc = self.epc_records['full_sap_epc']
self.old_data = self.epc_records['old_data']
self.full_sap_epc = self.epc_records["full_sap_epc"]
self.old_data = self.epc_records["old_data"]
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._temp_uprn_catch()
self._expand_prepared_epc_to_attributes()
@ -139,7 +141,7 @@ class EPCRecord:
# selff.df = self.epc_record_as_dataframe('prepared_epc')
# self._feature_generation()
# self._drop_features()
# self._drop_features()
return
@ -156,17 +158,20 @@ class EPCRecord:
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
self.df = self.df.drop(
columns=["lodgement_date_starting", "lodgement_date_ending"]
)
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"])
self.df["days_to_lodgement_date"] = self._calculate_days_to(
self.prepared_epc["lodgement_date"]
)
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
@ -176,13 +181,6 @@ class EPCRecord:
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _temp_uprn_catch(self):
"""
Catch the case we do now have uprn
"""
if self.prepared_epc["uprn"] == "":
self.prepared_epc["uprn"] = 0
def _clean_with_data_processor(self):
"""
This method will clean the records using the data processor
@ -190,7 +188,7 @@ class EPCRecord:
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
cleaning_averages=self.cleaning_data
cleaning_averages=self.cleaning_data,
)
epc_data_processor.prepare_data()
@ -216,11 +214,21 @@ class EPCRecord:
self.secondheat_description: str = self.prepared_epc["secondheat_description"]
self.windows_description: str = self.prepared_epc["windows_description"]
self.glazed_type: str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion: float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting: float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces: float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description: str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag: str = self.prepared_epc["solar_water_heating_flag"]
self.multi_glaze_proportion: float = float(
self.prepared_epc["multi_glaze_proportion"]
)
self.low_energy_lighting: float = float(
self.prepared_epc["low_energy_lighting"]
)
self.number_open_fireplaces: float = float(
self.prepared_epc["number_open_fireplaces"]
)
self.mainheatcont_description: str = self.prepared_epc[
"mainheatcont_description"
]
self.solar_water_heating_flag: str = self.prepared_epc[
"solar_water_heating_flag"
]
self.photo_supply: float = float(self.prepared_epc["photo_supply"])
self.transaction_type: str = self.prepared_epc["transaction_type"]
self.energy_tariff: str = self.prepared_epc["energy_tariff"]
@ -236,14 +244,28 @@ class EPCRecord:
self.mainheat_energy_eff: str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff: str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff: str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency: float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential: float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential: float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential: float = float(self.prepared_epc["co2_emissions_potential"])
self.potential_energy_efficiency: float = float(
self.prepared_epc["potential_energy_efficiency"]
)
self.environment_impact_potential: float = float(
self.prepared_epc["environment_impact_potential"]
)
self.energy_consumption_potential: float = float(
self.prepared_epc["energy_consumption_potential"]
)
self.co2_emissions_potential: float = float(
self.prepared_epc["co2_emissions_potential"]
)
self.lodgement_date: str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency: int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current: int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current: float = float(self.prepared_epc["co2_emissions_current"])
self.current_energy_efficiency: int = int(
self.prepared_epc["current_energy_efficiency"]
)
self.energy_consumption_current: int = int(
self.prepared_epc["energy_consumption_current"]
)
self.co2_emissions_current: float = float(
self.prepared_epc["co2_emissions_current"]
)
def _identify_delta_between_prepared_and_original_records(self):
"""
@ -252,7 +274,11 @@ class EPCRecord:
prepared_epc_df = self.epc_record_as_dataframe("prepared_epc")
original_epc_df = self.epc_record_as_dataframe("original_epc")
df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0)
df = pd.concat(
[prepared_epc_df, original_epc_df],
keys=["prepared_epc", "original_epc"],
axis=0,
)
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
@ -269,7 +295,7 @@ class EPCRecord:
# This method will merge on the cleaned lookup table and ensure that the building fabric in the
# starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
# possible dataset.
# """
# """
# for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
# if component == "main-fuel":
# component = component.replace("-", "_")
@ -325,8 +351,12 @@ class EPCRecord:
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True,
replace_empty_string: bool = False):
def epc_record_as_dataframe(
self,
epc_type: str = "prepared_epc",
use_upper_columns: bool = True,
replace_empty_string: bool = False,
):
"""
This method will return the dataframe representation of the epc record
"""
@ -348,8 +378,9 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if
self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]]
if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
else None
)
def _clean_number_lighting_outlets(self):
@ -360,35 +391,50 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if
old_record["fixed-lighting-outlets-count"] != ""
])
lighting_data.extend(
[
int(old_record["fixed-lighting-outlets-count"])
for old_record in self.old_data
if old_record["fixed-lighting-outlets-count"] != ""
]
)
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
lighting_data.append(
int(self.full_sap_epc["fixed-lighting-outlets-count"])
)
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data))
self.prepared_epc["fixed-lighting-outlets-count"] = round(
np.median(lighting_data)
)
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age
# band and local authority
# Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
data_to_clean=self.epc_record_as_dataframe(
"prepared_epc", replace_empty_string=True
),
cleaning_data=self.cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
cols_to_merge_on=[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"LOCAL_AUTHORITY",
],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
)
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
self.prepared_epc["fixed-lighting-outlets-count"] = float(
self.prepared_epc["fixed-lighting-outlets-count"]
)
def _filter_property_dimensions(self, property_dimensions):
"""
@ -397,16 +443,27 @@ class EPCRecord:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])]
result = property_dimensions[
(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])
]
if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if (
self.construction_age_band is not None
and self.construction_age_band not in DATA_ANOMALY_MATCHES
):
result = result[
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
]
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result[
"BUILT_FORM"]:
if (
self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES
and self.prepared_epc["built-form"] in result["BUILT_FORM"]
):
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
return result[
["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
].mean()
def _clean_property_dimensions(self):
"""
@ -417,30 +474,46 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
if not self.prepared_epc["number-habitable-rooms"] or (
self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
self.prepared_epc["floor-height"] == ""
or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet"
bucket_name=DATA_BUCKET,
file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet",
)
self.property_dimensions = self._filter_property_dimensions(
property_dimensions
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.prepared_epc["number-habitable-rooms"]:
self.prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()
)
else:
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
self.prepared_epc["number-habitable-rooms"] = float(
self.prepared_epc["number-habitable-rooms"]
)
if self.prepared_epc["property-type"] == "House":
self.number_of_floors = 2
elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.prepared_epc["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
self.number_of_floors = estimate_number_of_floors(
self.prepared_epc["property-type"]
)
if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
# if self.prepared_epc["property-type"] == "House":
# self.number_of_floors = 2
# elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
# self.number_of_floors = 1
# elif self.prepared_epc["property-type"] == "Maisonette":
# self.number_of_floors = 2
# else:
# raise NotImplementedError("Implement me")
if (
self.prepared_epc["floor-height"] == ""
or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
self.prepared_epc["floor-height"] = float(
self.property_dimensions["FLOOR_HEIGHT"].round(2)
)
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
@ -451,7 +524,9 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"])
self.prepared_epc["total-floor-area"] = float(
self.prepared_epc["total-floor-area"]
)
def _clean_mains_gas(self):
"""
@ -465,9 +540,14 @@ class EPCRecord:
"N": False,
}
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else map[self.prepared_epc["mains-gas-flag"]]
self.prepared_epc["mains-gas-flag"] = (
None
if (
self.prepared_epc["mains-gas-flag"] == ""
or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
)
else map[self.prepared_epc["mains-gas-flag"]]
)
def _clean_heat_loss_corridor(self):
"""
@ -479,16 +559,19 @@ class EPCRecord:
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
"heated corridor": False,
}
self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc[
"heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[
self.prepared_epc["heat-loss-corridor"]]
self.prepared_epc["heat-loss-corridor"] = (
False
if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
else map[self.prepared_epc["heat-loss-corridor"]]
)
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"]) if
self.prepared_epc["unheated-corridor-length"] != "" else None
float(self.prepared_epc["unheated-corridor-length"])
if self.prepared_epc["unheated-corridor-length"] != ""
else None
)
def _clean_count_variables(self):
@ -496,7 +579,7 @@ class EPCRecord:
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Record doesn not contain epc data")
raise ValueError("EPC Recrod doesn not contain epc data")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
@ -508,6 +591,8 @@ class EPCRecord:
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
# TODO: check this
# value = self.data["extension-count"]
value = self.prepared_epc[epc_field]
if value == "" or value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
@ -526,8 +611,11 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc[
'wind-turbine-count'] != "" else None
self.prepared_epc["wind-turbine-count"] = (
int(self.prepared_epc["wind-turbine-count"])
if self.prepared_epc["wind-turbine-count"] != ""
else None
)
def _clean_solar_hot_water(self):
"""
@ -542,7 +630,9 @@ class EPCRecord:
"": None,
}
self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']]
self.prepared_epc["solar-water-heating-flag"] = value_map[
self.prepared_epc["solar-water-heating-flag"]
]
def _clean_solar_pv(self):
"""
@ -551,9 +641,11 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc[
'photo-supply'] != "" \
self.prepared_epc["photo-supply"] = (
float(self.prepared_epc["photo-supply"])
if self.prepared_epc["photo-supply"] != ""
else None
)
def _clean_energy(self):
"""
@ -562,8 +654,12 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"])
self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"])
self.prepared_epc["energy-consumption-current"] = float(
self.prepared_epc["energy-consumption-current"]
)
self.prepared_epc["co2-emissions-current"] = float(
self.prepared_epc["co2-emissions-current"]
)
def _clean_built_form(self):
"""
@ -572,8 +668,9 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"],
self.prepared_epc["built-form"])
self.prepared_epc["built-form"] = BUILT_FORM_REMAP.get(
self.prepared_epc["built-form"], self.prepared_epc["built-form"]
)
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] == "Flat":
self.prepared_epc["built-form"] = "Semi-Detached"
@ -586,26 +683,38 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"])
self.prepared_epc["construction-age-band"]
)
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[old_record["lodgement-datetime"] for old_record in self.old_data if
old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES]
[
old_record["lodgement-datetime"]
for old_record in self.old_data
if old_record["construction-age-band"]
not in DATA_ANOMALY_MATCHES
]
)
most_recent = [old_record for old_record in self.old_data if
old_record["lodgement-datetime"] == max_datetime]
most_recent = [
old_record
for old_record in self.old_data
if old_record["lodgement-datetime"] == max_datetime
]
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
self.construction_age_band = (
EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None):
if (self.prepared_epc["transaction-type"] == "new dwelling") and (
self.age_band is None
):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
self.construction_age_band = "England and Wales: 2012 onwards"
if self.age_band is None:
raise ValueError("age_band is missing")
@ -615,7 +724,9 @@ class EPCRecord:
This method will clean the year built, if empty or invalid
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
self.year_built = datetime.strptime(
self.full_sap_epc["lodgement-date"], "%Y-%m-%d"
).year
return
@ -623,7 +734,12 @@ class EPCRecord:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])]
band = [
int(x)
for x in re.findall(
r"\b\d{4}\b", self.prepared_epc["construction-age-band"]
)
]
self.year_built = band[0]
return
@ -634,9 +750,14 @@ class EPCRecord:
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc['mechanical-ventilation'] = None if (
self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else (
self.mechanical_ventilation)
self.prepared_epc["mechanical-ventilation"] = (
None
if (
self.mechanical_ventilation == ""
or self.mechanical_ventilation in DATA_ANOMALY_MATCHES
)
else self.mechanical_ventilation
)
def _field_validation(self):
"""
@ -647,54 +768,67 @@ class EPCRecord:
# Get the variable named record key from self
field_value = self.__dict__[record_key]
if validation_config['type'] == "string":
if validation_config["type"] == "string":
self._validate_string(record_key, field_value, validation_config)
elif validation_config['type'] == "float":
elif validation_config["type"] == "float":
self._validate_float(record_key, field_value, validation_config)
else:
raise ValueError(f"Validation type {validation_config['type']} not supported")
raise ValueError(
f"Validation type {validation_config['type']} not supported"
)
def _validate_string(self, record_key: str, field_value: Union[str, float], validation_config: dict):
def _validate_string(
self, record_key: str, field_value: Union[str, float], validation_config: dict
):
"""
Validate a string field
"""
if not isinstance(field_value, str):
raise ValueError(f"Field {record_key} has value {field_value} which is not a string")
raise ValueError(
f"Field {record_key} has value {field_value} which is not a string"
)
if 'function' in validation_config:
if "function" in validation_config:
try:
validation_config['function'](field_value)
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}"
)
if validation_config['acceptable_values'] is not None:
if field_value not in validation_config['acceptable_values']:
if validation_config["acceptable_values"] is not None:
if field_value not in validation_config["acceptable_values"]:
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
f"{validation_config['acceptable_values']}")
f"Field {record_key} has value {field_value} which is not in the acceptable values of {validation_config['acceptable_values']}"
)
def _validate_float(self, record_key: str, field_value: Union[str, float], validation_config: dict):
def _validate_float(
self, record_key: str, field_value: Union[str, float], validation_config: dict
):
"""
Validate a float field
"""
if not isinstance(field_value, float):
raise ValueError(f"Field {record_key} has value {field_value} which is not a float")
raise ValueError(
f"Field {record_key} has value {field_value} which is not a float"
)
if 'function' in validation_config:
if "function" in validation_config:
try:
validation_config['function'](field_value)
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}"
)
if validation_config['range'] is not None:
if field_value < validation_config['range'][0] or field_value > validation_config['range'][1]:
if validation_config["range"] is not None:
if (
field_value < validation_config["range"][0]
or field_value > validation_config["range"][1]
):
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
f"{validation_config['range']}")
f"Field {record_key} has value {field_value} which is not in the acceptable range of {validation_config['range']}"
)
def __sub__(self, other):
"""
@ -703,7 +837,9 @@ class EPCRecord:
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
difference_record = EPCDifferenceRecord(record1=self, record2=other, auto_sort=True)
difference_record = EPCDifferenceRecord(
record1=self, record2=other, auto_sort=True
)
return difference_record
@ -743,18 +879,27 @@ class EPCRecord:
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any:
def get(
self,
key: Union[str, List[str]],
return_asdict: bool = False,
key_suffix: str | None = None,
) -> Any:
"""
This method will return the value of the key
"""
if return_asdict:
output_dict = {x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key}
output_dict = {
x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key
}
if key_suffix is not None:
output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()}
return output_dict
if isinstance(key, list):
return [self.__dict__[x] if x in self.__dict__.keys() else None for x in key]
return [
self.__dict__[x] if x in self.__dict__.keys() else None for x in key
]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
@ -771,12 +916,18 @@ class EPCDifferenceRecord:
"""
self.record1 = record1
self.record2 = record2
self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2
self.earliest_record = (
record1 if record1.lodgement_date < record2.lodgement_date else record2
)
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration
self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration
self.difference_validation_configuration = (
EPCDifferenceRecordValidationConfiguration
)
self.fixed_data_validation_configuration = (
EPCDifferenceRecordFixedDataValidationConfiguration
)
if auto_sort and (self.record2 <= self.record1):
self.record1, self.record2 = self.record2, self.record1
@ -790,15 +941,27 @@ class EPCDifferenceRecord:
This method will construct the difference record between the two records
"""
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(RDSAP_RESPONSE)
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(HEAT_DEMAND_RESPONSE)
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(CARBON_RESPONSE)
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(
RDSAP_RESPONSE
)
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(
HEAT_DEMAND_RESPONSE
)
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(
CARBON_RESPONSE
)
component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_starting")
ending_record = self.record2.get(
component_variables + ["lodgement_date"],
return_asdict=True,
key_suffix="_ending",
)
starting_record = self.record1.get(
component_variables + ["lodgement_date"],
return_asdict=True,
key_suffix="_starting",
)
self.difference_record = {
"uprn": self.record1.get("uprn"),
@ -811,12 +974,20 @@ class EPCDifferenceRecord:
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
"carbon_starting": self.record1.get(CARBON_RESPONSE),
"carbon_ending": self.record2.get(CARBON_RESPONSE),
"potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"),
"environment_impact_potential": self.earliest_record.get("environment_impact_potential"),
"energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"),
"co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"),
"potential_energy_efficiency": self.earliest_record.get(
"potential_energy_efficiency"
),
"environment_impact_potential": self.earliest_record.get(
"environment_impact_potential"
),
"energy_consumption_potential": self.earliest_record.get(
"energy_consumption_potential"
),
"co2_emissions_potential": self.earliest_record.get(
"co2_emissions_potential"
),
**ending_record,
**starting_record
**starting_record,
}
def _validate_difference_record(self):
@ -849,7 +1020,11 @@ class EPCDifferenceRecord:
"""
This method will return the value of the key
"""
return self.difference_record[key] if key in self.difference_record.keys() else None
return (
self.difference_record[key]
if key in self.difference_record.keys()
else None
)
def append_fixed_data(self, fixed_data: dict):
"""
@ -863,7 +1038,70 @@ class EPCDifferenceRecord:
This method will validate the fixed data
"""
# Can have more sophisticated checks here
# Can have more sophisticated checks here
# self.fixed_data_validataion_configuration
pass
def ensure_adequate_data(self) -> bool:
"""
This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is zero
Can move into the initiation of the difference record
"""
wall_check = self.record1.walls_description == self.record2.walls_description
floor_check = self.record1.floor_description == self.record2.floor_description
roof_check = self.record1.roof_description == self.record2.roof_description
mainheat_check = (
self.record1.mainheat_description == self.record2.mainheat_description
)
windows_check = (
self.record1.windows_description == self.record2.windows_description
)
solar_water_heating_flag_check = (
self.record1.solar_water_heating_flag
== self.record2.solar_water_heating_flag
)
solar_pv_check = self.record1.photo_supply == self.record2.photo_supply
heating_control_check = (
self.record1.mainheatcont_description
== self.record2.mainheatcont_description
)
extension_count_check = (
self.record1.extension_count == self.record2.extension_count
)
floor_height_check = (
abs(1 - (self.record1.floor_height / self.record2.floor_height)) < 0.05
)
total_floor_area_check = (
abs(1 - (self.record1.total_floor_area / self.record2.total_floor_area))
< 0.05
)
if all(
[
wall_check,
floor_check,
roof_check,
mainheat_check,
windows_check,
solar_water_heating_flag_check,
extension_count_check,
floor_height_check,
total_floor_area_check,
solar_pv_check,
heating_control_check,
]
):
return True
else:
return False

View file

@ -5,23 +5,27 @@ from etl.epc.Pipeline import EPCPipeline
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
def main():
"""
Orchestration function
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[125:128]
# directories = directories[0:3]
epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training"))
epc_pipeline = EPCPipeline(
directories=directories,
epc_data_processor=EPCDataProcessor(run_mode="training"),
)
epc_pipeline.run()
# For testing
# dataset_df = epc_pipeline.compiled_dataset
# dataset_df.to_parquet("refactor_datasets/dataset.parquet")
# pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet")
# pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet")
# dataset_df.to_parquet("refactor_datasets/dataset_with0perm_all.parquet")
# pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows_with0perm_all.parquet")
# pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages_with0perm_all.parquet")
# from utils.s3 import read_dataframe_from_s3_parquet
# dataset = read_dataframe_from_s3_parquet(