Model/backend/app/plan/router.py
2023-10-10 13:29:05 +08:00

341 lines
14 KiB
Python

from datetime import datetime
import pandas as pd
from epc_api.client import EpcClient
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data
)
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import (
create_recommendation_scoring_data, filter_materials, get_cleaned, insert_temp_recommendation_id
)
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3
from backend.ml_models.sap_change_model.api import SAPChangeModelAPI
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.WallRecommendations import WallRecommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
logger = setup_logger()
router = APIRouter(
prefix="/plan",
tags=["plan"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
# Create a record in db
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC, and spatial data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
materials_by_type = filter_materials(materials)
cleaned = get_cleaned()
logger.info("Getting components and epc recommendations")
# TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
recommendations_scoring_data = []
for p in input_properties:
property_recommendations = []
# Property recommendations
p.get_components(cleaned)
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
materials=materials_by_type["floor"],
)
floor_recommender.recommend()
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
# Wall recommendations
wall_recomender = WallRecommendations(
property_instance=p,
materials=materials_by_type["walls"]
)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = insert_temp_recommendation_id(property_recommendations)
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = create_recommendation_scoring_data(
property=p,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
recommendations_scoring_data.append(scoring_dict)
# cleanup
del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Perform the same cleaning as in the model
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
).drop(columns=["LOCAL_AUTHORITY"])
recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
recommendations_scoring_data, [
c for c in recommendations_scoring_data.columns if
("thermal_transmittance" in c) or ("insulation_thickness" in c)
]
)
sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at)
file_location = sap_change_model_api.upload_scoring_data(
df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET
)
response = sap_change_model_api.predict(
file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location,
)
# Retrieve the predictions
predictions = pd.DataFrame(
read_csv_from_s3(
bucket_name=get_settings().PREDICTIONS_BUCKET,
filepath=response["storage_filepath"].split(get_settings().PREDICTIONS_BUCKET + "/")[1]
)
)
predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(1)
predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True)
# Insert the predictions into the recommendations and run the optimiser
logger.info("Storing recommendations")
for property_id in recommendations.keys():
property = [p for p in input_properties if p.id == property_id][0]
property_predictions = predictions[predictions["property_id"] == str(property_id)]
for recommendations_by_type in recommendations[property_id]:
for rec in recommendations_by_type:
rec["sap_points"] = property_predictions[property_predictions["recommendation_id"] == str(
rec["recommendation_id"]
)]["RDSAP_CHANGE"].values[0]
if rec["sap_points"] is None:
raise ValueError("Sap points missing")
input_measures = prepare_input_measures(recommendations[property_id], body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(property.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures, min_gain=target_sap_points - current_sap_points
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# We'll use the set of selected recommendations to filter the recommendations to upload
final_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in recommendations[property_id]
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
final_recommendations = [
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
]
# We update recommendations[property_id]
recommendations[property_id] = final_recommendations
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id,
rating_lookup=rating_lookup)
create_property_details_epc(session, property_details_epc)
property_data = p.get_full_property_data()
update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Create a plan
new_plan_id = create_plan(
session,
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
# Upload recommendations
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
# Finally, match the recommendation to the plan
create_plan_recommendations(
session,
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id)
# Commit all changes at once
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)