from datetime import datetime import pandas as pd from epc_api.client import EpcClient from fastapi import APIRouter, Depends from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import sessionmaker from starlette.responses import Response from backend.app.config import get_settings from backend.app.db.connection import db_engine from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.functions.property_functions import ( create_property, create_property_details_epc, create_property_targets, update_property_data ) from backend.app.db.functions.recommendations_functions import ( create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import ( create_recommendation_scoring_data, filter_materials, get_cleaned, insert_temp_recommendation_id ) from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3 from backend.ml_models.sap_change_model.api import SAPChangeModelAPI from backend.Property import Property from etl.epc.DataProcessor import DataProcessor from etl.epc.settings import COLUMNS_TO_MERGE_ON from recommendations.FloorRecommendations import FloorRecommendations from recommendations.RoofRecommendations import RoofRecommendations from recommendations.VentilationRecommendations import VentilationRecommendations from recommendations.FireplaceRecommendations import FireplaceRecommendations from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.WallRecommendations import WallRecommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet logger = setup_logger() BATCH_SIZE = 5 router = APIRouter( prefix="/plan", tags=["plan"], dependencies=[Depends(validate_token)], responses={404: {"description": "Not found"}} ) @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") session = sessionmaker(bind=db_engine)() created_at = datetime.now().isoformat() try: session.begin() logger.info("Getting the inputs") epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN) plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) uprn_filenames = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" ) cleaning_data = read_parquet_from_s3( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) input_properties = [] for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly # TODO: implment validation. We should also standardise postcode and address in some fashion as # a postcode of abcdef would be considered different to ABCDEF # Create a record in db property_id, is_new = create_property( session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] ) # if a new record was not created, we don't produduce recommendations if not is_new: continue # TODO: Need to add heat demand target create_property_targets( session, property_id=property_id, portfolio_id=body.portfolio_id, epc_target=body.goal_value, heat_demand_target=None ) input_properties.append( Property( postcode=config['postcode'], address1=config['address'], epc_client=epc_client, id=property_id ) ) if not input_properties: return Response(status_code=204) logger.info("Getting EPC, and spatial data") for p in input_properties: p.search_address_epc() p.set_year_built() p.get_spatial_data(uprn_filenames) # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data logger.info("Reading in materials and cleaned datasets") materials = get_materials(session) materials_by_type = filter_materials(materials) cleaned = get_cleaned() logger.info("Getting components and epc recommendations") # TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method recommendations = {} recommendations_scoring_data = [] for p in input_properties: # Property recommendations p.get_components(cleaned) property_recommendations = [] # Floor recommendations floor_recommender = FloorRecommendations(property_instance=p, materials=materials_by_type["floor"]) floor_recommender.recommend() if floor_recommender.recommendations: property_recommendations.append(floor_recommender.recommendations) # Wall recommendations wall_recomender = WallRecommendations(property_instance=p, materials=materials_by_type["walls"]) wall_recomender.recommend() if wall_recomender.recommendations: property_recommendations.append(wall_recomender.recommendations) # Roof recommendations roof_recommender = RoofRecommendations(property_instance=p, materials=materials_by_type["roof"]) roof_recommender.recommend() if roof_recommender.recommendations: property_recommendations.append(roof_recommender.recommendations) # Ventilation recommendations ventilation_recomender = VentilationRecommendations( property_instance=p, materials=materials_by_type["ventilation"] ) ventilation_recomender.recommend() if ventilation_recomender.recommendation: property_recommendations.append(ventilation_recomender.recommendation) # Fireplace sealing recommendations fireplace_recommender = FireplaceRecommendations(property_instance=p) fireplace_recommender.recommend() if fireplace_recommender.recommendation: property_recommendations.append(fireplace_recommender.recommendation) # We insert temporary ids into the recommendations which is important for the optimiser later property_recommendations = insert_temp_recommendation_id(property_recommendations) if not property_recommendations: continue recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP data_processor = DataProcessor(None, newdata=True) data_processor.insert_data(pd.DataFrame([p.get_model_data()])) # TODO: Temp if data_processor.data["UPRN"].values[0] == "": data_processor.data["UPRN"] = 0 data_processor.pre_process() starting_epc_data = data_processor.get_component_features(suffix="_STARTING") ending_epc_data = data_processor.get_component_features(suffix="_ENDING") fixed_data = data_processor.get_fixed_features() # We update the ending record with the recommended updates and we set lodgement date to today ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at) for recommendations_by_type in property_recommendations: for i, rec in enumerate(recommendations_by_type): scoring_dict = create_recommendation_scoring_data( property=p, recommendation=rec, starting_epc_data=starting_epc_data, ending_epc_data=ending_epc_data, fixed_data=fixed_data, ) recommendations_scoring_data.append(scoring_dict) # cleanup del data_processor logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) # Perform the same cleaning as in the model - first clean number of room variables though recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], ) recommendations_scoring_data = DataProcessor.apply_averages_cleaning( data_to_clean=recommendations_scoring_data, cleaning_data=cleaning_data, cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"], ).drop(columns=["LOCAL_AUTHORITY"]) recommendations_scoring_data = DataProcessor.clean_missings_after_description_process( recommendations_scoring_data, ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or ( "insulation_thickness" in c) or ("ENERGY_EFF" in c)] ) recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data) sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at) file_location = sap_change_model_api.upload_scoring_data( df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET ) response = sap_change_model_api.predict( file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location, ) # Retrieve the predictions predictions = pd.DataFrame( read_parquet_from_s3( bucket_name=get_settings().PREDICTIONS_BUCKET, file_key=response["storage_filepath"].split(get_settings().PREDICTIONS_BUCKET + "/")[1] ) ) predictions["predictions"] = predictions["predictions"].astype(float).round(1) predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True) # Insert the predictions into the recommendations and run the optimiser logger.info("Optimising recommendations") for property_id in recommendations.keys(): property = [p for p in input_properties if p.id == property_id][0] property_predictions = predictions[predictions["property_id"] == str(property_id)] for recommendations_by_type in recommendations[property_id]: for rec in recommendations_by_type: new_sap = property_predictions[property_predictions["recommendation_id"] == str( rec["recommendation_id"] )]["predictions"].values[0] rec["sap_points"] = new_sap - float(property.data["current-energy-efficiency"]) if rec["sap_points"] is None: raise ValueError("Sap points missing") input_measures = prepare_input_measures(recommendations[property_id], body.goal) if body.budget: optimiser = GainOptimiser(input_measures, max_cost=body.budget) else: # The minimum gain is the minimum number of SAP points required to get to the target SAP band current_sap_points = int(property.data["current-energy-efficiency"]) target_sap_points = epc_to_sap_lower_bound(body.goal_value) # If the gain is negative, the optimiser will return an empty solution optimiser = CostOptimiser( input_measures, min_gain=target_sap_points - current_sap_points ) optimiser.setup() optimiser.solve() solution = optimiser.solution selected_recommendations = {r["id"] for r in solution} # We'll use the set of selected recommendations to filter the recommendations to upload final_recommendations = [ [ {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} for rec in recommendations_by_type ] for recommendations_by_type in recommendations[property_id] ] # We'll also unlist the recommendations so they're a bit easier to handle from here onwards final_recommendations = [ rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type ] recommendations[property_id] = final_recommendations # 1) the property data # 2) the property details (epc) # 3) the recommendations logger.info("Uploading recommendations to the database") session.commit() for i in range(0, len(input_properties), BATCH_SIZE): try: # Take a slice of the input_properties list to make a batch batch_properties = input_properties[i:i + BATCH_SIZE] for p in batch_properties: # Your existing operations property_details_epc = p.get_property_details_epc( portfolio_id=body.portfolio_id, rating_lookup=rating_lookup ) create_property_details_epc(session, property_details_epc) # TODO: TEMP if p.data["uprn"] == "": print("Get rid of me!") p.data["uprn"] = 0 property_data = p.get_full_property_data() update_property_data( session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data ) recommendations_to_upload = recommendations.get(p.id, []) if not recommendations_to_upload: continue new_plan_id = create_plan(session, { "portfolio_id": body.portfolio_id, "property_id": p.id, "is_default": True }) uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) # Commit the session after each batch session.commit() except Exception as e: # Rollback the session if an error occurs session.rollback() print("Failed i = %s" % str(i)) logger.error(f"An error occurred during batch starting at index {i}: {e}") logger.info("Creating portfolio aggregations") # We implement this in the simplest way possible which will be just to query the database for all # recommendations associated to the portfolio and then aggregate them. This is not the most efficient # way to do this, but it's the simplest and will be a process that we can re-use since when we change a # recommendation from being default to not default, we'll need to re-run this process to re-calculate the # the portfolion level impact aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id) # Commit all changes at once session.commit() except IntegrityError: logger.error("Database integrity error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database integrity error.") except OperationalError: logger.error("Database operational error occurred", exc_info=True) session.rollback() return Response(status_code=500, content="Database operational error.") except ValueError: logger.error("Value error - possibly due to malformed data", exc_info=True) session.rollback() return Response(status_code=400, content="Bad request: malformed data.") except Exception as e: # General exception handling logger.error(f"An error occurred: {e}") session.rollback() return Response(status_code=500, content="An unexpected error occurred.") finally: session.close() return Response(status_code=200)