mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1480 lines
67 KiB
Python
1480 lines
67 KiB
Python
import time
|
|
import json
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
import numpy as np
|
|
from uuid import UUID
|
|
|
|
from backend.Funding import Funding
|
|
from backend.SearchEpc import SearchEpc
|
|
from contextlib import contextmanager
|
|
from sqlmodel import Session
|
|
|
|
from etl.epc.Record import EPCRecord
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from starlette.responses import Response
|
|
from backend.app.BatterySapScorer import BatterySAPScorer
|
|
|
|
from backend.app.config import get_settings, get_prediction_buckets
|
|
from backend.app.db.connection import db_engine
|
|
import backend.app.db.functions as db_funcs
|
|
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
|
|
|
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
|
|
from backend.app.plan.utils import (
|
|
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url
|
|
)
|
|
from backend.app.utils import sap_to_epc
|
|
import backend.app.assumptions as assumptions
|
|
|
|
from backend.ml_models.api import ModelApi
|
|
from backend.Property import Property
|
|
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
|
from backend.addresses.Addresses import Addresses
|
|
|
|
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
|
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
|
import recommendations.optimiser.optimiser_functions as optimiser_functions
|
|
from recommendations.Recommendations import Recommendations
|
|
from backend.ml_models.Valuation import PropertyValuation
|
|
|
|
from etl.bill_savings.KwhData import KwhData
|
|
from etl.spatial.OpenUprnClient import OpenUprnClient
|
|
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
|
|
|
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
|
|
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
|
|
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
|
|
|
|
logger = setup_logger()
|
|
|
|
BATCH_SIZE = 5
|
|
SCORING_BATCH_SIZE = 300
|
|
|
|
|
|
def extract_portfolio_aggregation_data(
|
|
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
|
|
):
|
|
# We aggregate a number of metrics for the portfolio:
|
|
# 1) A breakdown of the number of properties in each EPC band
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 2) Number of units
|
|
# 3) Co2/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 4) Energy bill/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 5) Average valuation improvement/unit
|
|
# 6) Total cost
|
|
# 7) Cost per unit
|
|
# 8) £ per CO2 saved
|
|
# 9) £ per SAP point
|
|
|
|
# We need to construct the underlyind data for this
|
|
|
|
# Helper function to reformat the EPC data
|
|
def reformat_epc_data(epc_counts):
|
|
# Define all possible EPC bands in the required order
|
|
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
|
|
|
|
# Create the formatted data list by checking each band in the order
|
|
formatted_data = []
|
|
for band in epc_bands:
|
|
# Get the count from the dictionary, defaulting to 0 if not present
|
|
count = epc_counts.get(band, 0)
|
|
# Append the formatted dictionary to the list
|
|
formatted_data.append({"name": band, band: count})
|
|
|
|
return formatted_data
|
|
|
|
n_units = len(input_properties)
|
|
|
|
agg_data = []
|
|
for p in input_properties:
|
|
# Get the recommendations for the property - we include all properties, even ones without recommendations
|
|
property_recommendations = recommendations.get(p.id, [])
|
|
|
|
# Get just the default recommendations
|
|
default_recommendations = [r for r in property_recommendations if r["default"]]
|
|
|
|
has_recommendations = len(default_recommendations) > 0
|
|
|
|
# We can now calculate multiple outputs based on default recommendations
|
|
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
|
|
|
|
pre_retrofit_co2 = p.energy["co2_emissions"]
|
|
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
|
|
|
|
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
|
|
post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum(
|
|
[r["energy_cost_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
pre_retrofit_energy_consumption = p.current_energy_consumption
|
|
post_retrofit_energy_consumption = p.current_energy_consumption - sum(
|
|
[r["kwh_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
# Add up energy savings
|
|
cost = sum([r["total"] for r in default_recommendations])
|
|
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
|
|
|
|
if not pd.isnull(property_value_increase_ranges[p.id]["current_value"]):
|
|
lower_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
upper_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
else:
|
|
lower_bound_valuation_uplift, upper_bound_valuation_uplift = 0, 0
|
|
|
|
agg_data.append({
|
|
"pre_retrofit_epc": p.data["current-energy-rating"],
|
|
"post_retrofit_epc": new_epc_bands[p.id],
|
|
"pre_retrofit_co2": pre_retrofit_co2,
|
|
"post_retrofit_co2": post_retrofit_co2,
|
|
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
|
|
"post_retrofit_energy_bill": post_retrofit_energy_bill,
|
|
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
|
|
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
|
|
"cost": cost,
|
|
"sap_point_improvement": sap_point_improvement,
|
|
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
|
|
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
|
|
"has_recommendations": has_recommendations,
|
|
"funding": float(p.project_funding) if p.project_funding is not None else 0,
|
|
"contingency": float(sum([x.get("contingency", 0) for x in default_recommendations]))
|
|
})
|
|
|
|
agg_data = pd.DataFrame(agg_data)
|
|
|
|
n_units_to_retrofit = agg_data["has_recommendations"].sum()
|
|
|
|
valuation_improvement_lower_bound_per_unit = (
|
|
agg_data["lower_bound_valuation_uplift"].mean()
|
|
)
|
|
valuation_improvement_upper_bound_per_unit = (
|
|
agg_data["upper_bound_valuation_uplift"].mean()
|
|
)
|
|
|
|
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
|
|
total_sap_points = agg_data["sap_point_improvement"].sum()
|
|
|
|
def format_money(amount):
|
|
return f"£{amount:,.0f}"
|
|
|
|
valuation_improvment_per_unit = str(
|
|
format_money(
|
|
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
|
|
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
|
|
)
|
|
|
|
if agg_data["cost"].sum() == 0:
|
|
valuation_percentage_increase = 0
|
|
valuation_increase_lower = 0
|
|
valuation_increase_upper = 0
|
|
else:
|
|
valuation_percentage_increase = round(total_valuation_increase / agg_data["cost"].sum(), 2)
|
|
valuation_increase_lower = agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
|
|
valuation_increase_upper = agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
|
|
|
|
valuation_return_on_investment = str(
|
|
str(valuation_percentage_increase) +
|
|
f" ("
|
|
f"{valuation_increase_lower:,.2f} - "
|
|
f"{valuation_increase_upper:,.2f})"
|
|
)
|
|
|
|
cost_per_co2_saved = agg_data["cost"].sum() / total_carbon_saved if total_carbon_saved > 0 else 0
|
|
cost_per_co2_saved = format_money(cost_per_co2_saved)
|
|
|
|
cost_per_sap_point = agg_data["cost"].sum() / total_sap_points if total_sap_points > 0 else 0
|
|
cost_per_sap_point = format_money(cost_per_sap_point)
|
|
|
|
total_funding = agg_data["funding"].sum()
|
|
total_contingency = agg_data["contingency"].sum()
|
|
|
|
aggregation_data = {
|
|
"epc_breakdown_pre_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"epc_breakdown_post_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"number_of_properties": int(n_units),
|
|
"n_units_to_retrofit": int(n_units_to_retrofit),
|
|
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
|
|
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
|
|
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
|
|
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
|
|
"energy_consumption_per_unit_pre_retrofit": str(
|
|
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"energy_consumption_per_unit_post_retrofit": str(
|
|
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"valuation_improvement_per_unit": valuation_improvment_per_unit,
|
|
"cost_per_unit": format_money(agg_data["cost"].mean()),
|
|
"cost_per_co2_saved": cost_per_co2_saved,
|
|
"cost_per_sap_point": cost_per_sap_point,
|
|
"valuation_return_on_investment": valuation_return_on_investment,
|
|
"funding": float(total_funding),
|
|
"contingency": float(total_contingency)
|
|
}
|
|
|
|
return aggregation_data
|
|
|
|
|
|
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
|
|
"""
|
|
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
|
|
and will factor in an energy assessment that we have performed for a client.
|
|
:param epc_searcher: An instance of the SearchEpc class
|
|
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
|
|
this should be an empty response as defined by the models's
|
|
EnergyAssessment.empty_response() method
|
|
"""
|
|
|
|
newest_epc = epc_searcher.newest_epc.copy()
|
|
if newest_epc["uprn"] == "" and epc_searcher.uprn:
|
|
newest_epc["uprn"] = epc_searcher.uprn
|
|
|
|
if not energy_assessment["epc"]:
|
|
energy_assessment_is_newer = False
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy(),
|
|
}, energy_assessment_is_newer
|
|
|
|
epc = energy_assessment["epc"]
|
|
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
|
|
|
|
# We insert county into the epc, since right now this isn't something that we pull out from the energy
|
|
# assessment
|
|
for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]:
|
|
epc[col] = newest_epc[col]
|
|
|
|
# We check if the energy assessment is newer than the newest EPC
|
|
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]):
|
|
# In this case, our energy assessment is newer than the EPCs available for this property
|
|
energy_assessment_is_newer = True
|
|
return {
|
|
"original_epc": epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy() + [newest_epc]
|
|
}, energy_assessment_is_newer
|
|
|
|
# We check if the EPC we have produced is contained in the set of EPCs done for the property
|
|
# We do this based on inspection-date and SAP
|
|
epc_in_historicals = [
|
|
x for x in epc_searcher.older_epcs + [newest_epc]
|
|
if x["inspection-date"] == energy_assessment_date and
|
|
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
|
|
]
|
|
energy_assessment_is_newer = False
|
|
|
|
if epc_in_historicals:
|
|
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
|
|
return {
|
|
"original_epc": newest_epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy()
|
|
}, energy_assessment_is_newer
|
|
|
|
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
|
|
# the historicals, so it can't have been lodged, so we include it in the old data
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy() + [epc],
|
|
}, energy_assessment_is_newer
|
|
|
|
|
|
def get_request_property_data(body: PlanTriggerRequest):
|
|
"""
|
|
This function will read in the on-site data from the S3 bucket
|
|
:param body: The request body
|
|
:return:
|
|
"""
|
|
patches = []
|
|
if body.patches_file_path:
|
|
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
|
|
|
|
already_installed = []
|
|
if body.already_installed_file_path:
|
|
already_installed = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
|
|
)
|
|
|
|
non_invasive_recommendations = []
|
|
if body.non_invasive_recommendations_file_path:
|
|
non_invasive_recommendations = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
|
|
)
|
|
|
|
valuation_data = []
|
|
if body.valuation_file_path:
|
|
valuation_data = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path
|
|
)
|
|
|
|
return patches, already_installed, non_invasive_recommendations, valuation_data
|
|
|
|
|
|
def get_funding_data():
|
|
"""
|
|
This function retrieves the eco project scores matrix and the warm homes local grant funding data
|
|
:return:
|
|
"""
|
|
project_scores_matrix = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/ECO4 Full Project Scores Matrix.csv",
|
|
)
|
|
project_scores_matrix = pd.DataFrame(project_scores_matrix)
|
|
project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
|
|
project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
|
|
|
|
partial_project_scores_matrix = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/ECO4_Partial_Project_Scores_Matrix_v6.csv",
|
|
)
|
|
partial_project_scores_matrix = pd.DataFrame(partial_project_scores_matrix)
|
|
partial_project_scores_matrix.columns = [
|
|
'Measure category', 'Measure_Type', 'Pre_Main_Heating_Source',
|
|
'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band',
|
|
'Average Treatable Factor', 'Cost Savings', 'SAP Savings'
|
|
]
|
|
# Replace 200 with 200+ in floor area band
|
|
partial_project_scores_matrix["Total Floor Area Band"] = partial_project_scores_matrix[
|
|
"Total Floor Area Band"
|
|
].replace({"200": "200+"})
|
|
partial_project_scores_matrix["Cost Savings"] = partial_project_scores_matrix["Cost Savings"].astype(float)
|
|
|
|
whlg_eligible_postcodes = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/whlg eligible postcodes.csv",
|
|
)
|
|
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
|
|
|
|
return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes
|
|
|
|
|
|
def check_duplicate_uprns(plan_input):
|
|
"""
|
|
Simple function to check if the input data contains duplicated UPRNS.
|
|
If there are duplicates, an exception will be rasied
|
|
:return:
|
|
"""
|
|
# Check for duplicate UPRNS
|
|
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
|
|
|
|
if input_uprns:
|
|
# Check for dupes
|
|
if len(input_uprns) != len(set(input_uprns)):
|
|
# Find the duplicate UPRNs
|
|
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
|
|
# de-dupe input_uprns
|
|
raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}")
|
|
|
|
return True
|
|
|
|
|
|
def check_duplicate_property_ids(input_properties):
|
|
"""
|
|
Simple function to check if the input data contains duplicated property IDs. This will happen in very rare
|
|
cases where we have properties across different servers, where the input UPRN is possibly incorrect and we
|
|
find the right property via an address search, instead of a UPRN search and so we end up with the same property
|
|
twice.
|
|
:param input_properties:
|
|
:return:
|
|
"""
|
|
|
|
input_property_ids = [x.id for x in input_properties]
|
|
|
|
if input_property_ids:
|
|
# Check for dupes
|
|
if len(input_property_ids) != len(set(input_property_ids)):
|
|
# Find the duplicate property IDs
|
|
duplicates = set([x for x in input_property_ids if input_property_ids.count(x) > 1])
|
|
# de-dupe input_uprns
|
|
raise ValueError(f"Duplicate property IDs in the input data: {duplicates}")
|
|
|
|
# Check for dupe UPRNS
|
|
input_uprns = [x.uprn for x in input_properties if x.uprn is not None]
|
|
if input_uprns:
|
|
if len(input_uprns) != len(set(input_uprns)):
|
|
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
|
|
raise ValueError(f"Duplicate UPRNs in the input properties: {duplicates}")
|
|
|
|
return True
|
|
|
|
|
|
def averages_cleaning(prepared_epc: EPCRecord, cleaning_data: pd.DataFrame):
|
|
"""
|
|
Placeholder cleaning function to handle edge cases where we have missing data for
|
|
number of habitable rooms, number of heated rooms and floor height. We take the median
|
|
This need was born out of the Peabody project
|
|
:param prepared_epc:
|
|
:param cleaning_data:
|
|
:return:
|
|
"""
|
|
|
|
variables_to_clean = [
|
|
"number_habitable_rooms",
|
|
"number_heated_rooms",
|
|
"floor_height",
|
|
]
|
|
|
|
if not any([pd.isnull(prepared_epc.prepared_epc[k]) for k in variables_to_clean]):
|
|
# Nothing to do
|
|
return prepared_epc
|
|
|
|
# Clean with cleaning_data
|
|
clean_with = cleaning_data[
|
|
(cleaning_data["property_type"] == prepared_epc.prepared_epc["property_type"]) &
|
|
(cleaning_data["property_type"] == prepared_epc.prepared_epc["property_type"])
|
|
]
|
|
if prepared_epc.prepared_epc["local_authority"] in clean_with["local_authority"].values:
|
|
clean_with = clean_with[
|
|
clean_with["local_authority"] == prepared_epc.prepared_epc["local_authority"]
|
|
]
|
|
|
|
floor_area_clean_with = clean_with[
|
|
(clean_with["total_floor_area"] <= prepared_epc.prepared_epc["total_floor_area"] * 1.1) &
|
|
(clean_with["total_floor_area"] >= prepared_epc.prepared_epc["total_floor_area"] * 0.9)
|
|
]
|
|
|
|
if not floor_area_clean_with.empty:
|
|
clean_with = floor_area_clean_with
|
|
|
|
clean_n_habitable_rooms = int(round(clean_with["number_habitable_rooms"].median()))
|
|
clean_n_heated_rooms = int(round(clean_with["number_heated_rooms"].median()))
|
|
if clean_n_heated_rooms > clean_n_habitable_rooms:
|
|
clean_n_heated_rooms = clean_n_habitable_rooms
|
|
|
|
clean_floor_height = clean_with["floor_height"].median()
|
|
|
|
# We now fill
|
|
if not pd.isnull(clean_n_habitable_rooms) and pd.isnull(
|
|
prepared_epc.prepared_epc["number_habitable_rooms"]):
|
|
prepared_epc.prepared_epc["number_habitable_rooms"] = clean_n_habitable_rooms
|
|
prepared_epc.number_habitable_rooms = clean_n_habitable_rooms
|
|
|
|
if not pd.isnull(clean_n_heated_rooms) and pd.isnull(
|
|
prepared_epc.prepared_epc["number_heated_rooms"]):
|
|
prepared_epc.prepared_epc["number_heated_rooms"] = clean_n_heated_rooms
|
|
prepared_epc.number_heated_rooms = clean_n_heated_rooms
|
|
|
|
if not pd.isnull(clean_floor_height) and pd.isnull(
|
|
prepared_epc.prepared_epc["floor_height"]):
|
|
prepared_epc.prepared_epc["floor_height"] = clean_floor_height
|
|
prepared_epc.floor_height = clean_floor_height
|
|
|
|
# if pd.isnull(prepared_epc.lighting_cost_current):
|
|
# # This is a basic assumption as an average
|
|
# prepared_epc.prepared_epc["lighting_cost_current"] = assumptions.AVERAGE_LIGHTING_COST
|
|
# prepared_epc.lighting_cost_current = assumptions.AVERAGE_LIGHTING_COST
|
|
|
|
# if pd.isnull(prepared_epc.heating_cost_current):
|
|
# # This is a basic assumption as an average
|
|
# appliance_cost = AnnualBillSavings.estimate_appliances_energy_use(
|
|
# total_floor_area=prepared_epc.total_floor_area
|
|
# ) * AnnualBillSavings.ELECTRICITY_PRICE_CAP
|
|
# heating_cleaned_value = assumptions.AVERAGE_HEATING_AND_APPLIANCE_COST - appliance_cost
|
|
# prepared_epc.prepared_epc["heating_cost_current"] = heating_cleaned_value
|
|
# prepared_epc.heating_cost_current = heating_cleaned_value
|
|
#
|
|
# if pd.isnull(prepared_epc.hot_water_cost_current):
|
|
# # This is a basic assumption as an average
|
|
# prepared_epc.prepared_epc["hot_water_cost_current"] = assumptions.AVERAGE_HOT_WATER_COST
|
|
# prepared_epc.hot_water_cost_current = assumptions.AVERAGE_HOT_WATER_COST
|
|
#
|
|
# if pd.isnull(prepared_epc.energy_consumption_potential):
|
|
# # Set to current
|
|
# prepared_epc.prepared_epc["energy_consumption_potential"] = prepared_epc.energy_consumption_current
|
|
# prepared_epc.energy_consumption_potential = prepared_epc.energy_consumption_current
|
|
|
|
return prepared_epc
|
|
|
|
|
|
def extract_address_data(config, body):
|
|
"""
|
|
Simple helper to grab address data from the config
|
|
:return:
|
|
"""
|
|
uprn = config.get("uprn", None)
|
|
if pd.isnull(uprn):
|
|
uprn = None
|
|
if uprn:
|
|
uprn = int(float(uprn))
|
|
|
|
address1 = config.get("address", None)
|
|
# Handle domna address list format
|
|
if pd.isnull(address1) and body.file_format == "domna_asset_list":
|
|
address1 = config.get("domna_address_1", None)
|
|
|
|
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
|
|
full_address = config.get("domna_full_address", "") if body.file_format == "domna_asset_list" else None
|
|
if not isinstance(full_address, str): # Catch for when the full address is nan
|
|
full_address = None
|
|
|
|
return uprn, address1, full_address
|
|
|
|
|
|
@contextmanager
|
|
def db_session():
|
|
session = Session(db_engine)
|
|
try:
|
|
yield session
|
|
session.commit()
|
|
except Exception:
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@contextmanager
|
|
def db_read_session():
|
|
session = Session(db_engine, expire_on_commit=False)
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
async def model_engine(body: PlanTriggerRequest):
|
|
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
|
|
|
|
created_at = datetime.now().isoformat()
|
|
start_ms = int(time.time() * 1000)
|
|
|
|
try:
|
|
logger.info("Getting the inputs")
|
|
|
|
if body.file_type == "xlsx":
|
|
plan_input = read_excel_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
|
|
file_key=body.trigger_file_path,
|
|
sheet_name=body.sheet_name,
|
|
header_row=0,
|
|
)
|
|
|
|
# We now handle the case where the input data is a Domna standardised assset list
|
|
if body.file_format == "domna_asset_list":
|
|
# We rename the columns to match the expected format
|
|
plan_input = plan_input.rename(
|
|
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
|
|
)
|
|
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
|
|
# This will be reflexted
|
|
if "estimated" not in plan_input.columns:
|
|
plan_input["estimated"] = False
|
|
|
|
plan_input["uprn"] = np.where(
|
|
plan_input["estimated"].isin([1, True]) & (
|
|
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
|
|
), None, plan_input["uprn"]
|
|
)
|
|
# We handle the landlord property type and built form
|
|
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
|
|
if "landlord_built_form" in plan_input.columns:
|
|
plan_input["built_form"] = plan_input["landlord_built_form"].copy()
|
|
else:
|
|
plan_input["built_form"] = None
|
|
|
|
if "epc_property_type" not in plan_input.columns:
|
|
plan_input["epc_property_type"] = None
|
|
|
|
plan_input["property_type"] = np.where(
|
|
plan_input["property_type"] == "unknown",
|
|
plan_input["epc_property_type"],
|
|
plan_input["property_type"]
|
|
)
|
|
|
|
if "epc_archetype" not in plan_input.columns:
|
|
plan_input["epc_archetype"] = None
|
|
|
|
plan_input["built_form"] = np.where(
|
|
plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"]
|
|
)
|
|
property_type_map = {
|
|
"house": "House",
|
|
"flat": "Flat",
|
|
"maisonette": "Maisonette",
|
|
"bungalow": "Bungalow",
|
|
"block house": "House",
|
|
"coach house": "House",
|
|
"bedsit": "Flat",
|
|
}
|
|
|
|
built_form_map = {
|
|
"mid-terrace": "Mid-Terrace",
|
|
"end-terrace": "End-Terrace",
|
|
"semi-detached": "Semi-Detached",
|
|
"detached": "Detached",
|
|
"enclosed end-terrace": "Enclosed End-Terrace",
|
|
"enclosed mid-terrace": "Enclosed Mid-Terrace",
|
|
}
|
|
# We remap the values to match the EPC expected formats
|
|
|
|
# This syntax will actually retain any original values, if they don't get mapped
|
|
plan_input["property_type"] = (
|
|
plan_input["property_type"]
|
|
.map(property_type_map)
|
|
.fillna(plan_input["property_type"])
|
|
)
|
|
|
|
plan_input["built_form"] = (
|
|
plan_input["built_form"]
|
|
.map(built_form_map)
|
|
.fillna(plan_input["built_form"])
|
|
)
|
|
|
|
plan_input = plan_input.to_dict("records")
|
|
|
|
else:
|
|
raise ValueError("Other formats not yet supported")
|
|
|
|
else:
|
|
plan_input = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path
|
|
)
|
|
|
|
# We then slide it on the indexes if they are provided
|
|
if body.index_start is not None and body.index_end is not None:
|
|
plan_input = plan_input[body.index_start:body.index_end]
|
|
|
|
# Confirm no duplicate UPRNS
|
|
check_duplicate_uprns(plan_input)
|
|
|
|
# If we have patches or overrides, we should read them in here
|
|
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
|
|
|
|
if body.file_type == "xlsx" and body.file_format == "domna_asset_list":
|
|
# We check if we have valution data
|
|
if not valuation_data and body.valuation_file_path in [None, ""]:
|
|
# We check plan_input
|
|
if "domna_valuation" in plan_input[0]:
|
|
valuation_data = [{"uprn": x["uprn"], "valuation": x["domna_valuation"]} for x in plan_input]
|
|
|
|
cleaning_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
|
)
|
|
|
|
# Prepare input data
|
|
addresses = Addresses.from_plan_input(plan_input, body)
|
|
|
|
uprns = addresses.get_uprns()
|
|
landlord_ids = addresses.get_landlord_ids()
|
|
postcodes = addresses.get_postcodes_for_flats()
|
|
|
|
# Check if we've seen these properties before
|
|
with db_read_session() as session:
|
|
existing_properties = db_funcs.property_functions.get_existing_properties(
|
|
session, body.portfolio_id, uprns, landlord_ids
|
|
)
|
|
property_lookup = {}
|
|
for prop in existing_properties:
|
|
if prop.uprn:
|
|
property_lookup[("uprn", prop.uprn)] = prop.id
|
|
if prop.landlord_property_id:
|
|
property_lookup[("landlord_property_id", prop.landlord_property_id)] = prop.id
|
|
|
|
# List of properties that need to be created in the db
|
|
to_create = []
|
|
for addr in addresses:
|
|
key = ("uprn", addr.uprn) if addr.uprn else ("landlord_property_id", addr.landlord_property_id)
|
|
if key not in property_lookup:
|
|
to_create.append(addr)
|
|
|
|
# Pre-requests to the db
|
|
with db_read_session() as session:
|
|
epc_cache_by_uprn = db_funcs.epc_functions.EpcStoreService.get_epcs_for_uprns(session, uprns)
|
|
postcode_searches = db_funcs.address_functions.get_by_postcodes(session, list(postcodes))
|
|
energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns(
|
|
session, uprns
|
|
)
|
|
|
|
# If we have properties that need to be created, we cerate them in bulk
|
|
new_property_ids = set()
|
|
if to_create:
|
|
logger.info("Creating %d new properties", len(to_create))
|
|
with db_session() as session:
|
|
inserted = db_funcs.property_functions.bulk_create_properties(
|
|
session, body, to_create, energy_assessments_by_uprn
|
|
)
|
|
for prop_id, uprn, landlord_property_id in inserted:
|
|
new_property_ids.add(prop_id)
|
|
|
|
# We append the newly created properties to property_lookup
|
|
for prop_id, uprn, landlord_property_id in inserted:
|
|
if uprn is not None:
|
|
property_lookup[("uprn", uprn)] = prop_id
|
|
if landlord_property_id:
|
|
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
|
|
|
|
input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, []
|
|
for addr, config in tqdm(
|
|
zip(addresses, plan_input),
|
|
total=len(addresses),
|
|
desc="Processing properties",
|
|
):
|
|
# ---------- 1) filter fetched data ----------
|
|
epc_cache = epc_cache_by_uprn[addr.uprn]
|
|
epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
|
|
# Extract from EPC cache
|
|
if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH:
|
|
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
|
|
|
|
# Extract associated UPRNs from the database response
|
|
associated_uprns = db_funcs.address_functions.get_associated_uprns(
|
|
postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn
|
|
)
|
|
|
|
energy_assessment = energy_assessments_by_uprn.get(addr.uprn)
|
|
|
|
epc_searcher = SearchEpc(
|
|
address1=addr.address1,
|
|
postcode=addr.postcode,
|
|
uprn=addr.uprn,
|
|
auth_token=get_settings().EPC_AUTH_TOKEN,
|
|
os_api_key="",
|
|
full_address=addr.full_address,
|
|
heating_system=addr.heating_system,
|
|
associated_uprns=associated_uprns
|
|
)
|
|
epc_searcher.ordnance_survey_client.built_form = addr.built_form
|
|
epc_searcher.ordnance_survey_client.property_type = addr.property_type
|
|
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
|
|
|
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
|
|
epc_searcher.set_uprn_source(file_format=body.file_format)
|
|
|
|
lookup_key = (
|
|
("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id)
|
|
)
|
|
property_id = property_lookup[lookup_key]
|
|
|
|
if not property_id:
|
|
logger.error("Could not find property ID for address: %s", addr.request_data)
|
|
# Should not happen unless input data is inconsistent
|
|
continue
|
|
|
|
is_new = property_id in new_property_ids
|
|
if not is_new and not body.multi_plan:
|
|
continue
|
|
|
|
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
|
|
# Otherwise, we use the newest EPC
|
|
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
|
|
# has been publically lodged
|
|
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
|
|
epc_searcher, energy_assessment
|
|
)
|
|
|
|
req_data = extract_property_request_data(
|
|
address=addr,
|
|
patches=patches,
|
|
already_installed=already_installed,
|
|
non_invasive_recommendations=non_invasive_recommendations,
|
|
valuation_data=valuation_data,
|
|
uprn=addr.uprn,
|
|
)
|
|
# Pull this out as it may get overwritten
|
|
property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch
|
|
|
|
# if we have a remote assment data type, we pull the additional data and include it
|
|
epc_page_source = {}
|
|
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
|
|
property_non_invasive_recommendations, patch, epc_page_source = (
|
|
RetrieveFindMyEpc.get_from_epc_with_fallback(
|
|
epc=epc_searcher.newest_epc,
|
|
epc_page=epc_page,
|
|
rrn=rrn,
|
|
cleaned_address=epc_searcher.address_clean,
|
|
config_address=addr.address,
|
|
address_postal_town=epc_searcher.address_postal_town
|
|
)
|
|
)
|
|
|
|
epc_records = patch_epc(patch, epc_records)
|
|
|
|
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
|
|
|
|
# TODO: This is a temp function to handle a specific edge case with Peabody. We should
|
|
# factor this into EPCRecord as part of the cleaning however we need some more testing
|
|
prepared_epc = averages_cleaning(prepared_epc, cleaning_data)
|
|
|
|
# If we have an ECO project, we parse the cavity/solar reasons
|
|
eco_packages[property_id] = parse_eco_packages(addr, prepared_epc)
|
|
|
|
# Final step - extract inspections data, if we have it - we inject into property for usage
|
|
property_inspections = db_funcs.inspections_functions.extract_inspection_data(config)
|
|
if property_inspections:
|
|
inspections_map[property_id] = property_inspections
|
|
|
|
input_properties.append(
|
|
Property(
|
|
id=property_id,
|
|
uprn=addr.uprn,
|
|
is_new=is_new,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
epc_record=prepared_epc,
|
|
already_installed=req_data.already_installed + eco_packages.get(property_id)[3],
|
|
property_valuation=req_data.valuation,
|
|
non_invasive_recommendations=property_non_invasive_recommendations,
|
|
energy_assessment=energy_assessment,
|
|
inspections=inspections_map.get(property_id),
|
|
**Property.extract_kwargs(config), # TODO: Depraecate this
|
|
)
|
|
)
|
|
|
|
# If we have:
|
|
# 1) No EPC API data
|
|
# 2) A real EPC
|
|
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
|
|
# We store this data
|
|
uprn_to_check_against = addr.uprn if addr.uprn is not None else epc_searcher.uprn # Until we enforce uprn
|
|
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
|
|
epc_cache, epc_searcher.newest_epc.get("estimated"), uprn_to_check_against,
|
|
):
|
|
epc_upserts.append({
|
|
"uprn": uprn_to_check_against,
|
|
"epc_api": epc_searcher.data,
|
|
"epc_page": epc_page_source.get("page_source"),
|
|
"epc_page_rrn": epc_page_source.get("rrn"),
|
|
})
|
|
|
|
if not input_properties:
|
|
return Response(status_code=204)
|
|
|
|
check_duplicate_property_ids(input_properties)
|
|
|
|
logger.info("Inserting property data")
|
|
# We now bulk upload all of the EPC data
|
|
with db_session() as session:
|
|
db_funcs.epc_functions.EpcStoreService.bulk_upsert_epc_data(session, epc_upserts)
|
|
|
|
# We check if we have inspections data and store it in the database if so. We'll update or create
|
|
# aginst each property if
|
|
with db_session() as session:
|
|
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
|
|
|
|
# Set up model api and warm up the lambdas
|
|
model_api = ModelApi(
|
|
portfolio_id=body.portfolio_id,
|
|
timestamp=created_at,
|
|
prediction_buckets=get_prediction_buckets(),
|
|
max_retries=1
|
|
)
|
|
await model_api.async_warm_up_lambdas(
|
|
model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES
|
|
)
|
|
|
|
# The materials data could be cached or local so we don't need to make
|
|
# consistent requests to the backend for
|
|
# the same data
|
|
logger.info("Reading in materials and cleaned datasets")
|
|
with db_read_session() as session:
|
|
materials = db_funcs.materials_functions.get_materials(session)
|
|
cleaned = get_cleaned()
|
|
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
|
|
|
|
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
|
|
|
|
epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned)
|
|
|
|
kwh_preds = await model_api.async_paginated_predictions(
|
|
data=epcs_for_scoring,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
extract_ids=False,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the spatial data
|
|
logger.info("Getting spatial data")
|
|
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
|
|
|
|
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
|
|
logger.info("Performing solar analysis")
|
|
|
|
ofgem_consumption_averages = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet"
|
|
)
|
|
|
|
building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data(
|
|
input_properties=input_properties,
|
|
ofgem_consumption_averages=ofgem_consumption_averages,
|
|
body=body
|
|
)
|
|
with db_session() as session:
|
|
input_properties = GoogleSolarApi.building_solar_analysis(
|
|
building_solar_config=building_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
|
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
|
)
|
|
with db_session() as session:
|
|
input_properties = GoogleSolarApi.unit_solar_analysis(
|
|
unit_solar_config=unit_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
body=body,
|
|
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
|
inspections_map=inspections_map
|
|
)
|
|
|
|
# We also make a tweak - if the property has been flagged for solar but doesn't contain
|
|
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
|
|
|
|
logger.info("Identifying property recommendations")
|
|
recommendations = {}
|
|
recommendations_scoring_data = []
|
|
representative_recommendations = {}
|
|
for p in tqdm(input_properties):
|
|
# We set the ECO package data, if we have it
|
|
property_eco_package = eco_packages.get(p.id, (None, None, None))
|
|
if property_eco_package[0] is not None:
|
|
inclusions = property_eco_package[0]
|
|
exclusions = []
|
|
else:
|
|
inclusions = body.inclusions
|
|
exclusions = body.exclusions
|
|
|
|
recommender = Recommendations(
|
|
property_instance=p,
|
|
materials=materials,
|
|
exclusions=exclusions,
|
|
inclusions=inclusions,
|
|
default_u_values=body.default_u_values
|
|
)
|
|
property_recommendations, property_representative_recommendations = recommender.recommend()
|
|
|
|
if not property_recommendations:
|
|
continue
|
|
|
|
recommendations[p.id] = property_recommendations
|
|
representative_recommendations[p.id] = property_representative_recommendations
|
|
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
p.adjust_difference_record_with_recommendations(
|
|
property_recommendations, property_representative_recommendations
|
|
)
|
|
|
|
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
|
|
|
logger.info("Preparing data for scoring in sap change api")
|
|
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
|
|
|
recommendations_scoring_data = recommendations_scoring_data.drop(
|
|
columns=[
|
|
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
|
|
"carbon_ending"
|
|
]
|
|
)
|
|
|
|
all_predictions = await model_api.async_paginated_predictions(
|
|
data=recommendations_scoring_data,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the predictions into the recommendations, and get the impact summary
|
|
scoring_epcs = [] # For scoring the kwh models
|
|
for property_id in recommendations.keys():
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
recommendations_with_impact, impact_summary = (
|
|
Recommendations.calculate_recommendation_impact(
|
|
property_instance=property_instance,
|
|
all_predictions=all_predictions,
|
|
recommendations=recommendations,
|
|
representative_recommendations=representative_recommendations
|
|
)
|
|
)
|
|
|
|
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
|
|
# at each phase
|
|
property_instance.update_simulation_epcs(impact_summary)
|
|
scoring_epcs.extend(property_instance.updated_simulation_epcs)
|
|
recommendations[property_id] = recommendations_with_impact
|
|
|
|
# We call the API with the scoring epcs
|
|
scoring_epcs = pd.DataFrame(scoring_epcs)
|
|
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
|
|
|
|
kwh_simulation_predictions = await model_api.async_paginated_predictions(
|
|
data=scoring_epcs,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# We now insert kwh estimates and costs into the recommendations
|
|
logger.info("Calculating tenant savings - kwh and bills")
|
|
for property_id in tqdm([p.id for p in input_properties]):
|
|
property_recommendations = recommendations.get(property_id, [])
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
property_current_energy_bill = (
|
|
Recommendations.calculate_recommendation_tenant_savings(
|
|
property_instance=property_instance,
|
|
kwh_simulation_predictions=kwh_simulation_predictions,
|
|
property_recommendations=property_recommendations,
|
|
ashp_cop=body.ashp_cop
|
|
)
|
|
)
|
|
property_instance.current_energy_bill = property_current_energy_bill
|
|
|
|
# Insert the predictions into the recommendations and run the optimiser
|
|
for p in input_properties:
|
|
if not recommendations.get(p.id):
|
|
continue
|
|
|
|
# we need to double unlist because we have a list of lists
|
|
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
|
|
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
|
|
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
|
|
|
|
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
|
|
# its inclusion
|
|
needs_ventilation = any(
|
|
x in property_measure_types for x in assumptions.measures_needing_ventilation
|
|
) and not p.has_ventilation
|
|
|
|
if not measures_to_optimise:
|
|
# Nothing to do, we just reshape the recommendations
|
|
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
|
|
p.id, recommendations, set()
|
|
)
|
|
continue
|
|
|
|
fixed_gain = optimiser_functions.calculate_fixed_gain(
|
|
property_required_measures, recommendations, p, needs_ventilation
|
|
)
|
|
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
|
|
|
|
funding = Funding(
|
|
tenure=body.housing_type,
|
|
project_scores_matrix=project_scores_matrix,
|
|
partial_project_scores_matrix=partial_project_scores_matrix,
|
|
whlg_eligible_postcodes=whlg_eligible_postcodes,
|
|
eco4_social_cavity_abs_rate=13,
|
|
eco4_social_solid_abs_rate=17,
|
|
eco4_private_cavity_abs_rate=13,
|
|
eco4_private_solid_abs_rate=17,
|
|
gbis_social_cavity_abs_rate=21,
|
|
gbis_social_solid_abs_rate=25,
|
|
gbis_private_cavity_abs_rate=21,
|
|
gbis_private_solid_abs_rate=28,
|
|
)
|
|
|
|
li_thickness = convert_thickness_to_numeric(
|
|
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
|
|
)
|
|
current_wall_u_value = p.walls["thermal_transmittance"]
|
|
if current_wall_u_value is None:
|
|
current_wall_u_value = get_wall_u_value(
|
|
clean_description=p.walls["clean_description"],
|
|
age_band=p.age_band,
|
|
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
|
|
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
|
|
)
|
|
|
|
# We insert the innovation uplift
|
|
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
|
|
|
|
# TODO: Turn this into a function and store the innovaiton uplift
|
|
for group in measures_to_optimise_with_uplift:
|
|
for r in group:
|
|
|
|
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
|
|
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
|
|
(
|
|
r["partial_project_score"],
|
|
r["partial_project_funding"],
|
|
r["innovation_uplift"],
|
|
r["uplift_project_score"],
|
|
) = (
|
|
0, 0, 0, 0
|
|
)
|
|
continue
|
|
|
|
(
|
|
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
|
|
r["uplift_project_score"]
|
|
) = funding.get_innovation_uplift(
|
|
measure=r,
|
|
starting_sap=int(p.data["current-energy-efficiency"]),
|
|
floor_area=p.floor_area,
|
|
is_cavity=p.walls["is_cavity_wall"],
|
|
current_wall_uvalue=current_wall_u_value,
|
|
is_partial="partial" in p.walls["clean_description"].lower(),
|
|
existing_li_thickness=li_thickness,
|
|
mainheating=p.main_heating,
|
|
main_fuel=p.main_fuel,
|
|
mainheat_energy_eff=p.data["mainheat-energy-eff"],
|
|
)
|
|
|
|
if r["already_installed"]:
|
|
# if already installed, we zero out the uplift and funding
|
|
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
|
|
r["uplift_project_score"]) = (
|
|
0, 0, 0, 0
|
|
)
|
|
|
|
input_measures = optimiser_functions.prepare_input_measures(
|
|
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
|
|
property_eco_packages=eco_packages.get(p.id)
|
|
)
|
|
|
|
# When the goal is Increasing EPC, we can run the funding optimiser
|
|
if body.goal == "Increasing EPC":
|
|
|
|
solutions = optimise_with_funding_paths(
|
|
p=p,
|
|
input_measures=input_measures,
|
|
housing_type=body.housing_type,
|
|
budget=body.budget,
|
|
target_gain=gain,
|
|
funding=funding,
|
|
work_package=eco_packages[p.id][2]
|
|
)
|
|
|
|
# if handle the empty case
|
|
if solutions.empty:
|
|
scheme = "none"
|
|
funded_measures, solution = [], []
|
|
(
|
|
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score,
|
|
battery_sap_score
|
|
) = 0, 0, 0, 0, 0, 0
|
|
else:
|
|
solutions = solutions[
|
|
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
|
|
]
|
|
|
|
if solutions["meets_upgrade_target"].any():
|
|
# If we have a solution that meets the upgrade target, we select that one
|
|
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
|
|
else:
|
|
# Pick the cheapest
|
|
optimal_solution = solutions.iloc[0]
|
|
|
|
# This is the list of measures that we will recommend
|
|
scheme = optimal_solution["scheme"]
|
|
|
|
# We create this full list of selected measures, which is used in the next section for setting
|
|
# default measures
|
|
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
|
|
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
|
|
|
|
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
|
|
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
|
|
optimal_solution["partial_project_funding"]
|
|
# This is the total amount of funding associated to the uplift (£)
|
|
total_uplift = optimal_solution["total_uplift"]
|
|
# This is the funding scheme selected
|
|
# This is the full project ABS
|
|
full_project_score = optimal_solution["project_score"]
|
|
# This is the partial project ABS
|
|
partial_project_score = optimal_solution["partial_project_score"]
|
|
# This is the uplift score ABS
|
|
uplift_project_score = optimal_solution["total_uplift_score"]
|
|
# This is the SAP score associated to a battery
|
|
pv_size = next(
|
|
(m["array_size"] for m in optimal_solution["items"] if m["type"] == "solar_pv"), 0
|
|
)
|
|
battery_sap_score = BatterySAPScorer.score(
|
|
starting_sap=optimal_solution["ending_sap"], pv_size=pv_size
|
|
)
|
|
else:
|
|
# We optimise and then we determine eligibility for funding, based on the measures selected
|
|
optimiser = (
|
|
GainOptimiser(
|
|
input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
|
|
) if body.budget else CostOptimiser(input_measures, min_gain=gain)
|
|
)
|
|
optimiser.setup()
|
|
optimiser.solve()
|
|
solution = optimiser.solution
|
|
gain = optimiser.solution_gain
|
|
post_sap = int(p.data["current-energy-efficiency"]) + gain
|
|
|
|
recommendation_types = []
|
|
for measures in input_measures:
|
|
for measure in measures:
|
|
recommendation_types.append(measure["type"])
|
|
recommendation_types = set(recommendation_types)
|
|
|
|
has_wall_insulation_recommendation = any(
|
|
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
|
|
WALL_INSULATION_MEASURES
|
|
)
|
|
has_roof_insulation_recommendation = any(
|
|
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
|
|
ROOF_INSULATION_MEASURES
|
|
)
|
|
|
|
funding.check_funding(
|
|
measures=solution,
|
|
starting_sap=int(p.data["current-energy-efficiency"]),
|
|
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
|
|
floor_area=p.floor_area,
|
|
mainheat_description=p.main_heating["clean_description"],
|
|
heating_control_description=p.main_heating_controls["clean_description"],
|
|
is_cavity=p.walls["is_cavity_wall"],
|
|
current_wall_uvalue=current_wall_u_value,
|
|
is_partial="partial" in p.walls["clean_description"].lower(),
|
|
existing_li_thickness=li_thickness,
|
|
mainheating=p.main_heating,
|
|
main_fuel=p.main_fuel,
|
|
mainheat_energy_eff=p.data["mainheat-energy-eff"],
|
|
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
|
|
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
|
|
)
|
|
|
|
# Determine the scheme
|
|
scheme = "none"
|
|
if funding.eco4_eligible:
|
|
scheme = "eco4"
|
|
if scheme == "none" and funding.gbis_eligible:
|
|
scheme = "gbis"
|
|
|
|
funded_measures = solution if scheme in ["gbis", "eco4"] else []
|
|
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
|
|
total_uplift = funding.eco4_uplift
|
|
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
|
|
partial_project_score = funding.partial_project_abs
|
|
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
|
|
pv_size = next(
|
|
(m["array_size"] for m in solution if m["type"] == "solar_pv"), 0
|
|
)
|
|
battery_sap_score = BatterySAPScorer.score(starting_sap=post_sap, pv_size=pv_size)
|
|
|
|
selected = {r["id"] for r in solution}
|
|
|
|
if property_required_measures:
|
|
solution = optimiser_functions.add_required_measures(
|
|
property_id=p.id, property_required_measures=property_required_measures,
|
|
recommendations=recommendations, selected=selected,
|
|
)
|
|
|
|
# Add best practice measures (ventilation/trickle vents)
|
|
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
|
|
# Final flattening
|
|
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
|
|
p.id, recommendations, selected, battery_sap_score
|
|
)
|
|
|
|
# TODO: functionise
|
|
for measure in funded_measures:
|
|
if "+mechanical_ventilation" in measure["type"]:
|
|
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
|
|
|
|
p.insert_funding(
|
|
scheme=scheme,
|
|
funded_measures=funded_measures,
|
|
project_funding=project_funding,
|
|
total_uplift=total_uplift,
|
|
full_project_score=full_project_score,
|
|
partial_project_score=partial_project_score,
|
|
uplift_project_score=uplift_project_score
|
|
)
|
|
|
|
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
|
|
# of them
|
|
# TODO: We can probably do better and optimise at the building level - this is temp
|
|
# Idea: - optimise all measures except solar at the unit level. Then, test with and without solar for
|
|
# all units at the same time
|
|
logger.info("Adjusting solar PV recommendations for buildings")
|
|
building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
|
|
|
|
for bid in building_ids:
|
|
# We check if any of them have solar PV
|
|
building = [p for p in input_properties if p.building_id == bid]
|
|
has_solar = False
|
|
for unit in building:
|
|
# Get default recommendations
|
|
has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
|
|
if has_solar:
|
|
break
|
|
|
|
if has_solar:
|
|
# We adjust the units within the building
|
|
for unit in building:
|
|
for rec in recommendations[unit.id]:
|
|
if rec["type"] == "solar_pv":
|
|
# This is straightforward, we just set the default to True, since when we're at a building
|
|
# level, we only allow 1 solar PV option for each unit. If we change this, this logic will
|
|
# need to be updated
|
|
rec["default"] = True
|
|
|
|
logger.info("Uploading recommendations to the database")
|
|
# If we have any work to do, we create a new scenario
|
|
if body.scenario_id:
|
|
# We don't need to create a new scenario, we just use the existing one
|
|
scenario_id = body.scenario_id
|
|
else:
|
|
with db_session() as session:
|
|
scenario_id = db_funcs.recommendations_functions.create_scenario(
|
|
session=session,
|
|
scenario={
|
|
"name": body.scenario_name,
|
|
"created_at": created_at,
|
|
"budget": body.budget,
|
|
"portfolio_id": body.portfolio_id,
|
|
"housing_type": body.housing_type,
|
|
"goal": body.goal,
|
|
"goal_value": body.goal_value,
|
|
"trigger_file_path": body.trigger_file_path,
|
|
"already_installed_file_path": body.already_installed_file_path,
|
|
"patches_file_path": body.patches_file_path,
|
|
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
|
|
"exclusions": body.exclusions,
|
|
"multi_plan": body.multi_plan
|
|
}
|
|
)
|
|
|
|
# TODO: New
|
|
property_updates, property_epc_details, property_spatial_updates = [], [], []
|
|
# plans_to_create = [{property_id, plan_data}]
|
|
# recommendations_to_create = [{plan_ref, recommendation_data}]
|
|
# funding_to_create = [{plan_ref, funding_data}]
|
|
plans_to_create, recommendations_to_create, funding_to_create = [], [], []
|
|
|
|
# Prepare the data that will need to be uploaded in bulk
|
|
for p in input_properties:
|
|
recommendations_for_property = recommendations.get(p.id, [])
|
|
default_recommendations = [r for r in recommendations_for_property if r["default"]]
|
|
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
|
|
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
|
|
new_epc = sap_to_epc(new_sap_points)
|
|
total_cost = sum([r["total"] for r in default_recommendations])
|
|
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost)
|
|
|
|
# --- property-level updates (always) ---
|
|
property_updates.append({
|
|
"property_id": p.id,
|
|
"portfolio_id": body.portfolio_id,
|
|
"data": p.get_full_property_data(current_valuation=valuations["current_value"])
|
|
})
|
|
|
|
property_epc_details.append(p.get_property_details_epc(portfolio_id=body.portfolio_id))
|
|
|
|
property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial})
|
|
|
|
# --- skip plan creation if no recommendations ---
|
|
if not recommendations_for_property:
|
|
continue
|
|
|
|
plan_data = db_funcs.recommendations_functions.prepare_plan_data(
|
|
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations
|
|
)
|
|
plans_to_create.append({"property_id": p.id, "plan_data": plan_data})
|
|
|
|
# store recommendations keyed by property
|
|
for r in recommendations_for_property:
|
|
recommendations_to_create.append({"property_id": p.id, "data": r})
|
|
|
|
# Bulk upload property data
|
|
logger.info("Uploading property data in bulk")
|
|
with db_session() as session:
|
|
db_funcs.property_functions.bulk_update_properties(session, property_updates)
|
|
db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details)
|
|
db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates)
|
|
|
|
# TODO: End New
|
|
|
|
for i in tqdm(
|
|
range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))
|
|
):
|
|
try:
|
|
# Take a slice of the input_properties list to make a batch
|
|
batch_properties = input_properties[i:i + BATCH_SIZE]
|
|
with db_session() as session:
|
|
for p in batch_properties:
|
|
recommendations_to_upload = recommendations.get(p.id, [])
|
|
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
|
|
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
|
|
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
|
|
new_epc = sap_to_epc(new_sap_points)
|
|
|
|
total_cost = sum([r["total"] for r in default_recommendations])
|
|
|
|
valuations = PropertyValuation.estimate(
|
|
property_instance=p, target_epc=new_epc, total_cost=total_cost
|
|
)
|
|
|
|
property_plan_data = db_funcs.recommendations_functions.prepare_plan_data(
|
|
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc,
|
|
default_recommendations
|
|
)
|
|
|
|
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id)
|
|
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
|
|
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
|
|
|
|
db_funcs.property_functions.update_or_create_property_spatial_details(
|
|
session, p.uprn, p.spatial
|
|
)
|
|
|
|
db_funcs.property_functions.update_property_data(
|
|
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
|
|
)
|
|
|
|
if not recommendations_to_upload:
|
|
continue
|
|
|
|
new_plan_id = db_funcs.recommendations_functions.create_plan(
|
|
session, plan=property_plan_data
|
|
)
|
|
|
|
db_funcs.recommendations_functions.upload_recommendations(
|
|
session, recommendations_to_upload, p.id, new_plan_id
|
|
)
|
|
db_funcs.funding_functions.upload_funding(
|
|
session, p, new_plan_id, recommendations_to_upload
|
|
)
|
|
|
|
except Exception as e:
|
|
# Rollback the session if an error occurs
|
|
logger.warning("Failed i = %s" % str(i))
|
|
logger.error(f"An error occurred during batch starting at index {i}: {e}")
|
|
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
|
|
|
|
logger.info("Work completed, updating log status")
|
|
|
|
except IntegrityError as e:
|
|
return handle_error("Database integrity error.", e, body.subtask_id, 500, start_ms)
|
|
except OperationalError as e:
|
|
return handle_error("Database operational error.", e, body.subtask_id, 500, start_ms)
|
|
except ValueError as e:
|
|
return handle_error("Bad request: malformed data.", e, body.subtask_id, 400, start_ms)
|
|
except Exception as e: # General exception handling
|
|
return handle_error("An unexpected error occurred.", e, body.subtask_id, 500, start_ms)
|
|
|
|
cloud_logs_url = build_cloudwatch_log_url(start_ms)
|
|
# Mark the subtask as successful
|
|
SubTaskInterface().update_subtask_status(
|
|
subtask_id=UUID(body.subtask_id), status="complete", cloud_logs_url=cloud_logs_url
|
|
)
|
|
|
|
logger.info("Model Engine completed successfully")
|
|
|
|
return Response(status_code=200)
|