From fafbf4a52f79874f2890a0648afa63595b8acdbd Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 16 Apr 2025 16:42:12 +0100 Subject: [PATCH] refactoring serverless script with sqs queue --- backend/app/plan/router.py | 1054 +----------------------------------- backend/lambda/README.md | 8 + backend/lambda/handler.py | 13 + serverless.yml | 130 +++-- 4 files changed, 114 insertions(+), 1091 deletions(-) create mode 100644 backend/lambda/README.md create mode 100644 backend/lambda/handler.py diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index bc482263..a7351534 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -1,418 +1,11 @@ -import ast import asyncio -import json -from datetime import datetime - -from tqdm import tqdm -import pandas as pd -from etl.epc.Record import EPCRecord -from backend.SearchEpc import SearchEpc -from fastapi import APIRouter, Depends -from sqlalchemy.exc import IntegrityError, OperationalError -from sqlalchemy.orm import sessionmaker -from starlette.responses import Response - -from backend.app.config import get_settings, get_prediction_buckets -from backend.app.db.connection import db_engine -from backend.app.db.functions.materials_functions import get_materials -from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations -from backend.app.db.functions.property_functions import ( - create_property, create_property_details_epc, create_property_targets, update_property_data, - update_or_create_property_spatial_details -) -from backend.app.db.functions.recommendations_functions import ( - create_plan, upload_recommendations, create_scenario -) -from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn -from backend.app.db.models.portfolio import rating_lookup +from fastapi import APIRouter, Depends, BackgroundTasks from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest -from backend.app.plan.utils import get_cleaned -from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc -import backend.app.assumptions as assumptions - -from backend.ml_models.api import ModelApi -from backend.Property import Property -from backend.Funding import Funding -from backend.apis.GoogleSolarApi import GoogleSolarApi - -from recommendations.optimiser.CostOptimiser import CostOptimiser -from recommendations.optimiser.GainOptimiser import GainOptimiser -from recommendations.optimiser.optimiser_functions import prepare_input_measures -from recommendations.Recommendations import Recommendations from utils.logger import setup_logger -from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 -from backend.ml_models.Valuation import PropertyValuation - -from etl.bill_savings.KwhData import KwhData -from etl.spatial.OpenUprnClient import OpenUprnClient -from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc logger = setup_logger() -BATCH_SIZE = 5 -SCORING_BATCH_SIZE = 400 - - -def patch_epc(patch, epc_records): - """ - This utility function is useful to patch the epc data if we have data from the customer - :return: - """ - - for patch_variable, patch_value in patch.items(): - - if patch_variable in ["address", "postcode"]: - continue - - if patch_value == "": - continue - if patch_variable in epc_records["original_epc"]: - epc_records["original_epc"][patch_variable] = patch_value - - return epc_records - - -def extract_portfolio_aggregation_data( - input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges -): - # We aggregate a number of metrics for the portfolio: - # 1) A breakdown of the number of properties in each EPC band - # a) before retrofit - # b) after retrofit - # 2) Number of units - # 3) Co2/unit - # a) before retrofit - # b) after retrofit - # 4) Energy bill/unit - # a) before retrofit - # b) after retrofit - # 5) Average valuation improvement/unit - # 6) Total cost - # 7) Cost per unit - # 8) £ per CO2 saved - # 9) £ per SAP point - - # We need to construct the underlyind data for this - - # Helper function to reformat the EPC data - def reformat_epc_data(epc_counts): - # Define all possible EPC bands in the required order - epc_bands = ["G", "F", "E", "D", "C", "B", "A"] - - # Create the formatted data list by checking each band in the order - formatted_data = [] - for band in epc_bands: - # Get the count from the dictionary, defaulting to 0 if not present - count = epc_counts.get(band, 0) - # Append the formatted dictionary to the list - formatted_data.append({"name": band, band: count}) - - return formatted_data - - n_units = len(input_properties) - - agg_data = [] - for p in input_properties: - # Get the recommendations for the property - we include all properties, even ones without recommendations - property_recommendations = recommendations.get(p.id, []) - - # Get just the default recommendations - default_recommendations = [r for r in property_recommendations if r["default"]] - - has_recommendations = len(default_recommendations) > 0 - - # We can now calculate multiple outputs based on default recommendations - carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations]) - - pre_retrofit_co2 = p.energy["co2_emissions"] - post_retrofit_co2 = pre_retrofit_co2 - carbon_savings - - pre_retrofit_energy_bill = sum(p.current_energy_bill.values()) - post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum( - [r["energy_cost_savings"] for r in default_recommendations] - ) - - pre_retrofit_energy_consumption = p.current_energy_consumption - post_retrofit_energy_consumption = p.current_energy_consumption - sum( - [r["kwh_savings"] for r in default_recommendations] - ) - - # Add up energy savings - cost = sum([r["total"] for r in default_recommendations]) - sap_point_improvement = sum([r["sap_points"] for r in default_recommendations]) - - lower_bound_valuation_uplift = ( - property_value_increase_ranges[p.id]["lower_bound_increased_value"] - - property_value_increase_ranges[p.id]["current_value"] - ) - upper_bound_valuation_uplift = ( - property_value_increase_ranges[p.id]["upper_bound_increased_value"] - - property_value_increase_ranges[p.id]["current_value"] - ) - - agg_data.append({ - "pre_retrofit_epc": p.data["current-energy-rating"], - "post_retrofit_epc": new_epc_bands[p.id], - "pre_retrofit_co2": pre_retrofit_co2, - "post_retrofit_co2": post_retrofit_co2, - "pre_retrofit_energy_bill": pre_retrofit_energy_bill, - "post_retrofit_energy_bill": post_retrofit_energy_bill, - "pre_retrofit_energy_consumption": pre_retrofit_energy_consumption, - "post_retrofit_energy_consumption": post_retrofit_energy_consumption, - "cost": cost, - "sap_point_improvement": sap_point_improvement, - "lower_bound_valuation_uplift": lower_bound_valuation_uplift, - "upper_bound_valuation_uplift": upper_bound_valuation_uplift, - "has_recommendations": has_recommendations - }) - - agg_data = pd.DataFrame(agg_data) - - n_units_to_retrofit = agg_data["has_recommendations"].sum() - - valuation_improvement_lower_bound_per_unit = ( - agg_data["lower_bound_valuation_uplift"].mean() - ) - valuation_improvement_upper_bound_per_unit = ( - agg_data["upper_bound_valuation_uplift"].mean() - ) - - total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum() - total_sap_points = agg_data["sap_point_improvement"].sum() - - def format_money(amount): - return f"£{amount:,.0f}" - - valuation_improvment_per_unit = str( - format_money( - total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - " - f"{format_money(valuation_improvement_upper_bound_per_unit)})") - ) - - valuation_return_on_investment = str( - str(round(total_valuation_increase / agg_data["cost"].sum(), 2)) + - f" (" - f"{agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f} - " - f"{agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f})" - ) - - aggregation_data = { - "epc_breakdown_pre_retrofit": json.dumps( - reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict()) - ), - "epc_breakdown_post_retrofit": json.dumps( - reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict()) - ), - "number_of_properties": int(n_units), - "n_units_to_retrofit": int(n_units_to_retrofit), - "co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t", - "co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t", - "energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()), - "energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()), - "energy_consumption_per_unit_pre_retrofit": str( - round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh", - "energy_consumption_per_unit_post_retrofit": str( - round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh", - "valuation_improvement_per_unit": valuation_improvment_per_unit, - "cost_per_unit": format_money(agg_data["cost"].mean()), - "cost_per_co2_saved": format_money(agg_data["cost"].sum() / total_carbon_saved), - "cost_per_sap_point": format_money(agg_data["cost"].sum() / total_sap_points), - "valuation_return_on_investment": valuation_return_on_investment, - # TODO: Could we add 10yr carbon credits value? - } - - return aggregation_data - - -def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict): - """ - This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs - and will factor in an energy assessment that we have performed for a client. - :param epc_searcher: An instance of the SearchEpc class - :param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment, - this should be an empty response as defined by the models's - EnergyAssessment.empty_response() method - """ - - newest_epc = epc_searcher.newest_epc.copy() - if newest_epc["uprn"] == "" and epc_searcher.uprn: - newest_epc["uprn"] = epc_searcher.uprn - - if not energy_assessment["epc"]: - energy_assessment_is_newer = False - return { - 'original_epc': newest_epc, - 'full_sap_epc': epc_searcher.full_sap_epc.copy(), - 'old_data': epc_searcher.older_epcs.copy(), - }, energy_assessment_is_newer - - epc = energy_assessment["epc"] - energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d") - - # We insert county into the epc, since right now this isn't something that we pull out from the energy - # assessment - for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]: - epc[col] = newest_epc[col] - - # We check if the energy assessment is newer than the newest EPC - if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]): - # In this case, our energy assessment is newer than the EPCs available for this property - energy_assessment_is_newer = True - return { - "original_epc": epc, - "full_sap_epc": epc_searcher.full_sap_epc.copy(), - "old_data": epc_searcher.older_epcs.copy() + [newest_epc] - }, energy_assessment_is_newer - - # We check if the EPC we have produced is contained in the set of EPCs done for the property - # We do this based on inspection-date and SAP - epc_in_historicals = [ - x for x in epc_searcher.older_epcs + [newest_epc] - if x["inspection-date"] == energy_assessment_date and - x["current-energy-efficiency"] == epc["current-energy-efficiency"] - ] - energy_assessment_is_newer = False - - if epc_in_historicals: - # Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest - return { - "original_epc": newest_epc, - "full_sap_epc": epc_searcher.full_sap_epc.copy(), - "old_data": epc_searcher.older_epcs.copy() - }, energy_assessment_is_newer - - # In this case, our EPC is older than the newest publically avaible one, but is not contained in - # the historicals, so it can't have been lodged, so we include it in the old data - return { - 'original_epc': newest_epc, - 'full_sap_epc': epc_searcher.full_sap_epc.copy(), - 'old_data': epc_searcher.older_epcs.copy() + [epc], - }, energy_assessment_is_newer - - -def get_request_property_data(body: PlanTriggerRequest): - """ - This function will read in the on-site data from the S3 bucket - :param body: The request body - :return: - """ - patches = [] - if body.patches_file_path: - patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path) - - already_installed = [] - if body.already_installed_file_path: - already_installed = read_csv_from_s3( - bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path - ) - - non_invasive_recommendations = [] - if body.non_invasive_recommendations_file_path: - non_invasive_recommendations = read_csv_from_s3( - bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path - ) - - valuation_data = [] - if body.valuation_file_path: - valuation_data = read_csv_from_s3( - bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path - ) - - return patches, already_installed, non_invasive_recommendations, valuation_data - - -def extract_property_request_data( - config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn -): - patch_has_uprn = "uprn" in patches[0] if patches else True - if patch_has_uprn: - patch = next(( - x for x in patches if str(x["uprn"]) == str(config["uprn"]) - ), {}) - else: - patch = next(( - x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - ), {}) - - property_already_installed = next(( - x for x in already_installed if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - ), {}) - - # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN - # we need to check existence of uprn - has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False - if has_uprn: - has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None] - - if has_uprn: - property_non_invasive_recommendations = next(( - x for x in non_invasive_recommendations if - (str(x["uprn"]) == str(uprn)) - ), {}) - - # We patch the non-invasive recs that are ['cavity_extract_and_refill'] - else: - property_non_invasive_recommendations = next(( - x for x in non_invasive_recommendations if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - ), {}) - - if isinstance(property_non_invasive_recommendations.get("recommendations"), str): - property_non_invasive_recommendations["recommendations"] = ast.literal_eval( - property_non_invasive_recommendations["recommendations"] - ) - transformed = [] - for rec in property_non_invasive_recommendations["recommendations"]: - if isinstance(rec, str): - transformed.append({"type": rec, }) - else: - transformed.append(rec) - - property_non_invasive_recommendations["recommendations"] = transformed - - # Check if the valuation data has uprn - valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False - if valuation_has_uprn: - valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None] - - if valuation_has_uprn: - property_valution = next(( - float(x["valuation"]) for x in valuation_data if - (str(x["uprn"]) == str(uprn)) - ), None) - else: - property_valution = next(( - float(x["valuation"]) for x in valuation_data if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - ), None) - - return patch, property_already_installed, property_non_invasive_recommendations, property_valution - - -def get_funding_data(): - """ - This function retrieves the eco project scores matrix and the warm homes local grant funding data - :return: - """ - project_scores_matrix = read_csv_from_s3( - bucket_name=get_settings().DATA_BUCKET, - filepath="funding/ECO4 Full Project Scores Matrix.csv", - ) - project_scores_matrix = pd.DataFrame(project_scores_matrix) - project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings'] - project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float) - - whlg_eligible_postcodes = read_csv_from_s3( - bucket_name=get_settings().DATA_BUCKET, - filepath="funding/whlg eligible postcodes.csv", - ) - whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) - - return project_scores_matrix, whlg_eligible_postcodes - - router = APIRouter( prefix="/plan", tags=["plan"], @@ -422,655 +15,16 @@ router = APIRouter( @router.post("/trigger", status_code=202) -async def trigger_plan_entrypoint(body: PlanTriggerRequest): +async def trigger_plan_entrypoint(body: PlanTriggerRequest, background_tasks: BackgroundTasks): """ This function is the entrypoint for the plan trigger API. It will handle the request and call the trigger_plan function. :param body: The request body + :param background_tasks: The background tasks :return: """ logger.info("API triggered with body: %s", body) # Kick off the async background task - asyncio.create_task(model_engine(body)) + # TODO: Trigger SQS job return {"message": "Plan job accepted"} - - -async def model_engine(body: PlanTriggerRequest): - logger.info("Model Engine triggered with body: %s", body) - - logger.info("Connecting to db") - session = sessionmaker(bind=db_engine)() - created_at = datetime.now().isoformat() - - # TODO: if the measure is already installed, it should actually be the very first phase - - try: - session.begin() - logger.info("Getting the inputs") - plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) - # Check for duplicate UPRNS - input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")] - - if input_uprns: - # Check for dupes - if len(input_uprns) != len(set(input_uprns)): - raise ValueError("Duplicate UPRNs in the input data") - - # If we have patches or overrides, we should read them in here - patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body) - - cleaning_data = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", - ) - - input_properties = [] - for config in tqdm(plan_input): - # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly - uprn = config.get("uprn", None) - if uprn: - uprn = int(float(uprn)) - - epc_searcher = SearchEpc( - address1=config["address"], - postcode=config["postcode"], - uprn=uprn, - auth_token=get_settings().EPC_AUTH_TOKEN, - os_api_key="", - ) - epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) - epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) - # For the moment, our OS API access is unavailable, so we skip and interpolate - epc_searcher.find_property(skip_os=True) - - # We check for an energy assessment we have performed on this property: - energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn) - - # Create a record in db - property_id, is_new = create_property( - session=session, - portfolio_id=body.portfolio_id, - address=epc_searcher.address_clean, - postcode=epc_searcher.postcode_clean, - uprn=epc_searcher.uprn, - energy_assessment=energy_assessment - ) - if not is_new and not body.multi_plan: - continue - - if epc_searcher.newest_epc is None: - raise ValueError( - "No EPCs found for this property and did not estimate - likely need to provide a" - "property type and built form" - ) - - if is_new: - create_property_targets( - session, - property_id=property_id, - portfolio_id=body.portfolio_id, - epc_target=body.goal_value, - heat_demand_target=None - ) - - # If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that. - # Otherwise, we use the newest EPC - # energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that - # has been publically lodged - epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records( - epc_searcher, energy_assessment - ) - - patch, property_already_installed, property_non_invasive_recommendations, property_valuation = ( - extract_property_request_data( - config=config, - patches=patches, - already_installed=already_installed, - non_invasive_recommendations=non_invasive_recommendations, - valuation_data=valuation_data, - uprn=epc_searcher.uprn, - ) - ) - - # if we have a remote assment data type, we pull the additional data and include it - if body.event_type == "remote_assessment": - logger.info("Retrieving find my epc data") - for k in ["address", "address1"]: - epc_searcher.newest_epc[k] = epc_searcher.address_clean - - property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(epc_searcher.newest_epc) - # If we have a property type, this means when we pull the epc data, we might need to make a patch - - epc_records = patch_epc(patch, epc_records) - - prepared_epc = EPCRecord( - epc_records=epc_records, - run_mode="newdata", - cleaning_data=cleaning_data - ) - - input_properties.append( - Property( - id=property_id, - is_new=is_new, - address=epc_searcher.address_clean, - postcode=epc_searcher.postcode_clean, - epc_record=prepared_epc, - already_installed=property_already_installed, - property_valuation=property_valuation, - non_invasive_recommendations=property_non_invasive_recommendations, - energy_assessment=energy_assessment, - **Property.extract_kwargs(config), # TODO: Depraecate this - ) - ) - - if not input_properties: - return Response(status_code=204) - - # Set up model api and warm up the lambdas - model_api = ModelApi( - portfolio_id=body.portfolio_id, - timestamp=created_at, - prediction_buckets=get_prediction_buckets(), - max_retries=1 - ) - await model_api.async_warm_up_lambdas( - model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES - ) - - # The materials data could be cached or local so we don't need to make - # consistent requests to the backend for - # the same data - logger.info("Reading in materials and cleaned datasets") - materials = get_materials(session) - cleaned = get_cleaned() - eco_project_scores_matrix, whlg_eligible_postcodes = get_funding_data() - - kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) - - epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned) - - kwh_preds = await model_api.async_paginated_predictions( - data=epcs_for_scoring, - bucket=get_settings().DATA_BUCKET, - model_prefixes=model_api.KWH_MODEL_PREFIXES, - extract_ids=False, - batch_size=SCORING_BATCH_SIZE - ) - - # Insert the spatial data - logger.info("Getting spatial data") - input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET) - - [p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties] - - # TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour - # TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with - # extensions, since it doesn't seem to do a great job - - logger.info("Performing solar analysis") - - ofgem_consumption_averages = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, - file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet" - ) - - building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data( - input_properties=input_properties, - ofgem_consumption_averages=ofgem_consumption_averages, - body=body - ) - - input_properties = GoogleSolarApi.building_solar_analysis( - building_solar_config=building_solar_config, - input_properties=input_properties, - session=session, - google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY - ) - - input_properties = GoogleSolarApi.unit_solar_analysis( - unit_solar_config=unit_solar_config, - input_properties=input_properties, - session=session, - body=body, - google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY - ) - - logger.info("Identifying property recommendations") - recommendations = {} - recommendations_scoring_data = [] - representative_recommendations = {} - for p in tqdm(input_properties): - recommender = Recommendations( - property_instance=p, - materials=materials, - exclusions=body.exclusions, - inclusions=body.inclusions, - default_u_values=body.default_u_values - ) - property_recommendations, property_representative_recommendations = recommender.recommend() - - if not property_recommendations: - continue - - recommendations[p.id] = property_recommendations - representative_recommendations[p.id] = property_representative_recommendations - - p.create_base_difference_epc_record(cleaned_lookup=cleaned) - p.adjust_difference_record_with_recommendations( - property_recommendations, property_representative_recommendations - ) - - recommendations_scoring_data.extend(p.recommendations_scoring_data) - - # TODO: Make sure that number_habitable_rooms has been dropped - logger.info("Preparing data for scoring in sap change api") - recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) - - recommendations_scoring_data = recommendations_scoring_data.drop( - columns=[ - "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", - "carbon_ending" - ] - ) - - all_predictions = await model_api.async_paginated_predictions( - data=recommendations_scoring_data, - bucket=get_settings().DATA_BUCKET, - batch_size=SCORING_BATCH_SIZE - ) - - # Insert the predictions into the recommendations, and get the impact summary - scoring_epcs = [] # For scoring the kwh models - for property_id in recommendations.keys(): - property_instance = [p for p in input_properties if p.id == property_id][0] - - recommendations_with_impact, impact_summary = ( - Recommendations.calculate_recommendation_impact( - property_instance=property_instance, - all_predictions=all_predictions, - recommendations=recommendations, - representative_recommendations=representative_recommendations - ) - ) - - # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc - # at each phase - property_instance.update_simulation_epcs(impact_summary) - scoring_epcs.extend(property_instance.updated_simulation_epcs) - recommendations[property_id] = recommendations_with_impact - - # We call the API with the scoring epcs - scoring_epcs = pd.DataFrame(scoring_epcs) - scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned) - - kwh_simulation_predictions = await model_api.async_paginated_predictions( - data=scoring_epcs, - bucket=get_settings().DATA_BUCKET, - model_prefixes=model_api.KWH_MODEL_PREFIXES, - batch_size=SCORING_BATCH_SIZE - ) - - # We now insert kwh estimates and costs into the recommendations - logger.info("Calculating tenant savings - kwh and bills") - for property_id in tqdm([p.id for p in input_properties]): - property_recommendations = recommendations.get(property_id, []) - property_instance = [p for p in input_properties if p.id == property_id][0] - - property_current_energy_bill = ( - Recommendations.calculate_recommendation_tenant_savings( - property_instance=property_instance, - kwh_simulation_predictions=kwh_simulation_predictions, - property_recommendations=property_recommendations, - ashp_cop=body.ashp_cop - ) - ) - property_instance.current_energy_bill = property_current_energy_bill - - # Insert the predictions into the recommendations and run the optimiser - for p in input_properties: - if not recommendations.get(p.id): - continue - - # we need to double unlist because we have a list of lists - property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} - - property_required_measures = [ - m for m in recommendations[p.id] if m[0]["type"] in body.required_measures - ] - measures_to_optimise = [ - m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures - ] - - # If we have a wall insulation measure, we MUST include mechanical ventilation - # Additionally, if we have required measures, they should also be included. Therefore - # we can discount the number of points required to get to the target SAP band (or increase) - # in the case of ventilation - needs_ventilation = any(x in property_measure_types for x in assumptions.measures_needing_ventilation) - - input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation) - - if not input_measures[0]: - # This means that we have no defaults - selected_recommendations = {} - solution = [] - else: - - fixed_gain = 0 - if property_required_measures: - # We get the SAP points for the required measures - if body.goal != "Increasing EPC": - raise NotImplementedError("Only EPC optimisation is currently supported") - sap_by_type = [ - {"type": rec["type"], "sap_points": rec["sap_points"]} for recs in property_required_measures - for rec in recs - ] - # We get a MAX sap points per type - max_per_type = ( - pd.DataFrame(sap_by_type).groupby("type")["sap_points"].max().to_dict() - ) - fixed_gain = sum(max_per_type.values()) - - property_required_measure_types = {rec["type"] for rec in sap_by_type} - - # if the property needs ventilation, but the measure we optimise didn't include - # venilation we add the points for ventilation as a fixed gain - if needs_ventilation and any( - r in property_required_measure_types for r in assumptions.measures_needing_ventilation - ): - fixed_gain += next( - (r[0]["sap_points"] for r in recommendations[p.id] if - r[0]["type"] == "mechanical_ventilation"), - 0 - ) - - current_sap_points = int(p.data["current-energy-efficiency"]) - - sap_gain = CostOptimiser.calculate_sap_gain_with_slack( - epc_to_sap_lower_bound(body.goal_value) - current_sap_points - ) - fixed_gain - - if not body.optimise: - if body.goal != "Increasing EPC": - raise NotImplementedError("Only EPC optimisation is currently supported") - solution = [] - for sub_list in input_measures: - # Select the entry with the highest gain, and if tied, choose the one with the lowest cost - best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost'])) - solution.append(best_measure) - else: - - if body.budget: - optimiser = GainOptimiser( - input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0 - ) - else: - # The minimum gain is the minimum number of SAP points required to get to the target SAP band - # If the gain is negative, the optimiser will return an empty solution - optimiser = CostOptimiser( - input_measures, - min_gain=sap_gain - ) - - optimiser.setup() - optimiser.solve() - solution = optimiser.solution - - selected_recommendations = {r["id"] for r in solution} - - if property_required_measures: - # We select the cheapest of the required measures, into selected - for recs in property_required_measures: - # We select the cheapest of the required measures - cost_to_id = { - rec["recommendation_id"]: rec["total"] for rec in recs - if rec["recommendation_id"] not in selected_recommendations - } - # Take the recommendation id with the lowers cost - - selected_recommendations.add(min(cost_to_id, key=cost_to_id.get)) - # Update the solution with the selected recommendaitons - solution = [] - for recs in recommendations[p.id]: - for rec in recs: - if rec["recommendation_id"] in selected_recommendations: - solution.append( - { - "id": rec["recommendation_id"], - "cost": rec["total"], - "gain": rec["sap_points"], - "type": rec["type"] - } - ) - - # If wall insulation is selected, we also include mechanical ventilation as a best practice measure - if any(x in [r["type"] for r in solution] for x in assumptions.measures_needing_ventilation): - ventilation_rec = next( - (r[0] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"), - None - ) - - # If a matching recommendation was found, add its ID to the selected recommendations - if ventilation_rec: - selected_recommendations.add(ventilation_rec["recommendation_id"]) - - # If we have a trickle vents recommendation, we also switch it on. We don't just check the solution - trickle_vents_rec = next( - (r[0] for r in recommendations[p.id] if r[0]["type"] == "trickle_vents"), - None - ) - # If a matching recommendation was found, add its ID to the selected recommendations - if trickle_vents_rec: - selected_recommendations.add(trickle_vents_rec["recommendation_id"]) - - # We'll use the set of selected recommendations to filter the recommendations to upload - final_recommendations = [ - [ - {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} - for rec in recommendations_by_type - ] - for recommendations_by_type in recommendations[p.id] - ] - - # We'll also unlist the recommendations so they're a bit easier to handle from here onwards - recommendations[p.id] = [ - rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type - ] - - # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all - # of them - # TODO: We can probably do better and optimise at the building level - this is temp - logger.info("Adjusting solar PV recommendations for buildings") - building_ids = set([p.building_id for p in input_properties if p.building_id is not None]) - - for bid in building_ids: - # We check if any of them have solar PV - building = [p for p in input_properties if p.building_id == bid] - has_solar = False - for unit in building: - # Get default recommendations - has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0 - if has_solar: - break - - if has_solar: - # We adjust the units within the building - for unit in building: - for rec in recommendations[unit.id]: - if rec["type"] == "solar_pv": - # This is straightforward, we just set the default to True, since when we're at a building - # level, we only allow 1 solar PV option for each unit. If we change this, this logic will - # need to be updated - rec["default"] = True - - # ~~~~~~~~~~~~~~~~ - # Funding - # ~~~~~~~~~~~~~~~~ - - # for p in input_properties: - # funding_calulator = Funding( - # tenure=body.housing_type, - # starting_epc=p.data["current-energy-rating"], - # starting_sap=int(p.data["current-energy-efficiency"]), - # postcode=p.postcode, - # floor_area=p.floor_area, - # council_tax_band=None, # This is seemingly always None at the moment - # property_recommendations=recommendations[p.id], - # project_scores_matrix=eco_project_scores_matrix, - # whlg_eligible_postcodes=whlg_eligible_postcodes, - # gbis_abs_rate=15, - # eco4_abs_rate=15, - # ) - # funding_calulator.check_eligibiltiy() - # # Insert finding - # p.insert_funding(funding_calulator) - - logger.info("Uploading recommendations to the database") - # If we have any work to do, we create a new scenario - engine_scenario = create_scenario( - session=session, - scenario={ - "name": body.scenario_name, - "created_at": created_at, - "budget": body.budget, - "portfolio_id": body.portfolio_id, - "housing_type": body.housing_type, - "goal": body.goal, - "trigger_file_path": body.trigger_file_path, - "already_installed_file_path": body.already_installed_file_path, - "patches_file_path": body.patches_file_path, - "non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path, - "exclusions": body.exclusions, - "multi_plan": body.multi_plan - } - ) - - property_valuation_increases = [] - session.commit() - new_epc_bands = {} - property_value_increase_ranges = {} - for i in range(0, len(input_properties), BATCH_SIZE): - try: - # Take a slice of the input_properties list to make a batch - batch_properties = input_properties[i:i + BATCH_SIZE] - - for p in batch_properties: - recommendations_to_upload = recommendations.get(p.id, []) - default_recommendations = [r for r in recommendations_to_upload if r["default"]] - total_sap_points = sum([r["sap_points"] for r in default_recommendations]) - new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points - new_epc = sap_to_epc(new_sap_points) - new_epc_bands[p.id] = new_epc - - total_cost = sum([r["total"] for r in default_recommendations]) - - valuations = PropertyValuation.estimate( - property_instance=p, target_epc=new_epc, total_cost=total_cost - ) - property_value_increase_ranges[p.id] = valuations - - if p.is_new: - property_details_epc = p.get_property_details_epc( - portfolio_id=body.portfolio_id, rating_lookup=rating_lookup, - ) - create_property_details_epc(session, property_details_epc) - - update_or_create_property_spatial_details(session, p.uprn, p.spatial) - - property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) - - update_property_data( - session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data - ) - - if not recommendations_to_upload: - continue - - new_plan_id = create_plan(session, { - "portfolio_id": body.portfolio_id, - "property_id": p.id, - "scenario_id": engine_scenario.id, - "is_default": True if p.is_new else False, - "name": body.scenario_name, - "valuation_increase_lower_bound": ( - valuations["lower_bound_increased_value"] - valuations["current_value"] - ), - "valuation_increase_upper_bound": ( - valuations["upper_bound_increased_value"] - valuations["current_value"] - ), - "valuation_increase_average": ( - valuations["average_increased_value"] - valuations["current_value"] - ), - }) - - upload_recommendations( - session, recommendations_to_upload, p.id, new_plan_id - ) - - property_valuation_increases.append( - valuations["average_increased_value"] - valuations["current_value"] - ) - - # Commit the session after each batch - session.commit() - - except Exception as e: - # Rollback the session if an error occurs - session.rollback() - print("Failed i = %s" % str(i)) - logger.error(f"An error occurred during batch starting at index {i}: {e}") - - logger.info("Creating portfolio aggregations") - # We implement this in the simplest way possible which will be just to query the database for all - # recommendations associated to the portfolio and then aggregate them. This is not the most efficient - # way to do this, but it's the simplest and will be a process that we can re-use since when we change a - # recommendation from being default to not default, we'll need to re-run this process to re-calculate the - # the portfolion level impact - - total_valuation_increase = sum(property_valuation_increases) - labour_days = round(max( - [sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()] - )) - - aggregated_data = extract_portfolio_aggregation_data( - input_properties=input_properties, - total_valuation_increase=total_valuation_increase, - recommendations=recommendations, - new_epc_bands=new_epc_bands, - property_value_increase_ranges=property_value_increase_ranges - ) - - aggregate_portfolio_recommendations( - session, - portfolio_id=body.portfolio_id, - scenario_id=engine_scenario.id, - total_valuation_increase=total_valuation_increase, - labour_days=labour_days, - aggregated_data=aggregated_data - ) - - # Commit final changes - session.commit() - - except IntegrityError: - logger.error("Database integrity error occurred", exc_info=True) - session.rollback() - return Response(status_code=500, content="Database integrity error.") - except OperationalError: - logger.error("Database operational error occurred", exc_info=True) - session.rollback() - return Response(status_code=500, content="Database operational error.") - except ValueError: - logger.error("Value error - possibly due to malformed data", exc_info=True) - session.rollback() - return Response(status_code=400, content="Bad request: malformed data.") - except Exception as e: # General exception handling - logger.error(f"An error occurred: {e}") - session.rollback() - return Response(status_code=500, content="An unexpected error occurred.") - finally: - session.close() - - logger.info("Model Engine completed successfully") - - return Response(status_code=200) diff --git a/backend/lambda/README.md b/backend/lambda/README.md new file mode 100644 index 00000000..b5c42f11 --- /dev/null +++ b/backend/lambda/README.md @@ -0,0 +1,8 @@ +# Model Engine Lambda + +This repository contains the code for the Model Engine Lambda, which is responsible for serving machine learning models +in a serverless environment. The Lambda function is designed to handle requests for model inference and return the +results to the client. + +This service consumes an SQS queue and is triggered by messages sent to the queue. The Lambda function processes the +messages, performs model inference, and sends the results back to the client. \ No newline at end of file diff --git a/backend/lambda/handler.py b/backend/lambda/handler.py new file mode 100644 index 00000000..44d62d8f --- /dev/null +++ b/backend/lambda/handler.py @@ -0,0 +1,13 @@ +import json + + +def handler(event, context): + """ + This is an ascynchonous lambda handler that will run the model engine + :param event: + :param context: + :return: + """ + for record in event["Records"]: + body = json.loads(record["body"]) + asyncio.run(model_engine(body)) diff --git a/serverless.yml b/serverless.yml index abca5ade..116c2e29 100644 --- a/serverless.yml +++ b/serverless.yml @@ -1,8 +1,9 @@ -service: fastapi-lambda +service: retrofit-platform provider: name: aws region: eu-west-2 + runtime: python3.11 architecture: x86_64 environment: API_KEY: ${env:API_KEY} @@ -23,62 +24,109 @@ provider: SAP_PREDICTIONS_BUCKET: ${env:SAP_PREDICTIONS_BUCKET} CARBON_PREDICTIONS_BUCKET: ${env:CARBON_PREDICTIONS_BUCKET} HEAT_PREDICTIONS_BUCKET: ${env:HEAT_PREDICTIONS_BUCKET} - # LIGHTING_COST_PREDICTIONS_BUCKET: ${env:LIGHTING_COST_PREDICTIONS_BUCKET} - # HEATING_COST_PREDICTIONS_BUCKET: ${env:HEATING_COST_PREDICTIONS_BUCKET} - # HOT_WATER_COST_PREDICTIONS_BUCKET: ${env:HOT_WATER_COST_PREDICTIONS_BUCKET} HEATING_KWH_PREDICTIONS_BUCKET: ${env:HEATING_KWH_PREDICTIONS_BUCKET} HOTWATER_KWH_PREDICTIONS_BUCKET: ${env:HOTWATER_KWH_PREDICTIONS_BUCKET} ENERGY_ASSESSMENTS_BUCKET: ${env:ENERGY_ASSESSMENTS_BUCKET} GOOGLE_SOLAR_API_KEY: ${env:GOOGLE_SOLAR_API_KEY} - # Give lambda access to read from the bucket - iam: - role: - name: fastapi_backend_${env:PLAN_TRIGGER_BUCKET}_access - statements: - - Effect: Allow - Action: - - s3:GetObject - - s3:ListBucket - Resource: - - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET} - - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET}/* - - Effect: Allow - Action: - - s3:* - Resource: - - arn:aws:s3:::${env:PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:PREDICTIONS_BUCKET}/* - - arn:aws:s3:::${env:DATA_BUCKET} - - arn:aws:s3:::${env:DATA_BUCKET}/* - - arn:aws:s3:::${env:ENERGY_ASSESSMENTS_BUCKET} - - arn:aws:s3:::${env:ENERGY_ASSESSMENTS_BUCKET}/* - - arn:aws:s3:::${env:SAP_PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:SAP_PREDICTIONS_BUCKET}/* - - arn:aws:s3:::${env:CARBON_PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:CARBON_PREDICTIONS_BUCKET}/* - - arn:aws:s3:::${env:HEAT_PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:HEAT_PREDICTIONS_BUCKET}/* - - arn:aws:s3:::${env:HEATING_KWH_PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:HEATING_KWH_PREDICTIONS_BUCKET}/* - - arn:aws:s3:::${env:HOTWATER_KWH_PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:HOTWATER_KWH_PREDICTIONS_BUCKET}/* - + ENGINE_SQS_URL: + Ref: EngineQueue plugins: + - serverless-python-requirements - serverless-domain-manager custom: + pythonRequirements: + fileName: backend/requirements/requirements.txt + dockerizePip: true customDomain: domainName: api.${self:provider.environment.DOMAIN_NAME} createRoute53Record: true certificateArn: ${ssm:/ssl_certificate_arn} functions: - app: - image: - uri: ${env:ECR_URI}:${env:GITHUB_SHA} + + fastapi-backend: + handler: backend.app.main.handler + timeout: 30 + memorySize: 512 events: - http: path: /{proxy+} method: ANY - timeout: 900 # Max timeout to 15 mins for engine runs \ No newline at end of file + iamRoleStatements: + - Effect: Allow + Action: + - sqs:SendMessage + Resource: + - Fn::GetAtt: [ EngineQueue, Arn ] + - Effect: Allow + Action: + - s3:GetObject + - s3:ListBucket + Resource: + - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET} + - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET}/* + - Effect: Allow + Action: + - s3:GetObject + Resource: + - arn:aws:s3:::${env:DATA_BUCKET}/* + - arn:aws:s3:::${env:ENERGY_ASSESSMENTS_BUCKET}/* + - arn:aws:s3:::${env:SAP_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:CARBON_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HEAT_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HEATING_KWH_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HOTWATER_KWH_PREDICTIONS_BUCKET}/* + + model-engine-lambda: + image: + uri: ${env:ECR_URI}:${env:GITHUB_SHA} + timeout: 900 + memorySize: 2048 + events: + - sqs: + arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue + batchSize: 1 + iamRoleStatements: + - Effect: Allow + Action: + - sqs:ReceiveMessage + - sqs:DeleteMessage + - sqs:GetQueueAttributes + Resource: + - Fn::GetAtt: [ EngineQueue, Arn ] + - Effect: Allow + Action: + - s3:GetObject + - s3:ListBucket + Resource: + - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET} + - arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET}/* + - Effect: Allow + Action: + - s3:* + Resource: + - arn:aws:s3:::${env:PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:DATA_BUCKET} + - arn:aws:s3:::${env:DATA_BUCKET}/* + - arn:aws:s3:::${env:ENERGY_ASSESSMENTS_BUCKET} + - arn:aws:s3:::${env:ENERGY_ASSESSMENTS_BUCKET}/* + - arn:aws:s3:::${env:SAP_PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:SAP_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:CARBON_PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:CARBON_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HEAT_PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:HEAT_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HEATING_KWH_PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:HEATING_KWH_PREDICTIONS_BUCKET}/* + - arn:aws:s3:::${env:HOTWATER_KWH_PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:HOTWATER_KWH_PREDICTIONS_BUCKET}/* + +resources: + Resources: + EngineQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: model-engine-queue