From 955791f49953072268a57530fdd5b1f152ca481f Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Thu, 3 Oct 2024 23:12:20 +0100 Subject: [PATCH] add temp fix for None value instead of string none - should come from cleaned lookup --- etl/epc/Dataset.py | 16 +- recommendations/recommendation_utils.py | 211 +++++++++++++++--------- 2 files changed, 145 insertions(+), 82 deletions(-) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 83a85b78..3f2e810e 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -203,11 +203,11 @@ class TrainingDataset(BaseDataset): common_cols = [[col + "_starting", col + "_ending"] for col in common_cols] self.df = self.df.loc[ - :, - no_suffix_cols - + only_ending_cols - + [col for cols in common_cols for col in cols], - ] + :, + no_suffix_cols + + only_ending_cols + + [col for cols in common_cols for col in cols], + ] def _remove_abnormal_change_in_floor_area(self): """ @@ -511,7 +511,7 @@ class TrainingDataset(BaseDataset): expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"] ) - ] + ] elif component == "floor": expanded_df = expanded_df[ (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) @@ -528,7 +528,7 @@ class TrainingDataset(BaseDataset): expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"] ) - ] + ] elif component == "roof": expanded_df = expanded_df[ (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) @@ -541,7 +541,7 @@ class TrainingDataset(BaseDataset): expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"] ) - ] + ] return expanded_df diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index 883a387b..82d1b9af 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -7,10 +7,18 @@ import numpy as np import pandas as pd from recommendations.rdsap_tables import ( - epc_wall_description_map, wall_uvalues_df, default_wall_thickness, table_s9 as s9, table_s10 as s10, - table_s11 as s11, table_s12 as s12 + epc_wall_description_map, + wall_uvalues_df, + default_wall_thickness, + table_s9 as s9, + table_s10 as s10, + table_s11 as s11, + table_s12 as s12, +) +from recommendations.config import ( + PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION, + PARTIAL_CAVITY_DESCRIPTIONS, ) -from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION, PARTIAL_CAVITY_DESCRIPTIONS def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float): @@ -62,7 +70,9 @@ def calculate_u_value_uplift(u_value, insulation_u_value): return u_value_uplift, new_u_value -def is_diminishing_returns(recommendations, new_u_value, lowest_selected_u_value, diminishing_returns_u_value): +def is_diminishing_returns( + recommendations, new_u_value, lowest_selected_u_value, diminishing_returns_u_value +): """ What are defines diminishing returns? 1) The new u value is lower than the lowest selected u value @@ -136,9 +146,15 @@ def apply_formula_s_5_1_1(is_granite_or_whinstone, is_sandstone_or_limestone, ag S.5.1.1 """ - stone_wall_thickness = [x for x in default_wall_thickness if x["type"] == "stone"][0] + stone_wall_thickness = [x for x in default_wall_thickness if x["type"] == "stone"][ + 0 + ] - thickness = stone_wall_thickness["J_K_L"] if age_band in ["J", "L", "L"] else stone_wall_thickness[age_band] + thickness = ( + stone_wall_thickness["J_K_L"] + if age_band in ["J", "L", "L"] + else stone_wall_thickness[age_band] + ) if is_granite_or_whinstone: return 3.3 - 0.002 * thickness @@ -146,7 +162,9 @@ def apply_formula_s_5_1_1(is_granite_or_whinstone, is_sandstone_or_limestone, ag if is_sandstone_or_limestone: return 3 - 0.002 * thickness - raise ValueError("This should only be called when is_granite_or_whinstone or is_sandstone_or_limestone is True") + raise ValueError( + "This should only be called when is_granite_or_whinstone or is_sandstone_or_limestone is True" + ) def get_wall_u_value( @@ -164,16 +182,30 @@ def get_wall_u_value( if clean_description in PARTIAL_CAVITY_DESCRIPTIONS: # If we have a partial cavity fill, we linearly interpolate the u-value. This isn't necessarily the perfect # method and how we do this should be explored, however we want to distinguish between the old - filled_uvalue = float(wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Filled cavity"][age_band].values[0]) - unfilled_uvalue = float(wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Cavity as built"][age_band].values[0]) + filled_uvalue = float( + wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Filled cavity"][ + age_band + ].values[0] + ) + unfilled_uvalue = float( + wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Cavity as built"][ + age_band + ].values[0] + ) mapped_value = str( - unfilled_uvalue - (PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION * (unfilled_uvalue - filled_uvalue)) + unfilled_uvalue + - ( + PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION + * (unfilled_uvalue - filled_uvalue) + ) ) else: mapped_description = epc_wall_description_map[clean_description] - mapped_value = wall_uvalues_df[wall_uvalues_df["Wall_type"] == mapped_description][age_band].values[0] + mapped_value = wall_uvalues_df[ + wall_uvalues_df["Wall_type"] == mapped_description + ][age_band].values[0] if pd.isnull(mapped_value) and "Park home" in mapped_description: # We don't know enough in this case so we default to 0 @@ -185,17 +217,19 @@ def get_wall_u_value( apply_formula_s_5_1_1( is_granite_or_whinstone=is_granite_or_whinstone, is_sandstone_or_limestone=is_sandstone_or_limestone, - age_band=age_band + age_band=age_band, ) ) if "b" in mapped_value: potential_uvalue = float(mapped_value.replace("b", "")) - formula_uvalue = float(apply_formula_s_5_1_1( - is_granite_or_whinstone=is_granite_or_whinstone, - is_sandstone_or_limestone=is_sandstone_or_limestone, - age_band=age_band - )) + formula_uvalue = float( + apply_formula_s_5_1_1( + is_granite_or_whinstone=is_granite_or_whinstone, + is_sandstone_or_limestone=is_sandstone_or_limestone, + age_band=age_band, + ) + ) return min(potential_uvalue, formula_uvalue) if mapped_value == "s1.1.2": @@ -205,11 +239,16 @@ def get_wall_u_value( return float(mapped_value) -def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched, is_at_rafters): +def get_u_value_from_s9( + thickness, s9, is_loft, is_roof_room, is_thatched, is_at_rafters +): """Get the U-value from table S9 based on the insulation thickness.""" # If the roof as pitched & insulated at the rafters, it's a room roof if is_roof_room or is_at_rafters: + # TODO: We get None instead of a string none, this should be fixed + if thickness is None: + thickness = "none" # We re-map the thickness thickness_map = { "below average": "50", @@ -233,10 +272,14 @@ def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched, is_at return None # Determine the column to refer based on the roof type - column = 'Thatched_roof_U_value_W_m2K' if is_thatched else 'Slates_or_tiles_U_value_W_m2K' + column = ( + "Thatched_roof_U_value_W_m2K" + if is_thatched + else "Slates_or_tiles_U_value_W_m2K" + ) # Get the correct U-value based on the insulation thickness - return s9[s9['Insulation_thickness_mm'] >= thickness][column].iloc[0] + return s9[s9["Insulation_thickness_mm"] >= thickness][column].iloc[0] def get_roof_u_value( @@ -249,7 +292,7 @@ def get_roof_u_value( is_flat, is_pitched, is_at_rafters, - **kwargs + **kwargs, ): """ Determine the U-value for a roof based on the description dictionary and age band. @@ -292,7 +335,7 @@ def get_roof_u_value( is_loft=is_loft, is_roof_room=is_roof_room, is_thatched=is_thatched, - is_at_rafters=is_at_rafters + is_at_rafters=is_at_rafters, ) if u_value is not None: @@ -302,25 +345,25 @@ def get_roof_u_value( # Define the columns to be used based on the description details if is_flat: - column = 'Flat_roof' + column = "Flat_roof" elif is_thatched: if is_roof_room: - column = 'Thatched_roof_room_in_roof' + column = "Thatched_roof_room_in_roof" else: - column = 'Thatched_roof' + column = "Thatched_roof" elif is_roof_room: - column = 'Room_in_roof_slates_or_tiles' + column = "Room_in_roof_slates_or_tiles" elif is_pitched: if is_at_rafters: - column = 'Pitched_slates_or_tiles_insulation_at_rafters' + column = "Pitched_slates_or_tiles_insulation_at_rafters" else: - column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown' + column = "Pitched_slates_or_tiles_insulation_between_joists_or_unknown" else: # Default to pitched roof with insulation between joists or unknown - column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown' + column = "Pitched_slates_or_tiles_insulation_between_joists_or_unknown" # Get the U-value from table S10 based on the age band and the determined column - u_value = s10.loc[s10['Age_band'].str.contains(age_band), column].values[0] + u_value = s10.loc[s10["Age_band"].str.contains(age_band), column].values[0] return float(u_value) @@ -397,10 +440,14 @@ def get_exposed_floor_uvalue(insulation_thickness_str, age_band): else: insulation_thickness = int(insulation_thickness_str.replace("mm", "")) - return s12[s12["age_band"] == age_band][f"insulation_{insulation_thickness}"].values[0] + return s12[s12["age_band"] == age_band][ + f"insulation_{insulation_thickness}" + ].values[0] -def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulation_thickness=None): +def get_floor_u_value( + floor_type, area, perimeter, age_band, wall_type, insulation_thickness=None +): """ Estimate the u-value of a suspended floor, based on RdSap methodology Default U-value for UNINSULATED suspended floor, based on RdSAP methodology @@ -446,14 +493,19 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati Rsi = 0.17 # in m²K/W Rse = 0.04 # in m²K/W lambda_ins = 0.035 # thermal conductivity of floor insulation in W/m·K - wall_thickness = [x[age_band] for x in default_wall_thickness if x["type"] == wall_type][0] + wall_thickness = [ + x[age_band] for x in default_wall_thickness if x["type"] == wall_type + ][0] if wall_thickness is None and wall_type == "park home": # We don't know enough and likely won't make recommendations return 0 wall_thickness = wall_thickness / 1000 if insulation_thickness is None: - insulation_lookup = s11[s11["Age_band"].str.contains(age_band) & s11["Floor_construction"] == floor_type] + insulation_lookup = s11[ + s11["Age_band"].str.contains(age_band) & s11["Floor_construction"] + == floor_type + ] if insulation_lookup.empty: insulation_thickness = 0 else: @@ -465,7 +517,7 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati # Calculate B B = 2 * area / perimeter - if floor_type == 'solid': + if floor_type == "solid": # Calculate dt dt = wall_thickness + lambda_g * (Rsi + Rf + Rse) @@ -475,7 +527,7 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati else: U = lambda_g / (0.457 * B + dt) - elif floor_type == 'suspended': + elif floor_type == "suspended": # Define additional constants for suspended floors h = 0.3 # height above external ground level in meters v = 5 # average wind speed at 10 m height in m/s @@ -498,7 +550,9 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati U = 1 / (2 * Rsi + Rf + 1 / (Ug + Ux)) else: - raise ValueError("Invalid floor type. Acceptable values are 'solid' or 'suspended'.") + raise ValueError( + "Invalid floor type. Acceptable values are 'solid' or 'suspended'." + ) return round(U, 2) # rounding U value to two decimal places @@ -509,7 +563,13 @@ def extract_insulation_thickness(insulation_thickness_str): :param insulation_thickness_str: :return: """ - if insulation_thickness_str in ["none", "average", "below average", "above average", None]: + if insulation_thickness_str in [ + "none", + "average", + "below average", + "above average", + None, + ]: return None if isinstance(insulation_thickness_str, (float, int)): @@ -527,7 +587,7 @@ def get_wall_type( is_cob, is_system_built, is_park_home, - **kwargs + **kwargs, ) -> Union[str, None]: """ Converts booleans to a string wall type, for querying the wall thickness table @@ -573,10 +633,10 @@ def estimate_external_wall_area(num_floors, floor_height, perimeter, built_form) total_wall_area = wall_area_one_floor * num_floors number_exposed_walls = { - 'End-Terrace': 3, - 'Mid-Terrace': 2, - 'Semi-Detached': 3, - 'Detached': 4, + "End-Terrace": 3, + "Mid-Terrace": 2, + "Semi-Detached": 3, + "Detached": 4, } exposed_wall_area = total_wall_area * (number_exposed_walls.get(built_form, 3) / 4) @@ -622,27 +682,12 @@ def convert_thickness_to_numeric(string_thickness, is_pitched, is_flat): return 0 if is_pitched: - lookup = { - "none": 0, - "below average": 50, - "average": 100, - "above average": 270 - } + lookup = {"none": 0, "below average": 50, "average": 100, "above average": 270} elif is_flat: # For a flat roof, if it's below average, we assume it's 0 and requires a re-roof - lookup = { - "none": 0, - "below average": 0, - "average": 100, - "above average": 150 - } + lookup = {"none": 0, "below average": 0, "average": 100, "above average": 150} else: - lookup = { - "none": 0, - "below average": 100, - "average": 270, - "above average": 270 - } + lookup = {"none": 0, "below average": 100, "average": 270, "above average": 270} mapped = lookup.get(string_thickness) @@ -697,11 +742,16 @@ def estimate_windows( # Assuming most houses will have at least one kitchen and one bathroom # Scale non-habitable windows with the number of habitable rooms non_habitable_base = 2 # Base for kitchen and bathroom - extra_non_habitable = max(0, (number_habitable_rooms - 3) // 2) # Extra for large houses + extra_non_habitable = max( + 0, (number_habitable_rooms - 3) // 2 + ) # Extra for large houses window_count += non_habitable_base + extra_non_habitable # Adjustments based on built form and property type - if property_type in ["House", "Bungalow"] and built_form in ["Semi-Detached", "Detached"]: + if property_type in ["House", "Bungalow"] and built_form in [ + "Semi-Detached", + "Detached", + ]: built_form_lookup = { "Semi-Detached": 3, "Detached": 4, @@ -728,7 +778,10 @@ def estimate_windows( window_count += 2 # Adjust for construction age band - if construction_age_band in ["England and Wales: before 1900", "England and Wales: 1900-1929"]: + if construction_age_band in [ + "England and Wales: before 1900", + "England and Wales: 1900-1929", + ]: # Older houses with smaller, more numerous windows window_count += 1 @@ -751,7 +804,11 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned): df = [] for x in all_epcs: # Get the cleaned mapping - mapped = [y for y in cleaned["walls-description"] if y["original_description"] == x["walls-description"]] + mapped = [ + y + for y in cleaned["walls-description"] + if y["original_description"] == x["walls-description"] + ] if not mapped: continue df.append( @@ -768,7 +825,9 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned): return cavity_age -def check_simulation_difference(old_config, new_config, prefix="", keys_with_prefix=None): +def check_simulation_difference( + old_config, new_config, prefix="", keys_with_prefix=None +): """ Given two dictionaries, that describe the heating control configurations, this method will compare the two and pick out the differences. These differences will be things that have been added and things that have been @@ -777,14 +836,17 @@ def check_simulation_difference(old_config, new_config, prefix="", keys_with_pre """ keys_with_prefix = ( - ["is_assumed", "thermal_transmittance", "insulation_thickness"] if keys_with_prefix is None + ["is_assumed", "thermal_transmittance", "insulation_thickness"] + if keys_with_prefix is None else keys_with_prefix ) differences = {} for key in new_config: if old_config[key] != new_config[key]: - new_key = prefix + key + "_ending" if key in keys_with_prefix else key + "_ending" + new_key = ( + prefix + key + "_ending" if key in keys_with_prefix else key + "_ending" + ) differences[new_key] = new_config[key] return differences @@ -811,17 +873,18 @@ def combine_recommendation_configs(recommendation_config1, recommendation_config """ # Efficiency values - keys which contain _energy_eff_ending eff_1 = { - k: v for k, v in recommendation_config1.items() if ("_energy_eff_ending" in k) or ("-energy-eff" in k) + k: v + for k, v in recommendation_config1.items() + if ("_energy_eff_ending" in k) or ("-energy-eff" in k) } eff_2 = { - k: v for k, v in recommendation_config2.items() if ("_energy_eff_ending" in k) or ("-energy-eff" in k) + k: v + for k, v in recommendation_config2.items() + if ("_energy_eff_ending" in k) or ("-energy-eff" in k) } # We combine the simulation configs - combined = { - **recommendation_config1, - **recommendation_config2 - } + combined = {**recommendation_config1, **recommendation_config2} # Find overlapping keys overlapping_keys = set(eff_1.keys()).intersection(set(eff_2.keys()))