From a3609ee055509341be47dbf09e2938e20c7c66e3 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 28 May 2024 17:39:07 +0100 Subject: [PATCH 1/5] add new builds --- etl/epc/DataProcessor.py | 236 ++++++++++++++++++++++++++------------- 1 file changed, 159 insertions(+), 77 deletions(-) diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index a77bcaa3..4ad854c1 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -5,7 +5,7 @@ from BaseUtility import Definitions from etl.epc.settings import ( DATA_PROCESSOR_SETTINGS, EARLIEST_EPC_DATE, - IGNORED_TRANSACTION_TYPES, + # IGNORED_TRANSACTION_TYPES, IGNORED_FLOOR_LEVELS, IGNORED_PROPERTY_TYPES, IGNORED_TENURES, @@ -56,8 +56,11 @@ construction_age_remap = { expanded_map = { i: [ - label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l']) - ][0] for i in range(0, 3001) + label + for label, bounds in construction_age_bounds_map.items() + if (i <= bounds["u"]) and (i >= bounds["l"]) + ][0] + for i in range(0, 3001) } @@ -74,8 +77,13 @@ class EPCDataProcessor: Handle data loading and data preprocessing """ - def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, - run_mode: str = "training", violation_mode: bool = False) -> None: + def __init__( + self, + data: pd.DataFrame | None = None, + cleaning_averages: pd.DataFrame | None = None, + run_mode: str = "training", + violation_mode: bool = False, + ) -> None: """ :param filepath: If specified, is the physical location of the data :param is_newdata: Indicates if we are processing new, testing data. @@ -86,7 +94,9 @@ class EPCDataProcessor: self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame) - self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + self.cleaning_averages: pd.DataFrame = ( + cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + ) # FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA self.violation_mode = violation_mode @@ -103,7 +113,9 @@ class EPCDataProcessor: ignore_step = True if self.run_mode == "newdata" else False if filepath is not None: - self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + self.load_data( + filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"] + ) if len(self.data) == 0: raise Exception("No data to process - check filepath/ data being passed in") @@ -121,7 +133,8 @@ class EPCDataProcessor: self.clean_multi_glaze_proportion(ignore_step=ignore_step) self.clean_photo_supply() self.retain_multiple_epc_properties( - epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], + ignore_step=ignore_step, ) self.fill_na_fields() @@ -188,7 +201,9 @@ class EPCDataProcessor: if ignore_step: return - self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] + self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[ + 0 + ] def fill_invalid_constituency_fields(self, ignore_step: bool = False): """ @@ -201,7 +216,9 @@ class EPCDataProcessor: if ignore_step: return - self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) + self.data = self.data.fillna( + {"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]} + ) def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False): """ @@ -301,7 +318,7 @@ class EPCDataProcessor: """ if self.violation_mode: - # TODO: to fill in + # TODO: to fill in return if ignore_step: @@ -311,9 +328,7 @@ class EPCDataProcessor: lambda x: self.clean_construction_age_band(x) ) - self.data = self.data[ - ~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"]) - ] + self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])] def clean_missing_rooms(self, ignore_step: bool = False): """ @@ -331,31 +346,45 @@ class EPCDataProcessor: return # TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning) - self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0]) + self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply( + lambda x: x.split(" ")[0] + ) def apply_clean(data, matching_columns): - cleaning_data = data[~pd.isnull(data[col])].groupby( - matching_columns - )[col].median().reset_index() - - data = data.merge( - cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING") + cleaning_data = ( + data[~pd.isnull(data[col])] + .groupby(matching_columns)[col] + .median() + .reset_index() ) - data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]) + data = data.merge( + cleaning_data, + how="left", + on=matching_columns, + suffixes=("", "_CLEANING"), + ) + + data[col] = np.where( + pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col] + ) data = data.drop(columns=f"{col}_CLEANING") return data for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]: to_index = 3 - matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"] + matching_columns = [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "POSTAL_AREA", + ] has_missings = pd.isnull(self.data[col]).sum() while has_missings: self.data = apply_clean( - data=self.data, - matching_columns=matching_columns[0:to_index + 1] + data=self.data, matching_columns=matching_columns[0 : to_index + 1] ) has_missings = pd.isnull(self.data[col]).sum() @@ -363,7 +392,10 @@ class EPCDataProcessor: # Check if we've gotten to index 0 and still have missings - something has gone wrong or # we have a very unique property type if has_missings: - raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col) + raise NotImplementedError( + "Handle this edge case, we still have missings for column %s" + % col + ) break to_index -= 1 @@ -410,7 +442,7 @@ class EPCDataProcessor: # coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else # COLUMNTYPES # for k, v in coltypes.items(): - # self.data[k] = self.data[k].astype(v) + # self.data[k] = self.data[k].astype(v) # self.data = self.data.astype(coltypes) # self.na_remapping() @@ -437,9 +469,11 @@ class EPCDataProcessor: def na_remapping(self, auto_subset_columns: bool = False): - fill_na_map_apply = { - k: v for k, v in fill_na_map.items() if k in self.data.columns - } if auto_subset_columns else fill_na_map + fill_na_map_apply = ( + {k: v for k, v in fill_na_map.items() if k in self.data.columns} + if auto_subset_columns + else fill_na_map + ) for column, fill_value in fill_na_map_apply.items(): self.data[column] = self.data[column].fillna(fill_value) @@ -535,28 +569,34 @@ class EPCDataProcessor: for variable in AVERAGE_FIXED_FEATURES: # Replace any missing NAN values with averages for the same Property type and built form - cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_AVERAGE"] - ) + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_AVERAGE" + ) # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace - cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"] - ) + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_PROPERTY_AVERAGE" + ) # If there are still NA values, use BUILT FORM averages - cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"] - ) + cleaning_averages_filled["variable"] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_BUILT_FORM_AVERAGE" + ) # If there still is na values, use average across all epc in consituecy cleaning_averages_filled[variable] = cleaning_averages_filled[ @@ -573,7 +613,9 @@ class EPCDataProcessor: self.cleaning_averages = cleaning_averages_filled - def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None: + def retain_multiple_epc_properties( + self, epc_minimum_count: int = 1, ignore_step: bool = False + ) -> None: """ Reduce the data futher by keeping only datasets with multiple epcs """ @@ -592,12 +634,16 @@ class EPCDataProcessor: counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on="UPRN") - def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: + def recast_df_columns( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ if auto_subset_columns: - column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } for key, values in column_mappings.items(): if key not in self.data.columns: @@ -608,13 +654,17 @@ class EPCDataProcessor: else: self.data[key] = self.data[key].astype(values) - def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: + def recast_all_data( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: """ Using a dictionary to recast all columns at once """ if auto_subset_columns: - column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } self.data = self.data.astype(column_mappings) @@ -625,14 +675,26 @@ class EPCDataProcessor: if self.violation_mode: violation_uprn_missing = pd.isnull(self.data["UPRN"]) - violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE - violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES - violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS) + violation_old_lodgment_date = ( + self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE + ) + # violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES + violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin( + IGNORED_FLOOR_LEVELS + ) violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE - violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"]) - violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"]) - violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"]) - violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + violation_missing_windows_description = pd.isnull( + self.data["WINDOWS_DESCRIPTION"] + ) + violation_missing_hotwater_description = pd.isnull( + self.data["HOTWATER_DESCRIPTION"] + ) + violation_missing_roof_description = pd.isnull( + self.data["ROOF_DESCRIPTION"] + ) + violation_invalid_property_type = ( + self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + ) violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES) violation_df = pd.concat( @@ -647,7 +709,8 @@ class EPCDataProcessor: violation_missing_roof_description, violation_invalid_property_type, violation_invalid_tenure, - ], axis=1, + ], + axis=1, keys=[ "violation_uprn_missing", "violation_old_lodgment_date", @@ -658,8 +721,8 @@ class EPCDataProcessor: "violation_missing_hotwater_description", "violation_missing_roof_description", "violation_invalid_property_type", - "violation_invalid_tenure" - ] + "violation_invalid_tenure", + ], ) self.data = pd.concat([self.data, violation_df], axis=1) @@ -685,10 +748,8 @@ class EPCDataProcessor: self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] - self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES] - self.data = self.data[ - ~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS) - ] + # self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES] + self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)] self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE] # We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them @@ -705,7 +766,7 @@ class EPCDataProcessor: self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)] # We remap zero values to None - self.data.loc[self.data['FLOOR_HEIGHT'] == 0, 'FLOOR_HEIGHT'] = None + self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None: """ @@ -734,7 +795,11 @@ class EPCDataProcessor: @staticmethod def apply_averages_cleaning( - data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False + data_to_clean, + cleaning_data, + cols_to_merge_on, + colnames=None, + ignore_step: bool = False, ): """ Clean the input DataFrame using averages from a cleaning DataFrame. @@ -752,12 +817,13 @@ class EPCDataProcessor: # The desired colnames to clean - which may not be present if colnames is None: - colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"] + colnames = [ + "TOTAL_FLOOR_AREA", + "FLOOR_HEIGHT", + "FIXED_LIGHTING_OUTLETS_COUNT", + ] - cols_to_clean = [ - c for c in colnames if - c in data_to_clean.columns - ] + cols_to_clean = [c for c in colnames if c in data_to_clean.columns] # Enforce data types for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: @@ -768,7 +834,15 @@ class EPCDataProcessor: # Calculate averages cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( - dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean))) + dict( + zip( + cols_to_clean, + [ + "mean", + ] + * len(cols_to_clean), + ) + ) ) # Merge with the original data @@ -777,7 +851,7 @@ class EPCDataProcessor: cleaning_averages_to_merge, on=columns_to_merge_on, suffixes=("", "_AVERAGE"), - how='left' + how="left", ) global_averages = cleaning_data[cols_to_clean].mean() @@ -806,14 +880,20 @@ class EPCDataProcessor: raise Exception("Suffix should be one of _starting or _ending") if suffix == "_STARTING": - starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix) + starting_cols = ( + self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy() return pd.concat([starting_cols, fixed_cols], axis=1) - return self.data[ - ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES - ].copy().add_suffix(suffix) + return ( + self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) def get_fixed_features(self) -> pd.DataFrame: """ @@ -831,14 +911,17 @@ class EPCDataProcessor: :param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids :return: DataFrame with coerced columns. """ - object_columns = df.select_dtypes(include=['object']).columns + object_columns = df.select_dtypes(include=["object"]).columns if cols_to_ignore: object_columns = [c for c in object_columns if c not in cols_to_ignore] for column in object_columns: unique_values = df[column].dropna().unique() # If the unique values in the column are 'True' and 'False', convert the column to boolean - if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}: + if set(unique_values) == {"True", "False"} or set(unique_values) == { + True, + False, + }: df[column] = df[column].astype(bool) return df @@ -877,7 +960,6 @@ class EPCDataProcessor: @staticmethod def clean_efficiency_variables(df): - """ These is scope to clean this by the model per corresponding description. E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and From d51eeec58d77dff99c0c03033be1d0da534fd9e2 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 28 May 2024 17:48:45 +0100 Subject: [PATCH 2/5] add thermal transmittance unit as boolean flag to signify walls from new builds - assuming only new builds have this description --- etl/epc/Dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 7d5c3ef8..36abd4ef 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -559,9 +559,9 @@ class TrainingDataset(BaseDataset): "walls": [ # We need to cleaned descriptions for pulling out u-values "original_description", - "thermal_transmittance_unit", + # "thermal_transmittance_unit", "original_description_ending", - "thermal_transmittance_unit_ending", + # "thermal_transmittance_unit_ending", "is_cavity_wall_ending", "is_solid_brick_ending", "is_system_built_ending", From c3e04d2d007f191f5b2e6c3b0fa7d1737e6749fd Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 28 May 2024 18:14:47 +0100 Subject: [PATCH 3/5] add temp fix for cleaned to allow for new builds to flag thermal unit --- etl/epc/Pipeline.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 6abf05bd..3a078703 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -64,6 +64,21 @@ def get_cleaned_description_mapping(): clean_lookup = get_cleaned_description_mapping() +# TODO: THIS IS A TEMPORARY FIX +new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"]) + +import numpy as np + +new_walls_description_mapping["thermal_transmittance_unit"] = np.where( + ~pd.isnull(new_walls_description_mapping["thermal_transmittance_unit"]), + "w/m-¦k", + new_walls_description_mapping["thermal_transmittance_unit"], +) + +clean_lookup["walls-description"] = new_walls_description_mapping.to_dict( + orient="records" +) + class EPCPipeline: """ From 57477907cbed9086f275f5761e1a240cbcabc726 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 28 May 2024 19:02:19 +0100 Subject: [PATCH 4/5] add check for float nan in recommendations --- etl/epc/Dataset.py | 6 +- recommendations/WallRecommendations.py | 148 +++++++++++++++------- recommendations/WindowsRecommendations.py | 36 ++++-- 3 files changed, 130 insertions(+), 60 deletions(-) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 36abd4ef..ee3e357c 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -229,7 +229,9 @@ class TrainingDataset(BaseDataset): """ # TODO: move into EPCRecord record uvalue_columns = [ - col for col in self.df.columns if "thermal_transmittance" in col + col + for col in self.df.columns + if "thermal_transmittance" in col and "_unit" not in col ] for uvalue_col in uvalue_columns: self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) @@ -703,6 +705,8 @@ class TrainingDataset(BaseDataset): "insulation_thickness_ending": f"{component}_insulation_thickness_ending", "thermal_transmittance": f"{component}_thermal_transmittance", "thermal_transmittance_ending": f"{component}_thermal_transmittance_ending", + "thermal_transmittance_unit": f"{component}_thermal_transmittance_unit", + "thermal_transmittance_unit_ending": f"{component}_thermal_transmittance_unit_ending", "tariff_type": f"{component}_tariff_type", "tariff_type_ending": f"{component}_tariff_type_ending", "clean_description": f"{component}_clean_description", diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index 20fc453c..8ca34bc8 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -7,8 +7,13 @@ from datatypes.enums import QuantityUnits from backend.Property import Property from BaseUtility import Definitions from recommendations.recommendation_utils import ( - r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, - get_recommended_part, get_wall_u_value, override_costs + r_value_per_mm_to_u_value, + calculate_u_value_uplift, + is_diminishing_returns, + update_lowest_selected_u_value, + get_recommended_part, + get_wall_u_value, + override_costs, ) from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION from recommendations.Costs import Costs @@ -22,7 +27,7 @@ class WallRecommendations(Definitions): # After 1930, Solid brick walls became less populate and instead, cavity walls became a # more popular choice YEARS_CAVITY_WALLS_BEGAN = 1930 - U_VALUE_UNIT = 'w/m-¦k' + U_VALUE_UNIT = "w/m-¦k" # part L building regulations indicate that any rennovations on an existing property's walls should # achieve a U-value of no higher than 0.3 @@ -53,11 +58,7 @@ class WallRecommendations(Definitions): # threshold NEW_BUILD_INSULATED = 0.75 - def __init__( - self, - property_instance: Property, - materials: List - ): + def __init__(self, property_instance: Property, materials: List): self.property = property_instance self.costs = Costs(self.property) # For audit purposes, when estimating u values we'll store it @@ -75,9 +76,10 @@ class WallRecommendations(Definitions): ] self.internal_wall_non_insulation_materials = [ - part for part in materials if part["type"] in [ - "iwi_wall_demolition", "iwi_vapour_barrier", "iwi_redecoration" - ] + part + for part in materials + if part["type"] + in ["iwi_wall_demolition", "iwi_vapour_barrier", "iwi_redecoration"] ] self.external_wall_insulation_materials = [ @@ -85,9 +87,10 @@ class WallRecommendations(Definitions): ] self.external_wall_non_insulation_materials = [ - part for part in materials if part["type"] in [ - "ewi_wall_demolition", "ewi_wall_preparation", "ewi_wall_redecoration" - ] + part + for part in materials + if part["type"] + in ["ewi_wall_demolition", "ewi_wall_preparation", "ewi_wall_redecoration"] ] @property @@ -98,7 +101,9 @@ class WallRecommendations(Definitions): # Current logic: If the property is in a conservation area/heritage building/listed building or a flat, # it is not suitable for EWI - if self.property.restricted_measures or (self.property.data["property-type"].lower() == "flat"): + if self.property.restricted_measures or ( + self.property.data["property-type"].lower() == "flat" + ): return False return True @@ -109,31 +114,43 @@ class WallRecommendations(Definitions): # recommend internal wall insulation as a possible measure u_value = self.property.walls["thermal_transmittance"] + u_value = None if math.isnan(u_value) else u_value + is_cavity_wall = self.property.walls["is_cavity_wall"] insulation_thickness = self.property.walls["insulation_thickness"] # We check if the wall is already insulated and if so, we exit - if ((insulation_thickness in ["average", "above average"]) or self.property.walls["is_filled_cavity"]) and ( - "cavity_extract_and_refill" not in self.property.non_invasive_recommendations + if ( + (insulation_thickness in ["average", "above average"]) + or self.property.walls["is_filled_cavity"] + ) and ( + "cavity_extract_and_refill" + not in self.property.non_invasive_recommendations ): return if u_value: if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT: - raise NotImplementedError("Haven't handled the case of other u value units yet") + raise NotImplementedError( + "Haven't handled the case of other u value units yet" + ) # If the property is a new build and the U-value is below 0.75, we don't recommend insulation because it's # not practical - if (self.property.data["transaction-type"] == "new dwelling") and (u_value <= self.NEW_BUILD_INSULATED): + if (self.property.data["transaction-type"] == "new dwelling") and ( + u_value <= self.NEW_BUILD_INSULATED + ): # Recommend nothing return # We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already # + it already has a U-value WORSE than the building regulations, so we recommend either internal or # external wall insulation - if (not is_cavity_wall) and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) and ( - u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE + if ( + (not is_cavity_wall) + and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) + and (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) ): # Recommend insulation self.find_insulation(u_value, phase) @@ -141,8 +158,10 @@ class WallRecommendations(Definitions): # We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already # + it already has a U-value better than the building regulations, so we don't need to recommend anything - if (not is_cavity_wall) and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) and ( - u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE + if ( + (not is_cavity_wall) + and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) + and (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) ): # Recommend nothing return @@ -205,28 +224,40 @@ class WallRecommendations(Definitions): recommendations = [] for _, material in insulation_materials.iterrows(): - part_u_value = r_value_per_mm_to_u_value(cavity_width, material["r_value_per_mm"]) + part_u_value = r_value_per_mm_to_u_value( + cavity_width, material["r_value_per_mm"] + ) _, new_u_value = calculate_u_value_uplift(u_value, part_u_value) new_u_value = math.ceil(new_u_value * 100.0) / 100.0 if is_diminishing_returns( - recommendations, new_u_value, lowest_selected_u_value, self.DIMINISHING_RETURNS_U_VALUE + recommendations, + new_u_value, + lowest_selected_u_value, + self.DIMINISHING_RETURNS_U_VALUE, ): continue if new_u_value <= self.BUILDING_REGULATIONS_PART_L_CAVITY_WALL_MAX_U_VALUE: - lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value) + lowest_selected_u_value = update_lowest_selected_u_value( + lowest_selected_u_value, new_u_value + ) - is_extraction_and_refill = "cavity_extract_and_refill" in self.property.non_invasive_recommendations + is_extraction_and_refill = ( + "cavity_extract_and_refill" + in self.property.non_invasive_recommendations + ) cost_result = self.costs.cavity_wall_insulation( wall_area=self.property.insulation_wall_area, material=material.to_dict(), - is_extraction_and_refill=is_extraction_and_refill + is_extraction_and_refill=is_extraction_and_refill, ) - already_installed = "cavity_wall_insulation" in self.property.already_installed + already_installed = ( + "cavity_wall_insulation" in self.property.already_installed + ) if already_installed: cost_result = override_costs(cost_result) @@ -246,7 +277,7 @@ class WallRecommendations(Definitions): part=material.to_dict(), quantity=self.property.insulation_wall_area, quantity_unit=QuantityUnits.m2.value, - cost_result=cost_result + cost_result=cost_result, ) ], "type": "cavity_wall_insulation", @@ -255,13 +286,15 @@ class WallRecommendations(Definitions): "new_u_value": new_u_value, "sap_points": None, "already_installed": already_installed, - **cost_result + **cost_result, } ) self.recommendations = recommendations - def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase): + def _find_insulation( + self, u_value, insulation_materials, non_insulation_materials, phase + ): lowest_selected_u_value = None recommendations = [] @@ -269,7 +302,9 @@ class WallRecommendations(Definitions): for _, material in insulation_material_group.iterrows(): - part_u_value = r_value_per_mm_to_u_value(material["depth"], material["r_value_per_mm"]) + part_u_value = r_value_per_mm_to_u_value( + material["depth"], material["r_value_per_mm"] + ) _, new_u_value = calculate_u_value_uplift(u_value, part_u_value) new_u_value = math.ceil(new_u_value * 100.0) / 100.0 @@ -280,22 +315,30 @@ class WallRecommendations(Definitions): # further into the diminishing returns threshold and can shouldn't be if is_diminishing_returns( - recommendations, new_u_value, lowest_selected_u_value, self.DIMINISHING_RETURNS_U_VALUE + recommendations, + new_u_value, + lowest_selected_u_value, + self.DIMINISHING_RETURNS_U_VALUE, ): continue # We allow a small tolerance for error so we don't discount the recommendation entirely if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE: - lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value) + lowest_selected_u_value = update_lowest_selected_u_value( + lowest_selected_u_value, new_u_value + ) if material["type"] == "internal_wall_insulation": cost_result = self.costs.internal_wall_insulation( wall_area=self.property.insulation_wall_area, material=material.to_dict(), - non_insulation_materials=non_insulation_materials + non_insulation_materials=non_insulation_materials, + ) + already_installed = ( + "internal_wall_insulation" + in self.property.already_installed ) - already_installed = "internal_wall_insulation" in self.property.already_installed if already_installed: cost_result = override_costs(cost_result) @@ -303,9 +346,12 @@ class WallRecommendations(Definitions): cost_result = self.costs.external_wall_insulation( wall_area=self.property.insulation_wall_area, material=material.to_dict(), - non_insulation_materials=non_insulation_materials + non_insulation_materials=non_insulation_materials, + ) + already_installed = ( + "external_wall_insulation" + in self.property.already_installed ) - already_installed = "external_wall_insulation" in self.property.already_installed if already_installed: cost_result = override_costs(cost_result) else: @@ -319,7 +365,7 @@ class WallRecommendations(Definitions): part=material.to_dict(), quantity=self.property.insulation_wall_area, quantity_unit=QuantityUnits.m2.value, - cost_result=cost_result + cost_result=cost_result, ) ], "type": material["type"], @@ -328,7 +374,7 @@ class WallRecommendations(Definitions): "new_u_value": new_u_value, "already_installed": already_installed, "sap_points": None, - **cost_result + **cost_result, } ) @@ -350,16 +396,18 @@ class WallRecommendations(Definitions): if self.ewi_valid: ewi_recommendations = self._find_insulation( u_value=u_value, - insulation_materials=pd.DataFrame(self.external_wall_insulation_materials), + insulation_materials=pd.DataFrame( + self.external_wall_insulation_materials + ), non_insulation_materials=self.external_wall_non_insulation_materials, - phase=phase + phase=phase, ) iwi_recommendations = self._find_insulation( u_value=u_value, insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials), non_insulation_materials=self.internal_wall_non_insulation_materials, - phase=phase + phase=phase, ) self.recommendations += ewi_recommendations + iwi_recommendations @@ -367,12 +415,16 @@ class WallRecommendations(Definitions): @staticmethod def _make_description(material): if material["type"] == "internal_wall_insulation": - return (f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on internal " - f"walls") + return ( + f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on internal " + f"walls" + ) if material["type"] == "external_wall_insulation": - return (f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on external " - f"walls") + return ( + f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on external " + f"walls" + ) if material["type"] == "cavity_wall_insulation": return f"Fill cavity with {material['description']}" diff --git a/recommendations/WindowsRecommendations.py b/recommendations/WindowsRecommendations.py index b7c2823a..8c0cc493 100644 --- a/recommendations/WindowsRecommendations.py +++ b/recommendations/WindowsRecommendations.py @@ -4,7 +4,7 @@ import numpy as np from backend.Property import Property from recommendations.Costs import Costs -from recommendation_utils import override_costs +from recommendations.recommendation_utils import override_costs class WindowsRecommendations: @@ -14,7 +14,7 @@ class WindowsRecommendations: # glazed "most": 0.33, # If glazing is partial, we assume 50/50 split between glazed and unglazed - "partial": 0.5 + "partial": 0.5, } def __init__(self, property_instance: Property, materials: List): @@ -52,14 +52,20 @@ class WindowsRecommendations: if not number_of_windows: raise ValueError("Number of windows not specified") - if self.property.windows["has_glazing"] & (self.property.windows["glazing_coverage"] == "full"): + if self.property.windows["has_glazing"] & ( + self.property.windows["glazing_coverage"] == "full" + ): return # We scale the number of windows based on the proportion of existing glazing if self.property.data["multi-glaze-proportion"] != "": - n_windows_scalar = 1 - (int(self.property.data["multi-glaze-proportion"]) / 100) + n_windows_scalar = 1 - ( + int(self.property.data["multi-glaze-proportion"]) / 100 + ) else: - n_windows_scalar = self.COVERAGE_MAP.get(self.property.windows["glazing_coverage"], 1) + n_windows_scalar = self.COVERAGE_MAP.get( + self.property.windows["glazing_coverage"], 1 + ) number_of_windows *= n_windows_scalar number_of_windows = np.ceil(number_of_windows) @@ -68,7 +74,7 @@ class WindowsRecommendations: cost_result = self.costs.window_glazing( number_of_windows=number_of_windows, material=self.glazing_material, - is_secondary_glazing=is_secondary_glazing + is_secondary_glazing=is_secondary_glazing, ) already_installed = "windows_glazing" in self.property.already_installed @@ -76,18 +82,26 @@ class WindowsRecommendations: cost_result = override_costs(cost_result) description = "The property already has double glazing installed. No further action is required." else: - glazing_type = "secondary glazing" if is_secondary_glazing else "double glazing" + glazing_type = ( + "secondary glazing" if is_secondary_glazing else "double glazing" + ) if self.property.windows["glazing_coverage"] in ["partial", "most"]: description = f"Install {glazing_type} to the remaining windows" else: description = f"Install {glazing_type} to all windows" if self.property.is_listed: - description += ". Secondary glazing recommended due to listed building status" + description += ( + ". Secondary glazing recommended due to listed building status" + ) elif self.property.is_heritage: - description += ". Secondary glazing recommended due to herigate building status" + description += ( + ". Secondary glazing recommended due to herigate building status" + ) elif self.property.in_conservation_area: - description += ". Secondary glazing recommended due to conservation area status" + description += ( + ". Secondary glazing recommended due to conservation area status" + ) self.recommendation = [ { @@ -100,6 +114,6 @@ class WindowsRecommendations: "sap_points": None, "already_installed": already_installed, **cost_result, - "is_secondary_glazing": is_secondary_glazing + "is_secondary_glazing": is_secondary_glazing, } ] From 14452dde9937d242c30ff490b8c5039a80ea6fcc Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 28 May 2024 19:07:58 +0100 Subject: [PATCH 5/5] use pandas --- etl/epc/generate_scenarios_data.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/etl/epc/generate_scenarios_data.py b/etl/epc/generate_scenarios_data.py index f9f66034..df1f9452 100644 --- a/etl/epc/generate_scenarios_data.py +++ b/etl/epc/generate_scenarios_data.py @@ -41,6 +41,15 @@ cleaning_data = read_dataframe_from_s3_parquet( materials = get_materials(session) cleaned = get_cleaned() +# TODO: THIS IS A TEMPORARY FIX +new_walls_description_mapping = pd.DataFrame(cleaned["walls-description"]) +new_walls_description_mapping.loc[ + ~new_walls_description_mapping["thermal_transmittance_unit"].isnull(), + "thermal_transmittance_unit", +] = "w/m-¦k" + +cleaned["walls-description"] = new_walls_description_mapping.to_dict(orient="records") + uprn_filenames = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" ) @@ -167,7 +176,7 @@ for scenario_property in scenario_properties: p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) recommender = Recommendations(property_instance=p, materials=materials) - property_recommendations = recommender.recommend("0") + property_recommendations = recommender.recommend() wall_recommendations = recommender.wall_recomender.recommendations loft_recommendations = recommender.roof_recommender.recommendations