# # import ast # # import json # from copy import deepcopy # # from dataclasses import replace # # from datetime import datetime # # import random # from tqdm import tqdm # # import pandas as pd # import numpy as np # from etl.epc.Record import EPCRecord # # from backend.SearchEpc import SearchEpc # # from sqlalchemy.exc import IntegrityError, OperationalError # # from sqlalchemy.orm import sessionmaker # # from starlette.responses import Response # # # from backend.app.config import get_settings, get_prediction_buckets # # from backend.app.db.connection import db_engine # # from backend.app.db.functions.materials_functions import get_materials # # from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations # # from backend.app.db.functions.property_functions import ( # # create_property, create_property_details_epc, create_property_targets, update_property_data, # # update_or_create_property_spatial_details # # ) # # from backend.app.db.functions.recommendations_functions import ( # # create_plan, upload_recommendations, create_scenario # # ) # # from backend.app.db.functions.funding_functions import upload_funding # # from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn # # from backend.app.db.models.portfolio import rating_lookup # from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES # # from backend.app.plan.utils import get_cleaned # # from backend.app.utils import sap_to_epc # import backend.app.assumptions as assumptions # # from backend.ml_models.api import ModelApi # from backend.Property import Property # from backend.apis.GoogleSolarApi import GoogleSolarApi # # from recommendations.optimiser.CostOptimiser import CostOptimiser # from recommendations.optimiser.GainOptimiser import GainOptimiser # import recommendations.optimiser.optimiser_functions as optimiser_functions # from recommendations.Recommendations import Recommendations # # from utils.logger import setup_logger # # from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 # # from backend.ml_models.Valuation import PropertyValuation # # # # from etl.bill_savings.KwhData import KwhData # # from etl.spatial.OpenUprnClient import OpenUprnClient # # from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc # # from backend.Funding import Funding # from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths # from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value # # # Input data (temp) # import pickle # # import pandas as pd # # with open("local_data_for_deletion.pkl", 'rb') as f: # local_data = pickle.load(f) # # cleaning_data = local_data["cleaning_data"] # materials = local_data["materials"] # cleaned = local_data["cleaned"] # project_scores_matrix = local_data["project_scores_matrix"] # partial_project_scores_matrix = local_data["partial_project_scores_matrix"] # whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"] # # with open("kwh_client_for_deletion.pkl", "rb") as f: # kwh_client = pickle.load(f) # # epc_data = pd.read_csv( # "/Users/khalimconn-kowlessar/Downloads/domestic-E06000002-Middlesbrough/certificates.csv", # low_memory=False # ) # # # TODO: Store this for cleaning # costs_by_floor_area = epc_data[ # pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01" # ][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT", # "HOT_WATER_COST_CURRENT"]].copy() # # epc_data = epc_data[ # (epc_data["MAINHEAT_DESCRIPTION"].str.contains("SAP05:") == False) & # (~epc_data["LIGHTING_COST_CURRENT"].isin([None, ""])) & # (~pd.isnull(epc_data["LIGHTING_COST_CURRENT"])) # ] # # costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns] # for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: # costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"] # # costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[ # ["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"] # ].mean().reset_index() # # epc_data = epc_data[~pd.isnull(epc_data["UPRN"])] # # sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2008-01-01"].drop_duplicates("UPRN").sample( # 50000).reset_index(drop=True) # # # TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type # # TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used # # in the google solar api but is it really needed? I don't think it's super accurate. It might be better to # # just use an average energy consumption by floor area for UK households? # # Load the input properties # input_properties = [] # for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)): # epc = { # k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items() # } # # Avoid the data load inside of EPCRecord - something we should pull out # for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]: # if pd.isnull(epc[x]): # if x == "floor-height": # epc[x] = 2.4 # if x == "number-habitable-rooms": # epc[x] = 3 # if x == "number-heated-rooms": # epc[x] = 3 # # epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []} # # prepared_epc = EPCRecord( # epc_records=epc_records, # run_mode="newdata", # cleaning_data=cleaning_data, # ) # # input_properties.append( # Property( # id=row_id, # is_new=True, # address=epc["address"], # postcode=epc["postcode"], # epc_record=prepared_epc, # already_installed={}, # property_valuation={}, # non_invasive_recommendations=[], # energy_assessment=None, # **Property.extract_kwargs(config), # TODO: Depraecate this # ) # ) # # # For each property, insert the default solar configuration # for p in tqdm(input_properties): # solar_api = GoogleSolarApi( # api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5 # ) # panel_performance = solar_api.default_panel_performance(property_instance=p) # p.set_solar_panel_configuration( # solar_panel_configuration={ # "insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1 # }, # ) # # # We mock kwh preds # mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []} # for p in tqdm(input_properties): # mocked_kwh_predictions["heating_kwh_predictions"].append({ # "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] # }) # mocked_kwh_predictions["hotwater_kwh_predictions"].append({ # "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] # }) # mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"]) # mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"]) # # # TODO: We might want to implement this generally, via an ETL process # for x in cleaned["mainheat-description"]: # x["has_wood_chips"] = False # for p in input_properties: # for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: # if pd.isnull(p.data[col]): # min_diff = abs( # (costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"]) # ).min() # df = costs_by_floor_area[ # abs((costs_by_floor_area["current-energy-efficiency"] - p.data[ # "current-energy-efficiency"])) == min_diff # ] # if df.shape[0] > 1: # df = df.head(1) # p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0] # # [ # p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in # input_properties # ] # # for p in input_properties: # # p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) # # # Run the recommendations # recommendations = {} # recommendations_scoring_data = [] # representative_recommendations = {} # for p in tqdm(input_properties): # if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]): # p.data["built-form"] = "Semi-Detached" # recommender = Recommendations( # property_instance=p, # materials=materials, # exclusions=[], # inclusions=[], # default_u_values=True # ) # property_recommendations, property_representative_recommendations = recommender.recommend() # # if not property_recommendations: # continue # # recommendations[p.id] = property_recommendations # representative_recommendations[p.id] = property_representative_recommendations # # p.create_base_difference_epc_record(cleaned_lookup=cleaned) # p.adjust_difference_record_with_recommendations( # property_recommendations, property_representative_recommendations # ) # # recommendations_scoring_data.extend(p.recommendations_scoring_data) # # recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) # recommendations_scoring_data = recommendations_scoring_data.drop( # columns=[ # "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", # "carbon_ending" # ] # ) # # model_predictions_mocked = { # "sap_change_predictions": None, # "heat_demand_predictions": None, # "carbon_change_predictions": None, # "heating_kwh_predictions": None, # "hotwater_kwh_predictions": None, # } # # for k in model_predictions_mocked.keys(): # model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy() # model_predictions_mocked[k][['property_id', 'recommendation_id']] = ( # model_predictions_mocked[k]['id'].str.split('+', expand=True) # ) # model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply( # ModelApi.extract_phase) # # if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]: # model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000), # k=len(recommendations_scoring_data)) # continue # # model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True) # preds = [] # for p_id in model_predictions_mocked[k]["property_id"].unique(): # # We add some amount each time # p = [p for p in input_properties if str(p.id) == p_id][0] # if k == "sap_change_predictions": # start = p.data["current-energy-efficiency"] # elif k == "heat_demand_predictions": # start = p.data["energy-consumption-current"] # else: # start = p.data["co2-emissions-current"] # df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy() # # Add some amount each time # to_add = random.choices(range(0, 15), k=len(df)) # to_add = np.cumsum(to_add) # df["predictions"] = start + to_add # preds.append(df) # preds = pd.concat(preds) # model_predictions_mocked[k] = preds # # for property_id in tqdm(recommendations.keys(), total=len(recommendations)): # property_instance = [p for p in input_properties if p.id == property_id][0] # # recommendations_with_impact, impact_summary = ( # Recommendations.calculate_recommendation_impact( # property_instance=property_instance, # all_predictions=model_predictions_mocked, # recommendations=recommendations, # representative_recommendations=representative_recommendations # ) # ) # # # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc # # at each phase # property_instance.update_simulation_epcs(impact_summary) # recommendations[property_id] = recommendations_with_impact # # for property_id in tqdm([p.id for p in input_properties]): # property_recommendations = recommendations.get(property_id, []) # property_instance = [p for p in input_properties if p.id == property_id][0] # # property_current_energy_bill = ( # Recommendations.calculate_recommendation_tenant_savings( # property_instance=property_instance, # kwh_simulation_predictions=model_predictions_mocked, # property_recommendations=property_recommendations, # ashp_cop=2.8 # ) # ) # property_instance.current_energy_bill = property_current_energy_bill # # body = PlanTriggerRequest( # **{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0, # 'trigger_file_path': '', 'already_installed_file_path': '', # 'patches_file_path': None, 'non_invasive_recommendations_file_path': None, # 'valuation_file_path': '', # 'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None, # 'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8, # 'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None, # 'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None} # ) # # eco_packages = {} # # For testing # for p in input_properties: # eco_packages[p.id] = (None, None, None) # # for p in tqdm(input_properties): # if not recommendations.get(p.id): # continue # # # Temp allow to skip # if not isinstance(recommendations.get(p.id)[0], list): # continue # # # we need to double unlist because we have a list of lists # property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} # property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] # measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] # # # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore # # its inclusion # needs_ventilation = any( # x in property_measure_types for x in assumptions.measures_needing_ventilation # ) and not p.has_ventilation # # if not measures_to_optimise: # # Nothing to do, we just reshape the recommendations # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( # p.id, recommendations, set() # ) # continue # # fixed_gain = optimiser_functions.calculate_fixed_gain( # property_required_measures, recommendations, p, needs_ventilation # ) # gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages) # # # funding = Funding( # # tenure=body.housing_type, # # project_scores_matrix=project_scores_matrix, # # partial_project_scores_matrix=partial_project_scores_matrix, # # whlg_eligible_postcodes=whlg_eligible_postcodes, # # eco4_social_cavity_abs_rate=13, # # eco4_social_solid_abs_rate=17, # # eco4_private_cavity_abs_rate=13, # # eco4_private_solid_abs_rate=17, # # gbis_social_cavity_abs_rate=21, # # gbis_social_solid_abs_rate=25, # # gbis_private_cavity_abs_rate=21, # # gbis_private_solid_abs_rate=28, # # ) # # # # li_thickness = convert_thickness_to_numeric( # # p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] # # ) # # current_wall_u_value = p.walls["thermal_transmittance"] # # if current_wall_u_value is None: # # current_wall_u_value = get_wall_u_value( # # clean_description=p.walls["clean_description"], # # age_band=p.age_band, # # is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], # # is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], # # ) # # # We insert the innovation uplift # measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) # # # TODO: Turn this into a function and store the innovaiton uplift # for group in measures_to_optimise_with_uplift: # for r in group: # (r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], # r["uplift_project_score"]) = ( # 0, 0, 0, 0 # ) # # # if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", # # "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: # # ( # # r["partial_project_score"], # # r["partial_project_funding"], # # r["innovation_uplift"], # # r["uplift_project_score"], # # ) = ( # # 0, 0, 0, 0 # # ) # # continue # # # # ( # # r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], # # r["uplift_project_score"] # # ) = funding.get_innovation_uplift( # # measure=r, # # starting_sap=int(p.data["current-energy-efficiency"]), # # floor_area=p.floor_area, # # is_cavity=p.walls["is_cavity_wall"], # # current_wall_uvalue=current_wall_u_value, # # is_partial="partial" in p.walls["clean_description"].lower(), # # existing_li_thickness=li_thickness, # # mainheating=p.main_heating, # # main_fuel=p.main_fuel, # # mainheat_energy_eff=p.data["mainheat-energy-eff"], # # ) # # if r["already_installed"]: # # if already installed, we zero out the uplift and funding # (r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], # r["uplift_project_score"]) = ( # 0, 0, 0, 0 # ) # # input_measures = optimiser_functions.prepare_input_measures( # measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True, # property_eco_packages=eco_packages.get(p.id) # ) # # # When the goal is Increasing EPC, we can run the funding optimiser # if body.goal == "Switch off": # # solutions = optimise_with_funding_paths( # p=p, # input_measures=input_measures, # housing_type=body.housing_type, # budget=body.budget, # target_gain=gain, # funding=funding, # work_package=eco_packages[p.id][2] # ) # # # If the solution isn't eligible, we can't really consider it # solutions = solutions[ # (solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none") # ] # # if solutions["meets_upgrade_target"].any(): # # If we have a solution that meets the upgrade target, we select that one # optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] # else: # # Pick the cheapest # optimal_solution = solutions.iloc[0] # # # This is the list of measures that we will recommend # scheme = optimal_solution["scheme"] # # # We create this full list of selected measures, which is used in the next section for setting # # default measures # solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"]) # funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else [] # # # This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£) # project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ # optimal_solution["partial_project_funding"] # # This is the total amount of funding associated to the uplift (£) # total_uplift = optimal_solution["total_uplift"] # # This is the funding scheme selected # # This is the full project ABS # full_project_score = optimal_solution["project_score"] # # This is the partial project ABS # partial_project_score = optimal_solution["partial_project_score"] # # This is the uplift score ABS # uplift_project_score = optimal_solution["total_uplift_score"] # else: # # We optimise and then we determine eligibility for funding, based on the measures selected # optimiser = ( # GainOptimiser( # input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False # ) if body.budget else CostOptimiser(input_measures, min_gain=gain) # ) # optimiser.setup() # optimiser.solve() # solution = optimiser.solution # # recommendation_types = [] # for measures in input_measures: # for measure in measures: # recommendation_types.append(measure["type"]) # recommendation_types = set(recommendation_types) # # has_wall_insulation_recommendation = any( # (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in # WALL_INSULATION_MEASURES # ) # has_roof_insulation_recommendation = any( # (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in # ROOF_INSULATION_MEASURES # ) # # # funding.check_funding( # # measures=solution, # # starting_sap=int(p.data["current-energy-efficiency"]), # # ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]), # # floor_area=p.floor_area, # # mainheat_description=p.main_heating["clean_description"], # # heating_control_description=p.main_heating_controls["clean_description"], # # is_cavity=p.walls["is_cavity_wall"], # # current_wall_uvalue=current_wall_u_value, # # is_partial="partial" in p.walls["clean_description"].lower(), # # existing_li_thickness=li_thickness, # # mainheating=p.main_heating, # # main_fuel=p.main_fuel, # # mainheat_energy_eff=p.data["mainheat-energy-eff"], # # has_wall_insulation_recommendation=has_wall_insulation_recommendation, # # has_roof_insulation_recommendation=has_roof_insulation_recommendation, # # ) # # # Determine the scheme # scheme = "none" # # if funding.eco4_eligible: # # scheme = "eco4" # # if scheme == "none" and funding.gbis_eligible: # # scheme = "gbis" # # funded_measures = [] # # funded_measures = solution if scheme in ["gbis", "eco4"] else [] # # project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs # project_funding = 0 # # total_uplift = funding.eco4_uplift # total_uplift = 0 # # full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs # full_project_score = 0 # # partial_project_score = funding.partial_project_abs # partial_project_score = 0 # # uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift # uplift_project_score = 0 # # selected = {r["id"] for r in solution} # # if property_required_measures: # solution = optimiser_functions.add_required_measures( # property_id=p.id, property_required_measures=property_required_measures, # recommendations=recommendations, selected=selected, # ) # # # Add best practice measures (ventilation/trickle vents) # selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) # # Final flattening # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( # p.id, recommendations, selected # ) # # # TODO: functionise # for measure in funded_measures: # if "+mechanical_ventilation" in measure["type"]: # measure["type"] = measure["type"].split("+mechanical_ventilation")[0] # # p.insert_funding( # scheme=scheme, # funded_measures=funded_measures, # project_funding=project_funding, # total_uplift=total_uplift, # full_project_score=full_project_score, # partial_project_score=partial_project_score, # uplift_project_score=uplift_project_score # ) # # # for p in tqdm(input_properties): # # if not recommendations.get(p.id): # # continue # # # # # we need to double unlist because we have a list of lists # # property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} # # property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] # # measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] # # # # # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore # # # its inclusion # # needs_ventilation = any( # # x in property_measure_types for x in assumptions.measures_needing_ventilation # # ) and not p.has_ventilation # # # # if not measures_to_optimise: # # # Nothing to do, we just reshape the recommendations # # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( # # p.id, recommendations, set() # # ) # # continue # # # # fixed_gain = optimiser_functions.calculate_fixed_gain( # # property_required_measures, recommendations, p, needs_ventilation # # ) # # gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain) # # # # funding = Funding( # # tenure="Social", # # project_scores_matrix=project_scores_matrix, # # partial_project_scores_matrix=partial_project_scores_matrix, # # whlg_eligible_postcodes=whlg_eligible_postcodes, # # eco4_social_cavity_abs_rate=12.5, # # eco4_social_solid_abs_rate=17, # # eco4_private_cavity_abs_rate=12.5, # # eco4_private_solid_abs_rate=17, # # gbis_social_cavity_abs_rate=21, # # gbis_social_solid_abs_rate=25, # # gbis_private_cavity_abs_rate=21, # # gbis_private_solid_abs_rate=28, # # ) # # # # li_thickness = convert_thickness_to_numeric( # # p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] # # ) # # current_wall_u_value = p.walls["thermal_transmittance"] # # if current_wall_u_value is None: # # current_wall_u_value = get_wall_u_value( # # clean_description=p.walls["clean_description"], # # age_band=p.age_band, # # is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], # # is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], # # ) # # # # # We insert the innovation uplift # # measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) # # # # # TODO: Turn this into a function and store the innovaiton uplift # # for group in measures_to_optimise_with_uplift: # # for r in group: # # # # if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", # # "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: # # ( # # r["partial_project_score"], # # r["partial_project_funding"], # # r["innovation_uplift"], # # r["uplift_project_score"], # # ) = ( # # 0, 0, 0, 0 # # ) # # continue # # # # ( # # r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], # # r["uplift_project_score"] # # ) = funding.get_innovation_uplift( # # measure=r, # # starting_sap=p.data["current-energy-efficiency"], # # floor_area=p.floor_area, # # is_cavity=p.walls["is_cavity_wall"], # # current_wall_uvalue=current_wall_u_value, # # is_partial="partial" in p.walls["clean_description"].lower(), # # existing_li_thickness=li_thickness, # # mainheating=p.main_heating, # # main_fuel=p.main_fuel, # # mainheat_energy_eff=p.data["mainheat-energy-eff"], # # ) # # # # input_measures = optimiser_functions.prepare_input_measures( # # measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True # # ) # # # # # When the goal is Increasing EPC, we can run the funding optimiser # # if body.goal == "Increasing EPC": # # # # solutions = optimise_with_funding_paths( # # p=p, # # input_measures=input_measures, # # housing_type=body.housing_type, # # budget=body.budget, # # target_gain=gain, # # funding=funding # # ) # # # # # Given the solutions we select the optimal one # # solutions["cost_less_full_project_funding"] = np.where( # # solutions["scheme"] == "eco4", # # solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"], # # solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"] # # ) # # # # solutions["cost_less_full_project_funding"] = ( # # solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"] # # ) # # solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True) # # # # if solutions["meets_upgrade_target"].any(): # # # If we have a solution that meets the upgrade target, we select that one # # optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] # # else: # # # Pick the cheapest # # optimal_solution = solutions.iloc[0] # # # # # This is the list of measures that we will recommend # # scheme = optimal_solution["scheme"] # # funded_measures = optimal_solution["items"] if scheme != "none" else [] # # solution = optimal_solution["items"] + optimal_solution["unfunded_items"] # # # This is the total amount of funding that the project will produce (including uplifts) (£) # # project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ # # optimal_solution["partial_project_funding"] # # # This is the total amount of funding associated to the uplift (£) # # total_uplift = optimal_solution["total_uplift"] # # # This is the funding scheme selected # # # This is the full project ABS # # full_project_score = optimal_solution["project_score"] # # # This is the partial project ABS # # partial_project_score = optimal_solution["partial_project_score"] # # # This is the uplift score ABS # # uplift_project_score = optimal_solution["total_uplift_score"] # # else: # # # We optimise and then we determine eligibility for funding, based on the measures selected # # optimiser = ( # # GainOptimiser( # # input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False # # ) if body.budget else CostOptimiser(input_measures, min_gain=gain) # # ) # # optimiser.setup() # # optimiser.solve() # # solution = optimiser.solution # # # # recommendation_types = [] # # for measures in input_measures: # # for measure in measures: # # recommendation_types.append(measure["type"]) # # recommendation_types = set(recommendation_types) # # # # has_wall_insulation_recommendation = any( # # (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in # # WALL_INSULATION_MEASURES # # ) # # has_roof_insulation_recommendation = any( # # (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in # # ROOF_INSULATION_MEASURES # # ) # # # # funding.check_funding( # # measures=solution, # # starting_sap=p.data["current-energy-efficiency"], # # ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]), # # floor_area=p.floor_area, # # mainheat_description=p.main_heating["clean_description"], # # heating_control_description=p.main_heating_controls["clean_description"], # # is_cavity=p.walls["is_cavity_wall"], # # current_wall_uvalue=current_wall_u_value, # # is_partial="partial" in p.walls["clean_description"].lower(), # # existing_li_thickness=li_thickness, # # mainheating=p.main_heating, # # main_fuel=p.main_fuel, # # mainheat_energy_eff=p.data["mainheat-energy-eff"], # # has_wall_insulation_recommendation=has_wall_insulation_recommendation, # # has_roof_insulation_recommendation=has_roof_insulation_recommendation, # # ) # # # # # Determine the scheme # # scheme = "none" # # if funding.eco4_eligible: # # scheme = "eco4" # # if scheme == "none" and funding.gbis_eligible: # # scheme = "gbis" # # # # funded_measures = solution if scheme in ["gbis", "eco4"] else [] # # project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs # # total_uplift = funding.eco4_uplift # # full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs # # partial_project_score = funding.partial_project_abs # # uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift # # # # selected = {r["id"] for r in solution} # # # # if property_required_measures: # # solution = optimiser_functions.add_required_measures( # # property_id=p.id, property_required_measures=property_required_measures, # # recommendations=recommendations, selected=selected, # # ) # # # # # Add best practice measures (ventilation/trickle vents) # # selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) # # # Final flattening - Don't do this! # # # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( # # # p.id, recommendations, selected # # # ) # # # # # TODO: functionise # # for measure in funded_measures: # # if "+mechanical_ventilation" in measure["type"]: # # measure["type"] = measure["type"].split("+mechanical_ventilation")[0] # # # # p.insert_funding( # # scheme=scheme, # # funded_measures=funded_measures, # # project_funding=project_funding, # # total_uplift=total_uplift, # # full_project_score=full_project_score, # # partial_project_score=partial_project_score, # # uplift_project_score=uplift_project_score # # )