Model/backend/app/plan/router.py
2024-07-09 19:27:21 +01:00

1277 lines
57 KiB
Python

import json
from datetime import datetime
from tqdm import tqdm
import pandas as pd
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest
from backend.app.plan.utils import get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.Recommendations import Recommendations
from recommendations.Mds import Mds
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel
logger = setup_logger()
BATCH_SIZE = 5
SCORING_BATCH_SIZE = 400
def patch_epc(patch, epc_records):
"""
This utility function is useful to patch the epc data if we have data from the customer
:return:
"""
for patch_variable, patch_value in patch.items():
if patch_variable in ["address", "postcode"]:
continue
if patch_value == "":
continue
if patch_variable in epc_records["original_epc"]:
epc_records["original_epc"][patch_variable] = patch_value
return epc_records
def extract_portfolio_aggregation_data(
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
):
# We aggregate a number of metrics for the portfolio:
# 1) A breakdown of the number of properties in each EPC band
# a) before retrofit
# b) after retrofit
# 2) Number of units
# 3) Co2/unit
# a) before retrofit
# b) after retrofit
# 4) Energy bill/unit
# a) before retrofit
# b) after retrofit
# 5) Average valuation improvement/unit
# 6) Total cost
# 7) Cost per unit
# 8) £ per CO2 saved
# 9) £ per SAP point
# We need to construct the underlyind data for this
# Helper function to reformat the EPC data
def reformat_epc_data(epc_counts):
# Define all possible EPC bands in the required order
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
# Create the formatted data list by checking each band in the order
formatted_data = []
for band in epc_bands:
# Get the count from the dictionary, defaulting to 0 if not present
count = epc_counts.get(band, 0)
# Append the formatted dictionary to the list
formatted_data.append({"name": band, band: count})
return formatted_data
n_units = len(input_properties)
agg_data = []
for p in input_properties:
# Get the recommendations for the property - we include all properties, even ones without recommendations
property_recommendations = recommendations.get(p.id, [])
# Get just the default recommendations
default_recommendations = [r for r in property_recommendations if r["default"]]
has_recommendations = len(default_recommendations) > 0
# We can now calculate multiple outputs based on default recommendations
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
pre_retrofit_co2 = p.data["co2-emissions-current"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = p.current_energy_bill
post_retrofit_energy_bill = p.current_energy_bill - sum(
[r["energy_cost_savings"] for r in default_recommendations]
)
pre_retrofit_energy_consumption = p.current_adjusted_energy
post_retrofit_energy_consumption = p.current_adjusted_energy - sum(
[r["adjusted_heat_demand"] for r in default_recommendations]
)
# Add up energy savings
cost = sum([r["total"] for r in default_recommendations])
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
lower_bound_valuation_uplift = (
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
property_value_increase_ranges[p.id]["current_value"]
)
upper_bound_valuation_uplift = (
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
property_value_increase_ranges[p.id]["current_value"]
)
agg_data.append({
"pre_retrofit_epc": p.data["current-energy-rating"],
"post_retrofit_epc": new_epc_bands[p.id],
"pre_retrofit_co2": pre_retrofit_co2,
"post_retrofit_co2": post_retrofit_co2,
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
"post_retrofit_energy_bill": post_retrofit_energy_bill,
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
"cost": cost,
"sap_point_improvement": sap_point_improvement,
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
"has_recommendations": has_recommendations
})
agg_data = pd.DataFrame(agg_data)
n_units_to_retrofit = agg_data["has_recommendations"].sum()
valuation_improvement_lower_bound_per_unit = (
agg_data["lower_bound_valuation_uplift"].mean()
)
valuation_improvement_upper_bound_per_unit = (
agg_data["upper_bound_valuation_uplift"].mean()
)
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
total_sap_points = agg_data["sap_point_improvement"].sum()
def format_money(amount):
return f"£{amount:,.0f}"
valuation_improvment_per_unit = str(
format_money(
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
)
valuation_return_on_investment = str(
str(round(total_valuation_increase / agg_data["cost"].sum(), 2)) +
f" ("
f"{agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f} - "
f"{agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f})"
)
aggregation_data = {
"epc_breakdown_pre_retrofit": json.dumps(
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
),
"epc_breakdown_post_retrofit": json.dumps(
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
),
"number_of_properties": int(n_units),
"n_units_to_retrofit": int(n_units_to_retrofit),
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
"energy_consumption_per_unit_pre_retrofit": str(
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
"energy_consumption_per_unit_post_retrofit": str(
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
"valuation_improvement_per_unit": valuation_improvment_per_unit,
"cost_per_unit": format_money(agg_data["cost"].mean()),
"cost_per_co2_saved": format_money(agg_data["cost"].sum() / total_carbon_saved),
"cost_per_sap_point": format_money(agg_data["cost"].sum() / total_sap_points),
"valuation_return_on_investment": valuation_return_on_investment,
# TODO: Could we add 10yr carbon credits value?
}
return aggregation_data
router = APIRouter(
prefix="/plan",
tags=["plan"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
# TODO: We should store the trigger file path in the database with the plan so we can track the file that
# triggered the plan
# TODO: if the measure is already installed, it should actually be the very first phase
try:
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# If we have patches or overrides, we should read them in here
patches = []
if body.patches_file_path:
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
already_installed = []
if body.already_installed_file_path:
already_installed = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
)
non_invasive_recommendations = []
if body.non_invasive_recommendations_file_path:
non_invasive_recommendations = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if uprn:
uprn = int(float(uprn))
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
)
if not is_new:
continue
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
epc_records = {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
input_properties.append(
Property(
id=property_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
**Property.extract_kwargs(config)
)
)
if not input_properties:
return Response(status_code=204)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY)
dataset_version = "2024-07-08"
energy_consumption_client = EnergyConsumptionModel(
model_paths={
"heating_kwh": f"model_directory/energy_consumption_model/heating_kwh_{dataset_version}.pkl",
"hot_water_kwh": f"model_directory/energy_consumption_model/hot_water_kwh_{dataset_version}.pkl"
},
dummy_schema_path=f"model_directory/energy_consumption_model/{dataset_version}_dummy_schema.pkl",
consumption_average_path=f"energy_consumption/{dataset_version}/consumption_averages.parquet",
cleaned=cleaned,
environment=get_settings().ENVIRONMENT
)
logger.info("Getting spatial data")
for p in input_properties:
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds, energy_consumption_client)
p.get_spatial_data(uprn_filenames)
# TODO: Handle the case of modelling some units as buildings and some as properties individually
building_ids = [
{
"building_id": p.building_id,
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_rating=p.data["current-energy-rating"],
target_rating=body.goal_value,
current_consumption=p.current_adjusted_energy
),
"property_id": p.id
} for p in input_properties if p.building_id is not None
]
if building_ids:
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
for entry in building_ids:
building_id = entry['building_id']
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
if building_id not in unique_coordinates:
unique_coordinates[building_id] = []
if coordinate_pair not in unique_coordinates[building_id]:
unique_coordinates[building_id].append(coordinate_pair)
solar_panel_configuration = {}
for building_id, coordinates in unique_coordinates.items():
if len(coordinates) > 1:
raise NotImplementedError("more than one coordinate for a building - handle me")
coordinates = coordinates[0]
energy_consumption = sum(
[entry['energy_consumption'] for entry in building_ids if entry['building_id'] == building_id]
)
solar_api_client.get(
longitude=coordinates["longitude"],
latitude=coordinates["latitude"],
energy_consumption=energy_consumption,
is_building=True,
)
solar_panel_configuration[building_id] = {
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"n_units": len([entry for entry in building_ids if entry['building_id'] == building_id])
}
# Insert this into the properties that have this building id
for p in input_properties:
if p.building_id == building_id:
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
unit_solar_panel_configuration["unit_share_of_energy"] = (
[x for x in building_ids if x["property_id"] == p.id][0]["energy_consumption"] /
energy_consumption
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
else:
# Model the solar potential at the property level
for p in input_properties:
# TODO: Complete me! - we probably won't do this for individual flats
solar_performance = solar_api_client.get(
longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]
)
logger.info("Getting components and epc recommendations")
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
representative_recommendations[p.id] = property_representative_recommendations
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(
property_recommendations, property_representative_recommendations
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
# TODO: Make sure that number_habitable_rooms has been dropped
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predictions_template()
to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE)
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = model_api.predict_all(
df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE],
bucket=get_settings().DATA_BUCKET,
prediction_buckets=get_prediction_buckets()
)
# Append the predictions to the predictions dictionary
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
# Insert the predictions into the recommendations and run the optimiser
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
# possibility with heating system
# TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water
# cylinder jacket), we should add these to the recommendations as default
logger.info("Optimising recommendations")
for property_id in recommendations.keys():
property_instance = [p for p in input_properties if p.id == property_id][0]
(
recommendations_with_impact,
expected_adjusted_energy,
expected_energy_bill
) = (
Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations
)
)
# Store the resulting adjusted energy in the property instance
property_instance.set_adjusted_energy(
expected_adjusted_energy=expected_adjusted_energy,
expected_energy_bill=expected_energy_bill
)
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
current_sap_points = int(property_instance.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
if body.budget:
optimiser = GainOptimiser(
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=sap_gain
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
if any(x in [r["type"] for r in solution] for x in [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
]):
ventilation_rec = next(
(r[0] for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation"),
None
)
# If a matching recommendation was found, add its ID to the selected recommendations
if ventilation_rec:
selected_recommendations.add(ventilation_rec["recommendation_id"])
# We'll use the set of selected recommendations to filter the recommendations to upload
final_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in recommendations_with_impact
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
final_recommendations = [
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
]
recommendations[property_id] = final_recommendations
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
property_valuation_increases = []
session.commit()
new_epc_bands = {}
property_value_increase_ranges = {}
for i in range(0, len(input_properties), BATCH_SIZE):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
new_epc_bands[p.id] = new_epc
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
property_value_increase_ranges[p.id] = valuations
# Your existing operations
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
create_property_details_epc(session, property_details_epc)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
new_plan_id = create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_upper_bound": (
valuations["upper_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
})
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
property_valuation_increases.append(
valuations["average_increased_value"] - valuations["current_value"]
)
# Commit the session after each batch
session.commit()
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
print("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
total_valuation_increase = sum(property_valuation_increases)
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
aggregated_data = extract_portfolio_aggregation_data(
input_properties=input_properties,
total_valuation_increase=total_valuation_increase,
recommendations=recommendations,
new_epc_bands=new_epc_bands,
property_value_increase_ranges=property_value_increase_ranges
)
aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
total_valuation_increase=total_valuation_increase,
labour_days=labour_days,
aggregated_data=aggregated_data
)
# Commit final changes
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)
@router.post("/mds")
async def build_mds(body: MdsRequest):
# TODO: This is a placeholder location for the MDS endpoint, which this is being assembled
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
try:
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
measure_set = body.measures
optimise_measures = measure_set is not None
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for property_id, config in tqdm(enumerate(plan_input), total=len(plan_input)):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
uprn = None if uprn == "" else uprn
if uprn:
uprn = int(float(uprn))
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
if config["address"] == "35b High Street":
print("Performing temporary patch on 35b High Street")
epc_searcher.newest_epc["uprn"] = 10002911892
epc_searcher.full_sap_epc["uprn"] = 10002911892
if config["address"] == "Cobnut Barn":
print("Performing temporary patch on Cobnut Barn")
epc_searcher.newest_epc["uprn"] = 10013924689
# Create a record in db
# TODO: If we productionise the creation of this mds report, we will need to store this in the db
# property_id, is_new = create_property(
# session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
# )
# if not is_new:
# continue
#
# create_property_targets(
# session,
# property_id=property_id,
# portfolio_id=body.portfolio_id,
# epc_target=body.goal_value,
# heat_demand_target=None
# )
epc_records = {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
# patch = next((
# x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
# epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
# property_already_installed = next((
# x for x in already_installed if
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
#
# property_non_invasive_recommendations = next((
# x for x in non_invasive_recommendations if
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
if measure_set is None:
measures = config["measures"] if "measures" in config else None
else:
measures = measure_set
input_properties.append(
Property(
id=property_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
# already_installed=property_already_installed,
# non_invasive_recommendations=property_non_invasive_recommendations,
measures=measures,
**Property.extract_kwargs(config)
)
)
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
logger.info("Getting spatial data")
for p in tqdm(input_properties):
p.get_spatial_data(uprn_filenames)
logger.info("Getting components and epc recommendations")
recommendations_scoring_data = []
representative_recommendations = {}
recommendations = {}
for p in tqdm(input_properties):
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
mds = Mds(property_instance=p, materials=materials, optimise_measures=optimise_measures)
mds_recommendations, property_representative_recommendations, errors = mds.build()
if isinstance(errors, list):
if errors:
raise Exception("Errors occurred during MDS build")
else:
if any([len(x) for x in errors.values()]):
raise Exception("Errors occurred during MDS build")
recommendations[p.id] = mds_recommendations
representative_recommendations[p.id] = property_representative_recommendations
# Build the scoring data
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
if optimise_measures:
for _id, mds_recs in mds_recommendations.items():
representative_ids = [r["recommendation_id"] for r in property_representative_recommendations[_id]]
simulation_mds_recs = []
for recs in mds_recs:
simulation_mds_recs.append(
[r for r in recs if r["recommendation_id"] in representative_ids]
)
p.adjust_difference_record_with_recommendations(
simulation_mds_recs, property_representative_recommendations[_id]
)
data = p.recommendations_scoring_data.copy()
for d in data:
d["id"] = d["id"] + "*" + _id
recommendations_scoring_data.extend(data)
else:
recommendations_scoring_data.append(
p.simulate_all_representative_recommendations(property_representative_recommendations)
)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = {
"sap_change_predictions": pd.DataFrame(),
"heat_demand_predictions": pd.DataFrame(),
"carbon_change_predictions": pd.DataFrame()
}
to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE)
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = model_api.predict_all(
df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE],
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# Append the predictions to the predictions dictionary
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
# TODO: 1) walls_insulation_thickness_ending is not being set in the recommendations_scoring_data,
# insulation_thickness_ending is being set instead
# 2)
# TODO: TEMP
for p in plan_input:
if p["uprn"]:
p["uprn"] = str(int(float(p["uprn"])))
import re
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
if optimise_measures:
results = []
for p in input_properties:
sap_before = int(p.data["current-energy-efficiency"])
epc_before = p.data["current-energy-rating"]
heat_demand_before = p.data["energy-consumption-current"]
carbon_before = p.data["co2-emissions-current"]
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_before * p.floor_area,
current_epc_rating=epc_before,
)
current_energy_bill = AnnualBillSavings.calculate_annual_bill(current_adjusted_energy)
package_comparison = []
for _id in recommendations[p.id].keys():
sap_prediction = all_predictions["sap_change_predictions"][
(all_predictions["sap_change_predictions"]["property_id"] == str(p.id)) &
(all_predictions["sap_change_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
].copy().reset_index(drop=True)
sap_prediction["row_id"] = sap_prediction.index
heat_demand_prediction = all_predictions["heat_demand_predictions"][
(all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)) &
(all_predictions["heat_demand_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
].copy().reset_index(drop=True)
heat_demand_prediction["row_id"] = heat_demand_prediction.index
carbon_prediction = all_predictions["carbon_change_predictions"][
(all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)) &
(all_predictions["carbon_change_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
].copy().reset_index(drop=True)
carbon_prediction["row_id"] = carbon_prediction.index
epc_target = body.goal_value
if epc_before == epc_target:
continue
sap_target = epc_to_sap_lower_bound(epc_target)
# Define the measures
sap_threshold_barrier = sap_prediction[sap_prediction["predictions"] >= sap_target]
meets_threshold = True
if sap_threshold_barrier.empty:
sap_threshold_barrier = sap_prediction.tail(1)
meets_threshold = False
sap_threshold_barrier = sap_threshold_barrier.head(1)
sap_prediction = sap_prediction[
sap_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
]
heat_demand_prediction = heat_demand_prediction[
heat_demand_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
]
carbon_prediction = carbon_prediction[
carbon_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
]
reverse_map = {v: k for k, v in Mds.format_map.items()}
selected_measures = [
reverse_map[x.split("-")[0]] for x in sap_prediction["recommendation_id"].values
]
selected_measure_ids = [x.split("*")[0] for x in sap_prediction["recommendation_id"].values]
costs = [
r["total"] for r in representative_recommendations[p.id][_id] if
r["recommendation_id"] in selected_measure_ids
]
costs = sum(costs)
sap_after = sap_prediction["predictions"].values[-1]
epc_after = sap_to_epc(sap_after)
heat_demand_after = heat_demand_prediction["predictions"].values[-1]
carbon_after = carbon_prediction["predictions"].values[-1]
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_after * p.floor_area,
current_epc_rating=epc_before,
)
expected_energy_bill = AnnualBillSavings.calculate_annual_bill(expected_adjusted_energy)
bill_savings = current_energy_bill - expected_energy_bill
energy_savings = current_adjusted_energy - expected_adjusted_energy
package_comparison.append(
{
"id": _id,
"cost": costs,
"measures": selected_measures,
"sap_before": sap_before,
"sap_after": sap_after,
"epc_before": epc_before,
"epc_after": epc_after,
"heat_demand_before": heat_demand_before,
"heat_demand_after": heat_demand_after,
"carbon_before": carbon_before,
"carbon_after": carbon_after,
"bill_savings": bill_savings,
"energy_savings": energy_savings,
"current_energy_bill": current_energy_bill,
"meets_threshold": meets_threshold
}
)
package_comparison = pd.DataFrame(package_comparison)
# Find the smallest cost package
if not package_comparison.empty:
# We check if any of the packages meet the threshold
# If none of them do, take the one that gets closest to the target
if package_comparison["meets_threshold"].any():
package_comparison = package_comparison[package_comparison["meets_threshold"]]
package_comparison = package_comparison.sort_values("cost")
else:
package_comparison = package_comparison.sort_values("sap_after", ascending=False)
package_comparison = package_comparison.head(1).to_dict("records")[0]
else:
package_comparison = {
"measures": [],
"sap_before": sap_before,
"sap_after": sap_before,
"epc_before": epc_before,
"epc_after": epc_before,
"heat_demand_before": heat_demand_before,
"heat_demand_after": heat_demand_before,
"carbon_before": carbon_before,
"carbon_after": carbon_before,
"bill_savings": 0,
"energy_savings": 0,
"current_energy_bill": current_energy_bill,
"meets_threshold": False
}
config = [c for c in plan_input if c["uprn"] == str(p.uprn)]
if not config:
config = {"address": None, "postcode": None}
else:
config = config[0]
results.append({
"config_address": config["address"],
"config_postcode": config["postcode"],
"uprn": p.uprn,
"address": p.address,
"postcode": p.postcode,
"measures": package_comparison["measures"],
"year_of_epc": p.data['lodgement-date'],
"sap_before": package_comparison["sap_before"],
"sap_after": package_comparison["sap_after"],
"epc_before": package_comparison["epc_before"],
"epc_after": package_comparison["epc_after"],
"heat_demand_before": package_comparison["heat_demand_before"],
"heat_demand_after": package_comparison["heat_demand_after"],
"carbon_before": package_comparison["carbon_before"],
"carbon_after": package_comparison["carbon_after"],
"bill_savings": round(package_comparison["bill_savings"], 2),
"energy_savings": round(package_comparison["energy_savings"], 2),
"current_energy_bill": round(package_comparison["current_energy_bill"], 2),
"EWI": "EWI" if "external_wall_insulation" in package_comparison["measures"] else None,
"CWI": "CWI" if "cavity_wall_insulation" in package_comparison["measures"] else None,
"LI": "LI" if "loft_insulation" in package_comparison["measures"] else None,
"ASHP Htg": "ASHP Htg" if "air_source_heat_pump" in package_comparison["measures"] else None,
"Elec Storage": (
"Elec Storage Htrs (Out of scope -Prov sum only)" if "high_heat_retention_storage_heaters" in
package_comparison["measures"] else None
),
"Solar PV": "Solar PV" if "solar_pv" in package_comparison["measures"] else None,
})
results = pd.DataFrame(results)
# For the different measures, we check the impact with a few debugging functions
walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures)
results.to_excel("optimised mds_results 5th June.xlsx")
results = []
for p in input_properties:
measures = p.measures
property_recommendations = [r['type'] for r in representative_recommendations[p.id]]
# TODO: Check high heat retention storage heaters - looks like it's excluded controls!
sap_prediction = all_predictions["sap_change_predictions"][
all_predictions["sap_change_predictions"]["property_id"] == str(p.id)
]
heat_demand_prediction = all_predictions["heat_demand_predictions"][
all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)
]
carbon_prediction = all_predictions["carbon_change_predictions"][
all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)
]
# Get a before and after for SAP, heat demand, CO2 and also calculate energy bill and energy savings
sap_before = int(p.data["current-energy-efficiency"])
sap_after = sap_prediction["predictions"].values[0] if measures else sap_before
epc_before = p.data["current-energy-rating"]
epc_after = sap_to_epc(sap_after) if measures else epc_before
heat_demand_before = p.data["energy-consumption-current"]
heat_demand_after = heat_demand_prediction["predictions"].values[0] if measures else heat_demand_before
carbon_before = p.data["co2-emissions-current"]
carbon_after = carbon_prediction["predictions"].values[0] if measures else carbon_before
# Estimate bill savings
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_before * p.floor_area,
current_epc_rating=epc_before,
)
# TODO: This isn't quite right as this is based on EVERY possible measure, not just the ones that are
# actually implemented
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_after * p.floor_area,
current_epc_rating=epc_before,
)
# TODO: We should determine if the home is gas & electricity or just electricity
# Determine if the heating and hotwater was previously electric only or both
current_energy_bill = AnnualBillSavings.calculate_annual_bill(
kwh=current_adjusted_energy,
)
expected_energy_bill = AnnualBillSavings.calculate_annual_bill(
kwh=expected_adjusted_energy,
)
bill_savings = current_energy_bill - expected_energy_bill
energy_savings = current_adjusted_energy - expected_adjusted_energy
config = [c for c in plan_input if c["uprn"] == str(p.uprn)]
if not config:
config = {"address": None, "postcode": None}
else:
config = config[0]
to_append = {
"config_address": config["address"],
"config_postcode": config["postcode"],
"uprn": p.uprn,
"address": p.address,
"postcode": p.postcode,
"measures": measures,
"property_recommendations": property_recommendations,
"year_of_epc": p.data['lodgement-date'],
"sap_before": sap_before,
"sap_after": sap_after,
"epc_before": epc_before,
"epc_after": epc_after,
"heat_demand_before": heat_demand_before,
"heat_demand_after": heat_demand_after,
"carbon_before": carbon_before,
"carbon_after": carbon_after,
"bill_savings": round(bill_savings, 2),
"energy_savings": round(energy_savings, 2),
"current_energy_bill": round(current_energy_bill, 2),
"fuel_type": p.main_fuel["fuel_type"],
}
results.append(to_append)
results = pd.DataFrame(results)
results["sap_uplift"] = results["sap_after"] - results["sap_before"]
# results.to_excel("mds_results 5th June.xlsx")
walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures)
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
def check_mds(results, input_properties, recommendations, optimise_measures):
import ast
walls_check = []
hhr_check = []
for p in input_properties:
res = results[results["uprn"] == p.uprn]
wall = p.walls
heating = p.main_heating
heating_controls = p.main_heating_controls
if optimise_measures:
measures = res["measures"].values[0]
else:
measures = [list(z.keys())[0] for z in res["measures"].values[0]]
wall_recommendation = [
x for x in measures if
x in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]
]
hhr_recommendation = [
x for x in measures if
x in ["high_heat_retention_storage_heaters"]
]
if optimise_measures:
possible_measures = [ast.literal_eval(x) for x in list(recommendations[p.id].keys())]
# Unlist them
possible_measures = [x for sublist in possible_measures for x in sublist]
possible_measures = list(set(possible_measures))
else:
possible_measures = p.measures
if wall_recommendation:
if len(wall_recommendation) > 1:
raise Exception("something went wrong")
wall_recommendation = wall_recommendation[0]
else:
wall_recommendation = None
hhr_recommendation = hhr_recommendation[0] if hhr_recommendation else None
walls_check.append(
{
"uprn": p.uprn,
"address": p.address,
"postcode": p.postcode,
"property_type": p.data['property-type'],
"conservation_status": p.spatial["conservation_status"],
"is_listed_building": p.spatial["is_listed_building"],
"is_heritage_building": p.spatial["is_heritage_building"],
"wall": wall["clean_description"],
"recommendation": wall_recommendation,
"possible_measures": possible_measures,
"selected_measures": res["measures"].values[0],
}
)
hhr_check.append(
{
"uprn": p.uprn,
"address": p.address,
"postcode": p.postcode,
"heating": heating["clean_description"],
"heating_controls": heating_controls["clean_description"],
"recommendation": hhr_recommendation,
"possible_measures": possible_measures,
"selected_measures": res["measures"].values[0],
}
)
walls_check = pd.DataFrame(walls_check)
hhr_check = pd.DataFrame(hhr_check)
return walls_check, hhr_check