diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 2494497d..5497d671 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -644,7 +644,7 @@ class EPCDataProcessor: self.cleaning_averages = cleaning_averages_filled def retain_multiple_epc_properties( - self, epc_minimum_count: int = 1, ignore_step: bool = False + self, epc_minimum_count: int = 2, ignore_step: bool = False ) -> None: """ Reduce the data futher by keeping only datasets with multiple epcs @@ -661,7 +661,7 @@ class EPCDataProcessor: counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs - counts = counts[counts["count"] > epc_minimum_count] + counts = counts[counts["count"] >= epc_minimum_count] self.data = pd.merge(self.data, counts, on="UPRN") def recast_df_columns( @@ -802,6 +802,9 @@ class EPCDataProcessor: # We remap zero values to None self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None + # We remove EPCs with zero floor area + self.data = self.data.loc[self.data["TOTAL_FLOOR_AREA"] != 0] + def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 @@ -825,7 +828,8 @@ class EPCDataProcessor: We fill photo supply with zeros where it's missing """ - self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) + self.data.loc[self.data["PHOTO_SUPPLY"].isnull(), "PHOTO_SUPPLY"] = 0 + # self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) @staticmethod def apply_averages_cleaning( diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 7f989633..6368497c 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -834,3 +834,383 @@ class NewDataset(BaseDataset): return self else: return self.__add__(other) + + +class RecordDataset(BaseDataset): + """ + A collection of EPCRecrods can be combined into a Dataset. + """ + + def __init__(self, datasets: pd.DataFrame, cleaned_lookup: dict) -> None: + # self.pipeline_steps = self.pipeline_factory("newdata") + self.datasets = datasets + self.df = datasets + + self._clean_efficiency_variables() + self._null_validation(information="Clean Efficiency Variables") + self._expand_description_to_features(cleaned_lookup) + self._adjust_assumed_values_in_wall_descriptions() + self._generate_u_values_from_features() + # # # TODO: For some of the features that we clean, we have either a true, false or possibly null value + # # # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't + # # # need to + self._clean_missing_values() + self._null_validation(information="Clean Missing Values") + # # self._remove_abnormal_change_in_floor_area() + self._ensure_numeric() + + def _ensure_numeric(self): + """ + Ensure that all columns are numeric + """ + # TODO: move into EPCRecord record + uvalue_columns = [ + col for col in self.df.columns if "thermal_transmittance" in col + ] + for uvalue_col in uvalue_columns: + self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) + + def _clean_missing_values(self, ignore_cols=None): + missings = pd.isnull(self.df).sum() + missings = missings[missings > 0] + + if ignore_cols: + missings = missings[~missings.index.isin(ignore_cols)] + + for col in missings.index: + unique_values = self.df[col].unique() + if True in unique_values or False in unique_values: + self.df[col] = self.df[col].fillna(False) + if "none" in unique_values: + self.df[col] = self.df[col].fillna("none") + else: + self.df[col] = self.df[col].fillna("Unknown") + + @staticmethod + def _lambda_function_to_generate_roof_uvalue(row, is_end=False): + """ + Using the apply method, use the get_roof_u_value method to generate the u-value + """ + + col_name = ( + "roof_insulation_thickness" + if not is_end + else "roof_insulation_thickness_ending" + ) + + if row["has_dwelling_above"]: + if (row["roof_thermal_transmittance"] != 0) & ( + not pd.isnull(row["roof_thermal_transmittance"]) + ): + raise ValueError("Should have 0 u-value for roof") + + return get_roof_u_value( + insulation_thickness=row[col_name], + has_dwelling_above=row["has_dwelling_above"], + is_loft=row["is_loft"], + is_roof_room=row["is_roof_room"], + is_thatched=row["is_thatched"], + is_flat=row["is_flat"], + is_pitched=row["is_pitched"], + is_at_rafters=row["is_at_rafters"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + ) + + @staticmethod + def _lambda_function_to_generate_wall_uvalue(row, is_end=False): + """ + Using the apply method, use the get_wall_u_value method to generate the u-value + """ + description_col_name = ( + "walls_clean_description" + if not is_end + else "walls_clean_description_ending" + ) + thermal_transistance_col_name = ( + "walls_thermal_transmittance" + if not is_end + else "walls_thermal_transmittance_ending" + ) + + if pd.isnull(row[thermal_transistance_col_name]): + output = get_wall_u_value( + clean_description=row[description_col_name], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + ) + else: + output = row[thermal_transistance_col_name] + + return output + + @staticmethod + def _lambda_function_to_generate_floor_uvalue(row, is_end=False): + """ + Using the apply method, use the get_floor_u_value method to generate the u-value + """ + + floor_thermal_col_name = ( + "floor_thermal_transmittance" + if not is_end + else "floor_thermal_transmittance_ending" + ) + + if row["another_property_below"]: + if (row["floor_thermal_transmittance"] != 0) & ( + not pd.isnull(row["floor_thermal_transmittance"]) + ): + raise ValueError("Should have 0 u-value for floor") + + return 0 + else: + uvalue = row[floor_thermal_col_name] + + if pd.isnull(uvalue): + + insulation_col_name = ( + "floor_insulation_thickness" + if not is_end + else "floor_insulation_thickness_ending" + ) + floor_area_col_name = ( + "estimated_perimeter" if not is_end else "estimated_perimeter_ending" + ) + perimeter_col_name = ( + "total_floor_area" if not is_end else "total_floor_area_ending" + ) + + uvalue = get_floor_u_value( + floor_type=row["floor_type"], + perimeter=row[floor_area_col_name], + area=row[perimeter_col_name], + insulation_thickness=row[insulation_col_name], + wall_type=row["wall_type"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + ) + + return uvalue + + def _generate_u_values_from_features(self): + """ + Generate u-values from the features + """ + + # ~~~~~~~~~~~~~~~~~~ + # Walls + # ~~~~~~~~~~~~~~~~~~ + + walls_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1 + ) + + walls_uvalue = self.df["walls_thermal_transmittance"].fillna(walls_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Roof + # ~~~~~~~~~~~~~~~~~~ + + roof_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1 + ) + + roof_uvalue = self.df["roof_thermal_transmittance"].fillna(roof_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Floor + # ~~~~~~~~~~~~~~~~~~ + + self.df["estimated_perimeter"] = self.df.apply( + lambda row: estimate_perimeter( + row["total_floor_area"], row["number_habitable_rooms"] + ), + axis=1, + ) + + self.df["floor_type"] = self.df["is_suspended"].replace( + {True: "suspended", False: "solid"} + ) + self.df["wall_type"] = self.df.apply( + lambda row: get_wall_type( + is_cavity_wall=row["is_cavity_wall"], + is_solid_brick=row["is_solid_brick"], + is_timber_frame=row["is_timber_frame"], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_cob=row["is_cob"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + is_system_built=row["is_system_built"], + is_park_home=row["is_park_home"], + ), + axis=1, + ) + + floor_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1 + ) + + floor_uvalue = self.df["floor_thermal_transmittance"].fillna(floor_uvalue) + + for component in ["walls", "roof", "floor"]: + self.df[f"{component}_thermal_transmittance"] = self.df[ + f"{component}_thermal_transmittance" + ].fillna(eval(f"{component}_uvalue")) + + self.df = self.df.drop( + columns=["floor_type", "wall_type", "walls_clean_description"] + ) + + def _adjust_assumed_values_in_wall_descriptions(self): + """ + Strip out assumed values for all wall descriptions + """ + for col in ["walls_clean_description"]: + self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip() + + def _clean_efficiency_variables(self): + """ + These is scope to clean this by the model per corresponding description. + E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and + fill in the missing values with this. + When looking at this initially, there are a large volume of records with missing energy efficiency + values and therefore a simpler approach was taken just to test including these variables + :param df: + :return: + """ + + missings = pd.isnull(self.df).sum() + missings = missings[missings >= 1] + + if len(missings) == 0: + return + + # Make sure they are all efficiency columns + if any(~missings.index.str.contains("energy_eff")): + raise ValueError("Non efficiency columns are missing") + + for m in missings.index: + column_index = self.df[m].isna() + self.df.loc[column_index, m] = "NO_RATING" + + def _null_validation(self, information: str): + print(f"Null validation after {information}") + if pd.isnull(self.df).sum().sum(): + raise ValueError(f"Null values found in dataset, after step {information}") + + def _expand_description_to_features(self, cleaned_lookup: dict): + """ + This method will merge on the cleaned lookup table and ensure that the building fabric in the + starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest + possible dataset. + # We look for key building fabric features that have changed from one EPC to the next. + # if, for example, we see that a home has gone from being a cavity wall to a solid wall, we + # remove this record, as it indicates that the quality of the EPC conducted in the first instance + # is low + # We also replace descriptions with their cleaned variants + """ + + cols_to_drop = { + "walls": [ + # We need to cleaned descriptions for pulling out u-values + "original_description", + "thermal_transmittance_unit", + # Re remove the is_assumed columns + "is_assumed", + ], + "floor": [ + "original_description", + "clean_description", + "thermal_transmittance_unit", + "no_data", + "is_assumed", + ], + "roof": [ + "original_description", + "clean_description", + "thermal_transmittance_unit", + "is_assumed", + "is_valid", + ], + "hotwater": [ + "original_description", + "clean_description", + "assumed", + ], + "mainheat": [ + "original_description", + "clean_description", + "has_assumed", + ], + "mainheatcont": [ + "original_description", + "clean_description", + ], + "windows": [ + "original_description", + "clean_description", + # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature + "has_glazing", + "glazing_coverage", + "no_data", + ], + "main-fuel": [ + "original_description", + "clean_description", + ], + } + + components_to_expand = cols_to_drop.keys() + + for component in components_to_expand: + + # TODO: change cleaned dataframe to have underscores instead of dashes + if component == "main-fuel": + cleaned_key = "main-fuel" + left_on_key = "main_fuel" + original_cols = ["main_fuel"] + else: + cleaned_key = f"{component}-description" + left_on_key = f"{component}_description" + original_cols = [f"{component}_description"] + + cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) + + expanded_df = self.df.merge( + cleaned_lookup_df_for_key, + how="left", + left_on=left_on_key, + right_on="original_description", + ) + + # Drop original cols and cols to drop + expanded_df = expanded_df.drop( + columns=cols_to_drop[component] + original_cols + ) + + # Rename columns to component specific names, if they have not been dropped + expanded_df = expanded_df.rename( + columns={ + "insulation_thickness": f"{component}_insulation_thickness", + "thermal_transmittance": f"{component}_thermal_transmittance", + "tariff_type": f"{component}_tariff_type", + "clean_description": f"{component}_clean_description", + } + ) + self.df = expanded_df + + # We don't need any lighting specific cleaning, we just drop the original description as we use + # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING + self.df = self.df.drop(columns=["lighting_description"]) + + # def __add__(self, other) -> "NewDataset": + # if not isinstance(other, NewDataset): + # raise TypeError("Addition can only be performed with another instance of ScoringDataset") + # return NewDataset(self.datasets + other.datasets) + + # def __radd__(self, other): + # """ + # Required for sum() to work + # """ + # if isinstance(other, int): + # return self + # else: + # return self.__add__(other) diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 665d9320..01749046 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -9,7 +9,7 @@ import multiprocessing as mp from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Record import EPCRecord, EPCDifferenceRecord -from etl.epc.Dataset import TrainingDataset +from etl.epc.Dataset import TrainingDataset, RecordDataset from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 from etl.epc.settings import ( MANDATORY_FIXED_FEATURES, @@ -26,8 +26,8 @@ from etl.epc.settings import ( # TODO: change in setting file MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] -# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] -LATEST_FIELD = [x.lower() for x in LATEST_FIELD] +LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] +# LATEST_FIELD = [x.lower() for x in LATEST_FIELD] COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] RDSAP_RESPONSE = RDSAP_RESPONSE.lower() HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() @@ -83,9 +83,9 @@ class EPCPipeline: run_mode="training", epc_local_file="certificates.csv", epc_bucket_name="retrofit-data-dev", - epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_no_cleaning.parquet", - epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_no_cleaning.parquet", - epc_compiled_dataset_key="sap_change_model/{}/dataset_no_cleaning.parquet", + epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_no_cleaning_records.parquet", + epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_no_cleaning_records.parquet", + epc_compiled_dataset_key="sap_change_model/{}/dataset_no_cleaning_records.parquet", use_parallel=False, ): """ @@ -124,9 +124,131 @@ class EPCPipeline: self.run_training_dataset_pipeline() elif self.run_mode == "newdata": self.run_newdata_dataset_pipeline() + elif self.run_mode == "record": + self.run_record_dataset_pipeline() else: raise ValueError("Run mode defined needs to be in 'training' or 'newdata'") + def task(self, directory: str) -> pd.DataFrame: + """ + Dummy task to enable parallel processing + """ + filepath = directory / self.epc_local_file + + print(filepath) + + self.epc_data_processor.prepare_data(filepath=filepath) + + constituency_data = self.epc_data_processor.data + # self.compiled_cleaning_averages.append( + # self.epc_data_processor.cleaning_averages + # ) + + if len(constituency_data) == 0: + return None + + cleaned_data = [] + + for uprn, property_data in constituency_data.groupby("uprn", observed=True): + property_data = property_data.sort_values("lodgement_date") + property_data["sap_starting"] = property_data[ + "current_energy_efficiency" + ].shift(1) + output = property_data[~property_data["sap_starting"].isnull()] + cleaned_data.append(output) + + constituency_data = pd.concat(cleaned_data).reset_index(drop=True) + + # TODO: integrate with EPCRecord + record_dataset = constituency_data[ + ["uprn"] + + VARIABLE_DATA_FEATURES + + MANDATORY_FIXED_FEATURES + + LATEST_FIELD + + ["sap_starting"] + ].rename(columns={RDSAP_RESPONSE: "sap_ending"}) + + constituency_dataset = RecordDataset( + datasets=record_dataset, cleaned_lookup=clean_lookup + ) + + return constituency_dataset + + self.compiled_dataset = pd.concat( + [self.compiled_dataset, constituency_dataset.df] + ) + + def run_record_dataset_pipeline(self): + """ + Running pipeline with just the EPCRecords + """ + + if self.directories is None: + raise ValueError( + "Directories not specified - Unable to run Training pipeline" + ) + + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self.task, self.directories), total=len(self.directories) + ), + ) + # for directory in tqdm(self.directories): + # self.task(directory) + + for result in tqdm(results): + if result is None: + continue + self.compiled_dataset = pd.concat([self.compiled_dataset, result.df]) + + self.compiled_dataset = self.compiled_dataset.reset_index(drop=True) + + # for directory in tqdm(self.directories): + + # filepath = directory / self.epc_local_file + # self.epc_data_processor.prepare_data(filepath=filepath) + + # constituency_data = self.epc_data_processor.data + # self.compiled_cleaning_averages.append( + # self.epc_data_processor.cleaning_averages + # ) + + # # TODO: integrate with EPCRecord + # record_dataset = constituency_data[ + # ["uprn"] + # + [RDSAP_RESPONSE] + # + VARIABLE_DATA_FEATURES + # + MANDATORY_FIXED_FEATURES + # + LATEST_FIELD + # ].rename(columns={RDSAP_RESPONSE: "sap"}) + + # constituency_dataset = RecordDataset( + # datasets=record_dataset, cleaned_lookup=clean_lookup + # ) + + # self.compiled_dataset = pd.concat( + # [self.compiled_dataset, constituency_dataset.df] + # ) + + save_dataframe_to_s3_parquet( + df=self.compiled_dataset, + bucket_name=self.epc_bucket_name, + file_key=self.epc_compiled_dataset_key, + ) + + # save_dataframe_to_s3_parquet( + # df=pd.DataFrame(self.compiled_all_equal_rows), + # bucket_name=self.epc_bucket_name, + # file_key=self.epc_all_equal_rows_key, + # ) + + # save_dataframe_to_s3_parquet( + # df=pd.concat(self.compiled_cleaning_averages), + # bucket_name=self.epc_bucket_name, + # file_key=self.epc_cleaning_dataset_key, + # ) + def run_newdata_dataset_pipeline(self): """ Main function to run the newdata pipeline diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 506c32b3..e25c73a4 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -12,10 +12,11 @@ def main(): """ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - # directories = directories[76:85] + # directories = directories[40:50] epc_pipeline = EPCPipeline( directories=directories, + run_mode="record", use_parallel=True, epc_data_processor=EPCDataProcessor(run_mode="training"), ) diff --git a/etl/epc/settings.py b/etl/epc/settings.py index 18dbaa7c..d5937062 100644 --- a/etl/epc/settings.py +++ b/etl/epc/settings.py @@ -234,7 +234,7 @@ BUILT_FORM_REMAP = { DATA_PROCESSOR_SETTINGS = { "low_memory": False, - "epc_minimum_count": 1, + "epc_minimum_count": 2, "column_mappings": {"UPRN": [int, str]}, }