diff --git a/backend/app/requirements/dev.txt b/backend/app/requirements/dev.txt new file mode 100644 index 00000000..a466954c --- /dev/null +++ b/backend/app/requirements/dev.txt @@ -0,0 +1,4 @@ +pytest +mock +pytest-cov +pytest-mock diff --git a/backend/app/requirements/requirements.txt b/backend/app/requirements/requirements.txt new file mode 100644 index 00000000..577776be --- /dev/null +++ b/backend/app/requirements/requirements.txt @@ -0,0 +1,33 @@ +# Pandas and numpy +numpy==2.1.2 +pandas==2.2.3 +pytz==2024.2 +six==1.16.0 +# tqdm +tqdm==4.66.5 +# fastapi +fastapi==0.115.2 +sqlalchemy==2.0.36 +pydantic-settings==2.6.0 +psycopg2-binary==2.9.10 +python-jose==3.3.0 +cryptography==43.0.3 +mangum==0.19.0 +# AWS +boto3==1.35.44 +# ML, Data Science +usaddress==0.5.11 +epc-api-python==1.0.2 +fuzzywuzzy==0.18.0 +python-Levenshtein==0.26.0 +textblob==0.18.0.post0 +msgpack==1.1.0 +scikit-learn==1.5.2 +cffi==1.15.1 +mip==1.15.0 +# Data +pyarrow==17.0.0 +fastparquet==2024.5.0 +aiohttp==3.10.10 +# find my epc +beautifulsoup4 diff --git a/backend/docker/engine.Dockerfile b/backend/docker/engine.Dockerfile new file mode 100644 index 00000000..e37dab38 --- /dev/null +++ b/backend/docker/engine.Dockerfile @@ -0,0 +1,50 @@ +# Pull base image +FROM python:3.11.10-slim-bullseye as build-image + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE 1 +ENV PYTHONUNBUFFERED 1 + +# Set work directory to the root of your project +WORKDIR var/task/Model + +# Install python dependencies +COPY ./backend/requirements/requirements.txt ./requirements.txt +# Install and clean up temp caches +RUN pip install --upgrade pip \ + && pip install -r backend/engine/requirements.txt && rm -rf /root/.cache + +# Since we are not using a base AWS image, there is some additional setup required. We need to set up the runtime +# interface client +# https://docs.aws.amazon.com/lambda/latest/dg/python-image.html#python-image-clients +# Additionally install the AWS Lambda RIC +RUN pip install awslambdaric + +# Second stage: "runtime-image" +FROM python:3.11.10-slim-bullseye + +# Create the extensions directory to avoid warnings with RIE +RUN mkdir -p /opt/extensions + +# Set work directory to the root of your project +WORKDIR /var/task/Model + +# Copy the python dependencies from the build-image +COPY --from=build-image /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/ + +# Copy project files +COPY ./backend/ ./backend +COPY ./recommendations/ ./recommendations +COPY ./utils/ ./utils/ +COPY ./etl/epc/ ./etl/epc/ +COPY ./etl/epc_clean/ ./etl/epc_clean/ +COPY ./etl/bill_savings/ ./etl/bill_savings/ +COPY ./etl/spatial/ ./etl/spatial/ +COPY ./BaseUtility.py ./BaseUtility.py +COPY ./datatypes/ ./datatypes/ +COPY ./etl/find_my_epc/ ./etl/find_my_epc/ + +# Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler +ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] +# Define the handler location +CMD ["backend.engine.handler.handler"] \ No newline at end of file diff --git a/backend/engine/README.md b/backend/engine/README.md new file mode 100644 index 00000000..b5c42f11 --- /dev/null +++ b/backend/engine/README.md @@ -0,0 +1,8 @@ +# Model Engine Lambda + +This repository contains the code for the Model Engine Lambda, which is responsible for serving machine learning models +in a serverless environment. The Lambda function is designed to handle requests for model inference and return the +results to the client. + +This service consumes an SQS queue and is triggered by messages sent to the queue. The Lambda function processes the +messages, performs model inference, and sends the results back to the client. \ No newline at end of file diff --git a/backend/engine/engine.py b/backend/engine/engine.py new file mode 100644 index 00000000..35b75bd8 --- /dev/null +++ b/backend/engine/engine.py @@ -0,0 +1,1050 @@ +import ast +import json +from datetime import datetime + +from tqdm import tqdm +import pandas as pd +from etl.epc.Record import EPCRecord +from backend.SearchEpc import SearchEpc +from sqlalchemy.exc import IntegrityError, OperationalError +from sqlalchemy.orm import sessionmaker +from starlette.responses import Response + +from backend.app.config import get_settings, get_prediction_buckets +from backend.app.db.connection import db_engine +from backend.app.db.functions.materials_functions import get_materials +from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations +from backend.app.db.functions.property_functions import ( + create_property, create_property_details_epc, create_property_targets, update_property_data, + update_or_create_property_spatial_details +) +from backend.app.db.functions.recommendations_functions import ( + create_plan, upload_recommendations, create_scenario +) +from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn +from backend.app.db.models.portfolio import rating_lookup +from backend.app.plan.schemas import PlanTriggerRequest +from backend.app.plan.utils import get_cleaned +from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc +import backend.app.assumptions as assumptions + +from backend.ml_models.api import ModelApi +from backend.Property import Property +from backend.Funding import Funding +from backend.apis.GoogleSolarApi import GoogleSolarApi + +from recommendations.optimiser.CostOptimiser import CostOptimiser +from recommendations.optimiser.GainOptimiser import GainOptimiser +from recommendations.optimiser.optimiser_functions import prepare_input_measures +from recommendations.Recommendations import Recommendations +from utils.logger import setup_logger +from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 +from backend.ml_models.Valuation import PropertyValuation + +from etl.bill_savings.KwhData import KwhData +from etl.spatial.OpenUprnClient import OpenUprnClient +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc + +logger = setup_logger() + +BATCH_SIZE = 5 +SCORING_BATCH_SIZE = 400 + + +def patch_epc(patch, epc_records): + """ + This utility function is useful to patch the epc data if we have data from the customer + :return: + """ + + for patch_variable, patch_value in patch.items(): + + if patch_variable in ["address", "postcode"]: + continue + + if patch_value == "": + continue + if patch_variable in epc_records["original_epc"]: + epc_records["original_epc"][patch_variable] = patch_value + + return epc_records + + +def extract_portfolio_aggregation_data( + input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges +): + # We aggregate a number of metrics for the portfolio: + # 1) A breakdown of the number of properties in each EPC band + # a) before retrofit + # b) after retrofit + # 2) Number of units + # 3) Co2/unit + # a) before retrofit + # b) after retrofit + # 4) Energy bill/unit + # a) before retrofit + # b) after retrofit + # 5) Average valuation improvement/unit + # 6) Total cost + # 7) Cost per unit + # 8) £ per CO2 saved + # 9) £ per SAP point + + # We need to construct the underlyind data for this + + # Helper function to reformat the EPC data + def reformat_epc_data(epc_counts): + # Define all possible EPC bands in the required order + epc_bands = ["G", "F", "E", "D", "C", "B", "A"] + + # Create the formatted data list by checking each band in the order + formatted_data = [] + for band in epc_bands: + # Get the count from the dictionary, defaulting to 0 if not present + count = epc_counts.get(band, 0) + # Append the formatted dictionary to the list + formatted_data.append({"name": band, band: count}) + + return formatted_data + + n_units = len(input_properties) + + agg_data = [] + for p in input_properties: + # Get the recommendations for the property - we include all properties, even ones without recommendations + property_recommendations = recommendations.get(p.id, []) + + # Get just the default recommendations + default_recommendations = [r for r in property_recommendations if r["default"]] + + has_recommendations = len(default_recommendations) > 0 + + # We can now calculate multiple outputs based on default recommendations + carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations]) + + pre_retrofit_co2 = p.energy["co2_emissions"] + post_retrofit_co2 = pre_retrofit_co2 - carbon_savings + + pre_retrofit_energy_bill = sum(p.current_energy_bill.values()) + post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum( + [r["energy_cost_savings"] for r in default_recommendations] + ) + + pre_retrofit_energy_consumption = p.current_energy_consumption + post_retrofit_energy_consumption = p.current_energy_consumption - sum( + [r["kwh_savings"] for r in default_recommendations] + ) + + # Add up energy savings + cost = sum([r["total"] for r in default_recommendations]) + sap_point_improvement = sum([r["sap_points"] for r in default_recommendations]) + + lower_bound_valuation_uplift = ( + property_value_increase_ranges[p.id]["lower_bound_increased_value"] - + property_value_increase_ranges[p.id]["current_value"] + ) + upper_bound_valuation_uplift = ( + property_value_increase_ranges[p.id]["upper_bound_increased_value"] - + property_value_increase_ranges[p.id]["current_value"] + ) + + agg_data.append({ + "pre_retrofit_epc": p.data["current-energy-rating"], + "post_retrofit_epc": new_epc_bands[p.id], + "pre_retrofit_co2": pre_retrofit_co2, + "post_retrofit_co2": post_retrofit_co2, + "pre_retrofit_energy_bill": pre_retrofit_energy_bill, + "post_retrofit_energy_bill": post_retrofit_energy_bill, + "pre_retrofit_energy_consumption": pre_retrofit_energy_consumption, + "post_retrofit_energy_consumption": post_retrofit_energy_consumption, + "cost": cost, + "sap_point_improvement": sap_point_improvement, + "lower_bound_valuation_uplift": lower_bound_valuation_uplift, + "upper_bound_valuation_uplift": upper_bound_valuation_uplift, + "has_recommendations": has_recommendations + }) + + agg_data = pd.DataFrame(agg_data) + + n_units_to_retrofit = agg_data["has_recommendations"].sum() + + valuation_improvement_lower_bound_per_unit = ( + agg_data["lower_bound_valuation_uplift"].mean() + ) + valuation_improvement_upper_bound_per_unit = ( + agg_data["upper_bound_valuation_uplift"].mean() + ) + + total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum() + total_sap_points = agg_data["sap_point_improvement"].sum() + + def format_money(amount): + return f"£{amount:,.0f}" + + valuation_improvment_per_unit = str( + format_money( + total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - " + f"{format_money(valuation_improvement_upper_bound_per_unit)})") + ) + + valuation_return_on_investment = str( + str(round(total_valuation_increase / agg_data["cost"].sum(), 2)) + + f" (" + f"{agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f} - " + f"{agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f})" + ) + + aggregation_data = { + "epc_breakdown_pre_retrofit": json.dumps( + reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict()) + ), + "epc_breakdown_post_retrofit": json.dumps( + reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict()) + ), + "number_of_properties": int(n_units), + "n_units_to_retrofit": int(n_units_to_retrofit), + "co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t", + "co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t", + "energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()), + "energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()), + "energy_consumption_per_unit_pre_retrofit": str( + round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh", + "energy_consumption_per_unit_post_retrofit": str( + round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh", + "valuation_improvement_per_unit": valuation_improvment_per_unit, + "cost_per_unit": format_money(agg_data["cost"].mean()), + "cost_per_co2_saved": format_money(agg_data["cost"].sum() / total_carbon_saved), + "cost_per_sap_point": format_money(agg_data["cost"].sum() / total_sap_points), + "valuation_return_on_investment": valuation_return_on_investment, + # TODO: Could we add 10yr carbon credits value? + } + + return aggregation_data + + +def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict): + """ + This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs + and will factor in an energy assessment that we have performed for a client. + :param epc_searcher: An instance of the SearchEpc class + :param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment, + this should be an empty response as defined by the models's + EnergyAssessment.empty_response() method + """ + + newest_epc = epc_searcher.newest_epc.copy() + if newest_epc["uprn"] == "" and epc_searcher.uprn: + newest_epc["uprn"] = epc_searcher.uprn + + if not energy_assessment["epc"]: + energy_assessment_is_newer = False + return { + 'original_epc': newest_epc, + 'full_sap_epc': epc_searcher.full_sap_epc.copy(), + 'old_data': epc_searcher.older_epcs.copy(), + }, energy_assessment_is_newer + + epc = energy_assessment["epc"] + energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d") + + # We insert county into the epc, since right now this isn't something that we pull out from the energy + # assessment + for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]: + epc[col] = newest_epc[col] + + # We check if the energy assessment is newer than the newest EPC + if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]): + # In this case, our energy assessment is newer than the EPCs available for this property + energy_assessment_is_newer = True + return { + "original_epc": epc, + "full_sap_epc": epc_searcher.full_sap_epc.copy(), + "old_data": epc_searcher.older_epcs.copy() + [newest_epc] + }, energy_assessment_is_newer + + # We check if the EPC we have produced is contained in the set of EPCs done for the property + # We do this based on inspection-date and SAP + epc_in_historicals = [ + x for x in epc_searcher.older_epcs + [newest_epc] + if x["inspection-date"] == energy_assessment_date and + x["current-energy-efficiency"] == epc["current-energy-efficiency"] + ] + energy_assessment_is_newer = False + + if epc_in_historicals: + # Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest + return { + "original_epc": newest_epc, + "full_sap_epc": epc_searcher.full_sap_epc.copy(), + "old_data": epc_searcher.older_epcs.copy() + }, energy_assessment_is_newer + + # In this case, our EPC is older than the newest publically avaible one, but is not contained in + # the historicals, so it can't have been lodged, so we include it in the old data + return { + 'original_epc': newest_epc, + 'full_sap_epc': epc_searcher.full_sap_epc.copy(), + 'old_data': epc_searcher.older_epcs.copy() + [epc], + }, energy_assessment_is_newer + + +def get_request_property_data(body: PlanTriggerRequest): + """ + This function will read in the on-site data from the S3 bucket + :param body: The request body + :return: + """ + patches = [] + if body.patches_file_path: + patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path) + + already_installed = [] + if body.already_installed_file_path: + already_installed = read_csv_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path + ) + + non_invasive_recommendations = [] + if body.non_invasive_recommendations_file_path: + non_invasive_recommendations = read_csv_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path + ) + + valuation_data = [] + if body.valuation_file_path: + valuation_data = read_csv_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path + ) + + return patches, already_installed, non_invasive_recommendations, valuation_data + + +def extract_property_request_data( + config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn +): + patch_has_uprn = "uprn" in patches[0] if patches else True + if patch_has_uprn: + patch = next(( + x for x in patches if str(x["uprn"]) == str(config["uprn"]) + ), {}) + else: + patch = next(( + x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + ), {}) + + property_already_installed = next(( + x for x in already_installed if + (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + ), {}) + + # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN + # we need to check existence of uprn + has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False + if has_uprn: + has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None] + + if has_uprn: + property_non_invasive_recommendations = next(( + x for x in non_invasive_recommendations if + (str(x["uprn"]) == str(uprn)) + ), {}) + + # We patch the non-invasive recs that are ['cavity_extract_and_refill'] + else: + property_non_invasive_recommendations = next(( + x for x in non_invasive_recommendations if + (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + ), {}) + + if isinstance(property_non_invasive_recommendations.get("recommendations"), str): + property_non_invasive_recommendations["recommendations"] = ast.literal_eval( + property_non_invasive_recommendations["recommendations"] + ) + transformed = [] + for rec in property_non_invasive_recommendations["recommendations"]: + if isinstance(rec, str): + transformed.append({"type": rec, }) + else: + transformed.append(rec) + + property_non_invasive_recommendations["recommendations"] = transformed + + # Check if the valuation data has uprn + valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False + if valuation_has_uprn: + valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None] + + if valuation_has_uprn: + property_valution = next(( + float(x["valuation"]) for x in valuation_data if + (str(x["uprn"]) == str(uprn)) + ), None) + else: + property_valution = next(( + float(x["valuation"]) for x in valuation_data if + (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + ), None) + + return patch, property_already_installed, property_non_invasive_recommendations, property_valution + + +def get_funding_data(): + """ + This function retrieves the eco project scores matrix and the warm homes local grant funding data + :return: + """ + project_scores_matrix = read_csv_from_s3( + bucket_name=get_settings().DATA_BUCKET, + filepath="funding/ECO4 Full Project Scores Matrix.csv", + ) + project_scores_matrix = pd.DataFrame(project_scores_matrix) + project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings'] + project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float) + + whlg_eligible_postcodes = read_csv_from_s3( + bucket_name=get_settings().DATA_BUCKET, + filepath="funding/whlg eligible postcodes.csv", + ) + whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + + return project_scores_matrix, whlg_eligible_postcodes + + +async def model_engine(body: PlanTriggerRequest): + logger.info("Model Engine triggered with body: %s", body) + + logger.info("Connecting to db") + session = sessionmaker(bind=db_engine)() + created_at = datetime.now().isoformat() + + # TODO: if the measure is already installed, it should actually be the very first phase + + try: + session.begin() + logger.info("Getting the inputs") + plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) + # Check for duplicate UPRNS + input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")] + + if input_uprns: + # Check for dupes + if len(input_uprns) != len(set(input_uprns)): + raise ValueError("Duplicate UPRNs in the input data") + + # If we have patches or overrides, we should read them in here + patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body) + + cleaning_data = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", + ) + + input_properties = [] + for config in tqdm(plan_input): + # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly + uprn = config.get("uprn", None) + if uprn: + uprn = int(float(uprn)) + + epc_searcher = SearchEpc( + address1=config["address"], + postcode=config["postcode"], + uprn=uprn, + auth_token=get_settings().EPC_AUTH_TOKEN, + os_api_key="", + ) + epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) + epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) + # For the moment, our OS API access is unavailable, so we skip and interpolate + epc_searcher.find_property(skip_os=True) + + # We check for an energy assessment we have performed on this property: + energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn) + + # Create a record in db + property_id, is_new = create_property( + session=session, + portfolio_id=body.portfolio_id, + address=epc_searcher.address_clean, + postcode=epc_searcher.postcode_clean, + uprn=epc_searcher.uprn, + energy_assessment=energy_assessment + ) + if not is_new and not body.multi_plan: + continue + + if epc_searcher.newest_epc is None: + raise ValueError( + "No EPCs found for this property and did not estimate - likely need to provide a" + "property type and built form" + ) + + if is_new: + create_property_targets( + session, + property_id=property_id, + portfolio_id=body.portfolio_id, + epc_target=body.goal_value, + heat_demand_target=None + ) + + # If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that. + # Otherwise, we use the newest EPC + # energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that + # has been publically lodged + epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records( + epc_searcher, energy_assessment + ) + + patch, property_already_installed, property_non_invasive_recommendations, property_valuation = ( + extract_property_request_data( + config=config, + patches=patches, + already_installed=already_installed, + non_invasive_recommendations=non_invasive_recommendations, + valuation_data=valuation_data, + uprn=epc_searcher.uprn, + ) + ) + + # if we have a remote assment data type, we pull the additional data and include it + if body.event_type == "remote_assessment": + logger.info("Retrieving find my epc data") + for k in ["address", "address1"]: + epc_searcher.newest_epc[k] = epc_searcher.address_clean + + property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(epc_searcher.newest_epc) + # If we have a property type, this means when we pull the epc data, we might need to make a patch + + epc_records = patch_epc(patch, epc_records) + + prepared_epc = EPCRecord( + epc_records=epc_records, + run_mode="newdata", + cleaning_data=cleaning_data + ) + + input_properties.append( + Property( + id=property_id, + is_new=is_new, + address=epc_searcher.address_clean, + postcode=epc_searcher.postcode_clean, + epc_record=prepared_epc, + already_installed=property_already_installed, + property_valuation=property_valuation, + non_invasive_recommendations=property_non_invasive_recommendations, + energy_assessment=energy_assessment, + **Property.extract_kwargs(config), # TODO: Depraecate this + ) + ) + + if not input_properties: + return Response(status_code=204) + + # Set up model api and warm up the lambdas + model_api = ModelApi( + portfolio_id=body.portfolio_id, + timestamp=created_at, + prediction_buckets=get_prediction_buckets(), + max_retries=1 + ) + await model_api.async_warm_up_lambdas( + model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES + ) + + # The materials data could be cached or local so we don't need to make + # consistent requests to the backend for + # the same data + logger.info("Reading in materials and cleaned datasets") + materials = get_materials(session) + cleaned = get_cleaned() + eco_project_scores_matrix, whlg_eligible_postcodes = get_funding_data() + + kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) + + epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned) + + kwh_preds = await model_api.async_paginated_predictions( + data=epcs_for_scoring, + bucket=get_settings().DATA_BUCKET, + model_prefixes=model_api.KWH_MODEL_PREFIXES, + extract_ids=False, + batch_size=SCORING_BATCH_SIZE + ) + + # Insert the spatial data + logger.info("Getting spatial data") + input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET) + + [p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties] + + # TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour + # TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with + # extensions, since it doesn't seem to do a great job + + logger.info("Performing solar analysis") + + ofgem_consumption_averages = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, + file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet" + ) + + building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data( + input_properties=input_properties, + ofgem_consumption_averages=ofgem_consumption_averages, + body=body + ) + + input_properties = GoogleSolarApi.building_solar_analysis( + building_solar_config=building_solar_config, + input_properties=input_properties, + session=session, + google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY + ) + + input_properties = GoogleSolarApi.unit_solar_analysis( + unit_solar_config=unit_solar_config, + input_properties=input_properties, + session=session, + body=body, + google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY + ) + + logger.info("Identifying property recommendations") + recommendations = {} + recommendations_scoring_data = [] + representative_recommendations = {} + for p in tqdm(input_properties): + recommender = Recommendations( + property_instance=p, + materials=materials, + exclusions=body.exclusions, + inclusions=body.inclusions, + default_u_values=body.default_u_values + ) + property_recommendations, property_representative_recommendations = recommender.recommend() + + if not property_recommendations: + continue + + recommendations[p.id] = property_recommendations + representative_recommendations[p.id] = property_representative_recommendations + + p.create_base_difference_epc_record(cleaned_lookup=cleaned) + p.adjust_difference_record_with_recommendations( + property_recommendations, property_representative_recommendations + ) + + recommendations_scoring_data.extend(p.recommendations_scoring_data) + + # TODO: Make sure that number_habitable_rooms has been dropped + logger.info("Preparing data for scoring in sap change api") + recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) + + recommendations_scoring_data = recommendations_scoring_data.drop( + columns=[ + "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", + "carbon_ending" + ] + ) + + all_predictions = await model_api.async_paginated_predictions( + data=recommendations_scoring_data, + bucket=get_settings().DATA_BUCKET, + batch_size=SCORING_BATCH_SIZE + ) + + # Insert the predictions into the recommendations, and get the impact summary + scoring_epcs = [] # For scoring the kwh models + for property_id in recommendations.keys(): + property_instance = [p for p in input_properties if p.id == property_id][0] + + recommendations_with_impact, impact_summary = ( + Recommendations.calculate_recommendation_impact( + property_instance=property_instance, + all_predictions=all_predictions, + recommendations=recommendations, + representative_recommendations=representative_recommendations + ) + ) + + # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc + # at each phase + property_instance.update_simulation_epcs(impact_summary) + scoring_epcs.extend(property_instance.updated_simulation_epcs) + recommendations[property_id] = recommendations_with_impact + + # We call the API with the scoring epcs + scoring_epcs = pd.DataFrame(scoring_epcs) + scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned) + + kwh_simulation_predictions = await model_api.async_paginated_predictions( + data=scoring_epcs, + bucket=get_settings().DATA_BUCKET, + model_prefixes=model_api.KWH_MODEL_PREFIXES, + batch_size=SCORING_BATCH_SIZE + ) + + # We now insert kwh estimates and costs into the recommendations + logger.info("Calculating tenant savings - kwh and bills") + for property_id in tqdm([p.id for p in input_properties]): + property_recommendations = recommendations.get(property_id, []) + property_instance = [p for p in input_properties if p.id == property_id][0] + + property_current_energy_bill = ( + Recommendations.calculate_recommendation_tenant_savings( + property_instance=property_instance, + kwh_simulation_predictions=kwh_simulation_predictions, + property_recommendations=property_recommendations, + ashp_cop=body.ashp_cop + ) + ) + property_instance.current_energy_bill = property_current_energy_bill + + # Insert the predictions into the recommendations and run the optimiser + for p in input_properties: + if not recommendations.get(p.id): + continue + + # we need to double unlist because we have a list of lists + property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} + + property_required_measures = [ + m for m in recommendations[p.id] if m[0]["type"] in body.required_measures + ] + measures_to_optimise = [ + m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures + ] + + # If we have a wall insulation measure, we MUST include mechanical ventilation + # Additionally, if we have required measures, they should also be included. Therefore + # we can discount the number of points required to get to the target SAP band (or increase) + # in the case of ventilation + needs_ventilation = any(x in property_measure_types for x in assumptions.measures_needing_ventilation) + + input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation) + + if not input_measures[0]: + # This means that we have no defaults + selected_recommendations = {} + solution = [] + else: + + fixed_gain = 0 + if property_required_measures: + # We get the SAP points for the required measures + if body.goal != "Increasing EPC": + raise NotImplementedError("Only EPC optimisation is currently supported") + sap_by_type = [ + {"type": rec["type"], "sap_points": rec["sap_points"]} for recs in property_required_measures + for rec in recs + ] + # We get a MAX sap points per type + max_per_type = ( + pd.DataFrame(sap_by_type).groupby("type")["sap_points"].max().to_dict() + ) + fixed_gain = sum(max_per_type.values()) + + property_required_measure_types = {rec["type"] for rec in sap_by_type} + + # if the property needs ventilation, but the measure we optimise didn't include + # venilation we add the points for ventilation as a fixed gain + if needs_ventilation and any( + r in property_required_measure_types for r in assumptions.measures_needing_ventilation + ): + fixed_gain += next( + (r[0]["sap_points"] for r in recommendations[p.id] if + r[0]["type"] == "mechanical_ventilation"), + 0 + ) + + current_sap_points = int(p.data["current-energy-efficiency"]) + + sap_gain = CostOptimiser.calculate_sap_gain_with_slack( + epc_to_sap_lower_bound(body.goal_value) - current_sap_points + ) - fixed_gain + + if not body.optimise: + if body.goal != "Increasing EPC": + raise NotImplementedError("Only EPC optimisation is currently supported") + solution = [] + for sub_list in input_measures: + # Select the entry with the highest gain, and if tied, choose the one with the lowest cost + best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost'])) + solution.append(best_measure) + else: + + if body.budget: + optimiser = GainOptimiser( + input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0 + ) + else: + # The minimum gain is the minimum number of SAP points required to get to the target SAP band + # If the gain is negative, the optimiser will return an empty solution + optimiser = CostOptimiser( + input_measures, + min_gain=sap_gain + ) + + optimiser.setup() + optimiser.solve() + solution = optimiser.solution + + selected_recommendations = {r["id"] for r in solution} + + if property_required_measures: + # We select the cheapest of the required measures, into selected + for recs in property_required_measures: + # We select the cheapest of the required measures + cost_to_id = { + rec["recommendation_id"]: rec["total"] for rec in recs + if rec["recommendation_id"] not in selected_recommendations + } + # Take the recommendation id with the lowers cost + + selected_recommendations.add(min(cost_to_id, key=cost_to_id.get)) + # Update the solution with the selected recommendaitons + solution = [] + for recs in recommendations[p.id]: + for rec in recs: + if rec["recommendation_id"] in selected_recommendations: + solution.append( + { + "id": rec["recommendation_id"], + "cost": rec["total"], + "gain": rec["sap_points"], + "type": rec["type"] + } + ) + + # If wall insulation is selected, we also include mechanical ventilation as a best practice measure + if any(x in [r["type"] for r in solution] for x in assumptions.measures_needing_ventilation): + ventilation_rec = next( + (r[0] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"), + None + ) + + # If a matching recommendation was found, add its ID to the selected recommendations + if ventilation_rec: + selected_recommendations.add(ventilation_rec["recommendation_id"]) + + # If we have a trickle vents recommendation, we also switch it on. We don't just check the solution + trickle_vents_rec = next( + (r[0] for r in recommendations[p.id] if r[0]["type"] == "trickle_vents"), + None + ) + # If a matching recommendation was found, add its ID to the selected recommendations + if trickle_vents_rec: + selected_recommendations.add(trickle_vents_rec["recommendation_id"]) + + # We'll use the set of selected recommendations to filter the recommendations to upload + final_recommendations = [ + [ + {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} + for rec in recommendations_by_type + ] + for recommendations_by_type in recommendations[p.id] + ] + + # We'll also unlist the recommendations so they're a bit easier to handle from here onwards + recommendations[p.id] = [ + rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type + ] + + # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all + # of them + # TODO: We can probably do better and optimise at the building level - this is temp + logger.info("Adjusting solar PV recommendations for buildings") + building_ids = set([p.building_id for p in input_properties if p.building_id is not None]) + + for bid in building_ids: + # We check if any of them have solar PV + building = [p for p in input_properties if p.building_id == bid] + has_solar = False + for unit in building: + # Get default recommendations + has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0 + if has_solar: + break + + if has_solar: + # We adjust the units within the building + for unit in building: + for rec in recommendations[unit.id]: + if rec["type"] == "solar_pv": + # This is straightforward, we just set the default to True, since when we're at a building + # level, we only allow 1 solar PV option for each unit. If we change this, this logic will + # need to be updated + rec["default"] = True + + # ~~~~~~~~~~~~~~~~ + # Funding + # ~~~~~~~~~~~~~~~~ + + # for p in input_properties: + # funding_calulator = Funding( + # tenure=body.housing_type, + # starting_epc=p.data["current-energy-rating"], + # starting_sap=int(p.data["current-energy-efficiency"]), + # postcode=p.postcode, + # floor_area=p.floor_area, + # council_tax_band=None, # This is seemingly always None at the moment + # property_recommendations=recommendations[p.id], + # project_scores_matrix=eco_project_scores_matrix, + # whlg_eligible_postcodes=whlg_eligible_postcodes, + # gbis_abs_rate=15, + # eco4_abs_rate=15, + # ) + # funding_calulator.check_eligibiltiy() + # # Insert finding + # p.insert_funding(funding_calulator) + + logger.info("Uploading recommendations to the database") + # If we have any work to do, we create a new scenario + engine_scenario = create_scenario( + session=session, + scenario={ + "name": body.scenario_name, + "created_at": created_at, + "budget": body.budget, + "portfolio_id": body.portfolio_id, + "housing_type": body.housing_type, + "goal": body.goal, + "trigger_file_path": body.trigger_file_path, + "already_installed_file_path": body.already_installed_file_path, + "patches_file_path": body.patches_file_path, + "non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path, + "exclusions": body.exclusions, + "multi_plan": body.multi_plan + } + ) + + property_valuation_increases = [] + session.commit() + new_epc_bands = {} + property_value_increase_ranges = {} + for i in range(0, len(input_properties), BATCH_SIZE): + try: + # Take a slice of the input_properties list to make a batch + batch_properties = input_properties[i:i + BATCH_SIZE] + + for p in batch_properties: + recommendations_to_upload = recommendations.get(p.id, []) + default_recommendations = [r for r in recommendations_to_upload if r["default"]] + total_sap_points = sum([r["sap_points"] for r in default_recommendations]) + new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points + new_epc = sap_to_epc(new_sap_points) + new_epc_bands[p.id] = new_epc + + total_cost = sum([r["total"] for r in default_recommendations]) + + valuations = PropertyValuation.estimate( + property_instance=p, target_epc=new_epc, total_cost=total_cost + ) + property_value_increase_ranges[p.id] = valuations + + if p.is_new: + property_details_epc = p.get_property_details_epc( + portfolio_id=body.portfolio_id, rating_lookup=rating_lookup, + ) + create_property_details_epc(session, property_details_epc) + + update_or_create_property_spatial_details(session, p.uprn, p.spatial) + + property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) + + update_property_data( + session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data + ) + + if not recommendations_to_upload: + continue + + new_plan_id = create_plan(session, { + "portfolio_id": body.portfolio_id, + "property_id": p.id, + "scenario_id": engine_scenario.id, + "is_default": True if p.is_new else False, + "name": body.scenario_name, + "valuation_increase_lower_bound": ( + valuations["lower_bound_increased_value"] - valuations["current_value"] + ), + "valuation_increase_upper_bound": ( + valuations["upper_bound_increased_value"] - valuations["current_value"] + ), + "valuation_increase_average": ( + valuations["average_increased_value"] - valuations["current_value"] + ), + }) + + upload_recommendations( + session, recommendations_to_upload, p.id, new_plan_id + ) + + property_valuation_increases.append( + valuations["average_increased_value"] - valuations["current_value"] + ) + + # Commit the session after each batch + session.commit() + + except Exception as e: + # Rollback the session if an error occurs + session.rollback() + print("Failed i = %s" % str(i)) + logger.error(f"An error occurred during batch starting at index {i}: {e}") + + logger.info("Creating portfolio aggregations") + # We implement this in the simplest way possible which will be just to query the database for all + # recommendations associated to the portfolio and then aggregate them. This is not the most efficient + # way to do this, but it's the simplest and will be a process that we can re-use since when we change a + # recommendation from being default to not default, we'll need to re-run this process to re-calculate the + # the portfolion level impact + + total_valuation_increase = sum(property_valuation_increases) + labour_days = round(max( + [sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()] + )) + + aggregated_data = extract_portfolio_aggregation_data( + input_properties=input_properties, + total_valuation_increase=total_valuation_increase, + recommendations=recommendations, + new_epc_bands=new_epc_bands, + property_value_increase_ranges=property_value_increase_ranges + ) + + aggregate_portfolio_recommendations( + session, + portfolio_id=body.portfolio_id, + scenario_id=engine_scenario.id, + total_valuation_increase=total_valuation_increase, + labour_days=labour_days, + aggregated_data=aggregated_data + ) + + # Commit final changes + session.commit() + + except IntegrityError: + logger.error("Database integrity error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database integrity error.") + except OperationalError: + logger.error("Database operational error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database operational error.") + except ValueError: + logger.error("Value error - possibly due to malformed data", exc_info=True) + session.rollback() + return Response(status_code=400, content="Bad request: malformed data.") + except Exception as e: # General exception handling + logger.error(f"An error occurred: {e}") + session.rollback() + return Response(status_code=500, content="An unexpected error occurred.") + finally: + session.close() + + logger.info("Model Engine completed successfully") + + return Response(status_code=200) diff --git a/backend/engine/handler.py b/backend/engine/handler.py new file mode 100644 index 00000000..fdf48db3 --- /dev/null +++ b/backend/engine/handler.py @@ -0,0 +1,20 @@ +import json +import asyncio +from backend.engine.engine import model_engine +from backend.app.plan.schemas import PlanTriggerRequest +from utils.logger import setup_logger + +logger = setup_logger() + + +def handler(event, context): + """ + Lambda handler that triggers the model engine for each SQS message. + """ + for record in event.get("Records", []): + try: + body_dict = json.loads(record["body"]) + payload = PlanTriggerRequest.model_validate(body_dict) + asyncio.run(model_engine(payload)) + except Exception as e: + logger.error(f"Failed to process record: {e}") diff --git a/backend/engine/requirements.txt b/backend/engine/requirements.txt new file mode 100644 index 00000000..f5e1b5f6 --- /dev/null +++ b/backend/engine/requirements.txt @@ -0,0 +1,25 @@ +# Pandas and numpy +numpy==2.1.2 +pandas==2.2.3 +pytz==2024.2 +six==1.16.0 +# tqdm +tqdm==4.66.5 +# AWS +boto3==1.35.44 +# ML, Data Science +usaddress==0.5.11 +epc-api-python==1.0.2 +fuzzywuzzy==0.18.0 +python-Levenshtein==0.26.0 +textblob==0.18.0.post0 +msgpack==1.1.0 +scikit-learn==1.5.2 +cffi==1.15.1 +mip==1.15.0 +# Data +pyarrow==17.0.0 +fastparquet==2024.5.0 +aiohttp==3.10.10 +# find my epc +beautifulsoup4