import time import json from copy import deepcopy from datetime import datetime import pandas as pd from uuid import UUID from typing import List from tqdm import tqdm from sqlalchemy.exc import IntegrityError, OperationalError from starlette.responses import Response from backend.SearchEpc import SearchEpc from etl.epc.Record import EPCRecord from backend.app.BatterySapScorer import BatterySAPScorer from etl.epc.PredictionMatrix import PredictionMatrix from backend.app.config import get_settings, get_prediction_buckets from backend.app.db.connection import db_session, db_read_session import backend.app.db.functions as db_funcs from backend.app.db.functions.tasks.Tasks import SubTaskInterface from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import ( get_cleaned, patch_epc, extract_property_request_data, handle_error, build_cloudwatch_log_url ) from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions from backend.ml_models.api import ModelApi from backend.ml_models.Valuation import PropertyValuation from backend.Property import Property from backend.apis.GoogleSolarApi import GoogleSolarApi from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser import recommendations.optimiser.optimiser_functions as optimiser_functions from recommendations.Recommendations import Recommendations from recommendations.optimiser.funding_optimiser import optimise_with_scenarios from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.app.plan.plan_input_processor import PlanInputProcessor logger = setup_logger() BATCH_SIZE = 5 SCORING_BATCH_SIZE = 300 def extract_portfolio_aggregation_data( input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges ): # We aggregate a number of metrics for the portfolio: # 1) A breakdown of the number of properties in each EPC band # a) before retrofit # b) after retrofit # 2) Number of units # 3) Co2/unit # a) before retrofit # b) after retrofit # 4) Energy bill/unit # a) before retrofit # b) after retrofit # 5) Average valuation improvement/unit # 6) Total cost # 7) Cost per unit # 8) £ per CO2 saved # 9) £ per SAP point # We need to construct the underlyind data for this # Helper function to reformat the EPC data def reformat_epc_data(epc_counts): # Define all possible EPC bands in the required order epc_bands = ["G", "F", "E", "D", "C", "B", "A"] # Create the formatted data list by checking each band in the order formatted_data = [] for band in epc_bands: # Get the count from the dictionary, defaulting to 0 if not present count = epc_counts.get(band, 0) # Append the formatted dictionary to the list formatted_data.append({"name": band, band: count}) return formatted_data n_units = len(input_properties) agg_data = [] for p in input_properties: # Get the recommendations for the property - we include all properties, even ones without recommendations property_recommendations = recommendations.get(p.id, []) # Get just the default recommendations default_recommendations = [r for r in property_recommendations if r["default"]] has_recommendations = len(default_recommendations) > 0 # We can now calculate multiple outputs based on default recommendations carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations]) pre_retrofit_co2 = p.energy["co2_emissions"] post_retrofit_co2 = pre_retrofit_co2 - carbon_savings pre_retrofit_energy_bill = sum(p.current_energy_bill.values()) post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum( [r["energy_cost_savings"] for r in default_recommendations] ) pre_retrofit_energy_consumption = p.current_energy_consumption post_retrofit_energy_consumption = p.current_energy_consumption - sum( [r["kwh_savings"] for r in default_recommendations] ) # Add up energy savings cost = sum([r["total"] for r in default_recommendations]) sap_point_improvement = sum([r["sap_points"] for r in default_recommendations]) # Fix ambiguous Series/DataFrame truth value for current_value current_value = property_value_increase_ranges[p.id]["current_value"] if isinstance(current_value, (pd.Series, pd.DataFrame)): # Reduce to scalar is_null = bool( current_value.isnull().all().item() if hasattr(current_value.isnull().all(), 'item') else current_value.isnull().all().all() ) else: is_null = bool(pd.isnull(current_value)) if (not is_null) and (current_value is not None): lower_bound_valuation_uplift = ( property_value_increase_ranges[p.id]["lower_bound_increased_value"] - current_value ) upper_bound_valuation_uplift = ( property_value_increase_ranges[p.id]["upper_bound_increased_value"] - current_value ) else: lower_bound_valuation_uplift, upper_bound_valuation_uplift = 0, 0 agg_data.append({ "pre_retrofit_epc": p.epc_record.current_energy_rating, "post_retrofit_epc": new_epc_bands[p.id], "pre_retrofit_co2": pre_retrofit_co2, "post_retrofit_co2": post_retrofit_co2, "pre_retrofit_energy_bill": pre_retrofit_energy_bill, "post_retrofit_energy_bill": post_retrofit_energy_bill, "pre_retrofit_energy_consumption": pre_retrofit_energy_consumption, "post_retrofit_energy_consumption": post_retrofit_energy_consumption, "cost": cost, "sap_point_improvement": sap_point_improvement, "lower_bound_valuation_uplift": lower_bound_valuation_uplift, "upper_bound_valuation_uplift": upper_bound_valuation_uplift, "has_recommendations": has_recommendations, "funding": float(p.project_funding) if p.project_funding is not None else 0, "contingency": float(sum([x.get("contingency", 0) for x in default_recommendations])) }) agg_data = pd.DataFrame(agg_data) n_units_to_retrofit = agg_data["has_recommendations"].sum() valuation_improvement_lower_bound_per_unit = ( agg_data["lower_bound_valuation_uplift"].mean() ) valuation_improvement_upper_bound_per_unit = ( agg_data["upper_bound_valuation_uplift"].mean() ) total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum() total_sap_points = agg_data["sap_point_improvement"].sum() def format_money(amount): return f"£{amount:,.0f}" valuation_improvment_per_unit = str( format_money( total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - " f"{format_money(valuation_improvement_upper_bound_per_unit)})") ) if agg_data["cost"].sum() == 0: valuation_percentage_increase = 0 valuation_increase_lower = 0 valuation_increase_upper = 0 else: valuation_percentage_increase = round(total_valuation_increase / agg_data["cost"].sum(), 2) valuation_increase_lower = agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum() valuation_increase_upper = agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum() valuation_return_on_investment = str( str(valuation_percentage_increase) + f" (" f"{valuation_increase_lower:,.2f} - " f"{valuation_increase_upper:,.2f})" ) cost_per_co2_saved = agg_data["cost"].sum() / total_carbon_saved if total_carbon_saved > 0 else 0 cost_per_co2_saved = format_money(cost_per_co2_saved) cost_per_sap_point = agg_data["cost"].sum() / total_sap_points if total_sap_points > 0 else 0 cost_per_sap_point = format_money(cost_per_sap_point) total_funding = agg_data["funding"].sum() total_contingency = agg_data["contingency"].sum() aggregation_data = { "epc_breakdown_pre_retrofit": json.dumps( reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict()) ), "epc_breakdown_post_retrofit": json.dumps( reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict()) ), "number_of_properties": int(n_units), "n_units_to_retrofit": int(n_units_to_retrofit), "co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t", "co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t", "energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()), "energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()), "energy_consumption_per_unit_pre_retrofit": str( round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh", "energy_consumption_per_unit_post_retrofit": str( round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh", "valuation_improvement_per_unit": valuation_improvment_per_unit, "cost_per_unit": format_money(agg_data["cost"].mean()), "cost_per_co2_saved": cost_per_co2_saved, "cost_per_sap_point": cost_per_sap_point, "valuation_return_on_investment": valuation_return_on_investment, "funding": float(total_funding), "contingency": float(total_contingency) } return aggregation_data def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict): """ This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs and will factor in an energy assessment that we have performed for a client. :param epc_searcher: An instance of the SearchEpc class :param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment, this should be an empty response as defined by the models's EnergyAssessment.empty_response() method """ newest_epc = epc_searcher.newest_epc.copy() if newest_epc["uprn"] == "" and epc_searcher.uprn: newest_epc["uprn"] = epc_searcher.uprn if not energy_assessment["epc"]: energy_assessment_is_newer = False return { 'original_epc': newest_epc, 'full_sap_epc': epc_searcher.full_sap_epc.copy(), 'old_data': epc_searcher.older_epcs.copy(), }, energy_assessment_is_newer epc = energy_assessment["epc"] energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d") # We insert county into the epc, since right now this isn't something that we pull out from the energy # assessment for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]: epc[col] = newest_epc[col] # We check if the energy assessment is newer than the newest EPC if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]): # In this case, our energy assessment is newer than the EPCs available for this property energy_assessment_is_newer = True return { "original_epc": epc, "full_sap_epc": epc_searcher.full_sap_epc.copy(), "old_data": epc_searcher.older_epcs.copy() + [newest_epc] }, energy_assessment_is_newer # We check if the EPC we have produced is contained in the set of EPCs done for the property # We do this based on inspection-date and SAP epc_in_historicals = [ x for x in epc_searcher.older_epcs + [newest_epc] if x["inspection-date"] == energy_assessment_date and x["current-energy-efficiency"] == epc["current-energy-efficiency"] ] energy_assessment_is_newer = False if epc_in_historicals: # Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest return { "original_epc": newest_epc, "full_sap_epc": epc_searcher.full_sap_epc.copy(), "old_data": epc_searcher.older_epcs.copy() }, energy_assessment_is_newer # In this case, our EPC is older than the newest publically availe one, but is not contained in # the historicals, so it can't have been lodged, so we include it in the old data return { 'original_epc': newest_epc, 'full_sap_epc': epc_searcher.full_sap_epc.copy(), 'old_data': epc_searcher.older_epcs.copy() + [epc], }, energy_assessment_is_newer def get_request_property_data(body: PlanTriggerRequest): """ This function will read in the on-site data from the S3 bucket :param body: The request body :return: """ patches = [] if body.patches_file_path: patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path) already_installed = [] if body.already_installed_file_path: already_installed = read_csv_from_s3( bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path ) non_invasive_recommendations = [] if body.non_invasive_recommendations_file_path: non_invasive_recommendations = read_csv_from_s3( bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path ) valuation_data = [] if body.valuation_file_path: valuation_data = read_csv_from_s3( bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path ) return patches, already_installed, non_invasive_recommendations, valuation_data def get_funding_data(): """ This function retrieves the eco project scores matrix and the warm homes local grant funding data :return: """ project_scores_matrix = read_csv_from_s3( bucket_name=get_settings().DATA_BUCKET, filepath="funding/ECO4 Full Project Scores Matrix.csv", ) project_scores_matrix = pd.DataFrame(project_scores_matrix) project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings'] project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float) partial_project_scores_matrix = read_csv_from_s3( bucket_name=get_settings().DATA_BUCKET, filepath="funding/ECO4_Partial_Project_Scores_Matrix_v6.csv", ) partial_project_scores_matrix = pd.DataFrame(partial_project_scores_matrix) partial_project_scores_matrix.columns = [ 'Measure category', 'Measure_Type', 'Pre_Main_Heating_Source', 'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band', 'Average Treatable Factor', 'Cost Savings', 'SAP Savings' ] # Replace 200 with 200+ in floor area band partial_project_scores_matrix["Total Floor Area Band"] = partial_project_scores_matrix[ "Total Floor Area Band" ].replace({"200": "200+"}) partial_project_scores_matrix["Cost Savings"] = partial_project_scores_matrix["Cost Savings"].astype(float) whlg_eligible_postcodes = read_csv_from_s3( bucket_name=get_settings().DATA_BUCKET, filepath="funding/whlg eligible postcodes.csv", ) whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes def check_duplicate_uprns(input_uprns: List[int]): """ Simple function to check if the input data contains duplicated UPRNS. If there are duplicates, an exception will be rasied :return: """ # Check for duplicate UPRNS if input_uprns: # Check for dupes if len(input_uprns) != len(set(input_uprns)): # Find the duplicate UPRNs duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1]) # de-dupe input_uprns raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}") return True def check_duplicate_property_ids(input_properties): """ Simple function to check if the input data contains duplicated property IDs. This will happen in very rare cases where we have properties across different servers, where the input UPRN is possibly incorrect and we find the right property via an address search, instead of a UPRN search and so we end up with the same property twice. :param input_properties: :return: """ input_property_ids = [x.id for x in input_properties] if input_property_ids: # Check for dupes if len(input_property_ids) != len(set(input_property_ids)): # Find the duplicate property IDs duplicates = set([x for x in input_property_ids if input_property_ids.count(x) > 1]) # de-dupe input_uprns raise ValueError(f"Duplicate property IDs in the input data: {duplicates}") # Check for dupe UPRNS input_uprns = [x.uprn for x in input_properties if x.uprn is not None] if input_uprns: if len(input_uprns) != len(set(input_uprns)): duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1]) raise ValueError(f"Duplicate UPRNs in the input properties: {duplicates}") return True def extract_address_data(config, body): """ Simple helper to grab address data from the config :return: """ try: uprn = config.get("uprn", None) if uprn is not None and pd.notnull(uprn): uprn = int(float(uprn)) else: uprn = None except Exception: uprn = None address1 = config.get("address", None) # Handle domna address list format if pd.isnull(address1) and body.file_format == "domna_asset_list": address1 = config.get("domna_address_1", None) address1 = str(int(address1)) if isinstance(address1, float) else str(address1) full_address = config.get("domna_full_address", "") if body.file_format == "domna_asset_list" else None if not isinstance(full_address, str): # Catch for when the full address is nan full_address = None return uprn, address1, full_address def keep_max_sap_per_measure_type(items): # First pass: find max sap_points per measure_type max_by_type = {} for item in items: t = item["measure_type"] max_by_type[t] = max(max_by_type.get(t, float("-inf")), item["sap_points"]) # Second pass: keep only items matching the max for their type output = [] for measure_type, points in max_by_type.items(): to_consider = [x for x in items if x["measure_type"] == measure_type and x["sap_points"] == points] output.append(to_consider[0]) # pick the first one in case of ties return output async def model_engine(body: PlanTriggerRequest): created_at = datetime.now().isoformat() start_ms = int(time.time() * 1000) logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json())) if body.subtask_id: SubTaskInterface().update_subtask_status( subtask_id=UUID(body.subtask_id), status="in progress", cloud_logs_url=build_cloudwatch_log_url(start_ms) ) try: logger.info("Getting the inputs") # Use PlanInputProcessor for all plan input processing plan_input_processor = PlanInputProcessor(body) addresses = plan_input_processor.process() valuation_data = plan_input_processor.valuation_data # Confirm no duplicate UPRNS check_duplicate_uprns([a.uprn for a in addresses]) # If we have patches or overrides, we should read them in here patches, already_installed, non_invasive_recommendations, _ = get_request_property_data(body) logger.info("Getting cleaning_data") cleaning_data = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) logger.info("Checking database for existing properties") uprns = addresses.get_uprns() landlord_ids = addresses.get_landlord_ids() postcodes = addresses.get_postcodes_for_flats() # Check if we've seen these properties before with db_read_session() as session: existing_properties = db_funcs.property_functions.get_existing_properties( session, body.portfolio_id, uprns, landlord_ids ) property_lookup = {} for prop in existing_properties: if prop.uprn: property_lookup[("uprn", prop.uprn)] = prop.id if prop.landlord_property_id: property_lookup[("landlord_property_id", prop.landlord_property_id)] = prop.id # List of properties that need to be created in the db to_create = [] for addr in addresses: key = ("uprn", addr.uprn) if addr.uprn else ("landlord_property_id", addr.landlord_property_id) if key not in property_lookup: to_create.append(addr) logger.info("Checking database for EPC cache") # Pre-requests to the db with db_read_session() as session: epc_cache_by_uprn = db_funcs.epc_functions.EpcStoreService.get_epcs_for_uprns(session, uprns) postcode_searches = db_funcs.address_functions.get_by_postcodes(session, list(postcodes)) energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns( session, uprns ) already_installed_by_uprn = db_funcs.already_installed_functions.get_installed_measure_types_by_uprns( session, uprns ) # If we have properties that need to be created, we cerate them in bulk logger.info("Determine new properties to be created") new_property_ids = set() if to_create: logger.info("Creating %d new properties", len(to_create)) with db_session() as session: inserted = db_funcs.property_functions.bulk_create_properties( session, body, to_create, energy_assessments_by_uprn ) for prop_id, uprn, landlord_property_id in inserted: new_property_ids.add(prop_id) # We append the newly created properties to property_lookup for prop_id, uprn, landlord_property_id in inserted: if uprn is not None: property_lookup[("uprn", uprn)] = prop_id if landlord_property_id: property_lookup[("landlord_property_id", landlord_property_id)] = prop_id logger.info("Processing each property for model input preparation") input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, [] for addr in tqdm( addresses, total=len(addresses), desc="Processing properties", ): # ---------- 1) filter fetched data ---------- epc_cache = epc_cache_by_uprn[addr.uprn] epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] # Extract from EPC cache if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH: epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] # Extract associated UPRNs from the database response associated_uprns = db_funcs.address_functions.get_associated_uprns( postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn ) energy_assessment = energy_assessments_by_uprn.get(addr.uprn) property_already_installed = list(already_installed_by_uprn[addr.uprn]) epc_searcher = SearchEpc( address1=addr.address_1, postcode=addr.postcode, uprn=addr.uprn, auth_token=get_settings().EPC_AUTH_TOKEN, os_api_key="", full_address=addr.full_address, heating_system=addr.landlord_heating_system, associated_uprns=associated_uprns, lmk_key=addr.lmk_key ) epc_searcher.ordnance_survey_client.built_form = addr.landlord_built_form epc_searcher.ordnance_survey_client.property_type = addr.landlord_property_type # For the moment, our OS API access is unavailable, so we skip and interpolate epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True) epc_searcher.set_uprn_source(file_format=body.file_format) lookup_key = ( ("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id) ) property_id = property_lookup[lookup_key] if not property_id: logger.error("Could not find property ID for address: %s", addr.request_data) # Should not happen unless input data is inconsistent continue is_new = property_id in new_property_ids if not is_new and not body.multi_plan: continue # If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that. # Otherwise, we use the newest EPC # energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that # has been publically lodged if energy_assessment is None: energy_assessment = {} epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records( epc_searcher, energy_assessment ) req_data = extract_property_request_data( address=addr, patches=patches, non_invasive_recommendations=non_invasive_recommendations, valuation_data=valuation_data, uprn=addr.uprn, ) # Pull this out as it may get overwritten property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch # if we have a remote assment data type, we pull the additional data and include it epc_page_source, find_my_epc_components = {}, [] if ((body.event_type == "remote_assessment") and not ( epc_searcher.newest_epc.get("estimated")) ) or addr.epc_certificate_number: if addr.epc_certificate_number: rrn = addr.epc_certificate_number property_non_invasive_recommendations, patch, epc_page_source, find_my_epc_components = ( RetrieveFindMyEpc.get_from_epc_with_fallback( epc=epc_searcher.newest_epc, epc_page=epc_page, rrn=rrn, cleaned_address=epc_searcher.address_clean, config_address=addr.address_1, address_postal_town=epc_searcher.address_postal_town ) ) epc_records = patch_epc(patch, epc_records) # Hack - temp while we're planning to rebuild backend if addr.epc_certificate_number is not None and epc_records["original_epc"].get("estimated"): epc_records["original_epc"]["estimated"] = False prepared_epc = EPCRecord( epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, # address_metadata=addr Switched off to remove injecting landlord inputs ) input_properties.append( Property( id=property_id, uprn=addr.uprn, is_new=is_new, address=epc_searcher.address_clean, postcode=epc_searcher.postcode_clean, epc_record=prepared_epc, already_installed=property_already_installed, find_my_epc_components=find_my_epc_components, property_valuation=req_data.valuation, non_invasive_recommendations=property_non_invasive_recommendations, energy_assessment=energy_assessment, inspections=inspections_map.get(property_id), ) ) # If we have: # 1) No EPC API data # 2) A real EPC # 3) A UPRN (meaning that a UPRN could be fetched against that property) # We store this data uprn_to_check_against = addr.uprn if addr.uprn is not None else epc_searcher.uprn # Until we enforce uprn if db_funcs.epc_functions.EpcStoreService.check_insert_needed( epc_cache, epc_searcher.newest_epc.get("estimated"), uprn_to_check_against, ): epc_upserts.append({ "uprn": uprn_to_check_against, "epc_api": epc_searcher.data, "epc_page": epc_page_source.get("page_source"), "epc_page_rrn": epc_page_source.get("rrn"), }) if not input_properties: return Response(status_code=204) check_duplicate_property_ids(input_properties) logger.info("Inserting property data") # We now bulk upload all the EPC data with db_session() as session: db_funcs.epc_functions.EpcStoreService.bulk_upsert_epc_data(session, epc_upserts) # We check if we have inspections data and store it in the database if so. We'll update or create # aginst each property if with db_session() as session: db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map) # Set up model api and warm up the lambdas model_api = ModelApi( portfolio_id=body.portfolio_id, timestamp=created_at, prediction_buckets=get_prediction_buckets(), max_retries=1 ) await model_api.async_warm_up_lambdas(model_prefies=model_api.models_for_warm_up) logger.info("Reading in materials and cleaned datasets") cleaned = get_cleaned() with db_read_session() as session: materials = db_funcs.materials_functions.get_materials(session) logger.info("Preparing rebaselining") rebaselining_scoring_data = [] for p in tqdm(input_properties): # 1) EPC expired 2) Missing EPC 3) Different information from landlord vs EPC needs_rebaselining = p.epc_is_expired | p.epc_is_estimated | (len(p.epc_record.landlord_differences) > 0) # Hack - skip if "SAP05" in p.epc_record.walls_description: continue if needs_rebaselining: p.create_base_difference_epc_record(cleaned_lookup=cleaned) scoring_data = p.base_difference_record.df.copy() rebaselining_scoring_data.append(scoring_data) rebaselining_scoring_data = ( pd.concat(rebaselining_scoring_data) if len(rebaselining_scoring_data) else pd.DataFrame([]) ) predictions_by_model_and_uprn = {} if not rebaselining_scoring_data.empty: logger.info(f"{rebaselining_scoring_data.shape[0]} properties require re-baselineing") # Trigger re-scoring rebaselining_scoring_data["is_post_sap10_starting"] = True rebaselining_response = model_api.predict_all( df=rebaselining_scoring_data, bucket=get_settings().DATA_BUCKET, model_prefixes=model_api.BASELINE_MODEL_PREFIXES, extract_ids=False, extract_uprn=True ) # Update EPC records with new model predictions input_properties_by_uprn = {int(p.uprn): p for p in input_properties if p.uprn is not None} # Pre-index predictions for each model by UPRN model_names = [ "retrofit_sap_baseline_predictions", "retrofit_carbon_baseline_predictions", "retrofit_heat_baseline_predictions", ] for model in model_names: df = rebaselining_response[model] predictions_by_model_and_uprn[model] = dict(zip(df["uprn"].astype(int), df["predictions"])) for uprn_int in rebaselining_scoring_data["uprn"].unique().astype(int): try: property_instance = input_properties_by_uprn[uprn_int] if property_instance is None: logger.warning(f"No property found for UPRN {uprn_int} during rebaselining update.") continue # Gather predictions for this UPRN try: new_sap = predictions_by_model_and_uprn["retrofit_sap_baseline_predictions"][uprn_int] new_carbon = predictions_by_model_and_uprn["retrofit_carbon_baseline_predictions"][uprn_int] new_heat_demand = predictions_by_model_and_uprn["retrofit_heat_baseline_predictions"][uprn_int] except KeyError as e: logger.warning(f"Missing prediction for UPRN {uprn_int}: {e}") continue # Update EPC record property_instance.epc_record.insert_new_performance_values( new_sap=new_sap, new_epc=sap_to_epc(new_sap), new_carbon=new_carbon, new_heat_demand=new_heat_demand, ) except Exception as e: logger.error(f"Error updating EPC record for UPRN {uprn_int}: {e}") kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned) kwh_preds = await model_api.async_paginated_predictions( data=epcs_for_scoring, bucket=get_settings().DATA_BUCKET, model_prefixes=model_api.KWH_MODEL_PREFIXES, extract_ids=False, batch_size=SCORING_BATCH_SIZE ) # Insert the spatial data logger.info("Getting spatial data") input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET) [p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties] logger.info("Performing solar analysis") ofgem_consumption_averages = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet" ) building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data( input_properties=input_properties, ofgem_consumption_averages=ofgem_consumption_averages, body=body ) with db_session() as session: input_properties = GoogleSolarApi.building_solar_analysis( building_solar_config=building_solar_config, input_properties=input_properties, session=session, google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY, solar_materials=[m for m in materials if m["type"] == "solar_pv"], ) with db_session() as session: input_properties = GoogleSolarApi.unit_solar_analysis( unit_solar_config=unit_solar_config, input_properties=input_properties, session=session, body=body, solar_materials=[m for m in materials if m["type"] == "solar_pv"], google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY, inspections_map=inspections_map ) # We also make a tweak - if the property has been flagged for solar but doesn't contain # any panel performance, we ensure that we have a 3kWp and 4kWp option for the property logger.info("Identifying property recommendations") recommendations, recommendations_scoring_data, representative_recommendations = {}, [], {} for p in tqdm(input_properties): # We set the ECO package data, if we have it property_eco_package = eco_packages.get(p.id, (None, None, None)) if property_eco_package[0] is not None: inclusions = property_eco_package[0] exclusions = [] else: inclusions = body.inclusions exclusions = body.exclusions recommender = Recommendations( property_instance=p, materials=materials, exclusions=exclusions, inclusions=inclusions, default_u_values=body.default_u_values ) property_recommendations, property_representative_recommendations = recommender.recommend() if not property_recommendations: continue # Perform a check for properties (temp) where we've remodelled if p.epc_record.has_been_remodelled: for x in property_recommendations: if any(y.get("survey") for y in x): raise ValueError("Should not have survey true for remodelled properties") recommendations[p.id] = property_recommendations representative_recommendations[p.id] = property_representative_recommendations p.create_base_difference_epc_record(cleaned_lookup=cleaned) p.adjust_difference_record_with_recommendations( property_recommendations, property_representative_recommendations ) recommendations_scoring_data.extend(p.recommendations_scoring_data) logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) if not recommendations_scoring_data.empty: recommendations_scoring_data = recommendations_scoring_data.drop( columns=[ "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending" ] ) recommendations_scoring_data["is_post_sap10_ending"] = True all_predictions = await model_api.async_paginated_predictions( data=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET, batch_size=SCORING_BATCH_SIZE ) # Insert the predictions into the recommendations, and get the impact summary scoring_epcs = [] # For scoring the kwh models for property_id in recommendations.keys(): property_instance = [p for p in input_properties if p.id == property_id][0] recommendations_with_impact, impact_summary = ( Recommendations.calculate_recommendation_impact( property_instance=property_instance, all_predictions=all_predictions, recommendations=recommendations, representative_recommendations=representative_recommendations ) ) # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc # at each phase property_instance.update_simulation_epcs(impact_summary) scoring_epcs.extend(property_instance.updated_simulation_epcs) recommendations[property_id] = recommendations_with_impact # We call the API with the scoring epcs scoring_epcs = pd.DataFrame(scoring_epcs) scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned) kwh_simulation_predictions = await model_api.async_paginated_predictions( data=scoring_epcs, bucket=get_settings().DATA_BUCKET, model_prefixes=model_api.KWH_MODEL_PREFIXES, batch_size=SCORING_BATCH_SIZE ) # We now insert kwh estimates and costs into the recommendations logger.info("Calculating tenant savings - kwh and bills") for p in tqdm(input_properties): property_id = p.id property_recommendations = recommendations.get(property_id, []) property_current_energy_bill = ( Recommendations.calculate_recommendation_tenant_savings( property_instance=p, kwh_simulation_predictions=kwh_simulation_predictions, property_recommendations=property_recommendations, ashp_cop=body.ashp_cop ) ) p.current_energy_bill = property_current_energy_bill # Create matrix of all predictions for debug: - any rebaselining and measure level predictions # Insert the predictions into the recommendations and run the optimiser logger.info("Optimising measures") for p in input_properties: if not recommendations.get(p.id): continue # we need to double unlist because we have a list of lists property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] # TODO - formalise property measure types into an enum ventilation_included = ( "ventilation" in property_measure_types or "mechanical_ventilation" in property_measure_types ) # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore # its inclusion needs_ventilation = optimiser_functions.check_needs_ventilation( property_measure_types, assumptions.measures_needing_ventilation, p.has_ventilation, ventilation_included ) if not measures_to_optimise: # Nothing to do, we just reshape the recommendations recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( p.id, recommendations, set() ) continue already_installed_measures = [] for measures in measures_to_optimise: for m in measures: # A) We're going to make the already installed measures default # B) We need to SAP points for all already installed measures to avoid double counting if m["already_installed"]: already_installed_measures.append( { "id": m["recommendation_id"], "measure_type": m["measure_type"], "sap_points": m["sap_points"], } ) # We get the ones with the highest SAP default_already_installed = keep_max_sap_per_measure_type(already_installed_measures) already_installed_sap = float(sum(d["sap_points"] for d in default_already_installed)) fixed_gain = optimiser_functions.calculate_fixed_gain( property_required_measures, recommendations, p, needs_ventilation ) gain = optimiser_functions.calculate_gain( body=body, p=p, fixed_gain=fixed_gain, already_installed_gain=already_installed_sap ) # We insert the innovation uplift measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) for group in measures_to_optimise_with_uplift: for r in group: (r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], r["uplift_project_score"]) = (0, 0, 0, 0) # Remove them from the optimisation pool finalised_measures_to_optimise = [] for m in measures_to_optimise_with_uplift: filtered = [x for x in m if not x["already_installed"]] if filtered: finalised_measures_to_optimise.append(filtered) input_measures = optimiser_functions.prepare_input_measures( finalised_measures_to_optimise, body.goal, needs_ventilation, funding=True, property_eco_packages=eco_packages.get(p.id) ) # When the goal is Increasing EPC, we can run the funding optimiser if body.goal == "Increasing EPC": solutions = optimise_with_scenarios( p=p, input_measures=input_measures, budget=body.budget, target_gain=gain, enforce_heat_pump_insulation=True, enforce_fabric_first=body.enforce_fabric_first, already_installed_sap=already_installed_sap, # To be passed to output ) # if handle the empty case if solutions.empty: solution, battery_sap_score = [], 0 else: if solutions["meets_upgrade_target"].any(): # If we have a solution that meets the upgrade target, we select that one optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] else: # We re-organise, taking the solution with the most gain and then the cheapest solutions = solutions.sort_values( by=["total_gain", "total_cost"], ascending=[False, True] ) optimal_solution = solutions.iloc[0] # We create this full list of selected measures, which is used in the next section for setting # default measures solution = deepcopy(optimal_solution["items"]) pv_size = float(optimal_solution["array_size"]) battery_sap_score = BatterySAPScorer.score( starting_sap=optimal_solution["ending_sap_without_battery"], pv_size=pv_size ) else: # We optimise and then we determine eligibility for funding, based on the measures selected optimiser = ( GainOptimiser( input_measures, max_cost=body.budget, max_gain=float(gain) if gain is not None else 0, allow_slack=False ) if body.budget else CostOptimiser(input_measures, min_gain=float(gain) if gain is not None else 0) ) optimiser.setup() optimiser.solve() solution = optimiser.solution gain = optimiser.solution_gain post_sap = p.epc_record.current_energy_efficiency + gain pv_size = next( (m["array_size"] for m in solution if m["type"] == "solar_pv"), 0 ) battery_sap_score = BatterySAPScorer.score(starting_sap=post_sap, pv_size=pv_size) # We add the defaulty already installed measures to the solution selected = {r["id"] for r in solution + default_already_installed} if property_required_measures: solution = optimiser_functions.add_required_measures( property_id=p.id, property_required_measures=property_required_measures, recommendations=recommendations, selected=selected, ) # Add best practice measures (ventilation/trickle vents) - pass needs_ventilation flag selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) # Final flattening - we pass what the battery SAP score would be, regardless if the battery was selected recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( p.id, recommendations, selected, battery_sap_score ) # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all # of them # TODO: We can probably do better and optimise at the building level - this is temp # Idea: - optimise all measures except solar at the unit level. Then, test with and without solar for # all units at the same time logger.info("Adjusting solar PV recommendations for buildings") building_ids = set([p.building_id for p in input_properties if p.building_id is not None]) for bid in building_ids: # We check if any of them have solar PV building = [p for p in input_properties if p.building_id == bid] has_solar = False for unit in building: # Get default recommendations has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0 if has_solar: break if has_solar: # We adjust the units within the building for unit in building: for rec in recommendations[unit.id]: if rec["type"] == "solar_pv": # This is straightforward, we just set the default to True, since when we're at a building # level, we only allow 1 solar PV option for each unit. If we change this, this logic will # need to be updated rec["default"] = True logger.info("Uploading recommendations to the database") # If we have any work to do, we create a new scenario if body.scenario_id: # We don't need to create a new scenario, we just use the existing one scenario_id = body.scenario_id else: with db_session() as session: scenario_id = db_funcs.recommendations_functions.create_scenario( session=session, scenario={ "name": body.scenario_name, "created_at": created_at, "budget": body.budget, "portfolio_id": body.portfolio_id, "housing_type": body.housing_type, "goal": body.goal, "goal_value": body.goal_value, "trigger_file_path": body.trigger_file_path, "already_installed_file_path": body.already_installed_file_path, "patches_file_path": body.patches_file_path, "non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path, "exclusions": body.exclusions, "multi_plan": body.multi_plan } ) property_updates, property_epc_details, property_spatial_updates = [], [], [] plans_to_create, recommendations_to_create = [], [] # Prepare the data that will need to be uploaded in bulk for p in input_properties: recommendations_for_property = recommendations.get(p.id, []) default_recommendations = [r for r in recommendations_for_property if r["default"]] # We need to: # Get already installed measures already_installed_default = [r for r in default_recommendations if r["already_installed"]] # Property should be have increased SAP needs_rebaselining = bool(len(already_installed_default)) rebaselining_sap = float(sum([r["sap_points"] for r in already_installed_default])) rebaselining_carbon = float(sum([r["co2_equivalent_savings"] for r in already_installed_default])) rebaselining_heat_demand = float(sum([r["heat_demand"] for r in already_installed_default])) rebaselining_kwh = float(sum([r["kwh_savings"] for r in already_installed_default])) rebaselining_bills = float(sum([r["energy_cost_savings"] for r in already_installed_default])) # This will include everything, including already installed total_sap_points = sum([r["sap_points"] for r in default_recommendations]) new_sap_points = p.epc_record.current_energy_efficiency + total_sap_points new_epc = sap_to_epc(new_sap_points) # Already installed measures do not have a cost but we remove anyway total_cost = sum([r["total"] for r in default_recommendations if not r["already_installed"]]) valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost) # --- property-level updates (always) --- property_updates.append({ "property_id": p.id, "portfolio_id": body.portfolio_id, "data": p.get_full_property_data( current_valuation=valuations["current_value"], needs_rebaselining=needs_rebaselining, rebaselining_sap=rebaselining_sap, ) }) property_epc_details.append( p.get_property_details_epc( portfolio_id=body.portfolio_id, needs_rebaselining=needs_rebaselining, rebaselining_carbon=rebaselining_carbon, rebaselining_heat_demand=rebaselining_heat_demand, rebaselining_kwh=rebaselining_kwh, rebaselining_bills=rebaselining_bills, ) ) property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial}) # --- skip plan creation if no recommendations --- if not recommendations_for_property: continue plan_data = db_funcs.recommendations_functions.prepare_plan_data( p=p, body=body, scenario_id=scenario_id, eco_packages=eco_packages, valuations=valuations, new_sap_points=new_sap_points, new_epc=new_epc, default_recommendations=default_recommendations, rebaselining_carbon=rebaselining_carbon, rebaselining_heat_demand=rebaselining_heat_demand, rebaselining_kwh=rebaselining_kwh, rebaselining_bills=rebaselining_bills, ) plans_to_create.append({"property_id": p.id, "plan_data": plan_data}) # store recommendations keyed by property for r in recommendations_for_property: recommendations_to_create.append({ "property_id": p.id, # ---- Recommendation core ---- "type": r["type"], "measure_type": r["measure_type"], "description": r["description"], "estimated_cost": float(r["total"]), "default": r["default"], "starting_u_value": float(r["starting_u_value"]) if r.get("starting_u_value") else None, "new_u_value": float(r["new_u_value"]) if r.get("new_u_value") else None, "sap_points": float(r["sap_points"]), "energy_savings": float(r["heat_demand"]), "kwh_savings": float(r["kwh_savings"]), "co2_equivalent_savings": float(r["co2_equivalent_savings"]), "total_work_hours": float(r["labour_hours"]), "energy_cost_savings": float(r["energy_cost_savings"]), "labour_days": float(r["labour_days"]), "already_installed": r["already_installed"], "heat_demand": float(r["heat_demand"]), # ---- parts ---- "parts": [ { "material_id": part["id"], "depth": int(part["depth"]) if part.get("depth") else None, "quantity": float(part["quantity"]) if part.get("quantity") else None, "quantity_unit": part.get("quantity_unit"), "estimated_cost": float(part.get("total", part.get("total_cost"))), } for part in r.get("parts", []) ], }) # Bulk upload property data logger.info("Uploading property data in bulk") with db_session() as session: db_funcs.property_functions.bulk_update_properties(session, property_updates) db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details) db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates) # # Bulk create plans plan_id_by_property = db_funcs.recommendations_functions.bulk_create_plans(session, plans_to_create) recommendation_payload = [ { "plan_id": plan_id_by_property[r["property_id"]], **{k: v for k, v in r.items() if k not in ["parts"]}, "parts": r["parts"], } for r in recommendations_to_create if r["property_id"] in plan_id_by_property ] db_funcs.recommendations_functions.bulk_upload_recommendations_and_materials( session, recommendation_payload ) logger.info("Work completed, updating log status") except IntegrityError as e: return handle_error("Database integrity error.", e, body.subtask_id, 500, start_ms) except OperationalError as e: return handle_error("Database operational error.", e, body.subtask_id, 500, start_ms) except ValueError as e: return handle_error("Bad request: malformed data.", e, body.subtask_id, 400, start_ms) except Exception as e: # General exception handling return handle_error("An unexpected error occurred.", e, body.subtask_id, 500, start_ms) # Mark the subtask as successful SubTaskInterface().update_subtask_status(subtask_id=UUID(body.subtask_id), status="complete") logger.info("Model Engine completed successfully") prediction_matrix = PredictionMatrix() # --- Add rebaselining and measure-level predictions to PredictionMatrix --- for p in input_properties: # Add rebaselined predictions if available uprn = p.uprn if uprn is None: continue # Rebaselined SAP prediction rebaselined_sap = None if uprn in predictions_by_model_and_uprn.get("retrofit_sap_baseline_predictions", {}): rebaselined_sap = predictions_by_model_and_uprn["retrofit_sap_baseline_predictions"][uprn] # Add original EPC and landlord differences for comparison prediction_matrix.set_original_epc( uprn=uprn, original_epc=p.epc_record.original_epc, landlord_differences=p.epc_record.landlord_differences, lodgement_date=p.epc_record.lodgement_date, ) prediction_matrix.set_rebaselined_prediction(uprn, rebaselined_sap) # Add measure-level predictions property_recommendations = recommendations.get(p.id, []) for rec in property_recommendations: prediction_matrix.add_recommendation( uprn=uprn, measure_id=rec.get("recommendation_id", rec.get("id", rec.get("type", "unknown"))), prediction=rec.get("sap_points"), metadata={k: v for k, v in rec.items() if k not in ("sap_points", "recommendation_id", "id")} ) # --- End PredictionMatrix population --- return Response(status_code=200)