diff --git a/.gitignore b/.gitignore index 63884ad7..4a204ac3 100644 --- a/.gitignore +++ b/.gitignore @@ -268,4 +268,6 @@ adhoc adhoc/* etl-router-venv/ -refactor_datasets/ \ No newline at end of file +refactor_datasets/ +etl-router-*/ +.vscode/ \ No newline at end of file diff --git a/backend/Property.py b/backend/Property.py index 2e6cbbb6..12e2dbb1 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -355,7 +355,12 @@ class Property: output["internal_insulation_ending"] = True if recommendation["type"] == "cavity_wall_insulation": - output["is_filled_cavity_ending"] = True + output['is_filled_cavity_ending'] = True + + # TODO: perhaps detrimental + # When making a recommendation for the wall, we will also update the ventilation + # if output["mechanical_ventilation_ending"] == 'natural': + # output["mechanical_ventilation_ending"] = 'mechanical, extract only' else: if output["walls_thermal_transmittance_ending"] is None: diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index e897da78..23f5a371 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -810,27 +810,381 @@ class TrainingDataset(BaseDataset): # return self.__add__(other) -class NewDataset(BaseDataset): +class RecordDataset(BaseDataset): """ - A collection of EPCDifferenceRecords can be combined into a ScoringDataset. + A collection of EPCRecrods can be combined into a Dataset. """ - def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: + def __init__(self, datasets: pd.DataFrame, cleaned_lookup: dict) -> None: # self.pipeline_steps = self.pipeline_factory("newdata") self.datasets = datasets + self.df = datasets - def __add__(self, other) -> "NewDataset": - if not isinstance(other, NewDataset): - raise TypeError( - "Addition can only be performed with another instance of ScoringDataset" + self._clean_efficiency_variables() + self._null_validation(information="Clean Efficiency Variables") + self._expand_description_to_features(cleaned_lookup) + self._adjust_assumed_values_in_wall_descriptions() + self._generate_u_values_from_features() + # # # TODO: For some of the features that we clean, we have either a true, false or possibly null value + # # # Those nulls should be False. clean_missings_after_description_process handles this but shouldn't + # # # need to + self._clean_missing_values() + self._null_validation(information="Clean Missing Values") + # # self._remove_abnormal_change_in_floor_area() + self._ensure_numeric() + + def _ensure_numeric(self): + """ + Ensure that all columns are numeric + """ + # TODO: move into EPCRecord record + uvalue_columns = [ + col for col in self.df.columns if "thermal_transmittance" in col + ] + for uvalue_col in uvalue_columns: + self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) + + def _clean_missing_values(self, ignore_cols=None): + missings = pd.isnull(self.df).sum() + missings = missings[missings > 0] + + if ignore_cols: + missings = missings[~missings.index.isin(ignore_cols)] + + for col in missings.index: + unique_values = self.df[col].unique() + if True in unique_values or False in unique_values: + self.df[col] = self.df[col].fillna(False) + if "none" in unique_values: + self.df[col] = self.df[col].fillna("none") + else: + self.df[col] = self.df[col].fillna("Unknown") + + @staticmethod + def _lambda_function_to_generate_roof_uvalue(row, is_end=False): + """ + Using the apply method, use the get_roof_u_value method to generate the u-value + """ + + col_name = ( + "roof_insulation_thickness" + if not is_end + else "roof_insulation_thickness_ending" + ) + + if row["has_dwelling_above"]: + if (row["roof_thermal_transmittance"] != 0) & ( + not pd.isnull(row["roof_thermal_transmittance"]) + ): + raise ValueError("Should have 0 u-value for roof") + + return get_roof_u_value( + insulation_thickness=row[col_name], + has_dwelling_above=row["has_dwelling_above"], + is_loft=row["is_loft"], + is_roof_room=row["is_roof_room"], + is_thatched=row["is_thatched"], + is_flat=row["is_flat"], + is_pitched=row["is_pitched"], + is_at_rafters=row["is_at_rafters"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + ) + + @staticmethod + def _lambda_function_to_generate_wall_uvalue(row, is_end=False): + """ + Using the apply method, use the get_wall_u_value method to generate the u-value + """ + description_col_name = ( + "walls_clean_description" + if not is_end + else "walls_clean_description_ending" + ) + thermal_transistance_col_name = ( + "walls_thermal_transmittance" + if not is_end + else "walls_thermal_transmittance_ending" + ) + + if pd.isnull(row[thermal_transistance_col_name]): + output = get_wall_u_value( + clean_description=row[description_col_name], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], ) - return NewDataset(self.datasets + other.datasets) - - def __radd__(self, other): - """ - Required for sum() to work - """ - if isinstance(other, int): - return self else: - return self.__add__(other) + output = row[thermal_transistance_col_name] + + return output + + @staticmethod + def _lambda_function_to_generate_floor_uvalue(row, is_end=False): + """ + Using the apply method, use the get_floor_u_value method to generate the u-value + """ + + floor_thermal_col_name = ( + "floor_thermal_transmittance" + if not is_end + else "floor_thermal_transmittance_ending" + ) + + if row["another_property_below"]: + if (row["floor_thermal_transmittance"] != 0) & ( + not pd.isnull(row["floor_thermal_transmittance"]) + ): + raise ValueError("Should have 0 u-value for floor") + + return 0 + else: + uvalue = row[floor_thermal_col_name] + + if pd.isnull(uvalue): + + insulation_col_name = ( + "floor_insulation_thickness" + if not is_end + else "floor_insulation_thickness_ending" + ) + floor_area_col_name = ( + "estimated_perimeter" if not is_end else "estimated_perimeter_ending" + ) + perimeter_col_name = ( + "total_floor_area" if not is_end else "total_floor_area_ending" + ) + + uvalue = get_floor_u_value( + floor_type=row["floor_type"], + perimeter=row[floor_area_col_name], + area=row[perimeter_col_name], + insulation_thickness=row[insulation_col_name], + wall_type=row["wall_type"], + age_band=england_wales_age_band_lookup[row["construction_age_band"]], + ) + + return uvalue + + def _generate_u_values_from_features(self): + """ + Generate u-values from the features + """ + + # ~~~~~~~~~~~~~~~~~~ + # Walls + # ~~~~~~~~~~~~~~~~~~ + + walls_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1 + ) + + walls_uvalue = self.df["walls_thermal_transmittance"].fillna(walls_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Roof + # ~~~~~~~~~~~~~~~~~~ + + roof_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1 + ) + + roof_uvalue = self.df["roof_thermal_transmittance"].fillna(roof_uvalue) + + # ~~~~~~~~~~~~~~~~~~ + # Floor + # ~~~~~~~~~~~~~~~~~~ + + self.df["estimated_perimeter"] = self.df.apply( + lambda row: estimate_perimeter( + row["total_floor_area"], row["number_habitable_rooms"] + ), + axis=1, + ) + + self.df["floor_type"] = self.df["is_suspended"].replace( + {True: "suspended", False: "solid"} + ) + self.df["wall_type"] = self.df.apply( + lambda row: get_wall_type( + is_cavity_wall=row["is_cavity_wall"], + is_solid_brick=row["is_solid_brick"], + is_timber_frame=row["is_timber_frame"], + is_granite_or_whinstone=row["is_granite_or_whinstone"], + is_cob=row["is_cob"], + is_sandstone_or_limestone=row["is_sandstone_or_limestone"], + is_system_built=row["is_system_built"], + is_park_home=row["is_park_home"], + ), + axis=1, + ) + + floor_uvalue = self.df.apply( + lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1 + ) + + floor_uvalue = self.df["floor_thermal_transmittance"].fillna(floor_uvalue) + + for component in ["walls", "roof", "floor"]: + self.df[f"{component}_thermal_transmittance"] = self.df[ + f"{component}_thermal_transmittance" + ].fillna(eval(f"{component}_uvalue")) + + self.df = self.df.drop( + columns=["floor_type", "wall_type", "walls_clean_description"] + ) + + def _adjust_assumed_values_in_wall_descriptions(self): + """ + Strip out assumed values for all wall descriptions + """ + for col in ["walls_clean_description"]: + self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip() + + def _clean_efficiency_variables(self): + """ + These is scope to clean this by the model per corresponding description. + E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and + fill in the missing values with this. + When looking at this initially, there are a large volume of records with missing energy efficiency + values and therefore a simpler approach was taken just to test including these variables + :param df: + :return: + """ + + missings = pd.isnull(self.df).sum() + missings = missings[missings >= 1] + + if len(missings) == 0: + return + + # Make sure they are all efficiency columns + if any(~missings.index.str.contains("energy_eff")): + raise ValueError("Non efficiency columns are missing") + + for m in missings.index: + column_index = self.df[m].isna() + self.df.loc[column_index, m] = "NO_RATING" + + def _null_validation(self, information: str): + print(f"Null validation after {information}") + if pd.isnull(self.df).sum().sum(): + raise ValueError(f"Null values found in dataset, after step {information}") + + def _expand_description_to_features(self, cleaned_lookup: dict): + """ + This method will merge on the cleaned lookup table and ensure that the building fabric in the + starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest + possible dataset. + # We look for key building fabric features that have changed from one EPC to the next. + # if, for example, we see that a home has gone from being a cavity wall to a solid wall, we + # remove this record, as it indicates that the quality of the EPC conducted in the first instance + # is low + # We also replace descriptions with their cleaned variants + """ + + cols_to_drop = { + "walls": [ + # We need to cleaned descriptions for pulling out u-values + "original_description", + "thermal_transmittance_unit", + # Re remove the is_assumed columns + "is_assumed", + ], + "floor": [ + "original_description", + "clean_description", + "thermal_transmittance_unit", + "no_data", + "is_assumed", + ], + "roof": [ + "original_description", + "clean_description", + "thermal_transmittance_unit", + "is_assumed", + "is_valid", + ], + "hotwater": [ + "original_description", + "clean_description", + "assumed", + ], + "mainheat": [ + "original_description", + "clean_description", + "has_assumed", + ], + "mainheatcont": [ + "original_description", + "clean_description", + ], + "windows": [ + "original_description", + "clean_description", + # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature + "has_glazing", + "glazing_coverage", + "no_data", + ], + "main-fuel": [ + "original_description", + "clean_description", + ], + } + + components_to_expand = cols_to_drop.keys() + + for component in components_to_expand: + + # TODO: change cleaned dataframe to have underscores instead of dashes + if component == "main-fuel": + cleaned_key = "main-fuel" + left_on_key = "main_fuel" + original_cols = ["main_fuel"] + else: + cleaned_key = f"{component}-description" + left_on_key = f"{component}_description" + original_cols = [f"{component}_description"] + + cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) + + expanded_df = self.df.merge( + cleaned_lookup_df_for_key, + how="left", + left_on=left_on_key, + right_on="original_description", + ) + + # Drop original cols and cols to drop + expanded_df = expanded_df.drop( + columns=cols_to_drop[component] + original_cols + ) + + # Rename columns to component specific names, if they have not been dropped + expanded_df = expanded_df.rename( + columns={ + "insulation_thickness": f"{component}_insulation_thickness", + "thermal_transmittance": f"{component}_thermal_transmittance", + "tariff_type": f"{component}_tariff_type", + "clean_description": f"{component}_clean_description", + } + ) + self.df = expanded_df + + # We don't need any lighting specific cleaning, we just drop the original description as we use + # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING + self.df = self.df.drop(columns=["lighting_description"]) + + # def __add__(self, other) -> "NewDataset": + # if not isinstance(other, NewDataset): + # raise TypeError("Addition can only be performed with another instance of ScoringDataset") + # return NewDataset(self.datasets + other.datasets) + + # def __radd__(self, other): + # """ + # Required for sum() to work + # """ + # if isinstance(other, int): + # return self + # else: + # return self.__add__(other) diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index dcd5f6a7..7ba4d58f 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -9,7 +9,7 @@ import multiprocessing as mp from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Record import EPCRecord, EPCDifferenceRecord -from etl.epc.Dataset import TrainingDataset +from etl.epc.Dataset import TrainingDataset, RecordDataset from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 from etl.epc.settings import ( MANDATORY_FIXED_FEATURES, @@ -26,8 +26,8 @@ from etl.epc.settings import ( # TODO: change in setting file MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] -# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] -LATEST_FIELD = [x.lower() for x in LATEST_FIELD] +LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES] +# LATEST_FIELD = [x.lower() for x in LATEST_FIELD] COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] RDSAP_RESPONSE = RDSAP_RESPONSE.lower() HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() @@ -64,6 +64,12 @@ def get_cleaned_description_mapping(): clean_lookup = get_cleaned_description_mapping() +# import pickle +# with open("./clean_lookup.pkl", "wb") as f: +# pickle.dump(clean_lookup, f) + +# clean_lookup = pickle.load(open("./clean_lookup.pkl", "rb")) + class EPCPipeline: """ @@ -130,9 +136,66 @@ class EPCPipeline: self.run_training_dataset_pipeline() elif self.run_mode == "newdata": self.run_newdata_dataset_pipeline() + elif self.run_mode == "record": + self.run_record_dataset_pipeline() else: raise ValueError("Run mode defined needs to be in 'training' or 'newdata'") + def run_record_dataset_pipeline(self): + """ + Running pipeline with just the EPCRecords + """ + + if self.directories is None: + raise ValueError( + "Directories not specified - Unable to run Training pipeline" + ) + + for directory in tqdm(self.directories): + + filepath = directory / self.epc_local_file + self.epc_data_processor.prepare_data(filepath=filepath) + + constituency_data = self.epc_data_processor.data + self.compiled_cleaning_averages.append( + self.epc_data_processor.cleaning_averages + ) + + # TODO: integrate with EPCRecord + record_dataset = constituency_data[ + ["uprn"] + + [RDSAP_RESPONSE] + + VARIABLE_DATA_FEATURES + + MANDATORY_FIXED_FEATURES + + LATEST_FIELD + ].rename(columns={RDSAP_RESPONSE: "sap"}) + + constituency_dataset = RecordDataset( + datasets=record_dataset, cleaned_lookup=clean_lookup + ) + + self.compiled_dataset = pd.concat( + [self.compiled_dataset, constituency_dataset.df] + ) + + save_dataframe_to_s3_parquet( + df=self.compiled_dataset, + bucket_name=self.epc_bucket_name, + file_key=self.epc_compiled_dataset_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.DataFrame(self.compiled_all_equal_rows), + bucket_name=self.epc_bucket_name, + file_key=self.epc_all_equal_rows_key, + ) + + save_dataframe_to_s3_parquet( + df=pd.concat(self.compiled_cleaning_averages), + bucket_name=self.epc_bucket_name, + file_key=self.epc_cleaning_dataset_key, + ) + def run_newdata_dataset_pipeline(self): """ Main function to run the newdata pipeline diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index b6e0f6e6..99519f8d 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -12,7 +12,7 @@ def main(): """ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - # directories = directories[0:3] + # directories = directories[202:203] epc_pipeline = EPCPipeline( directories=directories,