import ast import json from copy import deepcopy from dataclasses import replace from datetime import datetime import random from tqdm import tqdm import pandas as pd import numpy as np from etl.epc.Record import EPCRecord from backend.SearchEpc import SearchEpc from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import sessionmaker from starlette.responses import Response from backend.app.config import get_settings, get_prediction_buckets from backend.app.db.connection import db_engine from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.functions.property_functions import ( create_property, create_property_details_epc, create_property_targets, update_property_data, update_or_create_property_spatial_details ) from backend.app.db.functions.recommendations_functions import ( create_plan, upload_recommendations, create_scenario ) from backend.app.db.functions.funding_functions import upload_funding from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES from backend.app.plan.utils import get_cleaned from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions from backend.ml_models.api import ModelApi from backend.Property import Property from backend.apis.GoogleSolarApi import GoogleSolarApi from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser import recommendations.optimiser.optimiser_functions as optimiser_functions from recommendations.Recommendations import Recommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 from backend.ml_models.Valuation import PropertyValuation from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc from backend.Funding import Funding from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value # Input data (temp) import pickle import pandas as pd with open("local_data_for_deletion.pkl", 'rb') as f: local_data = pickle.load(f) cleaning_data = local_data["cleaning_data"] materials = local_data["materials"] cleaned = local_data["cleaned"] project_scores_matrix = local_data["project_scores_matrix"] partial_project_scores_matrix = local_data["partial_project_scores_matrix"] whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"] with open("kwh_client_for_deletion.pkl", "rb") as f: kwh_client = pickle.load(f) epc_data = pd.read_csv( "/Users/khalimconn-kowlessar/Downloads/domestic-E06000002-Middlesbrough/certificates.csv", low_memory=False ) # TODO: Store this for cleaning costs_by_floor_area = epc_data[ pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01" ][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT", "HOT_WATER_COST_CURRENT"]].copy() costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns] for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"] costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[ ["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"] ].mean().reset_index() sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample( 10000).reset_index(drop=True) # TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type # TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used # in the google solar api but is it really needed? I don't think it's super accurate. It might be better to # just use an average energy consumption by floor area for UK households? # Load the input properties input_properties = [] for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)): epc = { k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items() } # Avoid the data load inside of EPCRecord - something we should pull out for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]: if pd.isnull(epc[x]): if x == "floor-height": epc[x] = 2.4 if x == "number-habitable-rooms": epc[x] = 3 if x == "number-heated-rooms": epc[x] = 3 epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []} prepared_epc = EPCRecord( epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, ) input_properties.append( Property( id=row_id, is_new=True, address=epc["address"], postcode=epc["postcode"], epc_record=prepared_epc, already_installed={}, property_valuation={}, non_invasive_recommendations=[], energy_assessment=None, **Property.extract_kwargs(config), # TODO: Depraecate this ) ) # For each property, insert the default solar configuration for p in tqdm(input_properties): solar_api = GoogleSolarApi( api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5 ) panel_performance = solar_api.default_panel_performance(property_instance=p) p.set_solar_panel_configuration( solar_panel_configuration={ "insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1 }, ) # We mock kwh preds mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []} for p in tqdm(input_properties): mocked_kwh_predictions["heating_kwh_predictions"].append({ "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] }) mocked_kwh_predictions["hotwater_kwh_predictions"].append({ "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] }) mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"]) mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"]) # TODO: We might want to implement this generally, via an ETL process for p in input_properties: for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: if pd.isnull(p.data[col]): min_diff = abs( (costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"]) ).min() df = costs_by_floor_area[ abs((costs_by_floor_area["current-energy-efficiency"] - p.data[ "current-energy-efficiency"])) == min_diff ] if df.shape[0] > 1: df = df.head(1) p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0] [ p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in input_properties ] # for p in input_properties: # p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) # Run the recommendations recommendations = {} recommendations_scoring_data = [] representative_recommendations = {} for p in tqdm(input_properties): if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]): p.data["built-form"] = "Semi-Detached" recommender = Recommendations( property_instance=p, materials=materials, exclusions=[], inclusions=[], default_u_values=True ) property_recommendations, property_representative_recommendations = recommender.recommend() if not property_recommendations: continue recommendations[p.id] = property_recommendations representative_recommendations[p.id] = property_representative_recommendations p.create_base_difference_epc_record(cleaned_lookup=cleaned) p.adjust_difference_record_with_recommendations( property_recommendations, property_representative_recommendations ) recommendations_scoring_data.extend(p.recommendations_scoring_data) recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) recommendations_scoring_data = recommendations_scoring_data.drop( columns=[ "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending" ] ) model_predictions_mocked = { "sap_change_predictions": None, "heat_demand_predictions": None, "carbon_change_predictions": None, "heating_kwh_predictions": None, "hotwater_kwh_predictions": None, } for k in model_predictions_mocked.keys(): model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy() model_predictions_mocked[k][['property_id', 'recommendation_id']] = ( model_predictions_mocked[k]['id'].str.split('+', expand=True) ) model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply( ModelApi.extract_phase) if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]: model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000), k=len(recommendations_scoring_data)) continue model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True) preds = [] for p_id in model_predictions_mocked[k]["property_id"].unique(): # We add some amount each time p = [p for p in input_properties if str(p.id) == p_id][0] if k == "sap_change_predictions": start = p.data["current-energy-efficiency"] elif k == "heat_demand_predictions": start = p.data["energy-consumption-current"] else: start = p.data["co2-emissions-current"] df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy() # Add some amount each time to_add = random.choices(range(0, 15), k=len(df)) to_add = np.cumsum(to_add) df["predictions"] = start + to_add preds.append(df) preds = pd.concat(preds) model_predictions_mocked[k] = preds for property_id in tqdm(recommendations.keys(), total=len(recommendations)): property_instance = [p for p in input_properties if p.id == property_id][0] recommendations_with_impact, impact_summary = ( Recommendations.calculate_recommendation_impact( property_instance=property_instance, all_predictions=model_predictions_mocked, recommendations=recommendations, representative_recommendations=representative_recommendations ) ) # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc # at each phase property_instance.update_simulation_epcs(impact_summary) recommendations[property_id] = recommendations_with_impact for property_id in tqdm([p.id for p in input_properties]): property_recommendations = recommendations.get(property_id, []) property_instance = [p for p in input_properties if p.id == property_id][0] property_current_energy_bill = ( Recommendations.calculate_recommendation_tenant_savings( property_instance=property_instance, kwh_simulation_predictions=model_predictions_mocked, property_recommendations=property_recommendations, ashp_cop=2.8 ) ) property_instance.current_energy_bill = property_current_energy_bill body = PlanTriggerRequest( **{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0, 'trigger_file_path': '', 'already_installed_file_path': '', 'patches_file_path': None, 'non_invasive_recommendations_file_path': None, 'valuation_file_path': '', 'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None, 'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8, 'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None, 'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None} ) for p in tqdm(input_properties): if not recommendations.get(p.id): continue # we need to double unlist because we have a list of lists property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore # its inclusion needs_ventilation = any( x in property_measure_types for x in assumptions.measures_needing_ventilation ) and not p.has_ventilation if not measures_to_optimise: # Nothing to do, we just reshape the recommendations recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( p.id, recommendations, set() ) continue fixed_gain = optimiser_functions.calculate_fixed_gain( property_required_measures, recommendations, p, needs_ventilation ) gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain) funding = Funding( tenure="Social", project_scores_matrix=project_scores_matrix, partial_project_scores_matrix=partial_project_scores_matrix, whlg_eligible_postcodes=whlg_eligible_postcodes, eco4_social_cavity_abs_rate=12.5, eco4_social_solid_abs_rate=17, eco4_private_cavity_abs_rate=12.5, eco4_private_solid_abs_rate=17, gbis_social_cavity_abs_rate=21, gbis_social_solid_abs_rate=25, gbis_private_cavity_abs_rate=21, gbis_private_solid_abs_rate=28, ) li_thickness = convert_thickness_to_numeric( p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] ) current_wall_u_value = p.walls["thermal_transmittance"] if current_wall_u_value is None: current_wall_u_value = get_wall_u_value( clean_description=p.walls["clean_description"], age_band=p.age_band, is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], ) # We insert the innovation uplift measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) # TODO: Turn this into a function and store the innovaiton uplift for group in measures_to_optimise_with_uplift: for r in group: if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: ( r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], r["uplift_project_score"], ) = ( 0, 0, 0, 0 ) continue ( r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], r["uplift_project_score"] ) = funding.get_innovation_uplift( measure=r, starting_sap=p.data["current-energy-efficiency"], floor_area=p.floor_area, is_cavity=p.walls["is_cavity_wall"], current_wall_uvalue=current_wall_u_value, is_partial="partial" in p.walls["clean_description"].lower(), existing_li_thickness=li_thickness, mainheating=p.main_heating, main_fuel=p.main_fuel, mainheat_energy_eff=p.data["mainheat-energy-eff"], ) input_measures = optimiser_functions.prepare_input_measures( measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True ) # When the goal is Increasing EPC, we can run the funding optimiser if body.goal == "Increasing EPC": solutions = optimise_with_funding_paths( p=p, input_measures=input_measures, housing_type=body.housing_type, budget=body.budget, target_gain=gain, funding=funding ) # Given the solutions we select the optimal one solutions["cost_less_full_project_funding"] = np.where( solutions["scheme"] == "eco4", solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"], solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"] ) solutions["cost_less_full_project_funding"] = ( solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"] ) solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True) if solutions["meets_upgrade_target"].any(): # If we have a solution that meets the upgrade target, we select that one optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] else: # Pick the cheapest optimal_solution = solutions.iloc[0] # This is the list of measures that we will recommend scheme = optimal_solution["scheme"] funded_measures = optimal_solution["items"] if scheme != "none" else [] solution = optimal_solution["items"] + optimal_solution["unfunded_items"] # This is the total amount of funding that the project will produce (including uplifts) (£) project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ optimal_solution["partial_project_funding"] # This is the total amount of funding associated to the uplift (£) total_uplift = optimal_solution["total_uplift"] # This is the funding scheme selected # This is the full project ABS full_project_score = optimal_solution["project_score"] # This is the partial project ABS partial_project_score = optimal_solution["partial_project_score"] # This is the uplift score ABS uplift_project_score = optimal_solution["total_uplift_score"] else: # We optimise and then we determine eligibility for funding, based on the measures selected optimiser = ( GainOptimiser( input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False ) if body.budget else CostOptimiser(input_measures, min_gain=gain) ) optimiser.setup() optimiser.solve() solution = optimiser.solution recommendation_types = [] for measures in input_measures: for measure in measures: recommendation_types.append(measure["type"]) recommendation_types = set(recommendation_types) has_wall_insulation_recommendation = any( (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in WALL_INSULATION_MEASURES ) has_roof_insulation_recommendation = any( (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in ROOF_INSULATION_MEASURES ) funding.check_funding( measures=solution, starting_sap=p.data["current-energy-efficiency"], ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]), floor_area=p.floor_area, mainheat_description=p.main_heating["clean_description"], heating_control_description=p.main_heating_controls["clean_description"], is_cavity=p.walls["is_cavity_wall"], current_wall_uvalue=current_wall_u_value, is_partial="partial" in p.walls["clean_description"].lower(), existing_li_thickness=li_thickness, mainheating=p.main_heating, main_fuel=p.main_fuel, mainheat_energy_eff=p.data["mainheat-energy-eff"], has_wall_insulation_recommendation=has_wall_insulation_recommendation, has_roof_insulation_recommendation=has_roof_insulation_recommendation, ) # Determine the scheme scheme = "none" if funding.eco4_eligible: scheme = "eco4" if scheme == "none" and funding.gbis_eligible: scheme = "gbis" funded_measures = solution if scheme in ["gbis", "eco4"] else [] project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs total_uplift = funding.eco4_uplift full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs partial_project_score = funding.partial_project_abs uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift selected = {r["id"] for r in solution} if property_required_measures: solution = optimiser_functions.add_required_measures( property_id=p.id, property_required_measures=property_required_measures, recommendations=recommendations, selected=selected, ) # Add best practice measures (ventilation/trickle vents) selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) # Final flattening - Don't do this! # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( # p.id, recommendations, selected # ) # TODO: functionise for measure in funded_measures: if "+mechanical_ventilation" in measure["type"]: measure["type"] = measure["type"].split("+mechanical_ventilation")[0] p.insert_funding( scheme=scheme, funded_measures=funded_measures, project_funding=project_funding, total_uplift=total_uplift, full_project_score=full_project_score, partial_project_score=partial_project_score, uplift_project_score=uplift_project_score )