from collections import defaultdict from fastapi import APIRouter, Depends from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.utils import read_csv_from_s3 from backend.app.config import get_settings from backend.Property import Property from epc_api.client import EpcClient from utils.logger import setup_logger from recommendations.FloorRecommendations import FloorRecommendations from recommendations.WallRecommendations import WallRecommendations from utils.uvalue_estimates import classify_decile_newvalues from backend.app.db.utils import row2dict from starlette.responses import Response from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError # database interaction functions from backend.app.db.functions.property_functions import ( create_property, create_property_targets, update_property_data, create_property_details_epc ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations, upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser from backend.app.utils import epc_to_sap_lower_bound from model_data.optimiser.optimiser_functions import prepare_input_measures # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls from backend.app.plan.uvalue_estimates_floors import uvalue_estimates_floors from backend.app.plan.temp_cleaned_data import cleaned logger = setup_logger() router = APIRouter( prefix="/plan", tags=["plan"], dependencies=[Depends(validate_token)], responses={404: {"description": "Not found"}} ) # TODO: Load this data from db open_uprn_data = [ {'UPRN': 6032920, 'X_COORDINATE': 535110.0, 'Y_COORDINATE': 181819.0, 'LATITUDE': 51.5191407, 'LONGITUDE': -0.0540506}, {'UPRN': 6038625, 'X_COORDINATE': 535374.0, 'Y_COORDINATE': 182784.0, 'LATITUDE': 51.5277492, 'LONGITUDE': -0.0498772}, {'UPRN': 34153991, 'X_COORDINATE': 523238.74, 'Y_COORDINATE': 178003.02, 'LATITUDE': 51.4875579, 'LONGITUDE': -0.226392}, {'UPRN': 10008299676, 'X_COORDINATE': 533285.0, 'Y_COORDINATE': 184711.0, 'LATITUDE': 51.5455629, 'LONGITUDE': -0.0792445}, {'UPRN': 10008299677, 'X_COORDINATE': 533285.0, 'Y_COORDINATE': 184711.0, 'LATITUDE': 51.5455629, 'LONGITUDE': -0.0792445}, {'UPRN': 100021039066, 'X_COORDINATE': 535506.0, 'Y_COORDINATE': 185624.0, 'LATITUDE': 51.5532385, 'LONGITUDE': -0.0468833}, {'UPRN': 100021226060, 'X_COORDINATE': 529247.0, 'Y_COORDINATE': 187959.0, 'LATITUDE': 51.5756908, 'LONGITUDE': -0.1362513}, {'UPRN': 200003489276, 'X_COORDINATE': 533210.0, 'Y_COORDINATE': 179442.0, 'LATITUDE': 51.4982309, 'LONGITUDE': -0.0823165} ] in_conservation_area_data = [ {'uprn': 6032920, 'is_in_conservation_area': 'not_in_conservation_area'}, {'uprn': 6038625, 'is_in_conservation_area': 'not_in_conservation_area'}, {'uprn': 34153991, 'is_in_conservation_area': 'unknown'}, {'uprn': 10008299676, 'is_in_conservation_area': 'in_conservation_area'}, {'uprn': 10008299677, 'is_in_conservation_area': 'in_conservation_area'}, {'uprn': 100021039066, 'is_in_conservation_area': 'not_in_conservation_area'}, {'uprn': 100021226060, 'is_in_conservation_area': 'in_conservation_area'}, {'uprn': 200003489276, 'is_in_conservation_area': 'in_conservation_area'} ] # TODO: db floors_decile_data = { 'decile_labels': ['Decile 1', 'Decile 2', 'Decile 3', 'Decile 4', 'Decile 5', 'Decile 6', 'Decile 7', 'Decile 8', 'Decile 9', 'Decile 10'], 'decile_boundaries': [6., 50., 56., 69., 77.6, 87., 98., 112., 127., 150., 2279.]} walls_decile_data = { 'decile_labels': ['Decile 1', 'Decile 2', 'Decile 3', 'Decile 4', 'Decile 5', 'Decile 6', 'Decile 7', 'Decile 8', 'Decile 9', 'Decile 10'], 'decile_boundaries': [6., 49., 51., 55., 64., 71., 76., 83., 96., 120., 2279.]} lighting_averages = [ {'lighting-description': 'good lighting efficiency', 'low-energy-lighting': 99.26666666666667}, {'lighting-description': 'excellent lighting efficiency', 'low-energy-lighting': 100.0}, {'lighting-description': 'below average lighting efficiency', 'low-energy-lighting': 0.0} ] def filter_materials(materials): materials_by_type = defaultdict(list) for material in materials: material = row2dict(material) material_type = material["type"] materials_by_type[material_type].append(material) # Optionally, you can convert the defaultdict to a normal dict if desired materials_by_type = dict(materials_by_type) return materials_by_type def insert_temp_recommendation_id(property_recommendations): """ Creates a temporary recommendation id which is needed for filtering recommendations between default and no, after the optimiser has been run :param property_recommendations: nested list of recommendations, grouped by data_types :return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id" integer inserted """ idx = 0 for recs in property_recommendations: for rec in recs: rec["recommendation_id"] = idx idx += 1 return property_recommendations @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") Session = sessionmaker(bind=db_engine) session = Session() try: session.begin() logger.info("Getting the inputs") # Read in the trigger file from s3 bucket_name = get_settings().PLAN_TRIGGER_BUCKET epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN) plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path) input_properties = [] for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly # TODO: implment validation # Create a record in db property_id, is_new = create_property( session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] ) # if a new record was not created, we don't produduce recommendations if not is_new: continue # TODO: Need to add heat demand target create_property_targets( session, property_id=property_id, portfolio_id=body.portfolio_id, epc_target=body.goal_value, heat_demand_target=None ) input_properties.append( Property( postcode=config['postcode'], address1=config['address'], epc_client=epc_client, id=property_id ) ) if not input_properties: return Response(status_code=204) logger.info("Getting EPC data") for p in input_properties: p.search_address_epc() p.set_year_built() logger.info("Getting coordinates") # This is placeholder, until the full dataset is loaded into the database for p in input_properties: coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0] p.set_coordinates(coordinate_data) logger.info("Check if property is in conservation area") for p in input_properties: in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get( "is_in_conservation_area" ) p.set_is_in_conservation_area(in_conservation_area) # The materials data could be cached or local so we don't need to make # consistent requrests to the backend for # the same data # TODO: It might not be the best choice to store the materials data in a database table since thi # table probably won't be very large and won't be updated that often. It might be better to # store this data in s3 load it into memory when the app starts up. We will test this materials = get_materials(session) materials_by_type = filter_materials(materials) logger.info("Getting components and properties recommendations") # TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method recommendations = {} for p in input_properties: property_recommendations = [] # For each property, classiy floor area decide total_floor_area_group_decile = classify_decile_newvalues( decile_boundaries=floors_decile_data["decile_boundaries"], decile_labels=floors_decile_data["decile_labels"], new_values=[float(p.data["total-floor-area"])], )[0] # Property recommendations p.get_components(cleaned) # This is placeholder, until the full dataset is loaded into the database and we just make a read to the # database floors_u_value_estimate = [ x for x in uvalue_estimates_floors if (x['local-authority'] == p.data["local-authority"]) & (x['property-type'] == p.data["property-type"]) & (x['built-form'] == p.data["built-form"]) & (x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data[ "floor-energy-eff"] != 'N/A' else True) & (x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True) ] # Floor recommendations floor_recommender = FloorRecommendations( property_instance=p, uvalue_estimates=floors_u_value_estimate, total_floor_area_group_decile=total_floor_area_group_decile, materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"], ) floor_recommender.recommend() if floor_recommender.recommendations: property_recommendations.append(floor_recommender.recommendations) # Wall recommendations # We would make this u-value query directly to the database total_floor_area_group_decile = classify_decile_newvalues( decile_boundaries=walls_decile_data["decile_boundaries"], decile_labels=walls_decile_data["decile_labels"], new_values=[float(p.data["total-floor-area"])], )[0] # This is placeholder, until the full dataset is loaded into the database and we just make a read to the # database walls_u_value_estimate = [ x for x in uvalue_estimates_walls if (x['local-authority'] == p.data["local-authority"]) & (x['property-type'] == p.data["property-type"]) & (x['built-form'] == p.data["built-form"]) & (x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data[ "walls-energy-eff"] != 'N/A' else True) & (x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True) ] wall_recomender = WallRecommendations( property_instance=p, uvalue_estimates=walls_u_value_estimate, total_floor_area_group_decile=total_floor_area_group_decile, materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"] ) wall_recomender.recommend() if wall_recomender.recommendations: property_recommendations.append(wall_recomender.recommendations) # Use the optimiser to pick the default recommendations and decide if we need certain # recommendations to get to the goal property_recommendations = insert_temp_recommendation_id(property_recommendations) if not property_recommendations: continue input_measures = prepare_input_measures(property_recommendations, body.goal) if body.budget: optimiser = GainOptimiser(input_measures, max_cost=body.budget) else: # The minimum gain is the minimum number of SAP points required to get to the target SAP band current_sap_points = int(p.data["current-energy-efficiency"]) target_sap_points = epc_to_sap_lower_bound(body.goal_value) # If the gain is negative, the optimiser will return an empty solution optimiser = CostOptimiser( input_measures, min_gain=target_sap_points - current_sap_points ) optimiser.setup() optimiser.solve() solution = optimiser.solution selected_recommendations = {r["id"] for r in solution} # We'll use the set of selected recommendations to filter the recommendations to upload property_recommendations = [ [ {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} for rec in recommendations_by_type ] for recommendations_by_type in property_recommendations ] # We'll also unlist the recommendations so they're a bit easier to handle from here onwards property_recommendations = [ rec for recommendations_by_type in property_recommendations for rec in recommendations_by_type ] recommendations[p.id] = property_recommendations # Once we're done, we'll store: # 1) the property data # 2) the property details (epc) # 3) the recommendations logger.info("Uploading recommendations to the database") # Upload property data for p in input_properties: property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id, rating_lookup=rating_lookup) create_property_details_epc(session, property_details_epc) property_data = p.get_full_property_data() update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data) # Upload recommendations recommendations_to_upload = recommendations.get(p.id, []) if not recommendations_to_upload: continue # Create a plan new_plan_id = create_plan( session, { "portfolio_id": body.portfolio_id, "property_id": p.id, "is_default": True } ) # Upload recommendations uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) # Finally, match the recommendation to the plan create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) logger.info("Creating portfolio aggregations") # We implement this in the simplest way possible which will be just to query the database for all # recommendations associated to the portfolio and then aggregate them. This is not the most efficient # way to do this, but it's the simplest and will be a process that we can re-use since when we change a # recommendation from being default to not default, we'll need to re-run this process to re-calculate the # the portfolion level impact aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id) # Commit all changes at once session.commit() except IntegrityError: logger.error("Database integrity error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database integrity error.") except OperationalError: logger.error("Database operational error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database operational error.") except ValueError: logger.error("Value error - possibly due to malformed data", exc_info=True) session.rollback() return Response(status_code=400, content="Bad request: malformed data.") except Exception as e: # General exception handling logger.error(f"An error occurred: {e}") session.rollback() return Response(status_code=500, content="An unexpected error occurred.") finally: session.close() return Response(status_code=200)