from datetime import datetime import numpy as np import pandas as pd from epc_api.client import EpcClient from fastapi import APIRouter, Depends from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import sessionmaker from starlette.responses import Response from backend.app.config import get_settings from backend.app.db.connection import db_engine from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.functions.property_functions import ( create_property, create_property_details_epc, create_property_targets, update_property_data, update_or_create_property_spatial_details ) from backend.app.db.functions.recommendations_functions import ( create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc from backend.ml_models.api import ModelApi from backend.Property import Property from etl.epc.DataProcessor import DataProcessor from etl.epc.settings import COLUMNS_TO_MERGE_ON from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet from backend.ml_models.Valuation import PropertyValuation from backend.ml_models.AnnualBillSavings import AnnualBillSavings logger = setup_logger() BATCH_SIZE = 5 router = APIRouter( prefix="/plan", tags=["plan"], dependencies=[Depends(validate_token)], responses={404: {"description": "Not found"}} ) @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") session = sessionmaker(bind=db_engine)() created_at = datetime.now().isoformat() try: session.begin() logger.info("Getting the inputs") epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN) plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) uprn_filenames = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" ) cleaning_data = read_parquet_from_s3( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) input_properties = [] for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly # TODO: implment validation. We should also standardise postcode and address in some fashion as # a postcode of abcdef would be considered different to ABCDEF # Create a record in db property_id, is_new = create_property( session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] ) # if a new record was not created, we don't produduce recommendations if not is_new: continue # TODO: Need to add heat demand target create_property_targets( session, property_id=property_id, portfolio_id=body.portfolio_id, epc_target=body.goal_value, heat_demand_target=None ) input_properties.append( Property( postcode=config['postcode'], address1=config['address'], epc_client=epc_client, id=property_id ) ) if not input_properties: return Response(status_code=204) logger.info("Getting EPC, and spatial data") for p in input_properties: p.search_address_epc() p.set_year_built() p.get_spatial_data(uprn_filenames) # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data logger.info("Reading in materials and cleaned datasets") materials = get_materials(session) cleaned = get_cleaned() logger.info("Getting components and epc recommendations") recommendations = {} recommendations_scoring_data = [] property_scoring_data = {} for p in input_properties: # Property recommendations p.get_components(cleaned) # This is temp - this should happen after scoring cleaned_property_data = DataProcessor.apply_averages_cleaning( data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]), cleaning_data=cleaning_data, cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], ) p.set_number_lighting_outlets(cleaned_property_data) recommender = Recommendations(property_instance=p, materials=materials) property_recommendations = recommender.recommend() if not property_recommendations: continue recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP data_processor = DataProcessor(None, newdata=True) data_processor.insert_data(pd.DataFrame([p.get_model_data()])) # TODO: Temp if data_processor.data["UPRN"].values[0] == "": data_processor.data["UPRN"] = 0 data_processor.pre_process() starting_epc_data = data_processor.get_component_features(suffix="_STARTING") ending_epc_data = data_processor.get_component_features(suffix="_ENDING") fixed_data = data_processor.get_fixed_features() # We update the ending record with the recommended updates and we set lodgement date to today ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at) property_scoring_data[p.id] = { "starting_epc_data": starting_epc_data, "ending_epc_data": ending_epc_data, "fixed_data": fixed_data } for recommendations_by_type in property_recommendations: for i, rec in enumerate(recommendations_by_type): scoring_dict = create_recommendation_scoring_data( property=p, recommendation=rec, starting_epc_data=starting_epc_data, ending_epc_data=ending_epc_data, fixed_data=fixed_data, ) recommendations_scoring_data.append(scoring_dict) # cleanup del data_processor logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) # Perform the same cleaning as in the model - first clean number of room variables though recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], ) recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"], ).drop(columns=["LOCAL_AUTHORITY"]) recommendations_scoring_data = DataProcessor.clean_missings_after_description_process( recommendations_scoring_data, ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or ( "insulation_thickness" in c) or ("ENERGY_EFF" in c)] ) recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data) model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) all_predictions = model_api.predict_all( df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET, prediction_buckets={ "sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET, "heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET, "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET } ) # Insert the predictions into the recommendations and run the optimiser logger.info("Optimising recommendations") for property_id in recommendations.keys(): property_instance = [p for p in input_properties if p.id == property_id][0] recommendations_with_impact = Recommendations.calculate_recommendation_impact( property_instance=property_instance, all_predictions=all_predictions, recommendations=recommendations ) input_measures = prepare_input_measures(recommendations_with_impact, body.goal) if body.budget: optimiser = GainOptimiser(input_measures, max_cost=body.budget) else: # The minimum gain is the minimum number of SAP points required to get to the target SAP band current_sap_points = int(property_instance.data["current-energy-efficiency"]) target_sap_points = epc_to_sap_lower_bound(body.goal_value) # If the gain is negative, the optimiser will return an empty solution optimiser = CostOptimiser( input_measures, min_gain=CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points) ) optimiser.setup() optimiser.solve() solution = optimiser.solution selected_recommendations = {r["id"] for r in solution} # If wall ventilation is selected, we also include mechanical ventilation as a best practice measure if any(x in [r["type"] for r in solution] for x in [ "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation" ]): ventilation_rec = [ r for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation" ][0] selected_recommendations = set( list(selected_recommendations) + [ventilation_rec[0]["recommendation_id"]] ) # We check if the selected recommendation is wall ventilation and if so, we make sure # mechanical ventilation is selected # We'll use the set of selected recommendations to filter the recommendations to upload final_recommendations = [ [ {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} for rec in recommendations_by_type ] for recommendations_by_type in recommendations_with_impact ] # We'll also unlist the recommendations so they're a bit easier to handle from here onwards final_recommendations = [ rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type ] recommendations[property_id] = final_recommendations # This is a temporary step, to estimate the impact of the measured on heat demand and carbon # TODO: This needs to be cleaned up, if it happens to be kept combined_recommendations_scoring_data = [] representative_recs = {} for property_id, property_recommendations in recommendations.items(): default_recommendations = [r for r in property_recommendations if r["default"]] default_types = {x["type"] for x in default_recommendations} # Missing types missing_types = list(set([r["type"] for r in property_recommendations if r["type"] not in default_types])) # We might have a missing type as one of the solid wall options because for a solid wall, you might # have ewi or iwi but only one of them will be a default if ("internal_wall_insulation" in default_types) or ("external_wall_insaultion" in default_types): missing_types = [ t for t in missing_types if t not in ["internal_wall_insulation", "external_wall_insulation"] ] # We check if NO wall insulation was selected but iwi and ewi are available # This condition will check # 1) iwi and ewi are both in missing_types # 2) iwi and ewi are not in default_types # If both of these are true, it means that no wall insulation was selected via the optimisation routine # but both are possible, so we need to select a default. We default to iwi because it's usually cheaper if (("internal_wall_insulation" in missing_types) and ("external_wall_insulation" in missing_types)) and ( ("internal_wall_insulation" not in default_types) and ("external_wall_insulation" not in default_types) ): missing_types = [t for t in missing_types if t != "external_wall_insulation"] if missing_types: for missed_type in missing_types: missed = [r for r in property_recommendations if r["type"] == missed_type] min_cost = min([r["total"] for r in missed]) # Grab a representative, based on cheapest cost representative_rec = [r for r in property_recommendations if np.isclose(r["total"], min_cost)] default_recommendations.append(representative_rec[0]) representative_recs[property_id] = default_recommendations property_instance = [p for p in input_properties if p.id == property_id][0] property_scoring_datasets = property_scoring_data[property_id] starting_epc_data = property_scoring_datasets["starting_epc_data"].copy() ending_epc_data = property_scoring_datasets["ending_epc_data"].copy() fixed_data = property_scoring_datasets["fixed_data"].copy() scoring_dict = {} for rec in default_recommendations: scoring_dict = create_recommendation_scoring_data( property=property_instance, recommendation=rec, starting_epc_data=starting_epc_data, ending_epc_data=ending_epc_data, fixed_data=fixed_data, ) # At each iteration, we want to update the ending_epc_data, so in the end, ending_epc_data contains # all of the updates for k in scoring_dict.keys(): if k in ending_epc_data.columns: ending_epc_data[k] = scoring_dict[k] combined_recommendations_scoring_data.append(scoring_dict) # PERFORM SAME STEPS AGAIN - TODO: TO BE REMOVED combined_recommendations_scoring_data = pd.DataFrame(combined_recommendations_scoring_data) # Perform the same cleaning as in the model - first clean number of room variables though combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=combined_recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], ) combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=combined_recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"], ).drop(columns=["LOCAL_AUTHORITY"]) combined_recommendations_scoring_data = DataProcessor.clean_missings_after_description_process( combined_recommendations_scoring_data, ignore_cols=[ c for c in combined_recommendations_scoring_data.columns if ("thermal_transmittance" in c) or ( "insulation_thickness" in c) or ("ENERGY_EFF" in c) ] ) combined_recommendations_scoring_data = DataProcessor.clean_efficiency_variables( combined_recommendations_scoring_data ) model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) all_combined_predictions = model_api.predict_all( df=combined_recommendations_scoring_data, bucket=get_settings().DATA_BUCKET, prediction_buckets={ "sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET, "heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET, "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET } ) # We update the carbon and heat demand predictions for property_id, property_recommendations in recommendations.items(): combined_heat_demand = all_combined_predictions["heat_demand_predictions"] combined_heat_demand = combined_heat_demand[combined_heat_demand["property_id"] == str(property_id)] combined_carbon = all_combined_predictions["carbon_change_predictions"] combined_carbon = combined_carbon[combined_carbon["property_id"] == str(property_id)] property_instance = [p for p in input_properties if p.id == property_id][0] carbon_change = float( property_instance.data["co2-emissions-current"] ) - combined_carbon["predictions"].values[0] starting_heat_demand = ( float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area ) expected_heat_demand = starting_heat_demand - ( combined_heat_demand["predictions"].values[0] * property_instance.floor_area ) # We don't want to adjust the heat demand for mechanical ventilation so we add it back on # We adjust the heat demand figures to align to the UCL paper current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( epc_energy_consumption=starting_heat_demand, current_epc_rating=property_instance.data["current-energy-rating"], ) # We sum up the SAP points of the default recommendations and calculate a new EPC category. This # category is then used to produce adjusted energy figures expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( epc_energy_consumption=expected_heat_demand, current_epc_rating=property_instance.data["current-energy-rating"], ) heat_demand_change = ( current_adjusted_energy - expected_adjusted_energy ) # update the recommendations # We need to totals for the representative recommendations representative_rec_data = [ { "recommendation_id": r["recommendation_id"], "co2_equivalent_savings": r.get("co2_equivalent_savings"), "heat_demand": r.get("heat_demand"), "type": r["type"] } for r in representative_recs[property_id] ] representative_rec_data = pd.DataFrame(representative_rec_data) # Convert co2 and heat demand to proportions of their column sums representative_rec_data["co2_equivalent_savings_percent"] = ( representative_rec_data["co2_equivalent_savings"] / representative_rec_data["co2_equivalent_savings"].sum() ) representative_rec_data["heat_demand_percent"] = ( representative_rec_data["heat_demand"] / representative_rec_data["heat_demand"].sum() ) # We'll use the proportions to update the carbon and heat demand representative_rec_data["co2_equivalent_savings"] = ( carbon_change * representative_rec_data["co2_equivalent_savings_percent"] ) representative_rec_data["heat_demand"] = ( heat_demand_change * representative_rec_data["heat_demand_percent"] ) # Finally, insert these values into the final recommendations for rec in property_recommendations: if rec["type"] in ["external_wall_insulation", "internal_wall_insulation"]: change_data = representative_rec_data[ representative_rec_data["type"].isin(["external_wall_insulation", "internal_wall_insulation"]) ] else: change_data = representative_rec_data[representative_rec_data["type"] == rec["type"]] if rec["type"] == "mechanical_ventilation": rec["co2_equivalent_savings"] = 0 rec["heat_demand"] = 0 rec["energy_cost_savings"] = 0 else: rec["co2_equivalent_savings"] = change_data["co2_equivalent_savings"].values[0] rec["heat_demand"] = change_data["heat_demand"].values[0] rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"]) # Update recommendations recommendations[property_id] = property_recommendations # For expected adjust energy, we don't include mechanical ventilation so we'll add it back on mechanical_ventilation_rec = representative_rec_data[ representative_rec_data["type"] == "mechanical_ventilation" ] if not mechanical_ventilation_rec.empty: expected_adjusted_energy = ( expected_adjusted_energy + mechanical_ventilation_rec["heat_demand"].values[0] ) property_instance.set_adjusted_energy( current_adjusted_energy=current_adjusted_energy, expected_adjusted_energy=expected_adjusted_energy ) # 1) the property data # 2) the property details (epc) # 3) the recommendations logger.info("Uploading recommendations to the database") property_valuation_increases = [] session.commit() for i in range(0, len(input_properties), BATCH_SIZE): try: # Take a slice of the input_properties list to make a batch batch_properties = input_properties[i:i + BATCH_SIZE] for p in batch_properties: # Your existing operations property_details_epc = p.get_property_details_epc( portfolio_id=body.portfolio_id, rating_lookup=rating_lookup ) create_property_details_epc(session, property_details_epc) update_or_create_property_spatial_details(session, p.uprn, p.spatial) # TODO: TEMP if p.data["uprn"] == "": print("Get rid of me!") p.data["uprn"] = 0 property_data = p.get_full_property_data() update_property_data( session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data ) recommendations_to_upload = recommendations.get(p.id, []) if not recommendations_to_upload: continue new_plan_id = create_plan(session, { "portfolio_id": body.portfolio_id, "property_id": p.id, "is_default": True }) uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) # Get defaults default_recommendations = [r for r in recommendations_to_upload if r["default"]] total_sap_points = sum([r["sap_points"] for r in default_recommendations]) new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points new_epc = sap_to_epc(new_sap_points) valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc) property_valuation_increases.append(valuations["average_increase"]) # Commit the session after each batch session.commit() except Exception as e: # Rollback the session if an error occurs session.rollback() print("Failed i = %s" % str(i)) logger.error(f"An error occurred during batch starting at index {i}: {e}") logger.info("Creating portfolio aggregations") # We implement this in the simplest way possible which will be just to query the database for all # recommendations associated to the portfolio and then aggregate them. This is not the most efficient # way to do this, but it's the simplest and will be a process that we can re-use since when we change a # recommendation from being default to not default, we'll need to re-run this process to re-calculate the # the portfolion level impact total_valuation_increase = sum(property_valuation_increases) labour_days = round(max( [sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()] )) aggregate_portfolio_recommendations( session, portfolio_id=body.portfolio_id, total_valuation_increase=total_valuation_increase, labour_days=labour_days ) # Commit final changes session.commit() except IntegrityError: logger.error("Database integrity error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database integrity error.") except OperationalError: logger.error("Database operational error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database operational error.") except ValueError: logger.error("Value error - possibly due to malformed data", exc_info=True) session.rollback() return Response(status_code=400, content="Bad request: malformed data.") except Exception as e: # General exception handling logger.error(f"An error occurred: {e}") session.rollback() return Response(status_code=500, content="An unexpected error occurred.") finally: session.close() return Response(status_code=200)