from datetime import datetime from tqdm import tqdm import pandas as pd from etl.epc.Record import EPCRecord from backend.SearchEpc import SearchEpc from fastapi import APIRouter, Depends from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import sessionmaker from starlette.responses import Response from backend.app.config import get_settings from backend.app.db.connection import db_engine from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.functions.property_functions import ( create_property, create_property_details_epc, create_property_targets, update_property_data, update_or_create_property_spatial_details ) from backend.app.db.functions.recommendations_functions import ( create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import get_cleaned from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc from backend.ml_models.api import ModelApi from backend.Property import Property from etl.solar.SolarPhotoSupply import SolarPhotoSupply from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.ml_models.Valuation import PropertyValuation logger = setup_logger() BATCH_SIZE = 5 SCORING_BATCH_SIZE = 400 def patch_epc(config, epc_records): """ This utility function is useful to patch the epc data if we have data from the customer :return: """ number_habitable_rooms = config.get("number-habitable-rooms", None) number_heated_rooms = config.get("number-heated-rooms", None) if number_habitable_rooms is not None: epc_records["original_epc"]["number-habitable-rooms"] = int(number_habitable_rooms) if number_heated_rooms is not None: epc_records["original_epc"]["number-heated-rooms"] = int(number_heated_rooms) return epc_records router = APIRouter( prefix="/plan", tags=["plan"], dependencies=[Depends(validate_token)], responses={404: {"description": "Not found"}} ) @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") session = sessionmaker(bind=db_engine)() created_at = datetime.now().isoformat() # TODO: We should store the trigger file path in the database with the plan so we can track the file that # triggered the plan # TODO: Create the ability to congigure/switch off certain measures try: session.begin() logger.info("Getting the inputs") plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) cleaning_data = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) input_properties = [] for config in tqdm(plan_input): # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly uprn = config.get("uprn", None) if uprn: uprn = int(float(uprn)) epc_searcher = SearchEpc( address1=config["address"], postcode=config["postcode"], uprn=uprn, auth_token=get_settings().EPC_AUTH_TOKEN, os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY ) epc_searcher.find_property() # Create a record in db property_id, is_new = create_property( session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn ) # if a new record was not created, we don't produduce recommendations if not is_new: continue create_property_targets( session, property_id=property_id, portfolio_id=body.portfolio_id, epc_target=body.goal_value, heat_demand_target=None ) epc_records = { 'original_epc': epc_searcher.newest_epc.copy(), 'full_sap_epc': epc_searcher.full_sap_epc.copy(), 'old_data': epc_searcher.older_epcs.copy(), } epc_records = patch_epc(config, epc_records) prepared_epc = EPCRecord( epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data ) input_properties.append( Property( id=property_id, address=epc_searcher.address_clean, postcode=epc_searcher.postcode_clean, epc_record=prepared_epc, ) ) if not input_properties: return Response(status_code=204) # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data logger.info("Reading in materials and cleaned datasets") materials = get_materials(session) cleaned = get_cleaned() uprn_filenames = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" ) photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET) logger.info("Getting spatial data") for p in input_properties: p.get_spatial_data(uprn_filenames) logger.info("Getting components and epc recommendations") recommendations = {} recommendations_scoring_data = [] representative_recommendations = {} for p in tqdm(input_properties): # Property recommendations p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions) property_recommendations, property_representative_recommendations = recommender.recommend() if not property_recommendations: continue recommendations[p.id] = property_recommendations representative_recommendations[p.id] = property_representative_recommendations p.create_base_difference_epc_record(cleaned_lookup=cleaned) p.adjust_difference_record_with_recommendations( property_recommendations, property_representative_recommendations ) recommendations_scoring_data.extend(p.recommendations_scoring_data) # TODO: Make sure that number_habitable_rooms has been dropped logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) recommendations_scoring_data = recommendations_scoring_data.drop( columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending"] ) model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) all_predictions = { "sap_change_predictions": pd.DataFrame(), "heat_demand_predictions": pd.DataFrame(), "carbon_change_predictions": pd.DataFrame() } to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE) for chunk in tqdm(to_loop_over, total=len(to_loop_over)): predictions_dict = model_api.predict_all( df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE], bucket=get_settings().DATA_BUCKET, prediction_buckets={ "sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET, "heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET, "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET } ) # Append the predictions to the predictions dictionary for key, scored in predictions_dict.items(): all_predictions[key] = pd.concat([all_predictions[key], scored]) # Insert the predictions into the recommendations and run the optimiser # TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a # possibility with heating system # TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water # cylinder jacket), we should add these to the recommendations as default logger.info("Optimising recommendations") for property_id in recommendations.keys(): property_instance = [p for p in input_properties if p.id == property_id][0] recommendations_with_impact, current_adjusted_energy, expected_adjusted_energy = ( Recommendations.calculate_recommendation_impact( property_instance=property_instance, all_predictions=all_predictions, recommendations=recommendations ) ) # Store the resulting adjusted energy in the property instance property_instance.set_adjusted_energy( current_adjusted_energy=current_adjusted_energy, expected_adjusted_energy=expected_adjusted_energy ) input_measures = prepare_input_measures(recommendations_with_impact, body.goal) current_sap_points = int(property_instance.data["current-energy-efficiency"]) target_sap_points = epc_to_sap_lower_bound(body.goal_value) sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points) if body.budget: optimiser = GainOptimiser( input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0 ) else: # The minimum gain is the minimum number of SAP points required to get to the target SAP band # If the gain is negative, the optimiser will return an empty solution optimiser = CostOptimiser( input_measures, min_gain=sap_gain ) optimiser.setup() optimiser.solve() solution = optimiser.solution selected_recommendations = {r["id"] for r in solution} # If wall insulation is selected, we also include mechanical ventilation as a best practice measure if any(x in [r["type"] for r in solution] for x in [ "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation" ]): ventilation_rec = next( (r[0] for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation"), None ) # If a matching recommendation was found, add its ID to the selected recommendations if ventilation_rec: selected_recommendations.add(ventilation_rec["recommendation_id"]) # We'll use the set of selected recommendations to filter the recommendations to upload final_recommendations = [ [ {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} for rec in recommendations_by_type ] for recommendations_by_type in recommendations_with_impact ] # We'll also unlist the recommendations so they're a bit easier to handle from here onwards final_recommendations = [ rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type ] recommendations[property_id] = final_recommendations # 1) the property data # 2) the property details (epc) # 3) the recommendations logger.info("Uploading recommendations to the database") property_valuation_increases = [] session.commit() for i in range(0, len(input_properties), BATCH_SIZE): try: # Take a slice of the input_properties list to make a batch batch_properties = input_properties[i:i + BATCH_SIZE] for p in batch_properties: recommendations_to_upload = recommendations.get(p.id, []) default_recommendations = [r for r in recommendations_to_upload if r["default"]] total_sap_points = sum([r["sap_points"] for r in default_recommendations]) new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points new_epc = sap_to_epc(new_sap_points) valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc) # Your existing operations property_details_epc = p.get_property_details_epc( portfolio_id=body.portfolio_id, rating_lookup=rating_lookup, ) create_property_details_epc(session, property_details_epc) update_or_create_property_spatial_details(session, p.uprn, p.spatial) property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) update_property_data( session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data ) if not recommendations_to_upload: continue new_plan_id = create_plan(session, { "portfolio_id": body.portfolio_id, "property_id": p.id, "is_default": True, "valuation_increase_lower_bound": ( valuations["lower_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_upper_bound": ( valuations["upper_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_average": ( valuations["average_increased_value"] - valuations["current_value"] ), }) uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) property_valuation_increases.append( valuations["average_increased_value"] - valuations["current_value"] ) # Commit the session after each batch session.commit() except Exception as e: # Rollback the session if an error occurs session.rollback() print("Failed i = %s" % str(i)) logger.error(f"An error occurred during batch starting at index {i}: {e}") logger.info("Creating portfolio aggregations") # We implement this in the simplest way possible which will be just to query the database for all # recommendations associated to the portfolio and then aggregate them. This is not the most efficient # way to do this, but it's the simplest and will be a process that we can re-use since when we change a # recommendation from being default to not default, we'll need to re-run this process to re-calculate the # the portfolion level impact total_valuation_increase = sum(property_valuation_increases) labour_days = round(max( [sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()] )) aggregate_portfolio_recommendations( session, portfolio_id=body.portfolio_id, total_valuation_increase=total_valuation_increase, labour_days=labour_days ) # Commit final changes session.commit() except IntegrityError: logger.error("Database integrity error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database integrity error.") except OperationalError: logger.error("Database operational error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database operational error.") except ValueError: logger.error("Value error - possibly due to malformed data", exc_info=True) session.rollback() return Response(status_code=400, content="Bad request: malformed data.") except Exception as e: # General exception handling logger.error(f"An error occurred: {e}") session.rollback() return Response(status_code=500, content="An unexpected error occurred.") finally: session.close() return Response(status_code=200)