Model/backend/app/plan/router.py
2023-09-13 13:30:45 +01:00

558 lines
25 KiB
Python

from collections import defaultdict
from fastapi import APIRouter, Depends
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.utils import read_csv_from_s3
from backend.app.config import get_settings
from backend.Property import Property
from epc_api.client import EpcClient
from utils.logger import setup_logger
from utils.s3 import read_from_s3
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from utils.uvalue_estimates import classify_decile_newvalues
from backend.app.db.utils import row2dict
from starlette.responses import Response
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from datetime import datetime
import pandas as pd
import msgpack
# model apis
from backend.ml_models.sap_change_model.api import SAPChangeModelAPI
# database interaction functions
from backend.app.db.functions.property_functions import (
create_property, create_property_targets, update_property_data, create_property_details_epc
)
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
from backend.app.plan.columntypes import columntypes
from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3
from model_data.optimiser.optimiser_functions import prepare_input_measures
from model_data.simulation_system.core.DataProcessor import DataProcessor
from model_data.simulation_system.core.Settings import (
FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON
)
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
from backend.app.plan.uvalue_estimates_floors import uvalue_estimates_floors
logger = setup_logger()
router = APIRouter(
prefix="/plan",
tags=["plan"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
# TODO: Load this data from db
open_uprn_data = [
{'UPRN': 6032920, 'X_COORDINATE': 535110.0, 'Y_COORDINATE': 181819.0, 'LATITUDE': 51.5191407,
'LONGITUDE': -0.0540506},
{'UPRN': 6038625, 'X_COORDINATE': 535374.0, 'Y_COORDINATE': 182784.0, 'LATITUDE': 51.5277492,
'LONGITUDE': -0.0498772},
{'UPRN': 34153991, 'X_COORDINATE': 523238.74, 'Y_COORDINATE': 178003.02, 'LATITUDE': 51.4875579,
'LONGITUDE': -0.226392},
{'UPRN': 10008299676, 'X_COORDINATE': 533285.0, 'Y_COORDINATE': 184711.0, 'LATITUDE': 51.5455629,
'LONGITUDE': -0.0792445},
{'UPRN': 10008299677, 'X_COORDINATE': 533285.0, 'Y_COORDINATE': 184711.0, 'LATITUDE': 51.5455629,
'LONGITUDE': -0.0792445},
{'UPRN': 100021039066, 'X_COORDINATE': 535506.0, 'Y_COORDINATE': 185624.0, 'LATITUDE': 51.5532385,
'LONGITUDE': -0.0468833},
{'UPRN': 100021226060, 'X_COORDINATE': 529247.0, 'Y_COORDINATE': 187959.0, 'LATITUDE': 51.5756908,
'LONGITUDE': -0.1362513},
{'UPRN': 200003489276, 'X_COORDINATE': 533210.0, 'Y_COORDINATE': 179442.0, 'LATITUDE': 51.4982309,
'LONGITUDE': -0.0823165}
]
in_conservation_area_data = [
{'uprn': 6032920, 'is_in_conservation_area': 'not_in_conservation_area'},
{'uprn': 6038625, 'is_in_conservation_area': 'not_in_conservation_area'},
{'uprn': 34153991, 'is_in_conservation_area': 'unknown'},
{'uprn': 10008299676, 'is_in_conservation_area': 'in_conservation_area'},
{'uprn': 10008299677, 'is_in_conservation_area': 'in_conservation_area'},
{'uprn': 100021039066, 'is_in_conservation_area': 'not_in_conservation_area'},
{'uprn': 100021226060, 'is_in_conservation_area': 'in_conservation_area'},
{'uprn': 200003489276, 'is_in_conservation_area': 'in_conservation_area'}
]
# TODO: db
floors_decile_data = {
'decile_labels': ['Decile 1', 'Decile 2', 'Decile 3', 'Decile 4', 'Decile 5', 'Decile 6', 'Decile 7', 'Decile 8',
'Decile 9', 'Decile 10'], 'decile_boundaries': [6., 50., 56., 69., 77.6, 87., 98., 112.,
127., 150., 2279.]}
walls_decile_data = {
'decile_labels': ['Decile 1', 'Decile 2', 'Decile 3', 'Decile 4', 'Decile 5', 'Decile 6', 'Decile 7', 'Decile 8',
'Decile 9', 'Decile 10'], 'decile_boundaries': [6., 49., 51., 55., 64., 71., 76., 83., 96.,
120., 2279.]}
lighting_averages = [
{'lighting-description': 'good lighting efficiency', 'low-energy-lighting': 99.26666666666667},
{'lighting-description': 'excellent lighting efficiency', 'low-energy-lighting': 100.0},
{'lighting-description': 'below average lighting efficiency', 'low-energy-lighting': 0.0}
]
def filter_materials(materials):
materials_by_type = defaultdict(list)
for material in materials:
material = row2dict(material)
material_type = material["type"]
materials_by_type[material_type].append(material)
# Optionally, you can convert the defaultdict to a normal dict if desired
materials_by_type = dict(materials_by_type)
return materials_by_type
def insert_temp_recommendation_id(property_recommendations):
"""
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
idx = 0
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = idx
idx += 1
return property_recommendations
def get_cleaned():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-{environment}".format(environment=get_settings().ENVIRONMENT)
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def score_measures():
"""
This wrapper function prepares data to be passed to the sap model api
:return:
"""
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
Session = sessionmaker(bind=db_engine)
session = Session()
try:
session.begin()
logger.info("Getting the inputs")
# Read in the trigger file from s3
bucket_name = get_settings().PLAN_TRIGGER_BUCKET
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
# Create a record in db
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
logger.info("Getting coordinates")
# This is placeholder, until the full dataset is loaded into the database
for p in input_properties:
coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0]
p.set_coordinates(coordinate_data)
logger.info("Check if property is in conservation area")
for p in input_properties:
in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get(
"is_in_conservation_area"
)
p.set_is_in_conservation_area(in_conservation_area)
# The materials data could be cached or local so we don't need to make
# consistent requrests to the backend for
# the same data
# TODO: It might not be the best choice to store the materials data in a database table since thi
# table probably won't be very large and won't be updated that often. It might be better to
# store this data in s3 load it into memory when the app starts up. We will test this
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
materials_by_type = filter_materials(materials)
cleaned = get_cleaned()
logger.info("Getting components and properties recommendations")
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
recommendations_scoring_data = []
for p in input_properties:
property_recommendations = []
# For each property, classiy floor area decide
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=floors_decile_data["decile_boundaries"],
decile_labels=floors_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# Property recommendations
p.get_components(cleaned)
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data[
"floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"],
)
floor_recommender.recommend()
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
# Wall recommendations
# We would make this u-value query directly to the database
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=walls_decile_data["decile_boundaries"],
decile_labels=walls_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
walls_u_value_estimate = [
x for x in uvalue_estimates_walls
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data[
"walls-energy-eff"] != 'N/A' else True) &
(x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True)
]
wall_recomender = WallRecommendations(
property_instance=p,
uvalue_estimates=walls_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"]
)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = insert_temp_recommendation_id(property_recommendations)
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
# Finally, we'll prepare data for predicting the impact on SAP
# TODO: We should use the cleaned data from get_components in the data rather than the raw
# values. We should create a method in Property which takes the EPC data and inserts the cleaned
# data
epc_data = p.data.copy()
epc_data = pd.DataFrame([epc_data])
epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns]
starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING")
ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING")
fixed_data = epc_data[FIXED_FEATURES]
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d")
scoring_map = {
'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)',
'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)',
'Solid, no insulation (assumed)': 'Solid, insulated (assumed)',
}
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
scoring_dict = {
"UPRN": p.data["uprn"],
"id": "+".join([str(p.id), str(rec["recommendation_id"])]),
"LOCAL_AUTHORITY": p.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# We update the description to indicate it's insulated
if rec["type"] == "wall_insulation":
scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]]
elif rec["type"] == "floor_insulation":
scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]]
else:
raise NotImplementedError("Implement me")
recommendations_scoring_data.append(scoring_dict)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Clean the data
logger.info("Reading in cleaning dataset from s3")
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET,
file_key="sap_change_model/cleaning_dataset.parquet",
)
cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"})
# Merge the cleaning data onto recommendations_scoring_data
recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
# Perform the same cleaning as in the model
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
)
recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"])
# Note: We might need to perform the full pre-processing here
data_processor = DataProcessor(filepath=None)
data_processor.insert_data(recommendations_scoring_data)
data_processor.remap_columns()
recommendations_scoring_data = data_processor.data
# Remap column types
recommendations_scoring_data = recommendations_scoring_data.astype(columntypes)
# Store parquet file in s3 for scoring
created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=body.portfolio_id,
timestamp=created_at
)
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=recommendations_scoring_data,
bucket_name=get_settings().DATA_BUCKET,
file_key=file_location
)
logger.info("Making request to sap change api")
sap_change_model_api = SAPChangeModelAPI()
response = sap_change_model_api.predict(
file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location,
created_at=created_at,
portfolio_id=body.portfolio_id
)
# Retrieve the predictions
predictions = pd.DataFrame(read_csv_from_s3(
bucket_name=get_settings().PREDICTIONS_BUCKET,
filepath=response["storage_filepath"]
))
# We round the predictions
predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0)
# Extract property_id and recommendation_id
predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True)
# Insert the predictions into the recommendations and run the optimiser
logger.info("Storing recommendations")
for property_id in recommendations.keys():
property = [p for p in input_properties if p.id == property_id][0]
property_predictions = predictions[predictions["property_id"] == str(property_id)]
for recommendations_by_type in recommendations[property_id]:
for rec in recommendations_by_type:
rec["sap_points"] = property_predictions[property_predictions["recommendation_id"] == str(
rec["recommendation_id"]
)]["RDSAP_CHANGE"].values[0]
if not rec["sap_points"]:
raise ValueError("Sap points missing")
input_measures = prepare_input_measures(recommendations[property_id], body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(property.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures, min_gain=target_sap_points - current_sap_points
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# We'll use the set of selected recommendations to filter the recommendations to upload
final_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in recommendations[property_id]
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
final_recommendations = [
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
]
# We update recommendations[property_id]
recommendations[property_id] = final_recommendations
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id,
rating_lookup=rating_lookup)
create_property_details_epc(session, property_details_epc)
property_data = p.get_full_property_data()
update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Create a plan
new_plan_id = create_plan(
session,
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
# Upload recommendations
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
# Finally, match the recommendation to the plan
create_plan_recommendations(
session,
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id)
# Commit all changes at once
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)