Model/backend/app/plan/router.py
2024-01-02 11:44:18 +00:00

608 lines
28 KiB
Python

from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from backend.ml_models.Valuation import PropertyValuation
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
logger = setup_logger()
BATCH_SIZE = 5
router = APIRouter(
prefix="/plan",
tags=["plan"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation. We should also standardise postcode and address in some fashion as
# a postcode of abcdef would be considered different to ABCDEF
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY
)
epc_searcher.find_property()
# Create a record in db - TODO: Create this using the epc address and postcode and validate with
# uprn
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
input_properties.append(
Property(
id=property_id,
data=epc_searcher.newest_epc,
old_data=epc_searcher.older_epcs,
full_sap_epc=epc_searcher.full_sap_epc,
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC, and spatial data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
logger.info("Getting components and epc recommendations")
recommendations = {}
recommendations_scoring_data = []
property_scoring_data = {}
for p in input_properties:
# Property recommendations
p.get_components(cleaned)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
p.set_number_lighting_outlets(cleaned_property_data)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
# TODO: Temp
if data_processor.data["UPRN"].values[0] == "":
data_processor.data["UPRN"] = 0
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
property_scoring_data[p.id] = {
"starting_epc_data": starting_epc_data,
"ending_epc_data": ending_epc_data,
"fixed_data": fixed_data
}
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = create_recommendation_scoring_data(
property=p,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
recommendations_scoring_data.append(scoring_dict)
# cleanup
del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
recommendations_scoring_data,
ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)]
)
recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predict_all(
df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations")
for property_id in recommendations.keys():
property_instance = [p for p in input_properties if p.id == property_id][0]
recommendations_with_impact = Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations
)
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(property_instance.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# If wall ventilation is selected, we also include mechanical ventilation as a best practice measure
if any(x in [r["type"] for r in solution] for x in [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
]):
ventilation_rec = [
r for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation"
][0]
selected_recommendations = set(
list(selected_recommendations) + [ventilation_rec[0]["recommendation_id"]]
)
# We check if the selected recommendation is wall ventilation and if so, we make sure
# mechanical ventilation is selected
# We'll use the set of selected recommendations to filter the recommendations to upload
final_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in recommendations_with_impact
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
final_recommendations = [
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
]
recommendations[property_id] = final_recommendations
# This is a temporary step, to estimate the impact of the measured on heat demand and carbon
# TODO: This needs to be cleaned up, if it happens to be kept
combined_recommendations_scoring_data = []
representative_recs = {}
for property_id, property_recommendations in recommendations.items():
default_recommendations = [r for r in property_recommendations if r["default"]]
default_types = {x["type"] for x in default_recommendations}
# Missing types
missing_types = list(set([r["type"] for r in property_recommendations if r["type"] not in default_types]))
# We might have a missing type as one of the solid wall options because for a solid wall, you might
# have ewi or iwi but only one of them will be a default
if ("internal_wall_insulation" in default_types) or ("external_wall_insaultion" in default_types):
missing_types = [
t for t in missing_types if t not in ["internal_wall_insulation", "external_wall_insulation"]
]
# We check if NO wall insulation was selected but iwi and ewi are available
# This condition will check
# 1) iwi and ewi are both in missing_types
# 2) iwi and ewi are not in default_types
# If both of these are true, it means that no wall insulation was selected via the optimisation routine
# but both are possible, so we need to select a default. We default to iwi because it's usually cheaper
if (("internal_wall_insulation" in missing_types) and ("external_wall_insulation" in missing_types)) and (
("internal_wall_insulation" not in default_types) and ("external_wall_insulation" not in default_types)
):
missing_types = [t for t in missing_types if t != "external_wall_insulation"]
if missing_types:
for missed_type in missing_types:
missed = [r for r in property_recommendations if r["type"] == missed_type]
min_cost = min([r["total"] for r in missed])
# Grab a representative, based on cheapest cost
representative_rec = [r for r in property_recommendations if np.isclose(r["total"], min_cost)]
default_recommendations.append(representative_rec[0])
representative_recs[property_id] = default_recommendations
property_instance = [p for p in input_properties if p.id == property_id][0]
property_scoring_datasets = property_scoring_data[property_id]
starting_epc_data = property_scoring_datasets["starting_epc_data"].copy()
ending_epc_data = property_scoring_datasets["ending_epc_data"].copy()
fixed_data = property_scoring_datasets["fixed_data"].copy()
scoring_dict = {}
for rec in default_recommendations:
scoring_dict = create_recommendation_scoring_data(
property=property_instance,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
# At each iteration, we want to update the ending_epc_data, so in the end, ending_epc_data contains
# all of the updates
for k in scoring_dict.keys():
if k in ending_epc_data.columns:
ending_epc_data[k] = scoring_dict[k]
combined_recommendations_scoring_data.append(scoring_dict)
# PERFORM SAME STEPS AGAIN - TODO: TO BE REMOVED
combined_recommendations_scoring_data = pd.DataFrame(combined_recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
combined_recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
combined_recommendations_scoring_data,
ignore_cols=[
c for c in combined_recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)
]
)
combined_recommendations_scoring_data = DataProcessor.clean_efficiency_variables(
combined_recommendations_scoring_data
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_combined_predictions = model_api.predict_all(
df=combined_recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# We update the carbon and heat demand predictions
for property_id, property_recommendations in recommendations.items():
combined_heat_demand = all_combined_predictions["heat_demand_predictions"]
combined_heat_demand = combined_heat_demand[combined_heat_demand["property_id"] == str(property_id)]
combined_carbon = all_combined_predictions["carbon_change_predictions"]
combined_carbon = combined_carbon[combined_carbon["property_id"] == str(property_id)]
property_instance = [p for p in input_properties if p.id == property_id][0]
carbon_change = float(
property_instance.data["co2-emissions-current"]
) - combined_carbon["predictions"].values[0]
starting_heat_demand = (
float(property_instance.data["energy-consumption-current"]) * property_instance.floor_area
)
expected_heat_demand = starting_heat_demand - (
combined_heat_demand["predictions"].values[0] * property_instance.floor_area
)
# We don't want to adjust the heat demand for mechanical ventilation so we add it back on
# We adjust the heat demand figures to align to the UCL paper
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=starting_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
# We sum up the SAP points of the default recommendations and calculate a new EPC category. This
# category is then used to produce adjusted energy figures
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=expected_heat_demand,
current_epc_rating=property_instance.data["current-energy-rating"],
)
heat_demand_change = (
current_adjusted_energy - expected_adjusted_energy
)
# update the recommendations
# We need to totals for the representative recommendations
representative_rec_data = [
{
"recommendation_id": r["recommendation_id"],
"co2_equivalent_savings": r.get("co2_equivalent_savings"),
"heat_demand": r.get("heat_demand"),
"type": r["type"]
} for r
in representative_recs[property_id]
]
representative_rec_data = pd.DataFrame(representative_rec_data)
# Convert co2 and heat demand to proportions of their column sums
representative_rec_data["co2_equivalent_savings_percent"] = (
representative_rec_data["co2_equivalent_savings"] /
representative_rec_data["co2_equivalent_savings"].sum()
)
representative_rec_data["heat_demand_percent"] = (
representative_rec_data["heat_demand"] / representative_rec_data["heat_demand"].sum()
)
# We'll use the proportions to update the carbon and heat demand
representative_rec_data["co2_equivalent_savings"] = (
carbon_change * representative_rec_data["co2_equivalent_savings_percent"]
)
representative_rec_data["heat_demand"] = (
heat_demand_change * representative_rec_data["heat_demand_percent"]
)
# Finally, insert these values into the final recommendations
for rec in property_recommendations:
if rec["type"] in ["external_wall_insulation", "internal_wall_insulation"]:
change_data = representative_rec_data[
representative_rec_data["type"].isin(["external_wall_insulation", "internal_wall_insulation"])
]
else:
change_data = representative_rec_data[representative_rec_data["type"] == rec["type"]]
if rec["type"] == "mechanical_ventilation":
rec["co2_equivalent_savings"] = 0
rec["heat_demand"] = 0
rec["energy_cost_savings"] = 0
else:
rec["co2_equivalent_savings"] = change_data["co2_equivalent_savings"].values[0]
rec["heat_demand"] = change_data["heat_demand"].values[0]
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
# Update recommendations
recommendations[property_id] = property_recommendations
# For expected adjust energy, we don't include mechanical ventilation so we'll add it back on
mechanical_ventilation_rec = representative_rec_data[
representative_rec_data["type"] == "mechanical_ventilation"
]
if not mechanical_ventilation_rec.empty:
expected_adjusted_energy = (
expected_adjusted_energy + mechanical_ventilation_rec["heat_demand"].values[0]
)
property_instance.set_adjusted_energy(
current_adjusted_energy=current_adjusted_energy,
expected_adjusted_energy=expected_adjusted_energy
)
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
property_valuation_increases = []
session.commit()
for i in range(0, len(input_properties), BATCH_SIZE):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
for p in batch_properties:
# Your existing operations
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup
)
create_property_details_epc(session, property_details_epc)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
# TODO: TEMP
if p.data["uprn"] == "":
print("Get rid of me!")
p.data["uprn"] = 0
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
new_plan_id = create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
})
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Get defaults
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
property_valuation_increases.append(valuations["average_increase"])
# Commit the session after each batch
session.commit()
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
print("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
total_valuation_increase = sum(property_valuation_increases)
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
total_valuation_increase=total_valuation_increase,
labour_days=labour_days
)
# Commit final changes
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)