import json import pandas as pd from tqdm import tqdm from utils.s3 import read_dataframe_from_s3_parquet, save_data_to_s3, save_dataframe_to_s3_parquet from backend.Property import Property # This is the github pr number MODEL_VERSION = "100" def app(): dataset = read_dataframe_from_s3_parquet( bucket_name="retrofit-data-dev", file_key="sap_change_model/dataset.parquet" ) thresholds = dataset["total_floor_area_starting"].quantile( [0.3, 0.6, 0.9] ).values dataset["floor_area_quantile"] = pd.cut( dataset["total_floor_area_starting"], bins=[0] + list(thresholds) + [float('inf')], labels=False, include_lowest=True ) # We want to set up some tests to deduce the following: # For different property types, of various sizes, what is the impact of the various measures that we recommend # 1) Insulating the loft. We test the impact of bringing the loft to 270mm insulation and 300mm insulation property_types = dataset[ ["property_type", "built_form", "floor_area_quantile", "construction_age_band"] ].drop_duplicates() property_types = property_types.sort_values( ["property_type", "built_form", "floor_area_quantile", "construction_age_band"] ) # For each property type congifuration, we take an example property with different starting loft thresholds. We take # the value with the lowest U-value, since when simulating, we often work with particularly low u-values # TODOS # 1) When simulating with loft insulation, make sure is_loft is definitely true, because the roof could start as # pitched, but is_loft false # TODO: We have a description: "Pitched, loft insulation", which seems to have its insulation thickness set to # "none" # Example UPRN: 100021359753, 10001204228 # TODO: For windows, we have glazing_type and glazed_type. When simulating, we don't set glazed_type_ending which # could be set to "double glazing installed during or after 2002" (THIS HAS BEEN ADDED!) # TODO: When simulating external wall insulation vs internal wall insulation, I need to set the external_insulation # or internal_insulation boolean values to true (THIS HAS BEEN ADDED!) # TODO: We could probably re-map some of the values of glazed_type_ending # For simulating # 1) loft insulation - we take the lowest u-value when loft insulation is 270mm and 300mm, the values we most # commonly simulate to - For loft insulation, these values are in-line with best_270mm_uvalue = dataset[dataset["roof_insulation_thickness"] == "270"]["roof_thermal_transmittance"].min() best_300mm_uvalue = dataset[dataset["roof_insulation_thickness"] == "300"]["roof_thermal_transmittance"].min() # 2) Intenal wall insulation - we take the lowest u-value when simulating internal wall insulation best_internal_wall_uvalue = dataset[ dataset["internal_insulation"] & dataset["is_solid_brick"] ]["walls_thermal_transmittance"].min() # 3) External wall insulation - we take the lowest u-value when simulating external wall insulation best_external_wall_uvalue = dataset[ dataset["external_insulation"] & dataset["is_solid_brick"] ]["walls_thermal_transmittance"].min() # 4) Cavity wall insulation - we take the lowest u-value when simulating cavity wall insulation # This is 0.28, which is a sufficiently low value best_cavity_wall_uvalue = dataset[ dataset["is_cavity_wall"] & dataset["is_filled_cavity"] & (~dataset["external_insulation"]) & ( ~dataset["internal_insulation"]) ]["walls_thermal_transmittance"].min() ending_colums = [col for col in dataset.columns if col.endswith("_ending")] # For the purpose of scoring, we want to simulate JUST the impact of the measure we're testing. We therefore # need to make sure that every "_ending" column is equal to its starting value column_config = {} for ending_col in ending_colums: base_col = ending_col.replace("_ending", "") # We check if the starting column ends with _starting or is just the base col if base_col + "_starting" in dataset.columns: column_config[ending_col] = base_col + "_starting" elif base_col in dataset.columns: column_config[ending_col] = base_col else: raise ValueError("something went wrong") loft_insulation_testing_data = [] solid_wall_testing_data = [] cavity_wall_testing_data = [] solid_floor_testing_data = [] suspended_floor_testing_data = [] single_glazed_testing_data = [] partial_double_glazed_testing_data = [] partial_secondary_glazed_testing_data = [] pitched_roof_solar = [] flat_roof_solar = [] for property_config in tqdm(property_types.itertuples(), total=property_types.shape[0]): config_hash = hash(str(property_config)) # Take a sample row population = dataset[ (dataset["property_type"] == property_config.property_type) & (dataset["built_form"] == property_config.built_form) & (dataset["floor_area_quantile"] == property_config.floor_area_quantile) & (dataset["construction_age_band"] == property_config.construction_age_band) ].copy() # Re-set all of the ending columns for col in ending_colums: population[col] = population[column_config[col]] # 1) Loft insulation # For loft insulation, there are two scenarios we test. # 1) Loft insulation to 270mm # 2) Lost insulation to 300mm for insulation_thickness in ["none", "12", "50", "75", "100", "150", "200", "250"]: if insulation_thickness == "none": row = population[ (population["roof_insulation_thickness"] == "none") & (population["is_pitched"]) ] else: row = population[ (population["roof_insulation_thickness"] == insulation_thickness) & (population["is_pitched"]) ] if row.empty: continue row = row.sample(1) loft_insulation_270mm_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"loft_insulation_{insulation_thickness}_270mm_{config_hash}", "type": "loft_insulation", "new_u_value": best_270mm_uvalue, "parts": [ {"depth": 270} ] } ) loft_insulation_300mm_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"loft_insulation_{insulation_thickness}_300mm_{config_hash}", "type": "loft_insulation", "new_u_value": best_300mm_uvalue, "parts": [ {"depth": 300} ] } ) # Insert simulation specific configuration details loft_insulation_270mm_simulation = { "simulation_ending_insulation_thickness": "270", "simulation_starting_insulation_thickness": insulation_thickness, **loft_insulation_270mm_simulation } loft_insulation_300mm_simulation = { "simulation_ending_insulation_thickness": "300", "simulation_starting_insulation_thickness": insulation_thickness, **loft_insulation_300mm_simulation } loft_insulation_testing_data.append(loft_insulation_270mm_simulation) loft_insulation_testing_data.append(loft_insulation_300mm_simulation) # 2) Solid wall insulation solid_wall_sample = population[ population["is_solid_brick"] & (population["walls_insulation_thickness"] == "none") ] # We take 1 sample for each value of walls_thermal_transmittance for uvalue in solid_wall_sample["walls_thermal_transmittance"].unique(): row = solid_wall_sample[ solid_wall_sample["walls_thermal_transmittance"] == uvalue ].sample(1) # Simulated IWI internal_wall_insulation_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"internal_wall_insulation_uvalue_{uvalue}_{config_hash}", "type": "internal_wall_insulation", "new_u_value": best_internal_wall_uvalue, "parts": [] } ) # Simulated EWI external_wall_insulation_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"external_wall_insulation_uvalue_{uvalue}_{config_hash}", "type": "external_wall_insulation", "new_u_value": best_external_wall_uvalue, "parts": [] } ) # The iww/ewi simulations will be next to each other, so we can see how they differ for the same property solid_wall_testing_data.append(internal_wall_insulation_simulation) solid_wall_testing_data.append(external_wall_insulation_simulation) # 3) Cavity wall insulation cavity_wall_sample = population[ population["is_cavity_wall"] & (~population["is_filled_cavity"]) & ( ~population["external_insulation"] ) & (~population["internal_insulation"]) ] # We take 1 sample for each value of walls_thermal_transmittance for uvalue in cavity_wall_sample["walls_thermal_transmittance"].unique(): row = cavity_wall_sample[ cavity_wall_sample["walls_thermal_transmittance"] == uvalue ].sample(1) # Simulated filled cavity filled_cavity_wall_insulation_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"cavity_wall_insulation_uvalue_{uvalue}_{config_hash}", "type": "cavity_wall_insulation", "new_u_value": best_cavity_wall_uvalue, "parts": [] } ) cavity_wall_testing_data.append(filled_cavity_wall_insulation_simulation) # 4) Solid floor insulation solid_floor_sample = population[ population["is_solid"] & (population["floor_insulation_thickness"] == "none") ] solid_floor_uvalues = solid_floor_sample["floor_thermal_transmittance"].quantile([0.25, 0.5, 0.75]).values solid_floor_uvalues = {v for v in solid_floor_uvalues if not pd.isnull(v)} # We have many different values of u-value for solid floors, we we'll take a sample at the 25%, 50% and 75% # values # We must take a value that is in one of the unique values for floor_thermal_transmittance for uvalue in solid_floor_uvalues: nearest_value = solid_floor_sample['floor_thermal_transmittance'].sub(uvalue).abs().idxmin() nearest_row = solid_floor_sample.loc[[nearest_value]].sample(1) # Simulated solid floor insulation solid_floor_insulation_simulation = Property.create_recommendation_scoring_data( property_id=nearest_row["uprn"].values[0], recommendation_record=nearest_row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"solid_floor_insulation_uvalue_{uvalue}_{config_hash}", "type": "solid_floor_insulation", "new_u_value": None, # This doesn't matter at the moment "parts": [] } ) solid_floor_testing_data.append(solid_floor_insulation_simulation) # 5) Suspended floor insulation suspended_floor_sample = population[ population["is_suspended"] & (population["floor_insulation_thickness"] == "none") ] suspended_floor_uvalues = suspended_floor_sample["floor_thermal_transmittance"].quantile( [0.25, 0.5, 0.75] ).values suspended_floor_uvalues = {v for v in suspended_floor_uvalues if not pd.isnull(v)} # We take the same approach as for solid floors for uvalue in suspended_floor_uvalues: nearest_value = suspended_floor_sample['floor_thermal_transmittance'].sub(uvalue).abs().idxmin() nearest_row = suspended_floor_sample.loc[[nearest_value]].sample(1) # Simulated suspended floor insulation suspended_floor_insulation_simulation = Property.create_recommendation_scoring_data( property_id=nearest_row["uprn"].values[0], recommendation_record=nearest_row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"suspended_floor_insulation_uvalue_{uvalue}_{config_hash}", "type": "suspended_floor_insulation", "new_u_value": None, # This doesn't matter at the moment "parts": [] } ) suspended_floor_testing_data.append(suspended_floor_insulation_simulation) # 6) Windows - single glazing single_glazing_sample = population[ (population["glazing_type"] == "single") ] if not single_glazing_sample.empty: row = single_glazing_sample.sample(1) # For single glazed windows, we can recommend double glazing or secondary glazing # Simulated double glazing double_glazing_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"windows_glazing_single_to_double_{config_hash}", "type": "windows_glazing", "new_u_value": None, # This doesn't matter at the moment "parts": [], "is_secondary_glazing": False } ) # Simulated secondary glazing secondary_glazing_simulation = Property.create_recommendation_scoring_data( property_id=row["uprn"].values[0], recommendation_record=row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"windows_glazing_single_to_secondary_{config_hash}", "type": "windows_glazing", "new_u_value": None, # This doesn't matter at the moment "parts": [], "is_secondary_glazing": True } ) # Add in simulation specific details # Add to the beginning of the dictionary double_glazing_simulation = { "simulation_ending_window_finish": "double", **double_glazing_simulation } secondary_glazing_simulation = { "simulation_ending_window_finish": "secondary", **secondary_glazing_simulation } single_glazed_testing_data.append(double_glazing_simulation) single_glazed_testing_data.append(secondary_glazing_simulation) # 7) Windows - partial double glazed partial_double_glazing_sample = population[ (population["glazing_type"] == "double") & (population["multi_glaze_proportion_starting"] > 0) & ( population["multi_glaze_proportion_starting"] < 100 ) ] partial_double_glazed_values = partial_double_glazing_sample["multi_glaze_proportion_starting"].quantile( [0.25, 0.5, 0.75] ).values # Take non-null values partial_double_glazed_values = [v for v in partial_double_glazed_values if not pd.isnull(v)] partial_double_glazed_values = set(partial_double_glazed_values) for value in partial_double_glazed_values: nearest_value = partial_double_glazing_sample['multi_glaze_proportion_starting'].sub(value).abs().idxmin() nearest_row = partial_double_glazing_sample.loc[[nearest_value]].sample(1) # If we start with partial double glazing, we recommend completing the job # Simulated double glazing double_glazing_simulation = Property.create_recommendation_scoring_data( property_id=nearest_row["uprn"].values[0], recommendation_record=nearest_row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"windows_glazing_partial_double_to_double_{value}_{config_hash}", "type": "windows_glazing", "new_u_value": None, # This doesn't matter at the moment "parts": [], "is_secondary_glazing": False } ) partial_double_glazed_testing_data.append(double_glazing_simulation) # 8) Windows - partial secondary glazed partial_secondary_glazing_sample = population[ (population["glazing_type"] == "secondary") & (population["multi_glaze_proportion_starting"] > 0) & ( population["multi_glaze_proportion_starting"] < 100 ) ] partial_secondary_glazed_values = partial_secondary_glazing_sample["multi_glaze_proportion_starting"].quantile( [0.25, 0.5, 0.75] ).values # Take non-null values partial_secondary_glazed_values = [v for v in partial_secondary_glazed_values if not pd.isnull(v)] partial_secondary_glazed_values = set(partial_secondary_glazed_values) for value in partial_secondary_glazed_values: nearest_value = partial_secondary_glazing_sample['multi_glaze_proportion_starting'].sub( value).abs().idxmin() nearest_row = partial_secondary_glazing_sample.loc[[nearest_value]].sample(1) # If we start with partial secondary glazing, we recommend completing the job # Simulated secondary glazing secondary_glazing_simulation = Property.create_recommendation_scoring_data( property_id=nearest_row["uprn"].values[0], recommendation_record=nearest_row.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"windows_glazing_partial_secondary_to_secondary_{value}_{config_hash}", "type": "windows_glazing", "new_u_value": None, # This doesn't matter at the moment "parts": [], "is_secondary_glazing": True } ) partial_secondary_glazed_testing_data.append(secondary_glazing_simulation) # 9) Solar PV # We only recommend solar for properties that have flat or pitched roofs, and no existing solar pitched_roof_no_solar = population[ (population["is_pitched"]) & (population["photo_supply_starting"] == 0) ] if not pitched_roof_no_solar.empty: pitched_roof_no_solar = pitched_roof_no_solar.sample(1) flat_roof_no_solar = population[ (population["is_flat"]) & (population["photo_supply_starting"] == 0) ] if not flat_roof_no_solar.empty: flat_roof_no_solar = flat_roof_no_solar.sample(1) # We simulate 30%, 40% and 50% coverage for coverage in [30, 40, 50]: if not pitched_roof_no_solar.empty: solar_simulation_pitched = Property.create_recommendation_scoring_data( property_id=pitched_roof_no_solar["uprn"].values[0], recommendation_record=pitched_roof_no_solar.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"pitched_solar_pv_coverage_{coverage}_percent_{config_hash}", "type": "solar_pv", "new_u_value": None, # This doesn't matter at the moment "parts": [], "photo_supply": coverage } ) pitched_roof_solar.append(solar_simulation_pitched) if not flat_roof_no_solar.empty: solar_simulation_flat = Property.create_recommendation_scoring_data( property_id=flat_roof_no_solar["uprn"].values[0], recommendation_record=flat_roof_no_solar.copy().to_dict("records")[0], recommendation={ "recommendation_id": f"flat_solar_pv_coverage_{coverage}_percent_{config_hash}", "type": "solar_pv", "new_u_value": None, # This doesn't matter at the moment "parts": [], "photo_supply": coverage } ) flat_roof_solar.append(solar_simulation_flat) # We store all of this data in s3, as it is save_data_to_s3( bucket_name="retrofit-datalake-dev", s3_file_name="sap_change_model/simulation-pipeline-data.json", data=json.dumps( { "loft_insulation_testing_data": loft_insulation_testing_data, "solid_wall_testing_data": solid_wall_testing_data, "cavity_wall_testing_data": cavity_wall_testing_data, "solid_floor_testing_data": solid_floor_testing_data, "suspended_floor_testing_data": suspended_floor_testing_data, "single_glazed_testing_data": single_glazed_testing_data, "partial_double_glazed_testing_data": partial_double_glazed_testing_data, "partial_secondary_glazed_testing_data": partial_secondary_glazed_testing_data, "pitched_roof_solar": pitched_roof_solar, "flat_roof_solar": flat_roof_solar } ) ) # For each simulation type, we score against the model from backend.ml_models.api import ModelApi from datetime import datetime created_at = datetime.now().isoformat() model_api = ModelApi(portfolio_id="simulation-testing-pipeline", timestamp=created_at) model_api.MODEL_PREFIXES = ["sap_change_predictions"] # 1) Loft insulation # We chunk up the data into 200 rows loft_insulation_testing_df = pd.DataFrame(loft_insulation_testing_data) loft_insulation_predictions = [] loft_to_loop_over = range(0, loft_insulation_testing_df.shape[0], 200) for chunk in tqdm(loft_to_loop_over, total=len(loft_to_loop_over)): loft_insulation_predictions_dict = model_api.predict_all( df=loft_insulation_testing_df.iloc[chunk:chunk + 200], bucket="retrofit-data-dev", prediction_buckets={ "sap_change_predictions": "retrofit-sap-predictions-dev", } ) loft_insulation_predictions.append(loft_insulation_predictions_dict["sap_change_predictions"]) loft_insulation_predictions = pd.concat(loft_insulation_predictions) # Store final parquet in s3 save_dataframe_to_s3_parquet( df=loft_insulation_predictions, bucket_name="retrofit-datalake-dev", file_key=f"sap_change_model/simulation-pipeline-loft-insulation-predictions_{MODEL_VERSION}.parquet" ) # We now merge the loft insulation predictions onto the scoring data and calculate exactly how much the insulation # is worth loft_insulation_comparison_matrix = loft_insulation_testing_df[ ["simulation_starting_insulation_thickness", "simulation_ending_insulation_thickness", "uprn", "id", "sap_starting"] ].merge( loft_insulation_predictions.drop(columns=["recommendation_id"]), left_on="id", right_on="id", how="left" ) loft_insulation_comparison_matrix["measure_impact"] = loft_insulation_comparison_matrix["predictions"] - \ loft_insulation_comparison_matrix["sap_starting"] # We create a sap band grouping, for every 10 points of sap. So 1-10, 11-20, 21-30 etc loft_insulation_comparison_matrix["sap_band"] = pd.cut( loft_insulation_comparison_matrix["sap_starting"], bins=range(0, 101, 10), labels=range(1, 11) ) # Perform a group by describe loft_insulation_describe = loft_insulation_comparison_matrix.groupby( ["sap_band", "simulation_starting_insulation_thickness", "simulation_ending_insulation_thickness"] )[["measure_impact"]].describe().reset_index() for col in ["simulation_starting_insulation_thickness", "simulation_ending_insulation_thickness"]: loft_insulation_describe[col] = loft_insulation_describe[col].str.replace('none', "0") loft_insulation_describe[col] = loft_insulation_describe[col].astype(int) loft_insulation_describe = loft_insulation_describe.sort_values( ["simulation_ending_insulation_thickness", "simulation_starting_insulation_thickness"], ascending=True ) # In the training data, try and get just the rows that are loft insulation only # Things that change: # 1) roof_insulation_thickness # 3) roof_thermal_transmittance # 4) roof_energy_eff_ending loft_insulation_training_data = dataset.copy() loft_insulation_columns_we_need_the_same = [c for c in column_config.keys() if c not in [ "roof_insulation_thickness_ending", "roof_thermal_transmittance_ending", "roof_energy_eff_ending", "transaction_type_ending", "days_to_ending", "sap_ending", "heat_demand_ending", "carbon_ending", "total_floor_area_ending", "floor_height_ending", "estimated_perimeter_ending" ]] for ending_col in tqdm(loft_insulation_columns_we_need_the_same): starting_col = column_config[ending_col] loft_insulation_training_data = loft_insulation_training_data[ loft_insulation_training_data[ending_col] == loft_insulation_training_data[starting_col] ] # We get rows where the insulation starts at 200mm insulation_200mm_starting = loft_insulation_training_data[ (loft_insulation_training_data["roof_insulation_thickness"] == "200") & (loft_insulation_training_data["roof_insulation_thickness_ending"] == "300") ] # Let's use the API to find exactly the record from backend.SearchEpc import SearchEpc searcher = SearchEpc( address1="2 Darkfield Way", postcode="TA7 8HY", auth_token="a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=", os_api_key="" ) searcher.uprn = "10009320092" searcher.find_property(skip_os=True) newest_epc = searcher.newest_epc older_epc = [epc for epc in searcher.older_epcs if epc["lmk-key"] == "5ae2f073004839510f9eeb1886160776a05697f8518b8b3b63d45f65686c4757"][0] # Iterate through the keys in the newest_epc and find the values in older epc that are different to the newest epc differences = {} for k, v in newest_epc.items(): if v != older_epc[k]: differences[k] = (v, older_epc[k]) testing_model_api = ModelApi(portfolio_id="simulation-testing-loft-example", timestamp=created_at) testing_model_api.MODEL_PREFIXES = ["sap_change_predictions"] ############################################################################################################ # TODO:! # Findings: 1) For uprn 10009320092, the number of rooms and number of heated rooms has changed and can change from # epc to epc. We should therefore include a starting and ending value for this # Investigation 1) testing_row = insulation_200mm_starting[insulation_200mm_starting["uprn"] == "10009320092"].copy() testing_row["id"] = "testing-200mm-loft-insulation-starting-baseline+recommendation_id_baseline" testing_row["recommendation_id"] = "recommendation_id_baseline" # The testing row has 4 rooms # Score in the model to see what we get baseline_prediction = testing_model_api.predict_all( df=testing_row, bucket="retrofit-data-dev", prediction_buckets={ "sap_change_predictions": "retrofit-sap-predictions-dev", } ) baseline_pred_df = baseline_prediction["sap_change_predictions"] impact = baseline_pred_df["predictions"].values[0] - testing_row["sap_starting"].values[0] # Changing this from 4 rooms to 5 rooms has NO impact!! testing_row_5_rooms = testing_row.copy() testing_row_5_rooms["id"] = "testing-200mm-loft-insulation-starting-baseline+recommendation_id_5_rooms" testing_row_5_rooms["recommendation_id"] = "recommendation_id_5_rooms" testing_row_5_rooms["number_habitable_rooms"] = float(5) testing_row_5_rooms["number_heated_rooms"] = float(5) prediction_5_rooms = testing_model_api.predict_all( df=testing_row_5_rooms, bucket="retrofit-data-dev", prediction_buckets={ "sap_change_predictions": "retrofit-sap-predictions-dev", } ) pred_df_5_rooms = prediction_5_rooms["sap_change_predictions"] impact_5_rooms = pred_df_5_rooms["predictions"].values[0] - testing_row_5_rooms["sap_starting"].values[0]