Model/backend/engine/engine.py
2025-11-27 11:25:36 +00:00

1287 lines
59 KiB
Python

import json
from copy import deepcopy
from datetime import datetime
from tqdm import tqdm
import pandas as pd
import numpy as np
from uuid import UUID
from backend.Funding import Funding
from backend.SearchEpc import SearchEpc
from etl.epc.Record import EPCRecord
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
import backend.app.db.functions as db_funcs
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import (
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error
)
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
logger = setup_logger()
BATCH_SIZE = 5
SCORING_BATCH_SIZE = 300
def extract_portfolio_aggregation_data(
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
):
# We aggregate a number of metrics for the portfolio:
# 1) A breakdown of the number of properties in each EPC band
# a) before retrofit
# b) after retrofit
# 2) Number of units
# 3) Co2/unit
# a) before retrofit
# b) after retrofit
# 4) Energy bill/unit
# a) before retrofit
# b) after retrofit
# 5) Average valuation improvement/unit
# 6) Total cost
# 7) Cost per unit
# 8) £ per CO2 saved
# 9) £ per SAP point
# We need to construct the underlyind data for this
# Helper function to reformat the EPC data
def reformat_epc_data(epc_counts):
# Define all possible EPC bands in the required order
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
# Create the formatted data list by checking each band in the order
formatted_data = []
for band in epc_bands:
# Get the count from the dictionary, defaulting to 0 if not present
count = epc_counts.get(band, 0)
# Append the formatted dictionary to the list
formatted_data.append({"name": band, band: count})
return formatted_data
n_units = len(input_properties)
agg_data = []
for p in input_properties:
# Get the recommendations for the property - we include all properties, even ones without recommendations
property_recommendations = recommendations.get(p.id, [])
# Get just the default recommendations
default_recommendations = [r for r in property_recommendations if r["default"]]
has_recommendations = len(default_recommendations) > 0
# We can now calculate multiple outputs based on default recommendations
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
pre_retrofit_co2 = p.energy["co2_emissions"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum(
[r["energy_cost_savings"] for r in default_recommendations]
)
pre_retrofit_energy_consumption = p.current_energy_consumption
post_retrofit_energy_consumption = p.current_energy_consumption - sum(
[r["kwh_savings"] for r in default_recommendations]
)
# Add up energy savings
cost = sum([r["total"] for r in default_recommendations])
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
if not pd.isnull(property_value_increase_ranges[p.id]["current_value"]):
lower_bound_valuation_uplift = (
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
property_value_increase_ranges[p.id]["current_value"]
)
upper_bound_valuation_uplift = (
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
property_value_increase_ranges[p.id]["current_value"]
)
else:
lower_bound_valuation_uplift, upper_bound_valuation_uplift = 0, 0
agg_data.append({
"pre_retrofit_epc": p.data["current-energy-rating"],
"post_retrofit_epc": new_epc_bands[p.id],
"pre_retrofit_co2": pre_retrofit_co2,
"post_retrofit_co2": post_retrofit_co2,
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
"post_retrofit_energy_bill": post_retrofit_energy_bill,
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
"cost": cost,
"sap_point_improvement": sap_point_improvement,
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
"has_recommendations": has_recommendations,
"funding": float(p.project_funding) if p.project_funding is not None else 0,
"contingency": float(sum([x.get("contingency", 0) for x in default_recommendations]))
})
agg_data = pd.DataFrame(agg_data)
n_units_to_retrofit = agg_data["has_recommendations"].sum()
valuation_improvement_lower_bound_per_unit = (
agg_data["lower_bound_valuation_uplift"].mean()
)
valuation_improvement_upper_bound_per_unit = (
agg_data["upper_bound_valuation_uplift"].mean()
)
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
total_sap_points = agg_data["sap_point_improvement"].sum()
def format_money(amount):
return f"£{amount:,.0f}"
valuation_improvment_per_unit = str(
format_money(
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
)
if agg_data["cost"].sum() == 0:
valuation_percentage_increase = 0
valuation_increase_lower = 0
valuation_increase_upper = 0
else:
valuation_percentage_increase = round(total_valuation_increase / agg_data["cost"].sum(), 2)
valuation_increase_lower = agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
valuation_increase_upper = agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
valuation_return_on_investment = str(
str(valuation_percentage_increase) +
f" ("
f"{valuation_increase_lower:,.2f} - "
f"{valuation_increase_upper:,.2f})"
)
cost_per_co2_saved = agg_data["cost"].sum() / total_carbon_saved if total_carbon_saved > 0 else 0
cost_per_co2_saved = format_money(cost_per_co2_saved)
cost_per_sap_point = agg_data["cost"].sum() / total_sap_points if total_sap_points > 0 else 0
cost_per_sap_point = format_money(cost_per_sap_point)
total_funding = agg_data["funding"].sum()
total_contingency = agg_data["contingency"].sum()
aggregation_data = {
"epc_breakdown_pre_retrofit": json.dumps(
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
),
"epc_breakdown_post_retrofit": json.dumps(
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
),
"number_of_properties": int(n_units),
"n_units_to_retrofit": int(n_units_to_retrofit),
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
"energy_consumption_per_unit_pre_retrofit": str(
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
"energy_consumption_per_unit_post_retrofit": str(
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
"valuation_improvement_per_unit": valuation_improvment_per_unit,
"cost_per_unit": format_money(agg_data["cost"].mean()),
"cost_per_co2_saved": cost_per_co2_saved,
"cost_per_sap_point": cost_per_sap_point,
"valuation_return_on_investment": valuation_return_on_investment,
"funding": float(total_funding),
"contingency": float(total_contingency)
}
return aggregation_data
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
"""
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
and will factor in an energy assessment that we have performed for a client.
:param epc_searcher: An instance of the SearchEpc class
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
this should be an empty response as defined by the models's
EnergyAssessment.empty_response() method
"""
newest_epc = epc_searcher.newest_epc.copy()
if newest_epc["uprn"] == "" and epc_searcher.uprn:
newest_epc["uprn"] = epc_searcher.uprn
if not energy_assessment["epc"]:
energy_assessment_is_newer = False
return {
'original_epc': newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}, energy_assessment_is_newer
epc = energy_assessment["epc"]
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
# We insert county into the epc, since right now this isn't something that we pull out from the energy
# assessment
for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]:
epc[col] = newest_epc[col]
# We check if the energy assessment is newer than the newest EPC
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]):
# In this case, our energy assessment is newer than the EPCs available for this property
energy_assessment_is_newer = True
return {
"original_epc": epc,
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
"old_data": epc_searcher.older_epcs.copy() + [newest_epc]
}, energy_assessment_is_newer
# We check if the EPC we have produced is contained in the set of EPCs done for the property
# We do this based on inspection-date and SAP
epc_in_historicals = [
x for x in epc_searcher.older_epcs + [newest_epc]
if x["inspection-date"] == energy_assessment_date and
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
]
energy_assessment_is_newer = False
if epc_in_historicals:
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
return {
"original_epc": newest_epc,
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
"old_data": epc_searcher.older_epcs.copy()
}, energy_assessment_is_newer
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
# the historicals, so it can't have been lodged, so we include it in the old data
return {
'original_epc': newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy() + [epc],
}, energy_assessment_is_newer
def get_request_property_data(body: PlanTriggerRequest):
"""
This function will read in the on-site data from the S3 bucket
:param body: The request body
:return:
"""
patches = []
if body.patches_file_path:
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
already_installed = []
if body.already_installed_file_path:
already_installed = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
)
non_invasive_recommendations = []
if body.non_invasive_recommendations_file_path:
non_invasive_recommendations = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
)
valuation_data = []
if body.valuation_file_path:
valuation_data = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path
)
return patches, already_installed, non_invasive_recommendations, valuation_data
def get_funding_data():
"""
This function retrieves the eco project scores matrix and the warm homes local grant funding data
:return:
"""
project_scores_matrix = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/ECO4 Full Project Scores Matrix.csv",
)
project_scores_matrix = pd.DataFrame(project_scores_matrix)
project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
partial_project_scores_matrix = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/ECO4_Partial_Project_Scores_Matrix_v6.csv",
)
partial_project_scores_matrix = pd.DataFrame(partial_project_scores_matrix)
partial_project_scores_matrix.columns = [
'Measure category', 'Measure_Type', 'Pre_Main_Heating_Source',
'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band',
'Average Treatable Factor', 'Cost Savings', 'SAP Savings'
]
# Replace 200 with 200+ in floor area band
partial_project_scores_matrix["Total Floor Area Band"] = partial_project_scores_matrix[
"Total Floor Area Band"
].replace({"200": "200+"})
partial_project_scores_matrix["Cost Savings"] = partial_project_scores_matrix["Cost Savings"].astype(float)
whlg_eligible_postcodes = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/whlg eligible postcodes.csv",
)
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes
def parse_heating_system(config):
"""
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
placeholder function to cover some initial immediate cases.
:return:
"""
ll_heating = config.get("landlord_heating_system", None)
if not ll_heating:
return None
if ll_heating == "electric storage heaters":
# Return with the same format at the EPC
return "Electric storage heaters"
return None
def check_duplicate_uprns(plan_input):
"""
Simple function to check if the input data contains duplicated UPRNS.
If there are duplicates, an exception will be rasied
:return:
"""
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
# Find the duplicate UPRNs
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
# de-dupe input_uprns
raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}")
return True
async def model_engine(body: PlanTriggerRequest):
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
# TODO: if the measure is already installed, it should actually be the very first phase
try:
session.begin()
logger.info("Getting the inputs")
if body.file_type == "xlsx":
plan_input = read_excel_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
file_key=body.trigger_file_path,
sheet_name=body.sheet_name,
header_row=0,
)
# We now handle the case where the input data is a Domna standardised assset list
if body.file_format == "domna_asset_list":
# We rename the columns to match the expected format
plan_input = plan_input.rename(
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
)
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
# This will be reflexted
if "estimated" not in plan_input.columns:
plan_input["estimated"] = False
plan_input["uprn"] = np.where(
plan_input["estimated"].isin([1, True]) & (
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
), None, plan_input["uprn"]
)
# We handle the landlord property type and built form
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
if "landlord_built_form" in plan_input.columns:
plan_input["built_form"] = plan_input["landlord_built_form"].copy()
else:
plan_input["built_form"] = None
plan_input["property_type"] = np.where(
plan_input["property_type"] == "unknown",
plan_input["epc_property_type"],
plan_input["property_type"]
)
if "epc_archetype" not in plan_input.columns:
plan_input["epc_archetype"] = None
plan_input["built_form"] = np.where(
plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"]
)
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
built_form_map = {
"mid-terrace": "Mid-Terrace",
"end-terrace": "End-Terrace",
"semi-detached": "Semi-Detached",
"detached": "Detached",
"enclosed end-terrace": "Enclosed End-Terrace",
"enclosed mid-terrace": "Enclosed Mid-Terrace",
}
# We remap the values to match the EPC expected formats
plan_input["property_type"] = plan_input["property_type"].map(property_type_map)
plan_input["built_form"] = plan_input["built_form"].map(built_form_map)
plan_input = plan_input.to_dict("records")
else:
raise ValueError("Other formats not yet supported")
else:
plan_input = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path
)
# We then slide it on the indexes if they are provided
if body.index_start is not None and body.index_end is not None:
plan_input = plan_input[body.index_start:body.index_end]
# Confirm no duplicate UPRNS
check_duplicate_uprns(plan_input)
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
if body.file_type == "xlsx" and body.file_format == "domna_asset_list":
# We check if we have valution data
if not valuation_data and body.valuation_file_path in [None, ""]:
# We check plan_input
if "domna_valuation" in plan_input[0]:
valuation_data = [{"uprn": x["uprn"], "valuation": x["domna_valuation"]} for x in plan_input]
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties, inspections_map, eco_packages = [], {}, {}
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if pd.isnull(uprn):
uprn = None
if uprn:
uprn = int(float(uprn))
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
if uprn:
# if we have a UPRN, we check if we already have EPC data associated with this UPRN
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
if epc_cache["status"] == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
address1 = config.get("address", None)
# Handle domna address list format
if pd.isnull(address1) and body.file_format == "domna_asset_list":
address1 = config.get("domna_address_1", None)
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None
heating_system = parse_heating_system(config)
associated_uprns = []
if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat":
# We're running a remote assessment for a flat - we go and grab the associated
# UPRNS for other units in the same building
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
epc_searcher = SearchEpc(
address1=address1,
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key="",
full_address=full_address,
heating_system=heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True, api_data=epc_api_data)
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and (
epc_searcher.newest_epc["uprn"] < 0
):
epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED
# We check for an energy assessment we have performed on this property:
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn if uprn is not None else epc_searcher.uprn
)
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
)
if not property_id:
continue
if not is_new and not body.multi_plan:
continue
if epc_searcher.newest_epc is None:
raise ValueError(
"No EPCs found for this property and did not estimate - likely need to provide a"
"property type and built form"
)
if is_new:
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
# Otherwise, we use the newest EPC
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
# has been publically lodged
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
epc_searcher, energy_assessment
)
req_data = extract_property_request_data(
config=config,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=epc_searcher.uprn,
)
# Pull this out as it may get overwritten
property_non_invasive_recommendations = req_data.non_invasive_recommendations
patch = req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
epc_page_source = {}
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
logger.info("Retrieving find my epc data")
try:
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
except Exception as e:
logger.error(f"Failed to retrieve without cleaning address {e}")
for k in ["address", "address1"]:
epc_searcher.newest_epc[k] = epc_searcher.address_clean
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
# If we have a property type, this means when we pull the epc data, we might need to make a patch
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data,
)
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
# Final step - extract inspections data, if we have it - we inject into property for usage
property_inspections = db_funcs.inspections_functions.extract_inspection_data(config)
if property_inspections:
inspections_map[property_id] = property_inspections
input_properties.append(
Property(
id=property_id,
is_new=is_new,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=req_data.already_installed + eco_packages[property_id][3],
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
inspections=inspections_map.get(property_id),
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)
# If we have:
# 1) No EPC API data
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
if not input_properties:
return Response(status_code=204)
# We check if we have inspections data and store it in the database if so. We'll update or create
# aginst each property if
if inspections_map:
logger.info("Inserting inspections data")
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
# Set up model api and warm up the lambdas
model_api = ModelApi(
portfolio_id=body.portfolio_id,
timestamp=created_at,
prediction_buckets=get_prediction_buckets(),
max_retries=1
)
await model_api.async_warm_up_lambdas(
model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES
)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = db_funcs.materials_functions.get_materials(session)
cleaned = get_cleaned()
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned)
kwh_preds = await model_api.async_paginated_predictions(
data=epcs_for_scoring,
bucket=get_settings().DATA_BUCKET,
model_prefixes=model_api.KWH_MODEL_PREFIXES,
extract_ids=False,
batch_size=SCORING_BATCH_SIZE
)
# Insert the spatial data
logger.info("Getting spatial data")
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
# extensions, since it doesn't seem to do a great job
logger.info("Performing solar analysis")
ofgem_consumption_averages = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET,
file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet"
)
building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data(
input_properties=input_properties,
ofgem_consumption_averages=ofgem_consumption_averages,
body=body
)
input_properties = GoogleSolarApi.building_solar_analysis(
building_solar_config=building_solar_config,
input_properties=input_properties,
session=session,
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
)
input_properties = GoogleSolarApi.unit_solar_analysis(
unit_solar_config=unit_solar_config,
input_properties=input_properties,
session=session,
body=body,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
inspections_map=inspections_map
)
# We also make a tweak - if the property has been flagged for solar but doesn't contain
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
logger.info("Identifying property recommendations")
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
# We set the ECO package data, if we have it
property_eco_package = eco_packages.get(p.id, (None, None, None))
if property_eco_package[0] is not None:
inclusions = property_eco_package[0]
exclusions = []
else:
inclusions = body.inclusions
exclusions = body.exclusions
recommender = Recommendations(
property_instance=p,
materials=materials,
exclusions=exclusions,
inclusions=inclusions,
default_u_values=body.default_u_values
)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
representative_recommendations[p.id] = property_representative_recommendations
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(
property_recommendations, property_representative_recommendations
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
)
all_predictions = await model_api.async_paginated_predictions(
data=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
batch_size=SCORING_BATCH_SIZE
)
# Insert the predictions into the recommendations, and get the impact summary
scoring_epcs = [] # For scoring the kwh models
for property_id in recommendations.keys():
property_instance = [p for p in input_properties if p.id == property_id][0]
recommendations_with_impact, impact_summary = (
Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations,
representative_recommendations=representative_recommendations
)
)
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
# at each phase
property_instance.update_simulation_epcs(impact_summary)
scoring_epcs.extend(property_instance.updated_simulation_epcs)
recommendations[property_id] = recommendations_with_impact
# We call the API with the scoring epcs
scoring_epcs = pd.DataFrame(scoring_epcs)
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
kwh_simulation_predictions = await model_api.async_paginated_predictions(
data=scoring_epcs,
bucket=get_settings().DATA_BUCKET,
model_prefixes=model_api.KWH_MODEL_PREFIXES,
batch_size=SCORING_BATCH_SIZE
)
# We now insert kwh estimates and costs into the recommendations
logger.info("Calculating tenant savings - kwh and bills")
for property_id in tqdm([p.id for p in input_properties]):
property_recommendations = recommendations.get(property_id, [])
property_instance = [p for p in input_properties if p.id == property_id][0]
property_current_energy_bill = (
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=kwh_simulation_predictions,
property_recommendations=property_recommendations,
ashp_cop=body.ashp_cop
)
)
property_instance.current_energy_bill = property_current_energy_bill
# Insert the predictions into the recommendations and run the optimiser
for p in input_properties:
if not recommendations.get(p.id):
continue
# we need to double unlist because we have a list of lists
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# its inclusion
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation
) and not p.has_ventilation
if not measures_to_optimise:
# Nothing to do, we just reshape the recommendations
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, set()
)
continue
fixed_gain = optimiser_functions.calculate_fixed_gain(
property_required_measures, recommendations, p, needs_ventilation
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
current_wall_u_value = p.walls["thermal_transmittance"]
if current_wall_u_value is None:
current_wall_u_value = get_wall_u_value(
clean_description=p.walls["clean_description"],
age_band=p.age_band,
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
)
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=int(p.data["current-energy-efficiency"]),
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
property_eco_packages=eco_packages.get(p.id)
)
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
solutions = optimise_with_funding_paths(
p=p,
input_measures=input_measures,
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding,
work_package=eco_packages[p.id][2]
)
# If the solution isn't eligible, we can't really consider it
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
optimiser = (
GainOptimiser(
input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
) if body.budget else CostOptimiser(input_measures, min_gain=gain)
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
recommendation_types = []
for measures in input_measures:
for measure in measures:
recommendation_types.append(measure["type"])
recommendation_types = set(recommendation_types)
has_wall_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
WALL_INSULATION_MEASURES
)
has_roof_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
selected = {r["id"] for r in solution}
if property_required_measures:
solution = optimiser_functions.add_required_measures(
property_id=p.id, property_required_measures=property_required_measures,
recommendations=recommendations, selected=selected,
)
# Add best practice measures (ventilation/trickle vents)
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# Final flattening
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, selected
)
# TODO: functionise
for measure in funded_measures:
if "+mechanical_ventilation" in measure["type"]:
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
p.insert_funding(
scheme=scheme,
funded_measures=funded_measures,
project_funding=project_funding,
total_uplift=total_uplift,
full_project_score=full_project_score,
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
# of them
# TODO: We can probably do better and optimise at the building level - this is temp
logger.info("Adjusting solar PV recommendations for buildings")
building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
for bid in building_ids:
# We check if any of them have solar PV
building = [p for p in input_properties if p.building_id == bid]
has_solar = False
for unit in building:
# Get default recommendations
has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
if has_solar:
break
if has_solar:
# We adjust the units within the building
for unit in building:
for rec in recommendations[unit.id]:
if rec["type"] == "solar_pv":
# This is straightforward, we just set the default to True, since when we're at a building
# level, we only allow 1 solar PV option for each unit. If we change this, this logic will
# need to be updated
rec["default"] = True
logger.info("Uploading recommendations to the database")
# If we have any work to do, we create a new scenario
if body.scenario_id:
# We don't need to create a new scenario, we just use the existing one
scenario_id = body.scenario_id
else:
engine_scenario = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"goal_value": body.goal_value,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
scenario_id = engine_scenario.id
property_valuation_increases = []
session.commit()
new_epc_bands = {}
property_value_increase_ranges = {}
for i in range(0, len(input_properties), BATCH_SIZE):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
new_epc_bands[p.id] = new_epc
total_cost = sum([r["total"] for r in default_recommendations])
valuations = PropertyValuation.estimate(
property_instance=p, target_epc=new_epc, total_cost=total_cost
)
property_value_increase_ranges[p.id] = valuations
# TODO - this is not right, especially if the existing run failed
if p.is_new:
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
db_funcs.property_functions.update_or_create_property_spatial_details(
session, p.uprn, p.spatial
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
db_funcs.property_functions.update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
new_plan_id = db_funcs.recommendations_functions.create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"scenario_id": scenario_id,
"is_default": True if p.is_new else False,
"name": body.scenario_name,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_upper_bound": (
valuations["upper_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
})
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
db_funcs.funding_functions.upload_funding(session, p, new_plan_id, recommendations_to_upload)
if valuations["current_value"] > 0:
property_valuation_increases.append(
valuations["average_increased_value"] - valuations["current_value"]
)
# Commit the session after each batch
session.commit()
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
logger.warning("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
total_valuation_increase = sum(property_valuation_increases)
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
# TODO - This code only pulls in the properties that have been updated in this run, but we need to
# aggregate all properties in the portfolio. We likely need to trigger a re-aggregation
aggregated_data = extract_portfolio_aggregation_data(
input_properties=input_properties,
total_valuation_increase=total_valuation_increase,
recommendations=recommendations,
new_epc_bands=new_epc_bands,
property_value_increase_ranges=property_value_increase_ranges
)
db_funcs.portfolio_functions.aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
scenario_id=scenario_id,
total_valuation_increase=total_valuation_increase,
labour_days=labour_days,
aggregated_data=aggregated_data
)
# Commit final changes
session.commit()
except IntegrityError as e:
return handle_error(session, "Database integrity error.", e, body.subtask_id, 500)
except OperationalError as e:
return handle_error(session, "Database operational error.", e, body.subtask_id, 500)
except ValueError as e:
return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400)
except Exception as e: # General exception handling
return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500)
finally:
session.close()
# Mark the subtask as successful
SubTaskInterface().update_subtask_status(subtask_id=UUID(body.subtask_id), status="failed")
logger.info("Model Engine completed successfully")
return Response(status_code=200)