mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge branch 'etl-michael-recommend' of github.com:Hestia-Homes/Model into etl-michael-recommend
This commit is contained in:
commit
856fd5d32c
5 changed files with 446 additions and 22 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -268,4 +268,6 @@ adhoc
|
|||
adhoc/*
|
||||
|
||||
etl-router-venv/
|
||||
refactor_datasets/
|
||||
refactor_datasets/
|
||||
etl-router-*/
|
||||
.vscode/
|
||||
|
|
@ -355,7 +355,12 @@ class Property:
|
|||
output["internal_insulation_ending"] = True
|
||||
|
||||
if recommendation["type"] == "cavity_wall_insulation":
|
||||
output["is_filled_cavity_ending"] = True
|
||||
output['is_filled_cavity_ending'] = True
|
||||
|
||||
# TODO: perhaps detrimental
|
||||
# When making a recommendation for the wall, we will also update the ventilation
|
||||
# if output["mechanical_ventilation_ending"] == 'natural':
|
||||
# output["mechanical_ventilation_ending"] = 'mechanical, extract only'
|
||||
|
||||
else:
|
||||
if output["walls_thermal_transmittance_ending"] is None:
|
||||
|
|
|
|||
|
|
@ -810,27 +810,381 @@ class TrainingDataset(BaseDataset):
|
|||
# return self.__add__(other)
|
||||
|
||||
|
||||
class NewDataset(BaseDataset):
|
||||
class RecordDataset(BaseDataset):
|
||||
"""
|
||||
A collection of EPCDifferenceRecords can be combined into a ScoringDataset.
|
||||
A collection of EPCRecrods can be combined into a Dataset.
|
||||
"""
|
||||
|
||||
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
|
||||
def __init__(self, datasets: pd.DataFrame, cleaned_lookup: dict) -> None:
|
||||
# self.pipeline_steps = self.pipeline_factory("newdata")
|
||||
self.datasets = datasets
|
||||
self.df = datasets
|
||||
|
||||
def __add__(self, other) -> "NewDataset":
|
||||
if not isinstance(other, NewDataset):
|
||||
raise TypeError(
|
||||
"Addition can only be performed with another instance of ScoringDataset"
|
||||
self._clean_efficiency_variables()
|
||||
self._null_validation(information="Clean Efficiency Variables")
|
||||
self._expand_description_to_features(cleaned_lookup)
|
||||
self._adjust_assumed_values_in_wall_descriptions()
|
||||
self._generate_u_values_from_features()
|
||||
# # # TODO: For some of the features that we clean, we have either a true, false or possibly null value
|
||||
# # # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
|
||||
# # # need to
|
||||
self._clean_missing_values()
|
||||
self._null_validation(information="Clean Missing Values")
|
||||
# # self._remove_abnormal_change_in_floor_area()
|
||||
self._ensure_numeric()
|
||||
|
||||
def _ensure_numeric(self):
|
||||
"""
|
||||
Ensure that all columns are numeric
|
||||
"""
|
||||
# TODO: move into EPCRecord record
|
||||
uvalue_columns = [
|
||||
col for col in self.df.columns if "thermal_transmittance" in col
|
||||
]
|
||||
for uvalue_col in uvalue_columns:
|
||||
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
|
||||
|
||||
def _clean_missing_values(self, ignore_cols=None):
|
||||
missings = pd.isnull(self.df).sum()
|
||||
missings = missings[missings > 0]
|
||||
|
||||
if ignore_cols:
|
||||
missings = missings[~missings.index.isin(ignore_cols)]
|
||||
|
||||
for col in missings.index:
|
||||
unique_values = self.df[col].unique()
|
||||
if True in unique_values or False in unique_values:
|
||||
self.df[col] = self.df[col].fillna(False)
|
||||
if "none" in unique_values:
|
||||
self.df[col] = self.df[col].fillna("none")
|
||||
else:
|
||||
self.df[col] = self.df[col].fillna("Unknown")
|
||||
|
||||
@staticmethod
|
||||
def _lambda_function_to_generate_roof_uvalue(row, is_end=False):
|
||||
"""
|
||||
Using the apply method, use the get_roof_u_value method to generate the u-value
|
||||
"""
|
||||
|
||||
col_name = (
|
||||
"roof_insulation_thickness"
|
||||
if not is_end
|
||||
else "roof_insulation_thickness_ending"
|
||||
)
|
||||
|
||||
if row["has_dwelling_above"]:
|
||||
if (row["roof_thermal_transmittance"] != 0) & (
|
||||
not pd.isnull(row["roof_thermal_transmittance"])
|
||||
):
|
||||
raise ValueError("Should have 0 u-value for roof")
|
||||
|
||||
return get_roof_u_value(
|
||||
insulation_thickness=row[col_name],
|
||||
has_dwelling_above=row["has_dwelling_above"],
|
||||
is_loft=row["is_loft"],
|
||||
is_roof_room=row["is_roof_room"],
|
||||
is_thatched=row["is_thatched"],
|
||||
is_flat=row["is_flat"],
|
||||
is_pitched=row["is_pitched"],
|
||||
is_at_rafters=row["is_at_rafters"],
|
||||
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
|
||||
"""
|
||||
Using the apply method, use the get_wall_u_value method to generate the u-value
|
||||
"""
|
||||
description_col_name = (
|
||||
"walls_clean_description"
|
||||
if not is_end
|
||||
else "walls_clean_description_ending"
|
||||
)
|
||||
thermal_transistance_col_name = (
|
||||
"walls_thermal_transmittance"
|
||||
if not is_end
|
||||
else "walls_thermal_transmittance_ending"
|
||||
)
|
||||
|
||||
if pd.isnull(row[thermal_transistance_col_name]):
|
||||
output = get_wall_u_value(
|
||||
clean_description=row[description_col_name],
|
||||
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
||||
is_granite_or_whinstone=row["is_granite_or_whinstone"],
|
||||
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
|
||||
)
|
||||
return NewDataset(self.datasets + other.datasets)
|
||||
|
||||
def __radd__(self, other):
|
||||
"""
|
||||
Required for sum() to work
|
||||
"""
|
||||
if isinstance(other, int):
|
||||
return self
|
||||
else:
|
||||
return self.__add__(other)
|
||||
output = row[thermal_transistance_col_name]
|
||||
|
||||
return output
|
||||
|
||||
@staticmethod
|
||||
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
|
||||
"""
|
||||
Using the apply method, use the get_floor_u_value method to generate the u-value
|
||||
"""
|
||||
|
||||
floor_thermal_col_name = (
|
||||
"floor_thermal_transmittance"
|
||||
if not is_end
|
||||
else "floor_thermal_transmittance_ending"
|
||||
)
|
||||
|
||||
if row["another_property_below"]:
|
||||
if (row["floor_thermal_transmittance"] != 0) & (
|
||||
not pd.isnull(row["floor_thermal_transmittance"])
|
||||
):
|
||||
raise ValueError("Should have 0 u-value for floor")
|
||||
|
||||
return 0
|
||||
else:
|
||||
uvalue = row[floor_thermal_col_name]
|
||||
|
||||
if pd.isnull(uvalue):
|
||||
|
||||
insulation_col_name = (
|
||||
"floor_insulation_thickness"
|
||||
if not is_end
|
||||
else "floor_insulation_thickness_ending"
|
||||
)
|
||||
floor_area_col_name = (
|
||||
"estimated_perimeter" if not is_end else "estimated_perimeter_ending"
|
||||
)
|
||||
perimeter_col_name = (
|
||||
"total_floor_area" if not is_end else "total_floor_area_ending"
|
||||
)
|
||||
|
||||
uvalue = get_floor_u_value(
|
||||
floor_type=row["floor_type"],
|
||||
perimeter=row[floor_area_col_name],
|
||||
area=row[perimeter_col_name],
|
||||
insulation_thickness=row[insulation_col_name],
|
||||
wall_type=row["wall_type"],
|
||||
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
|
||||
)
|
||||
|
||||
return uvalue
|
||||
|
||||
def _generate_u_values_from_features(self):
|
||||
"""
|
||||
Generate u-values from the features
|
||||
"""
|
||||
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
# Walls
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
|
||||
walls_uvalue = self.df.apply(
|
||||
lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1
|
||||
)
|
||||
|
||||
walls_uvalue = self.df["walls_thermal_transmittance"].fillna(walls_uvalue)
|
||||
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
# Roof
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
|
||||
roof_uvalue = self.df.apply(
|
||||
lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1
|
||||
)
|
||||
|
||||
roof_uvalue = self.df["roof_thermal_transmittance"].fillna(roof_uvalue)
|
||||
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
# Floor
|
||||
# ~~~~~~~~~~~~~~~~~~
|
||||
|
||||
self.df["estimated_perimeter"] = self.df.apply(
|
||||
lambda row: estimate_perimeter(
|
||||
row["total_floor_area"], row["number_habitable_rooms"]
|
||||
),
|
||||
axis=1,
|
||||
)
|
||||
|
||||
self.df["floor_type"] = self.df["is_suspended"].replace(
|
||||
{True: "suspended", False: "solid"}
|
||||
)
|
||||
self.df["wall_type"] = self.df.apply(
|
||||
lambda row: get_wall_type(
|
||||
is_cavity_wall=row["is_cavity_wall"],
|
||||
is_solid_brick=row["is_solid_brick"],
|
||||
is_timber_frame=row["is_timber_frame"],
|
||||
is_granite_or_whinstone=row["is_granite_or_whinstone"],
|
||||
is_cob=row["is_cob"],
|
||||
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
|
||||
is_system_built=row["is_system_built"],
|
||||
is_park_home=row["is_park_home"],
|
||||
),
|
||||
axis=1,
|
||||
)
|
||||
|
||||
floor_uvalue = self.df.apply(
|
||||
lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1
|
||||
)
|
||||
|
||||
floor_uvalue = self.df["floor_thermal_transmittance"].fillna(floor_uvalue)
|
||||
|
||||
for component in ["walls", "roof", "floor"]:
|
||||
self.df[f"{component}_thermal_transmittance"] = self.df[
|
||||
f"{component}_thermal_transmittance"
|
||||
].fillna(eval(f"{component}_uvalue"))
|
||||
|
||||
self.df = self.df.drop(
|
||||
columns=["floor_type", "wall_type", "walls_clean_description"]
|
||||
)
|
||||
|
||||
def _adjust_assumed_values_in_wall_descriptions(self):
|
||||
"""
|
||||
Strip out assumed values for all wall descriptions
|
||||
"""
|
||||
for col in ["walls_clean_description"]:
|
||||
self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip()
|
||||
|
||||
def _clean_efficiency_variables(self):
|
||||
"""
|
||||
These is scope to clean this by the model per corresponding description.
|
||||
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
|
||||
fill in the missing values with this.
|
||||
When looking at this initially, there are a large volume of records with missing energy efficiency
|
||||
values and therefore a simpler approach was taken just to test including these variables
|
||||
:param df:
|
||||
:return:
|
||||
"""
|
||||
|
||||
missings = pd.isnull(self.df).sum()
|
||||
missings = missings[missings >= 1]
|
||||
|
||||
if len(missings) == 0:
|
||||
return
|
||||
|
||||
# Make sure they are all efficiency columns
|
||||
if any(~missings.index.str.contains("energy_eff")):
|
||||
raise ValueError("Non efficiency columns are missing")
|
||||
|
||||
for m in missings.index:
|
||||
column_index = self.df[m].isna()
|
||||
self.df.loc[column_index, m] = "NO_RATING"
|
||||
|
||||
def _null_validation(self, information: str):
|
||||
print(f"Null validation after {information}")
|
||||
if pd.isnull(self.df).sum().sum():
|
||||
raise ValueError(f"Null values found in dataset, after step {information}")
|
||||
|
||||
def _expand_description_to_features(self, cleaned_lookup: dict):
|
||||
"""
|
||||
This method will merge on the cleaned lookup table and ensure that the building fabric in the
|
||||
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
|
||||
possible dataset.
|
||||
# We look for key building fabric features that have changed from one EPC to the next.
|
||||
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
|
||||
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
|
||||
# is low
|
||||
# We also replace descriptions with their cleaned variants
|
||||
"""
|
||||
|
||||
cols_to_drop = {
|
||||
"walls": [
|
||||
# We need to cleaned descriptions for pulling out u-values
|
||||
"original_description",
|
||||
"thermal_transmittance_unit",
|
||||
# Re remove the is_assumed columns
|
||||
"is_assumed",
|
||||
],
|
||||
"floor": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
"thermal_transmittance_unit",
|
||||
"no_data",
|
||||
"is_assumed",
|
||||
],
|
||||
"roof": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
"thermal_transmittance_unit",
|
||||
"is_assumed",
|
||||
"is_valid",
|
||||
],
|
||||
"hotwater": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
"assumed",
|
||||
],
|
||||
"mainheat": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
"has_assumed",
|
||||
],
|
||||
"mainheatcont": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
],
|
||||
"windows": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
|
||||
"has_glazing",
|
||||
"glazing_coverage",
|
||||
"no_data",
|
||||
],
|
||||
"main-fuel": [
|
||||
"original_description",
|
||||
"clean_description",
|
||||
],
|
||||
}
|
||||
|
||||
components_to_expand = cols_to_drop.keys()
|
||||
|
||||
for component in components_to_expand:
|
||||
|
||||
# TODO: change cleaned dataframe to have underscores instead of dashes
|
||||
if component == "main-fuel":
|
||||
cleaned_key = "main-fuel"
|
||||
left_on_key = "main_fuel"
|
||||
original_cols = ["main_fuel"]
|
||||
else:
|
||||
cleaned_key = f"{component}-description"
|
||||
left_on_key = f"{component}_description"
|
||||
original_cols = [f"{component}_description"]
|
||||
|
||||
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
|
||||
|
||||
expanded_df = self.df.merge(
|
||||
cleaned_lookup_df_for_key,
|
||||
how="left",
|
||||
left_on=left_on_key,
|
||||
right_on="original_description",
|
||||
)
|
||||
|
||||
# Drop original cols and cols to drop
|
||||
expanded_df = expanded_df.drop(
|
||||
columns=cols_to_drop[component] + original_cols
|
||||
)
|
||||
|
||||
# Rename columns to component specific names, if they have not been dropped
|
||||
expanded_df = expanded_df.rename(
|
||||
columns={
|
||||
"insulation_thickness": f"{component}_insulation_thickness",
|
||||
"thermal_transmittance": f"{component}_thermal_transmittance",
|
||||
"tariff_type": f"{component}_tariff_type",
|
||||
"clean_description": f"{component}_clean_description",
|
||||
}
|
||||
)
|
||||
self.df = expanded_df
|
||||
|
||||
# We don't need any lighting specific cleaning, we just drop the original description as we use
|
||||
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
|
||||
self.df = self.df.drop(columns=["lighting_description"])
|
||||
|
||||
# def __add__(self, other) -> "NewDataset":
|
||||
# if not isinstance(other, NewDataset):
|
||||
# raise TypeError("Addition can only be performed with another instance of ScoringDataset")
|
||||
# return NewDataset(self.datasets + other.datasets)
|
||||
|
||||
# def __radd__(self, other):
|
||||
# """
|
||||
# Required for sum() to work
|
||||
# """
|
||||
# if isinstance(other, int):
|
||||
# return self
|
||||
# else:
|
||||
# return self.__add__(other)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import multiprocessing as mp
|
|||
|
||||
from etl.epc.DataProcessor import EPCDataProcessor
|
||||
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
|
||||
from etl.epc.Dataset import TrainingDataset
|
||||
from etl.epc.Dataset import TrainingDataset, RecordDataset
|
||||
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
|
||||
from etl.epc.settings import (
|
||||
MANDATORY_FIXED_FEATURES,
|
||||
|
|
@ -26,8 +26,8 @@ from etl.epc.settings import (
|
|||
|
||||
# TODO: change in setting file
|
||||
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
|
||||
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
|
||||
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
|
||||
LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
|
||||
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
|
||||
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
|
||||
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
|
||||
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
|
||||
|
|
@ -64,6 +64,12 @@ def get_cleaned_description_mapping():
|
|||
|
||||
clean_lookup = get_cleaned_description_mapping()
|
||||
|
||||
# import pickle
|
||||
# with open("./clean_lookup.pkl", "wb") as f:
|
||||
# pickle.dump(clean_lookup, f)
|
||||
|
||||
# clean_lookup = pickle.load(open("./clean_lookup.pkl", "rb"))
|
||||
|
||||
|
||||
class EPCPipeline:
|
||||
"""
|
||||
|
|
@ -130,9 +136,66 @@ class EPCPipeline:
|
|||
self.run_training_dataset_pipeline()
|
||||
elif self.run_mode == "newdata":
|
||||
self.run_newdata_dataset_pipeline()
|
||||
elif self.run_mode == "record":
|
||||
self.run_record_dataset_pipeline()
|
||||
else:
|
||||
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
|
||||
|
||||
def run_record_dataset_pipeline(self):
|
||||
"""
|
||||
Running pipeline with just the EPCRecords
|
||||
"""
|
||||
|
||||
if self.directories is None:
|
||||
raise ValueError(
|
||||
"Directories not specified - Unable to run Training pipeline"
|
||||
)
|
||||
|
||||
for directory in tqdm(self.directories):
|
||||
|
||||
filepath = directory / self.epc_local_file
|
||||
self.epc_data_processor.prepare_data(filepath=filepath)
|
||||
|
||||
constituency_data = self.epc_data_processor.data
|
||||
self.compiled_cleaning_averages.append(
|
||||
self.epc_data_processor.cleaning_averages
|
||||
)
|
||||
|
||||
# TODO: integrate with EPCRecord
|
||||
record_dataset = constituency_data[
|
||||
["uprn"]
|
||||
+ [RDSAP_RESPONSE]
|
||||
+ VARIABLE_DATA_FEATURES
|
||||
+ MANDATORY_FIXED_FEATURES
|
||||
+ LATEST_FIELD
|
||||
].rename(columns={RDSAP_RESPONSE: "sap"})
|
||||
|
||||
constituency_dataset = RecordDataset(
|
||||
datasets=record_dataset, cleaned_lookup=clean_lookup
|
||||
)
|
||||
|
||||
self.compiled_dataset = pd.concat(
|
||||
[self.compiled_dataset, constituency_dataset.df]
|
||||
)
|
||||
|
||||
save_dataframe_to_s3_parquet(
|
||||
df=self.compiled_dataset,
|
||||
bucket_name=self.epc_bucket_name,
|
||||
file_key=self.epc_compiled_dataset_key,
|
||||
)
|
||||
|
||||
save_dataframe_to_s3_parquet(
|
||||
df=pd.DataFrame(self.compiled_all_equal_rows),
|
||||
bucket_name=self.epc_bucket_name,
|
||||
file_key=self.epc_all_equal_rows_key,
|
||||
)
|
||||
|
||||
save_dataframe_to_s3_parquet(
|
||||
df=pd.concat(self.compiled_cleaning_averages),
|
||||
bucket_name=self.epc_bucket_name,
|
||||
file_key=self.epc_cleaning_dataset_key,
|
||||
)
|
||||
|
||||
def run_newdata_dataset_pipeline(self):
|
||||
"""
|
||||
Main function to run the newdata pipeline
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ def main():
|
|||
"""
|
||||
|
||||
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
|
||||
# directories = directories[0:3]
|
||||
# directories = directories[202:203]
|
||||
|
||||
epc_pipeline = EPCPipeline(
|
||||
directories=directories,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue