still testing non null stuff

This commit is contained in:
Michael Duong 2024-04-26 10:13:09 +01:00
parent bddad3fab3
commit 7125eb413c
5 changed files with 518 additions and 11 deletions

View file

@ -644,7 +644,7 @@ class EPCDataProcessor:
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(
self, epc_minimum_count: int = 1, ignore_step: bool = False
self, epc_minimum_count: int = 2, ignore_step: bool = False
) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
@ -661,7 +661,7 @@ class EPCDataProcessor:
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
counts = counts[counts["count"] >= epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(
@ -802,6 +802,9 @@ class EPCDataProcessor:
# We remap zero values to None
self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None
# We remove EPCs with zero floor area
self.data = self.data.loc[self.data["TOTAL_FLOOR_AREA"] != 0]
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
@ -825,7 +828,8 @@ class EPCDataProcessor:
We fill photo supply with zeros where it's missing
"""
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
self.data.loc[self.data["PHOTO_SUPPLY"].isnull(), "PHOTO_SUPPLY"] = 0
# self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
@staticmethod
def apply_averages_cleaning(

View file

@ -834,3 +834,383 @@ class NewDataset(BaseDataset):
return self
else:
return self.__add__(other)
class RecordDataset(BaseDataset):
"""
A collection of EPCRecrods can be combined into a Dataset.
"""
def __init__(self, datasets: pd.DataFrame, cleaned_lookup: dict) -> None:
# self.pipeline_steps = self.pipeline_factory("newdata")
self.datasets = datasets
self.df = datasets
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
self._expand_description_to_features(cleaned_lookup)
self._adjust_assumed_values_in_wall_descriptions()
self._generate_u_values_from_features()
# # # TODO: For some of the features that we clean, we have either a true, false or possibly null value
# # # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# # # need to
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
# # self._remove_abnormal_change_in_floor_area()
self._ensure_numeric()
def _ensure_numeric(self):
"""
Ensure that all columns are numeric
"""
# TODO: move into EPCRecord record
uvalue_columns = [
col for col in self.df.columns if "thermal_transmittance" in col
]
for uvalue_col in uvalue_columns:
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
def _clean_missing_values(self, ignore_cols=None):
missings = pd.isnull(self.df).sum()
missings = missings[missings > 0]
if ignore_cols:
missings = missings[~missings.index.isin(ignore_cols)]
for col in missings.index:
unique_values = self.df[col].unique()
if True in unique_values or False in unique_values:
self.df[col] = self.df[col].fillna(False)
if "none" in unique_values:
self.df[col] = self.df[col].fillna("none")
else:
self.df[col] = self.df[col].fillna("Unknown")
@staticmethod
def _lambda_function_to_generate_roof_uvalue(row, is_end=False):
"""
Using the apply method, use the get_roof_u_value method to generate the u-value
"""
col_name = (
"roof_insulation_thickness"
if not is_end
else "roof_insulation_thickness_ending"
)
if row["has_dwelling_above"]:
if (row["roof_thermal_transmittance"] != 0) & (
not pd.isnull(row["roof_thermal_transmittance"])
):
raise ValueError("Should have 0 u-value for roof")
return get_roof_u_value(
insulation_thickness=row[col_name],
has_dwelling_above=row["has_dwelling_above"],
is_loft=row["is_loft"],
is_roof_room=row["is_roof_room"],
is_thatched=row["is_thatched"],
is_flat=row["is_flat"],
is_pitched=row["is_pitched"],
is_at_rafters=row["is_at_rafters"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
)
@staticmethod
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
description_col_name = (
"walls_clean_description"
if not is_end
else "walls_clean_description_ending"
)
thermal_transistance_col_name = (
"walls_thermal_transmittance"
if not is_end
else "walls_thermal_transmittance_ending"
)
if pd.isnull(row[thermal_transistance_col_name]):
output = get_wall_u_value(
clean_description=row[description_col_name],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
)
else:
output = row[thermal_transistance_col_name]
return output
@staticmethod
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
"""
Using the apply method, use the get_floor_u_value method to generate the u-value
"""
floor_thermal_col_name = (
"floor_thermal_transmittance"
if not is_end
else "floor_thermal_transmittance_ending"
)
if row["another_property_below"]:
if (row["floor_thermal_transmittance"] != 0) & (
not pd.isnull(row["floor_thermal_transmittance"])
):
raise ValueError("Should have 0 u-value for floor")
return 0
else:
uvalue = row[floor_thermal_col_name]
if pd.isnull(uvalue):
insulation_col_name = (
"floor_insulation_thickness"
if not is_end
else "floor_insulation_thickness_ending"
)
floor_area_col_name = (
"estimated_perimeter" if not is_end else "estimated_perimeter_ending"
)
perimeter_col_name = (
"total_floor_area" if not is_end else "total_floor_area_ending"
)
uvalue = get_floor_u_value(
floor_type=row["floor_type"],
perimeter=row[floor_area_col_name],
area=row[perimeter_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
)
return uvalue
def _generate_u_values_from_features(self):
"""
Generate u-values from the features
"""
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
walls_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1
)
walls_uvalue = self.df["walls_thermal_transmittance"].fillna(walls_uvalue)
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
roof_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1
)
roof_uvalue = self.df["roof_thermal_transmittance"].fillna(roof_uvalue)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
self.df["estimated_perimeter"] = self.df.apply(
lambda row: estimate_perimeter(
row["total_floor_area"], row["number_habitable_rooms"]
),
axis=1,
)
self.df["floor_type"] = self.df["is_suspended"].replace(
{True: "suspended", False: "solid"}
)
self.df["wall_type"] = self.df.apply(
lambda row: get_wall_type(
is_cavity_wall=row["is_cavity_wall"],
is_solid_brick=row["is_solid_brick"],
is_timber_frame=row["is_timber_frame"],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_cob=row["is_cob"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
is_system_built=row["is_system_built"],
is_park_home=row["is_park_home"],
),
axis=1,
)
floor_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1
)
floor_uvalue = self.df["floor_thermal_transmittance"].fillna(floor_uvalue)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[
f"{component}_thermal_transmittance"
].fillna(eval(f"{component}_uvalue"))
self.df = self.df.drop(
columns=["floor_type", "wall_type", "walls_clean_description"]
)
def _adjust_assumed_values_in_wall_descriptions(self):
"""
Strip out assumed values for all wall descriptions
"""
for col in ["walls_clean_description"]:
self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip()
def _clean_efficiency_variables(self):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
fill in the missing values with this.
When looking at this initially, there are a large volume of records with missing energy efficiency
values and therefore a simpler approach was taken just to test including these variables
:param df:
:return:
"""
missings = pd.isnull(self.df).sum()
missings = missings[missings >= 1]
if len(missings) == 0:
return
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")
for m in missings.index:
column_index = self.df[m].isna()
self.df.loc[column_index, m] = "NO_RATING"
def _null_validation(self, information: str):
print(f"Null validation after {information}")
if pd.isnull(self.df).sum().sum():
raise ValueError(f"Null values found in dataset, after step {information}")
def _expand_description_to_features(self, cleaned_lookup: dict):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
"original_description",
"thermal_transmittance_unit",
# Re remove the is_assumed columns
"is_assumed",
],
"floor": [
"original_description",
"clean_description",
"thermal_transmittance_unit",
"no_data",
"is_assumed",
],
"roof": [
"original_description",
"clean_description",
"thermal_transmittance_unit",
"is_assumed",
"is_valid",
],
"hotwater": [
"original_description",
"clean_description",
"assumed",
],
"mainheat": [
"original_description",
"clean_description",
"has_assumed",
],
"mainheatcont": [
"original_description",
"clean_description",
],
"windows": [
"original_description",
"clean_description",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing",
"glazing_coverage",
"no_data",
],
"main-fuel": [
"original_description",
"clean_description",
],
}
components_to_expand = cols_to_drop.keys()
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
left_on_key = "main_fuel"
original_cols = ["main_fuel"]
else:
cleaned_key = f"{component}-description"
left_on_key = f"{component}_description"
original_cols = [f"{component}_description"]
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
expanded_df = self.df.merge(
cleaned_lookup_df_for_key,
how="left",
left_on=left_on_key,
right_on="original_description",
)
# Drop original cols and cols to drop
expanded_df = expanded_df.drop(
columns=cols_to_drop[component] + original_cols
)
# Rename columns to component specific names, if they have not been dropped
expanded_df = expanded_df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"thermal_transmittance": f"{component}_thermal_transmittance",
"tariff_type": f"{component}_tariff_type",
"clean_description": f"{component}_clean_description",
}
)
self.df = expanded_df
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
self.df = self.df.drop(columns=["lighting_description"])
# def __add__(self, other) -> "NewDataset":
# if not isinstance(other, NewDataset):
# raise TypeError("Addition can only be performed with another instance of ScoringDataset")
# return NewDataset(self.datasets + other.datasets)
# def __radd__(self, other):
# """
# Required for sum() to work
# """
# if isinstance(other, int):
# return self
# else:
# return self.__add__(other)

View file

@ -9,7 +9,7 @@ import multiprocessing as mp
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from etl.epc.Dataset import TrainingDataset
from etl.epc.Dataset import TrainingDataset, RecordDataset
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
@ -26,8 +26,8 @@ from etl.epc.settings import (
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
@ -83,9 +83,9 @@ class EPCPipeline:
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_no_cleaning.parquet",
epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_no_cleaning.parquet",
epc_compiled_dataset_key="sap_change_model/{}/dataset_no_cleaning.parquet",
epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_no_cleaning_records.parquet",
epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_no_cleaning_records.parquet",
epc_compiled_dataset_key="sap_change_model/{}/dataset_no_cleaning_records.parquet",
use_parallel=False,
):
"""
@ -124,9 +124,131 @@ class EPCPipeline:
self.run_training_dataset_pipeline()
elif self.run_mode == "newdata":
self.run_newdata_dataset_pipeline()
elif self.run_mode == "record":
self.run_record_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def task(self, directory: str) -> pd.DataFrame:
"""
Dummy task to enable parallel processing
"""
filepath = directory / self.epc_local_file
print(filepath)
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
# self.compiled_cleaning_averages.append(
# self.epc_data_processor.cleaning_averages
# )
if len(constituency_data) == 0:
return None
cleaned_data = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
property_data = property_data.sort_values("lodgement_date")
property_data["sap_starting"] = property_data[
"current_energy_efficiency"
].shift(1)
output = property_data[~property_data["sap_starting"].isnull()]
cleaned_data.append(output)
constituency_data = pd.concat(cleaned_data).reset_index(drop=True)
# TODO: integrate with EPCRecord
record_dataset = constituency_data[
["uprn"]
+ VARIABLE_DATA_FEATURES
+ MANDATORY_FIXED_FEATURES
+ LATEST_FIELD
+ ["sap_starting"]
].rename(columns={RDSAP_RESPONSE: "sap_ending"})
constituency_dataset = RecordDataset(
datasets=record_dataset, cleaned_lookup=clean_lookup
)
return constituency_dataset
self.compiled_dataset = pd.concat(
[self.compiled_dataset, constituency_dataset.df]
)
def run_record_dataset_pipeline(self):
"""
Running pipeline with just the EPCRecords
"""
if self.directories is None:
raise ValueError(
"Directories not specified - Unable to run Training pipeline"
)
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self.task, self.directories), total=len(self.directories)
),
)
# for directory in tqdm(self.directories):
# self.task(directory)
for result in tqdm(results):
if result is None:
continue
self.compiled_dataset = pd.concat([self.compiled_dataset, result.df])
self.compiled_dataset = self.compiled_dataset.reset_index(drop=True)
# for directory in tqdm(self.directories):
# filepath = directory / self.epc_local_file
# self.epc_data_processor.prepare_data(filepath=filepath)
# constituency_data = self.epc_data_processor.data
# self.compiled_cleaning_averages.append(
# self.epc_data_processor.cleaning_averages
# )
# # TODO: integrate with EPCRecord
# record_dataset = constituency_data[
# ["uprn"]
# + [RDSAP_RESPONSE]
# + VARIABLE_DATA_FEATURES
# + MANDATORY_FIXED_FEATURES
# + LATEST_FIELD
# ].rename(columns={RDSAP_RESPONSE: "sap"})
# constituency_dataset = RecordDataset(
# datasets=record_dataset, cleaned_lookup=clean_lookup
# )
# self.compiled_dataset = pd.concat(
# [self.compiled_dataset, constituency_dataset.df]
# )
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
bucket_name=self.epc_bucket_name,
file_key=self.epc_compiled_dataset_key,
)
# save_dataframe_to_s3_parquet(
# df=pd.DataFrame(self.compiled_all_equal_rows),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_all_equal_rows_key,
# )
# save_dataframe_to_s3_parquet(
# df=pd.concat(self.compiled_cleaning_averages),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_cleaning_dataset_key,
# )
def run_newdata_dataset_pipeline(self):
"""
Main function to run the newdata pipeline

View file

@ -12,10 +12,11 @@ def main():
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[76:85]
# directories = directories[40:50]
epc_pipeline = EPCPipeline(
directories=directories,
run_mode="record",
use_parallel=True,
epc_data_processor=EPCDataProcessor(run_mode="training"),
)

View file

@ -234,7 +234,7 @@ BUILT_FORM_REMAP = {
DATA_PROCESSOR_SETTINGS = {
"low_memory": False,
"epc_minimum_count": 1,
"epc_minimum_count": 2,
"column_mappings": {"UPRN": [int, str]},
}