mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1058 lines
46 KiB
Python
1058 lines
46 KiB
Python
import ast
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
from etl.epc.Record import EPCRecord
|
|
from backend.SearchEpc import SearchEpc
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from sqlalchemy.orm import sessionmaker
|
|
from starlette.responses import Response
|
|
|
|
from backend.app.config import get_settings, get_prediction_buckets
|
|
from backend.app.db.connection import db_engine
|
|
from backend.app.db.functions.materials_functions import get_materials
|
|
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
|
|
from backend.app.db.functions.property_functions import (
|
|
create_property, create_property_details_epc, create_property_targets, update_property_data,
|
|
update_or_create_property_spatial_details
|
|
)
|
|
from backend.app.db.functions.recommendations_functions import (
|
|
create_plan, upload_recommendations, create_scenario
|
|
)
|
|
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
|
|
from backend.app.db.models.portfolio import rating_lookup
|
|
from backend.app.plan.schemas import PlanTriggerRequest
|
|
from backend.app.plan.utils import get_cleaned
|
|
from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc
|
|
import backend.app.assumptions as assumptions
|
|
|
|
from backend.ml_models.api import ModelApi
|
|
from backend.Property import Property
|
|
from backend.Funding import Funding
|
|
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
|
|
|
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
|
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
|
from recommendations.optimiser.optimiser_functions import prepare_input_measures
|
|
from recommendations.Recommendations import Recommendations
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
|
|
from backend.ml_models.Valuation import PropertyValuation
|
|
|
|
from etl.bill_savings.KwhData import KwhData
|
|
from etl.spatial.OpenUprnClient import OpenUprnClient
|
|
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
|
|
|
logger = setup_logger()
|
|
|
|
BATCH_SIZE = 5
|
|
SCORING_BATCH_SIZE = 400
|
|
|
|
|
|
def patch_epc(patch, epc_records):
|
|
"""
|
|
This utility function is useful to patch the epc data if we have data from the customer
|
|
:return:
|
|
"""
|
|
|
|
for patch_variable, patch_value in patch.items():
|
|
|
|
if patch_variable in ["address", "postcode"]:
|
|
continue
|
|
|
|
if patch_value == "":
|
|
continue
|
|
if patch_variable in epc_records["original_epc"]:
|
|
epc_records["original_epc"][patch_variable] = patch_value
|
|
|
|
return epc_records
|
|
|
|
|
|
def extract_portfolio_aggregation_data(
|
|
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
|
|
):
|
|
# We aggregate a number of metrics for the portfolio:
|
|
# 1) A breakdown of the number of properties in each EPC band
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 2) Number of units
|
|
# 3) Co2/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 4) Energy bill/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 5) Average valuation improvement/unit
|
|
# 6) Total cost
|
|
# 7) Cost per unit
|
|
# 8) £ per CO2 saved
|
|
# 9) £ per SAP point
|
|
|
|
# We need to construct the underlyind data for this
|
|
|
|
# Helper function to reformat the EPC data
|
|
def reformat_epc_data(epc_counts):
|
|
# Define all possible EPC bands in the required order
|
|
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
|
|
|
|
# Create the formatted data list by checking each band in the order
|
|
formatted_data = []
|
|
for band in epc_bands:
|
|
# Get the count from the dictionary, defaulting to 0 if not present
|
|
count = epc_counts.get(band, 0)
|
|
# Append the formatted dictionary to the list
|
|
formatted_data.append({"name": band, band: count})
|
|
|
|
return formatted_data
|
|
|
|
n_units = len(input_properties)
|
|
|
|
agg_data = []
|
|
for p in input_properties:
|
|
# Get the recommendations for the property - we include all properties, even ones without recommendations
|
|
property_recommendations = recommendations.get(p.id, [])
|
|
|
|
# Get just the default recommendations
|
|
default_recommendations = [r for r in property_recommendations if r["default"]]
|
|
|
|
has_recommendations = len(default_recommendations) > 0
|
|
|
|
# We can now calculate multiple outputs based on default recommendations
|
|
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
|
|
|
|
pre_retrofit_co2 = p.energy["co2_emissions"]
|
|
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
|
|
|
|
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
|
|
post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum(
|
|
[r["energy_cost_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
pre_retrofit_energy_consumption = p.current_energy_consumption
|
|
post_retrofit_energy_consumption = p.current_energy_consumption - sum(
|
|
[r["kwh_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
# Add up energy savings
|
|
cost = sum([r["total"] for r in default_recommendations])
|
|
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
|
|
|
|
lower_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
upper_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
|
|
agg_data.append({
|
|
"pre_retrofit_epc": p.data["current-energy-rating"],
|
|
"post_retrofit_epc": new_epc_bands[p.id],
|
|
"pre_retrofit_co2": pre_retrofit_co2,
|
|
"post_retrofit_co2": post_retrofit_co2,
|
|
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
|
|
"post_retrofit_energy_bill": post_retrofit_energy_bill,
|
|
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
|
|
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
|
|
"cost": cost,
|
|
"sap_point_improvement": sap_point_improvement,
|
|
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
|
|
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
|
|
"has_recommendations": has_recommendations
|
|
})
|
|
|
|
agg_data = pd.DataFrame(agg_data)
|
|
|
|
n_units_to_retrofit = agg_data["has_recommendations"].sum()
|
|
|
|
valuation_improvement_lower_bound_per_unit = (
|
|
agg_data["lower_bound_valuation_uplift"].mean()
|
|
)
|
|
valuation_improvement_upper_bound_per_unit = (
|
|
agg_data["upper_bound_valuation_uplift"].mean()
|
|
)
|
|
|
|
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
|
|
total_sap_points = agg_data["sap_point_improvement"].sum()
|
|
|
|
def format_money(amount):
|
|
return f"£{amount:,.0f}"
|
|
|
|
valuation_improvment_per_unit = str(
|
|
format_money(
|
|
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
|
|
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
|
|
)
|
|
|
|
valuation_return_on_investment = str(
|
|
str(round(total_valuation_increase / agg_data["cost"].sum(), 2)) +
|
|
f" ("
|
|
f"{agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f} - "
|
|
f"{agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f})"
|
|
)
|
|
|
|
aggregation_data = {
|
|
"epc_breakdown_pre_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"epc_breakdown_post_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"number_of_properties": int(n_units),
|
|
"n_units_to_retrofit": int(n_units_to_retrofit),
|
|
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
|
|
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
|
|
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
|
|
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
|
|
"energy_consumption_per_unit_pre_retrofit": str(
|
|
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"energy_consumption_per_unit_post_retrofit": str(
|
|
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"valuation_improvement_per_unit": valuation_improvment_per_unit,
|
|
"cost_per_unit": format_money(agg_data["cost"].mean()),
|
|
"cost_per_co2_saved": format_money(agg_data["cost"].sum() / total_carbon_saved),
|
|
"cost_per_sap_point": format_money(agg_data["cost"].sum() / total_sap_points),
|
|
"valuation_return_on_investment": valuation_return_on_investment,
|
|
# TODO: Could we add 10yr carbon credits value?
|
|
}
|
|
|
|
return aggregation_data
|
|
|
|
|
|
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
|
|
"""
|
|
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
|
|
and will factor in an energy assessment that we have performed for a client.
|
|
:param epc_searcher: An instance of the SearchEpc class
|
|
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
|
|
this should be an empty response as defined by the models's
|
|
EnergyAssessment.empty_response() method
|
|
"""
|
|
|
|
newest_epc = epc_searcher.newest_epc.copy()
|
|
if newest_epc["uprn"] == "" and epc_searcher.uprn:
|
|
newest_epc["uprn"] = epc_searcher.uprn
|
|
|
|
if not energy_assessment["epc"]:
|
|
energy_assessment_is_newer = False
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy(),
|
|
}, energy_assessment_is_newer
|
|
|
|
epc = energy_assessment["epc"]
|
|
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
|
|
|
|
# We insert county into the epc, since right now this isn't something that we pull out from the energy
|
|
# assessment
|
|
for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]:
|
|
epc[col] = newest_epc[col]
|
|
|
|
# We check if the energy assessment is newer than the newest EPC
|
|
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]):
|
|
# In this case, our energy assessment is newer than the EPCs available for this property
|
|
energy_assessment_is_newer = True
|
|
return {
|
|
"original_epc": epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy() + [newest_epc]
|
|
}, energy_assessment_is_newer
|
|
|
|
# We check if the EPC we have produced is contained in the set of EPCs done for the property
|
|
# We do this based on inspection-date and SAP
|
|
epc_in_historicals = [
|
|
x for x in epc_searcher.older_epcs + [newest_epc]
|
|
if x["inspection-date"] == energy_assessment_date and
|
|
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
|
|
]
|
|
energy_assessment_is_newer = False
|
|
|
|
if epc_in_historicals:
|
|
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
|
|
return {
|
|
"original_epc": newest_epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy()
|
|
}, energy_assessment_is_newer
|
|
|
|
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
|
|
# the historicals, so it can't have been lodged, so we include it in the old data
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy() + [epc],
|
|
}, energy_assessment_is_newer
|
|
|
|
|
|
def get_request_property_data(body: PlanTriggerRequest):
|
|
"""
|
|
This function will read in the on-site data from the S3 bucket
|
|
:param body: The request body
|
|
:return:
|
|
"""
|
|
patches = []
|
|
if body.patches_file_path:
|
|
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
|
|
|
|
already_installed = []
|
|
if body.already_installed_file_path:
|
|
already_installed = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
|
|
)
|
|
|
|
non_invasive_recommendations = []
|
|
if body.non_invasive_recommendations_file_path:
|
|
non_invasive_recommendations = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
|
|
)
|
|
|
|
valuation_data = []
|
|
if body.valuation_file_path:
|
|
valuation_data = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path
|
|
)
|
|
|
|
return patches, already_installed, non_invasive_recommendations, valuation_data
|
|
|
|
|
|
def extract_property_request_data(
|
|
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
|
|
):
|
|
patch_has_uprn = "uprn" in patches[0] if patches else True
|
|
if patch_has_uprn:
|
|
patch = next((
|
|
x for x in patches if str(x["uprn"]) == str(config["uprn"])
|
|
), {})
|
|
else:
|
|
patch = next((
|
|
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
property_already_installed = next((
|
|
x for x in already_installed if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
|
|
# we need to check existence of uprn
|
|
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
|
|
if has_uprn:
|
|
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
|
|
|
|
if has_uprn:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(str(x["uprn"]) == str(uprn))
|
|
), {})
|
|
|
|
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
|
|
else:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
|
|
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
|
|
property_non_invasive_recommendations["recommendations"]
|
|
)
|
|
transformed = []
|
|
for rec in property_non_invasive_recommendations["recommendations"]:
|
|
if isinstance(rec, str):
|
|
transformed.append({"type": rec, })
|
|
else:
|
|
transformed.append(rec)
|
|
|
|
property_non_invasive_recommendations["recommendations"] = transformed
|
|
|
|
# Check if the valuation data has uprn
|
|
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
|
|
if valuation_has_uprn:
|
|
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
|
|
|
|
if valuation_has_uprn:
|
|
property_valution = next((
|
|
float(x["valuation"]) for x in valuation_data if
|
|
(str(x["uprn"]) == str(uprn))
|
|
), None)
|
|
else:
|
|
property_valution = next((
|
|
float(x["valuation"]) for x in valuation_data if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), None)
|
|
|
|
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
|
|
|
|
|
|
def get_funding_data():
|
|
"""
|
|
This function retrieves the eco project scores matrix and the warm homes local grant funding data
|
|
:return:
|
|
"""
|
|
project_scores_matrix = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/ECO4 Full Project Scores Matrix.csv",
|
|
)
|
|
project_scores_matrix = pd.DataFrame(project_scores_matrix)
|
|
project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
|
|
project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
|
|
|
|
whlg_eligible_postcodes = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/whlg eligible postcodes.csv",
|
|
)
|
|
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
|
|
|
|
return project_scores_matrix, whlg_eligible_postcodes
|
|
|
|
|
|
async def model_engine(body: PlanTriggerRequest):
|
|
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
|
|
|
|
logger.info("Connecting to db")
|
|
session = sessionmaker(bind=db_engine)()
|
|
created_at = datetime.now().isoformat()
|
|
|
|
# TODO: if the measure is already installed, it should actually be the very first phase
|
|
|
|
try:
|
|
session.begin()
|
|
logger.info("Getting the inputs")
|
|
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
|
|
# Check for duplicate UPRNS
|
|
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
|
|
|
|
if input_uprns:
|
|
# Check for dupes
|
|
if len(input_uprns) != len(set(input_uprns)):
|
|
raise ValueError("Duplicate UPRNs in the input data")
|
|
|
|
# If we have patches or overrides, we should read them in here
|
|
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
|
|
|
|
cleaning_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
|
)
|
|
|
|
input_properties = []
|
|
for config in tqdm(plan_input):
|
|
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
|
uprn = config.get("uprn", None)
|
|
if uprn:
|
|
uprn = int(float(uprn))
|
|
|
|
epc_searcher = SearchEpc(
|
|
address1=config["address"],
|
|
postcode=config["postcode"],
|
|
uprn=uprn,
|
|
auth_token=get_settings().EPC_AUTH_TOKEN,
|
|
os_api_key="",
|
|
)
|
|
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
|
|
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
|
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
|
epc_searcher.find_property(skip_os=True)
|
|
|
|
# We check for an energy assessment we have performed on this property:
|
|
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
|
|
|
|
# Create a record in db
|
|
property_id, is_new = create_property(
|
|
session=session,
|
|
portfolio_id=body.portfolio_id,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
uprn=epc_searcher.uprn,
|
|
energy_assessment=energy_assessment
|
|
)
|
|
if not is_new and not body.multi_plan:
|
|
continue
|
|
|
|
if epc_searcher.newest_epc is None:
|
|
raise ValueError(
|
|
"No EPCs found for this property and did not estimate - likely need to provide a"
|
|
"property type and built form"
|
|
)
|
|
|
|
if is_new:
|
|
create_property_targets(
|
|
session,
|
|
property_id=property_id,
|
|
portfolio_id=body.portfolio_id,
|
|
epc_target=body.goal_value,
|
|
heat_demand_target=None
|
|
)
|
|
|
|
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
|
|
# Otherwise, we use the newest EPC
|
|
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
|
|
# has been publically lodged
|
|
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
|
|
epc_searcher, energy_assessment
|
|
)
|
|
|
|
patch, property_already_installed, property_non_invasive_recommendations, property_valuation = (
|
|
extract_property_request_data(
|
|
config=config,
|
|
patches=patches,
|
|
already_installed=already_installed,
|
|
non_invasive_recommendations=non_invasive_recommendations,
|
|
valuation_data=valuation_data,
|
|
uprn=epc_searcher.uprn,
|
|
)
|
|
)
|
|
|
|
# if we have a remote assment data type, we pull the additional data and include it
|
|
if body.event_type == "remote_assessment":
|
|
logger.info("Retrieving find my epc data")
|
|
try:
|
|
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
|
|
epc_searcher.newest_epc
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve without cleaning address {e}")
|
|
for k in ["address", "address1"]:
|
|
epc_searcher.newest_epc[k] = epc_searcher.address_clean
|
|
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
|
|
epc_searcher.newest_epc
|
|
)
|
|
|
|
# If we have a property type, this means when we pull the epc data, we might need to make a patch
|
|
|
|
epc_records = patch_epc(patch, epc_records)
|
|
|
|
prepared_epc = EPCRecord(
|
|
epc_records=epc_records,
|
|
run_mode="newdata",
|
|
cleaning_data=cleaning_data
|
|
)
|
|
|
|
input_properties.append(
|
|
Property(
|
|
id=property_id,
|
|
is_new=is_new,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
epc_record=prepared_epc,
|
|
already_installed=property_already_installed,
|
|
property_valuation=property_valuation,
|
|
non_invasive_recommendations=property_non_invasive_recommendations,
|
|
energy_assessment=energy_assessment,
|
|
**Property.extract_kwargs(config), # TODO: Depraecate this
|
|
)
|
|
)
|
|
|
|
if not input_properties:
|
|
return Response(status_code=204)
|
|
|
|
# Set up model api and warm up the lambdas
|
|
model_api = ModelApi(
|
|
portfolio_id=body.portfolio_id,
|
|
timestamp=created_at,
|
|
prediction_buckets=get_prediction_buckets(),
|
|
max_retries=1
|
|
)
|
|
await model_api.async_warm_up_lambdas(
|
|
model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES
|
|
)
|
|
|
|
# The materials data could be cached or local so we don't need to make
|
|
# consistent requests to the backend for
|
|
# the same data
|
|
logger.info("Reading in materials and cleaned datasets")
|
|
materials = get_materials(session)
|
|
cleaned = get_cleaned()
|
|
eco_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
|
|
|
|
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
|
|
|
|
epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned)
|
|
|
|
kwh_preds = await model_api.async_paginated_predictions(
|
|
data=epcs_for_scoring,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
extract_ids=False,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the spatial data
|
|
logger.info("Getting spatial data")
|
|
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
|
|
|
|
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
|
|
|
|
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
|
|
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
|
|
# extensions, since it doesn't seem to do a great job
|
|
|
|
logger.info("Performing solar analysis")
|
|
|
|
ofgem_consumption_averages = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet"
|
|
)
|
|
|
|
building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data(
|
|
input_properties=input_properties,
|
|
ofgem_consumption_averages=ofgem_consumption_averages,
|
|
body=body
|
|
)
|
|
|
|
input_properties = GoogleSolarApi.building_solar_analysis(
|
|
building_solar_config=building_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY
|
|
)
|
|
|
|
input_properties = GoogleSolarApi.unit_solar_analysis(
|
|
unit_solar_config=unit_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
body=body,
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY
|
|
)
|
|
|
|
logger.info("Identifying property recommendations")
|
|
recommendations = {}
|
|
recommendations_scoring_data = []
|
|
representative_recommendations = {}
|
|
for p in tqdm(input_properties):
|
|
recommender = Recommendations(
|
|
property_instance=p,
|
|
materials=materials,
|
|
exclusions=body.exclusions,
|
|
inclusions=body.inclusions,
|
|
default_u_values=body.default_u_values
|
|
)
|
|
property_recommendations, property_representative_recommendations = recommender.recommend()
|
|
|
|
if not property_recommendations:
|
|
continue
|
|
|
|
recommendations[p.id] = property_recommendations
|
|
representative_recommendations[p.id] = property_representative_recommendations
|
|
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
p.adjust_difference_record_with_recommendations(
|
|
property_recommendations, property_representative_recommendations
|
|
)
|
|
|
|
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
|
|
|
# TODO: Make sure that number_habitable_rooms has been dropped
|
|
logger.info("Preparing data for scoring in sap change api")
|
|
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
|
|
|
recommendations_scoring_data = recommendations_scoring_data.drop(
|
|
columns=[
|
|
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
|
|
"carbon_ending"
|
|
]
|
|
)
|
|
|
|
all_predictions = await model_api.async_paginated_predictions(
|
|
data=recommendations_scoring_data,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the predictions into the recommendations, and get the impact summary
|
|
scoring_epcs = [] # For scoring the kwh models
|
|
for property_id in recommendations.keys():
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
recommendations_with_impact, impact_summary = (
|
|
Recommendations.calculate_recommendation_impact(
|
|
property_instance=property_instance,
|
|
all_predictions=all_predictions,
|
|
recommendations=recommendations,
|
|
representative_recommendations=representative_recommendations
|
|
)
|
|
)
|
|
|
|
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
|
|
# at each phase
|
|
property_instance.update_simulation_epcs(impact_summary)
|
|
scoring_epcs.extend(property_instance.updated_simulation_epcs)
|
|
recommendations[property_id] = recommendations_with_impact
|
|
|
|
# We call the API with the scoring epcs
|
|
scoring_epcs = pd.DataFrame(scoring_epcs)
|
|
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
|
|
|
|
kwh_simulation_predictions = await model_api.async_paginated_predictions(
|
|
data=scoring_epcs,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# We now insert kwh estimates and costs into the recommendations
|
|
logger.info("Calculating tenant savings - kwh and bills")
|
|
for property_id in tqdm([p.id for p in input_properties]):
|
|
property_recommendations = recommendations.get(property_id, [])
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
property_current_energy_bill = (
|
|
Recommendations.calculate_recommendation_tenant_savings(
|
|
property_instance=property_instance,
|
|
kwh_simulation_predictions=kwh_simulation_predictions,
|
|
property_recommendations=property_recommendations,
|
|
ashp_cop=body.ashp_cop
|
|
)
|
|
)
|
|
property_instance.current_energy_bill = property_current_energy_bill
|
|
|
|
# Insert the predictions into the recommendations and run the optimiser
|
|
for p in input_properties:
|
|
if not recommendations.get(p.id):
|
|
continue
|
|
|
|
# we need to double unlist because we have a list of lists
|
|
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
|
|
|
|
property_required_measures = [
|
|
m for m in recommendations[p.id] if m[0]["type"] in body.required_measures
|
|
]
|
|
measures_to_optimise = [
|
|
m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures
|
|
]
|
|
|
|
# If we have a wall insulation measure, we MUST include mechanical ventilation
|
|
# Additionally, if we have required measures, they should also be included. Therefore
|
|
# we can discount the number of points required to get to the target SAP band (or increase)
|
|
# in the case of ventilation
|
|
needs_ventilation = any(x in property_measure_types for x in assumptions.measures_needing_ventilation)
|
|
|
|
input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation)
|
|
|
|
if not input_measures[0]:
|
|
# This means that we have no defaults
|
|
selected_recommendations = {}
|
|
solution = []
|
|
else:
|
|
|
|
fixed_gain = 0
|
|
if property_required_measures:
|
|
# We get the SAP points for the required measures
|
|
if body.goal != "Increasing EPC":
|
|
raise NotImplementedError("Only EPC optimisation is currently supported")
|
|
sap_by_type = [
|
|
{"type": rec["type"], "sap_points": rec["sap_points"]} for recs in property_required_measures
|
|
for rec in recs
|
|
]
|
|
# We get a MAX sap points per type
|
|
max_per_type = (
|
|
pd.DataFrame(sap_by_type).groupby("type")["sap_points"].max().to_dict()
|
|
)
|
|
fixed_gain = sum(max_per_type.values())
|
|
|
|
property_required_measure_types = {rec["type"] for rec in sap_by_type}
|
|
|
|
# if the property needs ventilation, but the measure we optimise didn't include
|
|
# venilation we add the points for ventilation as a fixed gain
|
|
if needs_ventilation and any(
|
|
r in property_required_measure_types for r in assumptions.measures_needing_ventilation
|
|
):
|
|
fixed_gain += next(
|
|
(r[0]["sap_points"] for r in recommendations[p.id] if
|
|
r[0]["type"] == "mechanical_ventilation"),
|
|
0
|
|
)
|
|
|
|
current_sap_points = int(p.data["current-energy-efficiency"])
|
|
|
|
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(
|
|
epc_to_sap_lower_bound(body.goal_value) - current_sap_points
|
|
) - fixed_gain
|
|
|
|
if not body.optimise:
|
|
if body.goal != "Increasing EPC":
|
|
raise NotImplementedError("Only EPC optimisation is currently supported")
|
|
solution = []
|
|
for sub_list in input_measures:
|
|
# Select the entry with the highest gain, and if tied, choose the one with the lowest cost
|
|
best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost']))
|
|
solution.append(best_measure)
|
|
else:
|
|
|
|
if body.budget:
|
|
optimiser = GainOptimiser(
|
|
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
|
|
)
|
|
else:
|
|
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
|
|
# If the gain is negative, the optimiser will return an empty solution
|
|
optimiser = CostOptimiser(
|
|
input_measures,
|
|
min_gain=sap_gain
|
|
)
|
|
|
|
optimiser.setup()
|
|
optimiser.solve()
|
|
solution = optimiser.solution
|
|
|
|
selected_recommendations = {r["id"] for r in solution}
|
|
|
|
if property_required_measures:
|
|
# We select the cheapest of the required measures, into selected
|
|
for recs in property_required_measures:
|
|
# We select the cheapest of the required measures
|
|
cost_to_id = {
|
|
rec["recommendation_id"]: rec["total"] for rec in recs
|
|
if rec["recommendation_id"] not in selected_recommendations
|
|
}
|
|
# Take the recommendation id with the lowers cost
|
|
|
|
selected_recommendations.add(min(cost_to_id, key=cost_to_id.get))
|
|
# Update the solution with the selected recommendaitons
|
|
solution = []
|
|
for recs in recommendations[p.id]:
|
|
for rec in recs:
|
|
if rec["recommendation_id"] in selected_recommendations:
|
|
solution.append(
|
|
{
|
|
"id": rec["recommendation_id"],
|
|
"cost": rec["total"],
|
|
"gain": rec["sap_points"],
|
|
"type": rec["type"]
|
|
}
|
|
)
|
|
|
|
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
|
|
if any(x in [r["type"] for r in solution] for x in assumptions.measures_needing_ventilation):
|
|
ventilation_rec = next(
|
|
(r[0] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"),
|
|
None
|
|
)
|
|
|
|
# If a matching recommendation was found, add its ID to the selected recommendations
|
|
if ventilation_rec:
|
|
selected_recommendations.add(ventilation_rec["recommendation_id"])
|
|
|
|
# If we have a trickle vents recommendation, we also switch it on. We don't just check the solution
|
|
trickle_vents_rec = next(
|
|
(r[0] for r in recommendations[p.id] if r[0]["type"] == "trickle_vents"),
|
|
None
|
|
)
|
|
# If a matching recommendation was found, add its ID to the selected recommendations
|
|
if trickle_vents_rec:
|
|
selected_recommendations.add(trickle_vents_rec["recommendation_id"])
|
|
|
|
# We'll use the set of selected recommendations to filter the recommendations to upload
|
|
final_recommendations = [
|
|
[
|
|
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
|
|
for rec in recommendations_by_type
|
|
]
|
|
for recommendations_by_type in recommendations[p.id]
|
|
]
|
|
|
|
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
|
|
recommendations[p.id] = [
|
|
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
|
|
]
|
|
|
|
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
|
|
# of them
|
|
# TODO: We can probably do better and optimise at the building level - this is temp
|
|
logger.info("Adjusting solar PV recommendations for buildings")
|
|
building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
|
|
|
|
for bid in building_ids:
|
|
# We check if any of them have solar PV
|
|
building = [p for p in input_properties if p.building_id == bid]
|
|
has_solar = False
|
|
for unit in building:
|
|
# Get default recommendations
|
|
has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
|
|
if has_solar:
|
|
break
|
|
|
|
if has_solar:
|
|
# We adjust the units within the building
|
|
for unit in building:
|
|
for rec in recommendations[unit.id]:
|
|
if rec["type"] == "solar_pv":
|
|
# This is straightforward, we just set the default to True, since when we're at a building
|
|
# level, we only allow 1 solar PV option for each unit. If we change this, this logic will
|
|
# need to be updated
|
|
rec["default"] = True
|
|
|
|
# ~~~~~~~~~~~~~~~~
|
|
# Funding
|
|
# ~~~~~~~~~~~~~~~~
|
|
|
|
# for p in input_properties:
|
|
# funding_calulator = Funding(
|
|
# tenure=body.housing_type,
|
|
# starting_epc=p.data["current-energy-rating"],
|
|
# starting_sap=int(p.data["current-energy-efficiency"]),
|
|
# postcode=p.postcode,
|
|
# floor_area=p.floor_area,
|
|
# council_tax_band=None, # This is seemingly always None at the moment
|
|
# property_recommendations=recommendations[p.id],
|
|
# project_scores_matrix=eco_project_scores_matrix,
|
|
# whlg_eligible_postcodes=whlg_eligible_postcodes,
|
|
# gbis_abs_rate=15,
|
|
# eco4_abs_rate=15,
|
|
# )
|
|
# funding_calulator.check_eligibiltiy()
|
|
# # Insert finding
|
|
# p.insert_funding(funding_calulator)
|
|
|
|
logger.info("Uploading recommendations to the database")
|
|
# If we have any work to do, we create a new scenario
|
|
engine_scenario = create_scenario(
|
|
session=session,
|
|
scenario={
|
|
"name": body.scenario_name,
|
|
"created_at": created_at,
|
|
"budget": body.budget,
|
|
"portfolio_id": body.portfolio_id,
|
|
"housing_type": body.housing_type,
|
|
"goal": body.goal,
|
|
"trigger_file_path": body.trigger_file_path,
|
|
"already_installed_file_path": body.already_installed_file_path,
|
|
"patches_file_path": body.patches_file_path,
|
|
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
|
|
"exclusions": body.exclusions,
|
|
"multi_plan": body.multi_plan
|
|
}
|
|
)
|
|
|
|
property_valuation_increases = []
|
|
session.commit()
|
|
new_epc_bands = {}
|
|
property_value_increase_ranges = {}
|
|
for i in range(0, len(input_properties), BATCH_SIZE):
|
|
try:
|
|
# Take a slice of the input_properties list to make a batch
|
|
batch_properties = input_properties[i:i + BATCH_SIZE]
|
|
|
|
for p in batch_properties:
|
|
recommendations_to_upload = recommendations.get(p.id, [])
|
|
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
|
|
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
|
|
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
|
|
new_epc = sap_to_epc(new_sap_points)
|
|
new_epc_bands[p.id] = new_epc
|
|
|
|
total_cost = sum([r["total"] for r in default_recommendations])
|
|
|
|
valuations = PropertyValuation.estimate(
|
|
property_instance=p, target_epc=new_epc, total_cost=total_cost
|
|
)
|
|
property_value_increase_ranges[p.id] = valuations
|
|
|
|
if p.is_new:
|
|
property_details_epc = p.get_property_details_epc(
|
|
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
|
|
)
|
|
create_property_details_epc(session, property_details_epc)
|
|
|
|
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
|
|
|
|
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
|
|
|
|
update_property_data(
|
|
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
|
|
)
|
|
|
|
if not recommendations_to_upload:
|
|
continue
|
|
|
|
new_plan_id = create_plan(session, {
|
|
"portfolio_id": body.portfolio_id,
|
|
"property_id": p.id,
|
|
"scenario_id": engine_scenario.id,
|
|
"is_default": True if p.is_new else False,
|
|
"name": body.scenario_name,
|
|
"valuation_increase_lower_bound": (
|
|
valuations["lower_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_upper_bound": (
|
|
valuations["upper_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_average": (
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
),
|
|
})
|
|
|
|
upload_recommendations(
|
|
session, recommendations_to_upload, p.id, new_plan_id
|
|
)
|
|
|
|
property_valuation_increases.append(
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
)
|
|
|
|
# Commit the session after each batch
|
|
session.commit()
|
|
|
|
except Exception as e:
|
|
# Rollback the session if an error occurs
|
|
session.rollback()
|
|
print("Failed i = %s" % str(i))
|
|
logger.error(f"An error occurred during batch starting at index {i}: {e}")
|
|
|
|
logger.info("Creating portfolio aggregations")
|
|
# We implement this in the simplest way possible which will be just to query the database for all
|
|
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
|
|
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
|
|
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
|
|
# the portfolion level impact
|
|
|
|
total_valuation_increase = sum(property_valuation_increases)
|
|
labour_days = round(max(
|
|
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
|
|
))
|
|
|
|
aggregated_data = extract_portfolio_aggregation_data(
|
|
input_properties=input_properties,
|
|
total_valuation_increase=total_valuation_increase,
|
|
recommendations=recommendations,
|
|
new_epc_bands=new_epc_bands,
|
|
property_value_increase_ranges=property_value_increase_ranges
|
|
)
|
|
|
|
aggregate_portfolio_recommendations(
|
|
session,
|
|
portfolio_id=body.portfolio_id,
|
|
scenario_id=engine_scenario.id,
|
|
total_valuation_increase=total_valuation_increase,
|
|
labour_days=labour_days,
|
|
aggregated_data=aggregated_data
|
|
)
|
|
|
|
# Commit final changes
|
|
session.commit()
|
|
|
|
except IntegrityError:
|
|
logger.error("Database integrity error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database integrity error.")
|
|
except OperationalError:
|
|
logger.error("Database operational error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database operational error.")
|
|
except ValueError:
|
|
logger.error("Value error - possibly due to malformed data", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=400, content="Bad request: malformed data.")
|
|
except Exception as e: # General exception handling
|
|
logger.error(f"An error occurred: {e}")
|
|
session.rollback()
|
|
return Response(status_code=500, content="An unexpected error occurred.")
|
|
finally:
|
|
session.close()
|
|
|
|
logger.info("Model Engine completed successfully")
|
|
|
|
return Response(status_code=200)
|