mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1237 lines
56 KiB
Python
1237 lines
56 KiB
Python
import ast
|
|
import json
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import Nullable
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
import numpy as np
|
|
from etl.epc.Record import EPCRecord
|
|
from backend.SearchEpc import SearchEpc
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from sqlalchemy.orm import sessionmaker
|
|
from starlette.responses import Response
|
|
|
|
from backend.app.config import get_settings, get_prediction_buckets
|
|
from backend.app.db.connection import db_engine
|
|
from backend.app.db.functions.materials_functions import get_materials
|
|
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
|
|
from backend.app.db.functions.property_functions import (
|
|
create_property_details_epc, create_property_targets, update_property_data,
|
|
update_or_create_property_spatial_details, ensure_property_exists
|
|
)
|
|
from backend.app.db.functions.recommendations_functions import (
|
|
create_plan, upload_recommendations, create_scenario
|
|
)
|
|
from backend.app.db.functions.funding_functions import upload_funding
|
|
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
|
|
from backend.app.db.models.portfolio import rating_lookup
|
|
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
|
|
from backend.app.plan.utils import (
|
|
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error
|
|
)
|
|
from backend.app.utils import sap_to_epc
|
|
import backend.app.assumptions as assumptions
|
|
from backend.app.db.functions.inspections_functions import (
|
|
extract_inspection_data, bulk_upsert_inspections_pg
|
|
)
|
|
|
|
from backend.ml_models.api import ModelApi
|
|
from backend.Property import Property
|
|
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
|
|
|
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
|
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
|
import recommendations.optimiser.optimiser_functions as optimiser_functions
|
|
from recommendations.Recommendations import Recommendations
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
|
|
from backend.ml_models.Valuation import PropertyValuation
|
|
|
|
from etl.bill_savings.KwhData import KwhData
|
|
from etl.spatial.OpenUprnClient import OpenUprnClient
|
|
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
|
|
|
from backend.Funding import Funding
|
|
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
|
|
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
|
|
|
|
logger = setup_logger()
|
|
|
|
BATCH_SIZE = 5
|
|
SCORING_BATCH_SIZE = 300
|
|
|
|
|
|
def extract_portfolio_aggregation_data(
|
|
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
|
|
):
|
|
# We aggregate a number of metrics for the portfolio:
|
|
# 1) A breakdown of the number of properties in each EPC band
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 2) Number of units
|
|
# 3) Co2/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 4) Energy bill/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 5) Average valuation improvement/unit
|
|
# 6) Total cost
|
|
# 7) Cost per unit
|
|
# 8) £ per CO2 saved
|
|
# 9) £ per SAP point
|
|
|
|
# We need to construct the underlyind data for this
|
|
|
|
# Helper function to reformat the EPC data
|
|
def reformat_epc_data(epc_counts):
|
|
# Define all possible EPC bands in the required order
|
|
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
|
|
|
|
# Create the formatted data list by checking each band in the order
|
|
formatted_data = []
|
|
for band in epc_bands:
|
|
# Get the count from the dictionary, defaulting to 0 if not present
|
|
count = epc_counts.get(band, 0)
|
|
# Append the formatted dictionary to the list
|
|
formatted_data.append({"name": band, band: count})
|
|
|
|
return formatted_data
|
|
|
|
n_units = len(input_properties)
|
|
|
|
agg_data = []
|
|
for p in input_properties:
|
|
# Get the recommendations for the property - we include all properties, even ones without recommendations
|
|
property_recommendations = recommendations.get(p.id, [])
|
|
|
|
# Get just the default recommendations
|
|
default_recommendations = [r for r in property_recommendations if r["default"]]
|
|
|
|
has_recommendations = len(default_recommendations) > 0
|
|
|
|
# We can now calculate multiple outputs based on default recommendations
|
|
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
|
|
|
|
pre_retrofit_co2 = p.energy["co2_emissions"]
|
|
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
|
|
|
|
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
|
|
post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum(
|
|
[r["energy_cost_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
pre_retrofit_energy_consumption = p.current_energy_consumption
|
|
post_retrofit_energy_consumption = p.current_energy_consumption - sum(
|
|
[r["kwh_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
# Add up energy savings
|
|
cost = sum([r["total"] for r in default_recommendations])
|
|
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
|
|
|
|
if not pd.isnull(property_value_increase_ranges[p.id]["current_value"]):
|
|
lower_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
upper_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
else:
|
|
lower_bound_valuation_uplift, upper_bound_valuation_uplift = 0, 0
|
|
|
|
agg_data.append({
|
|
"pre_retrofit_epc": p.data["current-energy-rating"],
|
|
"post_retrofit_epc": new_epc_bands[p.id],
|
|
"pre_retrofit_co2": pre_retrofit_co2,
|
|
"post_retrofit_co2": post_retrofit_co2,
|
|
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
|
|
"post_retrofit_energy_bill": post_retrofit_energy_bill,
|
|
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
|
|
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
|
|
"cost": cost,
|
|
"sap_point_improvement": sap_point_improvement,
|
|
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
|
|
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
|
|
"has_recommendations": has_recommendations,
|
|
"funding": float(p.project_funding) if p.project_funding is not None else 0,
|
|
"contingency": float(sum([x.get("contingency", 0) for x in default_recommendations]))
|
|
})
|
|
|
|
agg_data = pd.DataFrame(agg_data)
|
|
|
|
n_units_to_retrofit = agg_data["has_recommendations"].sum()
|
|
|
|
valuation_improvement_lower_bound_per_unit = (
|
|
agg_data["lower_bound_valuation_uplift"].mean()
|
|
)
|
|
valuation_improvement_upper_bound_per_unit = (
|
|
agg_data["upper_bound_valuation_uplift"].mean()
|
|
)
|
|
|
|
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
|
|
total_sap_points = agg_data["sap_point_improvement"].sum()
|
|
|
|
def format_money(amount):
|
|
return f"£{amount:,.0f}"
|
|
|
|
valuation_improvment_per_unit = str(
|
|
format_money(
|
|
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
|
|
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
|
|
)
|
|
|
|
if agg_data["cost"].sum() == 0:
|
|
valuation_percentage_increase = 0
|
|
valuation_increase_lower = 0
|
|
valuation_increase_upper = 0
|
|
else:
|
|
valuation_percentage_increase = round(total_valuation_increase / agg_data["cost"].sum(), 2)
|
|
valuation_increase_lower = agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
|
|
valuation_increase_upper = agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum()
|
|
|
|
valuation_return_on_investment = str(
|
|
str(valuation_percentage_increase) +
|
|
f" ("
|
|
f"{valuation_increase_lower:,.2f} - "
|
|
f"{valuation_increase_upper:,.2f})"
|
|
)
|
|
|
|
cost_per_co2_saved = agg_data["cost"].sum() / total_carbon_saved if total_carbon_saved > 0 else 0
|
|
cost_per_co2_saved = format_money(cost_per_co2_saved)
|
|
|
|
cost_per_sap_point = agg_data["cost"].sum() / total_sap_points if total_sap_points > 0 else 0
|
|
cost_per_sap_point = format_money(cost_per_sap_point)
|
|
|
|
total_funding = agg_data["funding"].sum()
|
|
total_contingency = agg_data["contingency"].sum()
|
|
|
|
aggregation_data = {
|
|
"epc_breakdown_pre_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"epc_breakdown_post_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"number_of_properties": int(n_units),
|
|
"n_units_to_retrofit": int(n_units_to_retrofit),
|
|
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
|
|
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
|
|
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
|
|
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
|
|
"energy_consumption_per_unit_pre_retrofit": str(
|
|
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"energy_consumption_per_unit_post_retrofit": str(
|
|
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"valuation_improvement_per_unit": valuation_improvment_per_unit,
|
|
"cost_per_unit": format_money(agg_data["cost"].mean()),
|
|
"cost_per_co2_saved": cost_per_co2_saved,
|
|
"cost_per_sap_point": cost_per_sap_point,
|
|
"valuation_return_on_investment": valuation_return_on_investment,
|
|
"funding": float(total_funding),
|
|
"contingency": float(total_contingency)
|
|
}
|
|
|
|
return aggregation_data
|
|
|
|
|
|
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
|
|
"""
|
|
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
|
|
and will factor in an energy assessment that we have performed for a client.
|
|
:param epc_searcher: An instance of the SearchEpc class
|
|
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
|
|
this should be an empty response as defined by the models's
|
|
EnergyAssessment.empty_response() method
|
|
"""
|
|
|
|
newest_epc = epc_searcher.newest_epc.copy()
|
|
if newest_epc["uprn"] == "" and epc_searcher.uprn:
|
|
newest_epc["uprn"] = epc_searcher.uprn
|
|
|
|
if not energy_assessment["epc"]:
|
|
energy_assessment_is_newer = False
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy(),
|
|
}, energy_assessment_is_newer
|
|
|
|
epc = energy_assessment["epc"]
|
|
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
|
|
|
|
# We insert county into the epc, since right now this isn't something that we pull out from the energy
|
|
# assessment
|
|
for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]:
|
|
epc[col] = newest_epc[col]
|
|
|
|
# We check if the energy assessment is newer than the newest EPC
|
|
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(newest_epc["inspection-date"]):
|
|
# In this case, our energy assessment is newer than the EPCs available for this property
|
|
energy_assessment_is_newer = True
|
|
return {
|
|
"original_epc": epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy() + [newest_epc]
|
|
}, energy_assessment_is_newer
|
|
|
|
# We check if the EPC we have produced is contained in the set of EPCs done for the property
|
|
# We do this based on inspection-date and SAP
|
|
epc_in_historicals = [
|
|
x for x in epc_searcher.older_epcs + [newest_epc]
|
|
if x["inspection-date"] == energy_assessment_date and
|
|
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
|
|
]
|
|
energy_assessment_is_newer = False
|
|
|
|
if epc_in_historicals:
|
|
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
|
|
return {
|
|
"original_epc": newest_epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy()
|
|
}, energy_assessment_is_newer
|
|
|
|
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
|
|
# the historicals, so it can't have been lodged, so we include it in the old data
|
|
return {
|
|
'original_epc': newest_epc,
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy() + [epc],
|
|
}, energy_assessment_is_newer
|
|
|
|
|
|
def get_request_property_data(body: PlanTriggerRequest):
|
|
"""
|
|
This function will read in the on-site data from the S3 bucket
|
|
:param body: The request body
|
|
:return:
|
|
"""
|
|
patches = []
|
|
if body.patches_file_path:
|
|
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
|
|
|
|
already_installed = []
|
|
if body.already_installed_file_path:
|
|
already_installed = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
|
|
)
|
|
|
|
non_invasive_recommendations = []
|
|
if body.non_invasive_recommendations_file_path:
|
|
non_invasive_recommendations = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
|
|
)
|
|
|
|
valuation_data = []
|
|
if body.valuation_file_path:
|
|
valuation_data = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path
|
|
)
|
|
|
|
return patches, already_installed, non_invasive_recommendations, valuation_data
|
|
|
|
|
|
def get_funding_data():
|
|
"""
|
|
This function retrieves the eco project scores matrix and the warm homes local grant funding data
|
|
:return:
|
|
"""
|
|
project_scores_matrix = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/ECO4 Full Project Scores Matrix.csv",
|
|
)
|
|
project_scores_matrix = pd.DataFrame(project_scores_matrix)
|
|
project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
|
|
project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
|
|
|
|
partial_project_scores_matrix = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/ECO4_Partial_Project_Scores_Matrix_v6.csv",
|
|
)
|
|
partial_project_scores_matrix = pd.DataFrame(partial_project_scores_matrix)
|
|
partial_project_scores_matrix.columns = [
|
|
'Measure category', 'Measure_Type', 'Pre_Main_Heating_Source',
|
|
'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band',
|
|
'Average Treatable Factor', 'Cost Savings', 'SAP Savings'
|
|
]
|
|
# Replace 200 with 200+ in floor area band
|
|
partial_project_scores_matrix["Total Floor Area Band"] = partial_project_scores_matrix[
|
|
"Total Floor Area Band"
|
|
].replace({"200": "200+"})
|
|
partial_project_scores_matrix["Cost Savings"] = partial_project_scores_matrix["Cost Savings"].astype(float)
|
|
|
|
whlg_eligible_postcodes = read_csv_from_s3(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
filepath="funding/whlg eligible postcodes.csv",
|
|
)
|
|
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
|
|
|
|
return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes
|
|
|
|
|
|
def parse_heating_system(config):
|
|
"""
|
|
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
|
|
placeholder function to cover some initial immediate cases.
|
|
:return:
|
|
"""
|
|
|
|
ll_heating = config.get("landlord_heating_system", None)
|
|
if not ll_heating:
|
|
return None
|
|
|
|
if ll_heating == "electric storage heaters":
|
|
# Return with the same format at the EPC
|
|
return "Electric storage heaters"
|
|
|
|
return None
|
|
|
|
|
|
async def model_engine(body: PlanTriggerRequest):
|
|
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
|
|
|
|
logger.info("Connecting to db")
|
|
session = sessionmaker(bind=db_engine)()
|
|
created_at = datetime.now().isoformat()
|
|
|
|
# TODO: if the measure is already installed, it should actually be the very first phase
|
|
|
|
try:
|
|
session.begin()
|
|
logger.info("Getting the inputs")
|
|
|
|
if body.file_type == "xlsx":
|
|
plan_input = read_excel_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
|
|
file_key=body.trigger_file_path,
|
|
sheet_name=body.sheet_name,
|
|
header_row=0,
|
|
)
|
|
|
|
# We now handle the case where the input data is a Domna standardised assset list
|
|
if body.file_format == "domna_asset_list":
|
|
# We rename the columns to match the expected format
|
|
plan_input = plan_input.rename(
|
|
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
|
|
)
|
|
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
|
|
# This will be reflexted
|
|
plan_input["uprn"] = np.where(
|
|
plan_input["estimated"].isin([1, True]) & (
|
|
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
|
|
), None, plan_input["uprn"]
|
|
)
|
|
# We handle the landlord property type and built form
|
|
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
|
|
if "landlord_built_form" in plan_input.columns:
|
|
plan_input["built_form"] = plan_input["landlord_built_form"].copy()
|
|
else:
|
|
plan_input["built_form"] = None
|
|
plan_input["property_type"] = np.where(
|
|
plan_input["property_type"] == "unknown",
|
|
plan_input["epc_property_type"],
|
|
plan_input["property_type"]
|
|
)
|
|
|
|
if "epc_archetype" not in plan_input.columns:
|
|
plan_input["epc_archetype"] = None
|
|
|
|
plan_input["built_form"] = np.where(
|
|
plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"]
|
|
)
|
|
property_type_map = {
|
|
"house": "House",
|
|
"flat": "Flat",
|
|
"maisonette": "Maisonette",
|
|
"bungalow": "Bungalow",
|
|
"block house": "House",
|
|
"coach house": "House",
|
|
"bedsit": "Flat"
|
|
}
|
|
|
|
built_form_map = {
|
|
"mid-terrace": "Mid-Terrace",
|
|
"end-terrace": "End-Terrace",
|
|
"semi-detached": "Semi-Detached",
|
|
"detached": "Detached",
|
|
"enclosed end-terrace": "Enclosed End-Terrace",
|
|
"enclosed mid-terrace": "Enclosed Mid-Terrace",
|
|
}
|
|
# We remap the values to match the EPC expected formats
|
|
plan_input["property_type"] = plan_input["property_type"].map(property_type_map)
|
|
plan_input["built_form"] = plan_input["built_form"].map(built_form_map)
|
|
|
|
plan_input = plan_input.to_dict("records")
|
|
|
|
else:
|
|
raise ValueError("Other formats not yet supported")
|
|
|
|
else:
|
|
plan_input = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path
|
|
)
|
|
|
|
# We then slide it on the indexes if they are provided
|
|
if body.index_start is not None and body.index_end is not None:
|
|
plan_input = plan_input[body.index_start:body.index_end]
|
|
|
|
# Check for duplicate UPRNS
|
|
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
|
|
|
|
if input_uprns:
|
|
# Check for dupes
|
|
if len(input_uprns) != len(set(input_uprns)):
|
|
# Find the duplicate UPRNs
|
|
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
|
|
# de-dupe input_uprns
|
|
raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}")
|
|
|
|
# If we have patches or overrides, we should read them in here
|
|
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
|
|
|
|
if body.file_type == "xlsx" and body.file_format == "domna_asset_list":
|
|
# We check if we have valution data
|
|
if not valuation_data and body.valuation_file_path in [None, ""]:
|
|
# We check plan_input
|
|
if "domna_valuation" in plan_input[0]:
|
|
valuation_data = [{"uprn": x["uprn"], "valuation": x["domna_valuation"]} for x in plan_input]
|
|
|
|
cleaning_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
|
)
|
|
|
|
input_properties, inspections_map, eco_packages = [], {}, {}
|
|
for config in tqdm(plan_input):
|
|
|
|
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
|
uprn = config.get("uprn", None)
|
|
if pd.isnull(uprn):
|
|
uprn = None
|
|
if uprn:
|
|
uprn = int(float(uprn))
|
|
|
|
address1 = config.get("address", None)
|
|
# Handle domna address list format
|
|
if pd.isnull(address1) and body.file_format == "domna_asset_list":
|
|
address1 = config.get("domna_full_address", None)
|
|
|
|
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
|
|
full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None
|
|
heating_system = parse_heating_system(config)
|
|
|
|
epc_searcher = SearchEpc(
|
|
address1=address1,
|
|
postcode=config["postcode"],
|
|
uprn=uprn,
|
|
auth_token=get_settings().EPC_AUTH_TOKEN,
|
|
os_api_key="",
|
|
full_address=full_address,
|
|
heating_system=heating_system
|
|
)
|
|
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
|
|
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
|
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
|
epc_searcher.find_property(skip_os=True)
|
|
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and (
|
|
epc_searcher.newest_epc["uprn"] < 0
|
|
):
|
|
epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED
|
|
|
|
# We check for an energy assessment we have performed on this property:
|
|
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
|
|
|
|
property_id, is_new = ensure_property_exists(
|
|
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
|
|
)
|
|
if not property_id:
|
|
continue
|
|
|
|
if not is_new and not body.multi_plan:
|
|
continue
|
|
|
|
if epc_searcher.newest_epc is None:
|
|
raise ValueError(
|
|
"No EPCs found for this property and did not estimate - likely need to provide a"
|
|
"property type and built form"
|
|
)
|
|
|
|
if is_new:
|
|
create_property_targets(
|
|
session,
|
|
property_id=property_id,
|
|
portfolio_id=body.portfolio_id,
|
|
epc_target=body.goal_value,
|
|
heat_demand_target=None
|
|
)
|
|
|
|
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
|
|
# Otherwise, we use the newest EPC
|
|
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
|
|
# has been publically lodged
|
|
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
|
|
epc_searcher, energy_assessment
|
|
)
|
|
|
|
req_data = extract_property_request_data(
|
|
config=config,
|
|
patches=patches,
|
|
already_installed=already_installed,
|
|
non_invasive_recommendations=non_invasive_recommendations,
|
|
valuation_data=valuation_data,
|
|
uprn=epc_searcher.uprn,
|
|
)
|
|
# Pull this out as it may get overwritten
|
|
property_non_invasive_recommendations = req_data.non_invasive_recommendations
|
|
patch = req_data.patch
|
|
|
|
# if we have a remote assment data type, we pull the additional data and include it
|
|
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
|
|
logger.info("Retrieving find my epc data")
|
|
try:
|
|
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
|
|
epc_searcher.newest_epc
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve without cleaning address {e}")
|
|
for k in ["address", "address1"]:
|
|
epc_searcher.newest_epc[k] = epc_searcher.address_clean
|
|
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
|
|
epc_searcher.newest_epc
|
|
)
|
|
|
|
# If we have a property type, this means when we pull the epc data, we might need to make a patch
|
|
|
|
epc_records = patch_epc(patch, epc_records)
|
|
|
|
prepared_epc = EPCRecord(
|
|
epc_records=epc_records,
|
|
run_mode="newdata",
|
|
cleaning_data=cleaning_data,
|
|
)
|
|
|
|
# If we have an ECO project, we parse the cavity/solar reasons
|
|
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
|
|
|
|
# Final step - extract inspections data, if we have it - we inject into property for usage
|
|
property_inspections = extract_inspection_data(config)
|
|
if property_inspections:
|
|
inspections_map[property_id] = property_inspections
|
|
|
|
input_properties.append(
|
|
Property(
|
|
id=property_id,
|
|
is_new=is_new,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
epc_record=prepared_epc,
|
|
already_installed=req_data.already_installed + eco_packages[property_id][3],
|
|
property_valuation=req_data.valuation,
|
|
non_invasive_recommendations=property_non_invasive_recommendations,
|
|
energy_assessment=energy_assessment,
|
|
inspections=inspections_map.get(property_id),
|
|
**Property.extract_kwargs(config), # TODO: Depraecate this
|
|
)
|
|
)
|
|
|
|
if not input_properties:
|
|
return Response(status_code=204)
|
|
|
|
# We check if we have inspections data and store it in the database if so. We'll update or create
|
|
# aginst each property if
|
|
if inspections_map:
|
|
logger.info("Inserting inspections data")
|
|
bulk_upsert_inspections_pg(session, inspections_map)
|
|
|
|
# Set up model api and warm up the lambdas
|
|
model_api = ModelApi(
|
|
portfolio_id=body.portfolio_id,
|
|
timestamp=created_at,
|
|
prediction_buckets=get_prediction_buckets(),
|
|
max_retries=1
|
|
)
|
|
await model_api.async_warm_up_lambdas(
|
|
model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES
|
|
)
|
|
|
|
# The materials data could be cached or local so we don't need to make
|
|
# consistent requests to the backend for
|
|
# the same data
|
|
logger.info("Reading in materials and cleaned datasets")
|
|
materials = get_materials(session)
|
|
cleaned = get_cleaned()
|
|
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
|
|
|
|
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
|
|
|
|
epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned)
|
|
|
|
kwh_preds = await model_api.async_paginated_predictions(
|
|
data=epcs_for_scoring,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
extract_ids=False,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the spatial data
|
|
logger.info("Getting spatial data")
|
|
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
|
|
|
|
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
|
|
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
|
|
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
|
|
# extensions, since it doesn't seem to do a great job
|
|
|
|
logger.info("Performing solar analysis")
|
|
|
|
ofgem_consumption_averages = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET,
|
|
file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet"
|
|
)
|
|
|
|
building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data(
|
|
input_properties=input_properties,
|
|
ofgem_consumption_averages=ofgem_consumption_averages,
|
|
body=body
|
|
)
|
|
|
|
input_properties = GoogleSolarApi.building_solar_analysis(
|
|
building_solar_config=building_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
|
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
|
)
|
|
|
|
input_properties = GoogleSolarApi.unit_solar_analysis(
|
|
unit_solar_config=unit_solar_config,
|
|
input_properties=input_properties,
|
|
session=session,
|
|
body=body,
|
|
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
|
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
|
inspections_map=inspections_map
|
|
)
|
|
|
|
# We also make a tweak - if the property has been flagged for solar but doesn't contain
|
|
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
|
|
|
|
logger.info("Identifying property recommendations")
|
|
recommendations = {}
|
|
recommendations_scoring_data = []
|
|
representative_recommendations = {}
|
|
for p in tqdm(input_properties):
|
|
# We set the ECO package data, if we have it
|
|
property_eco_package = eco_packages.get(p.id, (None, None, None))
|
|
if property_eco_package[0] is not None:
|
|
inclusions = property_eco_package[0]
|
|
exclusions = []
|
|
else:
|
|
inclusions = body.inclusions
|
|
exclusions = body.exclusions
|
|
|
|
recommender = Recommendations(
|
|
property_instance=p,
|
|
materials=materials,
|
|
exclusions=exclusions,
|
|
inclusions=inclusions,
|
|
default_u_values=body.default_u_values
|
|
)
|
|
property_recommendations, property_representative_recommendations = recommender.recommend()
|
|
|
|
if not property_recommendations:
|
|
continue
|
|
|
|
recommendations[p.id] = property_recommendations
|
|
representative_recommendations[p.id] = property_representative_recommendations
|
|
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
p.adjust_difference_record_with_recommendations(
|
|
property_recommendations, property_representative_recommendations
|
|
)
|
|
|
|
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
|
|
|
logger.info("Preparing data for scoring in sap change api")
|
|
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
|
|
|
recommendations_scoring_data = recommendations_scoring_data.drop(
|
|
columns=[
|
|
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
|
|
"carbon_ending"
|
|
]
|
|
)
|
|
|
|
all_predictions = await model_api.async_paginated_predictions(
|
|
data=recommendations_scoring_data,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the predictions into the recommendations, and get the impact summary
|
|
scoring_epcs = [] # For scoring the kwh models
|
|
for property_id in recommendations.keys():
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
recommendations_with_impact, impact_summary = (
|
|
Recommendations.calculate_recommendation_impact(
|
|
property_instance=property_instance,
|
|
all_predictions=all_predictions,
|
|
recommendations=recommendations,
|
|
representative_recommendations=representative_recommendations
|
|
)
|
|
)
|
|
|
|
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
|
|
# at each phase
|
|
property_instance.update_simulation_epcs(impact_summary)
|
|
scoring_epcs.extend(property_instance.updated_simulation_epcs)
|
|
recommendations[property_id] = recommendations_with_impact
|
|
|
|
# We call the API with the scoring epcs
|
|
scoring_epcs = pd.DataFrame(scoring_epcs)
|
|
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
|
|
|
|
kwh_simulation_predictions = await model_api.async_paginated_predictions(
|
|
data=scoring_epcs,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=model_api.KWH_MODEL_PREFIXES,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# We now insert kwh estimates and costs into the recommendations
|
|
logger.info("Calculating tenant savings - kwh and bills")
|
|
for property_id in tqdm([p.id for p in input_properties]):
|
|
property_recommendations = recommendations.get(property_id, [])
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
property_current_energy_bill = (
|
|
Recommendations.calculate_recommendation_tenant_savings(
|
|
property_instance=property_instance,
|
|
kwh_simulation_predictions=kwh_simulation_predictions,
|
|
property_recommendations=property_recommendations,
|
|
ashp_cop=body.ashp_cop
|
|
)
|
|
)
|
|
property_instance.current_energy_bill = property_current_energy_bill
|
|
|
|
# Insert the predictions into the recommendations and run the optimiser
|
|
for p in input_properties:
|
|
if not recommendations.get(p.id):
|
|
continue
|
|
|
|
# we need to double unlist because we have a list of lists
|
|
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
|
|
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
|
|
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
|
|
|
|
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
|
|
# its inclusion
|
|
needs_ventilation = any(
|
|
x in property_measure_types for x in assumptions.measures_needing_ventilation
|
|
) and not p.has_ventilation
|
|
|
|
if not measures_to_optimise:
|
|
# Nothing to do, we just reshape the recommendations
|
|
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
|
|
p.id, recommendations, set()
|
|
)
|
|
continue
|
|
|
|
fixed_gain = optimiser_functions.calculate_fixed_gain(
|
|
property_required_measures, recommendations, p, needs_ventilation
|
|
)
|
|
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
|
|
|
|
funding = Funding(
|
|
tenure=body.housing_type,
|
|
project_scores_matrix=project_scores_matrix,
|
|
partial_project_scores_matrix=partial_project_scores_matrix,
|
|
whlg_eligible_postcodes=whlg_eligible_postcodes,
|
|
eco4_social_cavity_abs_rate=13,
|
|
eco4_social_solid_abs_rate=17,
|
|
eco4_private_cavity_abs_rate=13,
|
|
eco4_private_solid_abs_rate=17,
|
|
gbis_social_cavity_abs_rate=21,
|
|
gbis_social_solid_abs_rate=25,
|
|
gbis_private_cavity_abs_rate=21,
|
|
gbis_private_solid_abs_rate=28,
|
|
)
|
|
|
|
li_thickness = convert_thickness_to_numeric(
|
|
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
|
|
)
|
|
current_wall_u_value = p.walls["thermal_transmittance"]
|
|
if current_wall_u_value is None:
|
|
current_wall_u_value = get_wall_u_value(
|
|
clean_description=p.walls["clean_description"],
|
|
age_band=p.age_band,
|
|
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
|
|
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
|
|
)
|
|
|
|
# We insert the innovation uplift
|
|
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
|
|
|
|
# TODO: Turn this into a function and store the innovaiton uplift
|
|
for group in measures_to_optimise_with_uplift:
|
|
for r in group:
|
|
|
|
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
|
|
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
|
|
(
|
|
r["partial_project_score"],
|
|
r["partial_project_funding"],
|
|
r["innovation_uplift"],
|
|
r["uplift_project_score"],
|
|
) = (
|
|
0, 0, 0, 0
|
|
)
|
|
continue
|
|
|
|
(
|
|
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
|
|
r["uplift_project_score"]
|
|
) = funding.get_innovation_uplift(
|
|
measure=r,
|
|
starting_sap=int(p.data["current-energy-efficiency"]),
|
|
floor_area=p.floor_area,
|
|
is_cavity=p.walls["is_cavity_wall"],
|
|
current_wall_uvalue=current_wall_u_value,
|
|
is_partial="partial" in p.walls["clean_description"].lower(),
|
|
existing_li_thickness=li_thickness,
|
|
mainheating=p.main_heating,
|
|
main_fuel=p.main_fuel,
|
|
mainheat_energy_eff=p.data["mainheat-energy-eff"],
|
|
)
|
|
|
|
if r["already_installed"]:
|
|
# if already installed, we zero out the uplift and funding
|
|
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
|
|
r["uplift_project_score"]) = (
|
|
0, 0, 0, 0
|
|
)
|
|
|
|
input_measures = optimiser_functions.prepare_input_measures(
|
|
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
|
|
property_eco_packages=eco_packages.get(p.id)
|
|
)
|
|
|
|
# When the goal is Increasing EPC, we can run the funding optimiser
|
|
if body.goal == "Increasing EPC":
|
|
|
|
solutions = optimise_with_funding_paths(
|
|
p=p,
|
|
input_measures=input_measures,
|
|
housing_type=body.housing_type,
|
|
budget=body.budget,
|
|
target_gain=gain,
|
|
funding=funding,
|
|
work_package=eco_packages[p.id][2]
|
|
)
|
|
|
|
# If the solution isn't eligible, we can't really consider it
|
|
solutions = solutions[
|
|
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
|
|
]
|
|
|
|
if solutions["meets_upgrade_target"].any():
|
|
# If we have a solution that meets the upgrade target, we select that one
|
|
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
|
|
else:
|
|
# Pick the cheapest
|
|
optimal_solution = solutions.iloc[0]
|
|
|
|
# This is the list of measures that we will recommend
|
|
scheme = optimal_solution["scheme"]
|
|
|
|
# We create this full list of selected measures, which is used in the next section for setting
|
|
# default measures
|
|
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
|
|
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
|
|
|
|
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
|
|
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
|
|
optimal_solution["partial_project_funding"]
|
|
# This is the total amount of funding associated to the uplift (£)
|
|
total_uplift = optimal_solution["total_uplift"]
|
|
# This is the funding scheme selected
|
|
# This is the full project ABS
|
|
full_project_score = optimal_solution["project_score"]
|
|
# This is the partial project ABS
|
|
partial_project_score = optimal_solution["partial_project_score"]
|
|
# This is the uplift score ABS
|
|
uplift_project_score = optimal_solution["total_uplift_score"]
|
|
else:
|
|
# We optimise and then we determine eligibility for funding, based on the measures selected
|
|
optimiser = (
|
|
GainOptimiser(
|
|
input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
|
|
) if body.budget else CostOptimiser(input_measures, min_gain=gain)
|
|
)
|
|
optimiser.setup()
|
|
optimiser.solve()
|
|
solution = optimiser.solution
|
|
|
|
recommendation_types = []
|
|
for measures in input_measures:
|
|
for measure in measures:
|
|
recommendation_types.append(measure["type"])
|
|
recommendation_types = set(recommendation_types)
|
|
|
|
has_wall_insulation_recommendation = any(
|
|
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
|
|
WALL_INSULATION_MEASURES
|
|
)
|
|
has_roof_insulation_recommendation = any(
|
|
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
|
|
ROOF_INSULATION_MEASURES
|
|
)
|
|
|
|
funding.check_funding(
|
|
measures=solution,
|
|
starting_sap=int(p.data["current-energy-efficiency"]),
|
|
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
|
|
floor_area=p.floor_area,
|
|
mainheat_description=p.main_heating["clean_description"],
|
|
heating_control_description=p.main_heating_controls["clean_description"],
|
|
is_cavity=p.walls["is_cavity_wall"],
|
|
current_wall_uvalue=current_wall_u_value,
|
|
is_partial="partial" in p.walls["clean_description"].lower(),
|
|
existing_li_thickness=li_thickness,
|
|
mainheating=p.main_heating,
|
|
main_fuel=p.main_fuel,
|
|
mainheat_energy_eff=p.data["mainheat-energy-eff"],
|
|
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
|
|
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
|
|
)
|
|
|
|
# Determine the scheme
|
|
scheme = "none"
|
|
if funding.eco4_eligible:
|
|
scheme = "eco4"
|
|
if scheme == "none" and funding.gbis_eligible:
|
|
scheme = "gbis"
|
|
|
|
funded_measures = solution if scheme in ["gbis", "eco4"] else []
|
|
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
|
|
total_uplift = funding.eco4_uplift
|
|
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
|
|
partial_project_score = funding.partial_project_abs
|
|
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
|
|
|
|
selected = {r["id"] for r in solution}
|
|
|
|
if property_required_measures:
|
|
solution = optimiser_functions.add_required_measures(
|
|
property_id=p.id, property_required_measures=property_required_measures,
|
|
recommendations=recommendations, selected=selected,
|
|
)
|
|
|
|
# Add best practice measures (ventilation/trickle vents)
|
|
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
|
|
# Final flattening
|
|
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
|
|
p.id, recommendations, selected
|
|
)
|
|
|
|
# TODO: functionise
|
|
for measure in funded_measures:
|
|
if "+mechanical_ventilation" in measure["type"]:
|
|
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
|
|
|
|
p.insert_funding(
|
|
scheme=scheme,
|
|
funded_measures=funded_measures,
|
|
project_funding=project_funding,
|
|
total_uplift=total_uplift,
|
|
full_project_score=full_project_score,
|
|
partial_project_score=partial_project_score,
|
|
uplift_project_score=uplift_project_score
|
|
)
|
|
|
|
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
|
|
# of them
|
|
# TODO: We can probably do better and optimise at the building level - this is temp
|
|
logger.info("Adjusting solar PV recommendations for buildings")
|
|
building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
|
|
|
|
for bid in building_ids:
|
|
# We check if any of them have solar PV
|
|
building = [p for p in input_properties if p.building_id == bid]
|
|
has_solar = False
|
|
for unit in building:
|
|
# Get default recommendations
|
|
has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
|
|
if has_solar:
|
|
break
|
|
|
|
if has_solar:
|
|
# We adjust the units within the building
|
|
for unit in building:
|
|
for rec in recommendations[unit.id]:
|
|
if rec["type"] == "solar_pv":
|
|
# This is straightforward, we just set the default to True, since when we're at a building
|
|
# level, we only allow 1 solar PV option for each unit. If we change this, this logic will
|
|
# need to be updated
|
|
rec["default"] = True
|
|
|
|
logger.info("Uploading recommendations to the database")
|
|
# If we have any work to do, we create a new scenario
|
|
if body.scenario_id:
|
|
# We don't need to create a new scenario, we just use the existing one
|
|
scenario_id = body.scenario_id
|
|
else:
|
|
engine_scenario = create_scenario(
|
|
session=session,
|
|
scenario={
|
|
"name": body.scenario_name,
|
|
"created_at": created_at,
|
|
"budget": body.budget,
|
|
"portfolio_id": body.portfolio_id,
|
|
"housing_type": body.housing_type,
|
|
"goal": body.goal,
|
|
"goal_value": body.goal_value,
|
|
"trigger_file_path": body.trigger_file_path,
|
|
"already_installed_file_path": body.already_installed_file_path,
|
|
"patches_file_path": body.patches_file_path,
|
|
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
|
|
"exclusions": body.exclusions,
|
|
"multi_plan": body.multi_plan
|
|
}
|
|
)
|
|
scenario_id = engine_scenario.id
|
|
|
|
property_valuation_increases = []
|
|
session.commit()
|
|
new_epc_bands = {}
|
|
property_value_increase_ranges = {}
|
|
for i in range(0, len(input_properties), BATCH_SIZE):
|
|
try:
|
|
# Take a slice of the input_properties list to make a batch
|
|
batch_properties = input_properties[i:i + BATCH_SIZE]
|
|
|
|
for p in batch_properties:
|
|
recommendations_to_upload = recommendations.get(p.id, [])
|
|
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
|
|
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
|
|
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
|
|
new_epc = sap_to_epc(new_sap_points)
|
|
new_epc_bands[p.id] = new_epc
|
|
|
|
total_cost = sum([r["total"] for r in default_recommendations])
|
|
|
|
valuations = PropertyValuation.estimate(
|
|
property_instance=p, target_epc=new_epc, total_cost=total_cost
|
|
)
|
|
property_value_increase_ranges[p.id] = valuations
|
|
|
|
if p.is_new:
|
|
property_details_epc = p.get_property_details_epc(
|
|
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
|
|
)
|
|
create_property_details_epc(session, property_details_epc)
|
|
|
|
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
|
|
|
|
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
|
|
|
|
update_property_data(
|
|
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
|
|
)
|
|
|
|
if not recommendations_to_upload:
|
|
continue
|
|
|
|
new_plan_id = create_plan(session, {
|
|
"portfolio_id": body.portfolio_id,
|
|
"property_id": p.id,
|
|
"scenario_id": scenario_id,
|
|
"is_default": True if p.is_new else False,
|
|
"name": body.scenario_name,
|
|
"valuation_increase_lower_bound": (
|
|
valuations["lower_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_upper_bound": (
|
|
valuations["upper_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_average": (
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
),
|
|
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
|
|
})
|
|
|
|
upload_recommendations(
|
|
session, recommendations_to_upload, p.id, new_plan_id
|
|
)
|
|
|
|
upload_funding(session, p, new_plan_id, recommendations_to_upload)
|
|
|
|
if valuations["current_value"] > 0:
|
|
property_valuation_increases.append(
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
)
|
|
|
|
# Commit the session after each batch
|
|
session.commit()
|
|
|
|
except Exception as e:
|
|
# Rollback the session if an error occurs
|
|
session.rollback()
|
|
logger.warning("Failed i = %s" % str(i))
|
|
logger.error(f"An error occurred during batch starting at index {i}: {e}")
|
|
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
|
|
|
|
logger.info("Creating portfolio aggregations")
|
|
# We implement this in the simplest way possible which will be just to query the database for all
|
|
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
|
|
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
|
|
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
|
|
# the portfolion level impact
|
|
|
|
total_valuation_increase = sum(property_valuation_increases)
|
|
labour_days = round(max(
|
|
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
|
|
))
|
|
|
|
# TODO - This code only pulls in the properties that have been updated in this run, but we need to
|
|
# aggregate all properties in the portfolio. We likely need to trigger a re-aggregation
|
|
aggregated_data = extract_portfolio_aggregation_data(
|
|
input_properties=input_properties,
|
|
total_valuation_increase=total_valuation_increase,
|
|
recommendations=recommendations,
|
|
new_epc_bands=new_epc_bands,
|
|
property_value_increase_ranges=property_value_increase_ranges
|
|
)
|
|
|
|
aggregate_portfolio_recommendations(
|
|
session,
|
|
portfolio_id=body.portfolio_id,
|
|
scenario_id=scenario_id,
|
|
total_valuation_increase=total_valuation_increase,
|
|
labour_days=labour_days,
|
|
aggregated_data=aggregated_data
|
|
)
|
|
|
|
# Commit final changes
|
|
session.commit()
|
|
|
|
except IntegrityError:
|
|
return handle_error(session, "Database integrity error.", 500)
|
|
except OperationalError:
|
|
return handle_error(session, "Database operational error.", 500)
|
|
except ValueError:
|
|
return handle_error(session, "Bad request: malformed data.", 400)
|
|
except Exception as e: # General exception handling
|
|
return handle_error(session, "An unexpected error occurred.", 500)
|
|
finally:
|
|
session.close()
|
|
|
|
logger.info("Model Engine completed successfully")
|
|
|
|
return Response(status_code=200)
|