mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1524 lines
69 KiB
Python
1524 lines
69 KiB
Python
import json
|
|
from datetime import datetime
|
|
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
from etl.epc.Record import EPCRecord
|
|
from backend.SearchEpc import SearchEpc
|
|
from fastapi import APIRouter, Depends
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from sqlalchemy.orm import sessionmaker
|
|
from starlette.responses import Response
|
|
|
|
import backend.app.assumptions as assumptions
|
|
from backend.app.config import get_settings, get_prediction_buckets
|
|
from backend.app.db.connection import db_engine
|
|
from backend.app.db.functions.materials_functions import get_materials
|
|
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
|
|
from backend.app.db.functions.property_functions import (
|
|
create_property, create_property_details_epc, create_property_targets, update_property_data,
|
|
update_or_create_property_spatial_details
|
|
)
|
|
from backend.app.db.functions.recommendations_functions import (
|
|
create_plan, upload_recommendations, create_scenario
|
|
)
|
|
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
|
|
from backend.app.db.models.portfolio import rating_lookup
|
|
from backend.app.dependencies import validate_token
|
|
from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest
|
|
from backend.app.plan.utils import get_cleaned
|
|
from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc
|
|
|
|
from backend.ml_models.api import ModelApi
|
|
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
|
|
from backend.Property import Property
|
|
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
|
|
|
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
|
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
|
from recommendations.optimiser.optimiser_functions import prepare_input_measures
|
|
from recommendations.Recommendations import Recommendations
|
|
from recommendations.Mds import Mds
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
|
|
from backend.ml_models.Valuation import PropertyValuation
|
|
|
|
from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel
|
|
from etl.bill_savings.KwhData import KwhData
|
|
from etl.spatial.OpenUprnClient import OpenUprnClient
|
|
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
|
|
|
|
logger = setup_logger()
|
|
|
|
BATCH_SIZE = 5
|
|
SCORING_BATCH_SIZE = 400
|
|
|
|
|
|
def patch_epc(patch, epc_records):
|
|
"""
|
|
This utility function is useful to patch the epc data if we have data from the customer
|
|
:return:
|
|
"""
|
|
|
|
for patch_variable, patch_value in patch.items():
|
|
|
|
if patch_variable in ["address", "postcode"]:
|
|
continue
|
|
|
|
if patch_value == "":
|
|
continue
|
|
if patch_variable in epc_records["original_epc"]:
|
|
epc_records["original_epc"][patch_variable] = patch_value
|
|
|
|
return epc_records
|
|
|
|
|
|
def extract_portfolio_aggregation_data(
|
|
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
|
|
):
|
|
# We aggregate a number of metrics for the portfolio:
|
|
# 1) A breakdown of the number of properties in each EPC band
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 2) Number of units
|
|
# 3) Co2/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 4) Energy bill/unit
|
|
# a) before retrofit
|
|
# b) after retrofit
|
|
# 5) Average valuation improvement/unit
|
|
# 6) Total cost
|
|
# 7) Cost per unit
|
|
# 8) £ per CO2 saved
|
|
# 9) £ per SAP point
|
|
|
|
# We need to construct the underlyind data for this
|
|
|
|
# Helper function to reformat the EPC data
|
|
def reformat_epc_data(epc_counts):
|
|
# Define all possible EPC bands in the required order
|
|
epc_bands = ["G", "F", "E", "D", "C", "B", "A"]
|
|
|
|
# Create the formatted data list by checking each band in the order
|
|
formatted_data = []
|
|
for band in epc_bands:
|
|
# Get the count from the dictionary, defaulting to 0 if not present
|
|
count = epc_counts.get(band, 0)
|
|
# Append the formatted dictionary to the list
|
|
formatted_data.append({"name": band, band: count})
|
|
|
|
return formatted_data
|
|
|
|
n_units = len(input_properties)
|
|
|
|
agg_data = []
|
|
for p in input_properties:
|
|
# Get the recommendations for the property - we include all properties, even ones without recommendations
|
|
property_recommendations = recommendations.get(p.id, [])
|
|
|
|
# Get just the default recommendations
|
|
default_recommendations = [r for r in property_recommendations if r["default"]]
|
|
|
|
has_recommendations = len(default_recommendations) > 0
|
|
|
|
# We can now calculate multiple outputs based on default recommendations
|
|
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
|
|
|
|
pre_retrofit_co2 = p.data["co2-emissions-current"]
|
|
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
|
|
|
|
pre_retrofit_energy_bill = p.current_energy_bill
|
|
post_retrofit_energy_bill = p.current_energy_bill - sum(
|
|
[r["energy_cost_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
pre_retrofit_energy_consumption = p.current_adjusted_energy
|
|
post_retrofit_energy_consumption = p.current_adjusted_energy - sum(
|
|
[r["kwh_savings"] for r in default_recommendations]
|
|
)
|
|
|
|
# Add up energy savings
|
|
cost = sum([r["total"] for r in default_recommendations])
|
|
sap_point_improvement = sum([r["sap_points"] for r in default_recommendations])
|
|
|
|
lower_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["lower_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
upper_bound_valuation_uplift = (
|
|
property_value_increase_ranges[p.id]["upper_bound_increased_value"] -
|
|
property_value_increase_ranges[p.id]["current_value"]
|
|
)
|
|
|
|
agg_data.append({
|
|
"pre_retrofit_epc": p.data["current-energy-rating"],
|
|
"post_retrofit_epc": new_epc_bands[p.id],
|
|
"pre_retrofit_co2": pre_retrofit_co2,
|
|
"post_retrofit_co2": post_retrofit_co2,
|
|
"pre_retrofit_energy_bill": pre_retrofit_energy_bill,
|
|
"post_retrofit_energy_bill": post_retrofit_energy_bill,
|
|
"pre_retrofit_energy_consumption": pre_retrofit_energy_consumption,
|
|
"post_retrofit_energy_consumption": post_retrofit_energy_consumption,
|
|
"cost": cost,
|
|
"sap_point_improvement": sap_point_improvement,
|
|
"lower_bound_valuation_uplift": lower_bound_valuation_uplift,
|
|
"upper_bound_valuation_uplift": upper_bound_valuation_uplift,
|
|
"has_recommendations": has_recommendations
|
|
})
|
|
|
|
agg_data = pd.DataFrame(agg_data)
|
|
|
|
n_units_to_retrofit = agg_data["has_recommendations"].sum()
|
|
|
|
valuation_improvement_lower_bound_per_unit = (
|
|
agg_data["lower_bound_valuation_uplift"].mean()
|
|
)
|
|
valuation_improvement_upper_bound_per_unit = (
|
|
agg_data["upper_bound_valuation_uplift"].mean()
|
|
)
|
|
|
|
total_carbon_saved = agg_data["pre_retrofit_co2"].sum() - agg_data["post_retrofit_co2"].sum()
|
|
total_sap_points = agg_data["sap_point_improvement"].sum()
|
|
|
|
def format_money(amount):
|
|
return f"£{amount:,.0f}"
|
|
|
|
valuation_improvment_per_unit = str(
|
|
format_money(
|
|
total_valuation_increase / n_units) + (f" ({format_money(valuation_improvement_lower_bound_per_unit)} - "
|
|
f"{format_money(valuation_improvement_upper_bound_per_unit)})")
|
|
)
|
|
|
|
valuation_return_on_investment = str(
|
|
str(round(total_valuation_increase / agg_data["cost"].sum(), 2)) +
|
|
f" ("
|
|
f"{agg_data['lower_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f} - "
|
|
f"{agg_data['upper_bound_valuation_uplift'].sum() / agg_data['cost'].sum():,.2f})"
|
|
)
|
|
|
|
aggregation_data = {
|
|
"epc_breakdown_pre_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["pre_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"epc_breakdown_post_retrofit": json.dumps(
|
|
reformat_epc_data(agg_data["post_retrofit_epc"].value_counts().to_dict())
|
|
),
|
|
"number_of_properties": int(n_units),
|
|
"n_units_to_retrofit": int(n_units_to_retrofit),
|
|
"co2_per_unit_pre_retrofit": str(round(agg_data["pre_retrofit_co2"].mean(), 2)) + "t",
|
|
"co2_per_unit_post_retrofit": str(round(agg_data["post_retrofit_co2"].mean(), 2)) + "t",
|
|
"energy_bill_per_unit_pre_retrofit": format_money(agg_data["pre_retrofit_energy_bill"].mean()),
|
|
"energy_bill_per_unit_post_retrofit": format_money(agg_data["post_retrofit_energy_bill"].mean()),
|
|
"energy_consumption_per_unit_pre_retrofit": str(
|
|
round(agg_data["pre_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"energy_consumption_per_unit_post_retrofit": str(
|
|
round(agg_data["post_retrofit_energy_consumption"].mean())) + "kWh",
|
|
"valuation_improvement_per_unit": valuation_improvment_per_unit,
|
|
"cost_per_unit": format_money(agg_data["cost"].mean()),
|
|
"cost_per_co2_saved": format_money(agg_data["cost"].sum() / total_carbon_saved),
|
|
"cost_per_sap_point": format_money(agg_data["cost"].sum() / total_sap_points),
|
|
"valuation_return_on_investment": valuation_return_on_investment,
|
|
# TODO: Could we add 10yr carbon credits value?
|
|
}
|
|
|
|
return aggregation_data
|
|
|
|
|
|
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
|
|
"""
|
|
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
|
|
and will factor in an energy assessment that we have performed for a client.
|
|
:param epc_searcher: An instance of the SearchEpc class
|
|
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
|
|
this should be an empty response as defined by the models's
|
|
EnergyAssessment.empty_response() method
|
|
"""
|
|
|
|
if not energy_assessment["epc"]:
|
|
energy_assessment_is_newer = False
|
|
return {
|
|
'original_epc': epc_searcher.newest_epc.copy(),
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy(),
|
|
}, energy_assessment_is_newer
|
|
|
|
epc = energy_assessment["epc"]
|
|
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
|
|
|
|
# We insert county into the epc, since right now this isn't something that we pull out from the energy
|
|
# assessment
|
|
for col in ["county", "constituency", "constituency-label", "local-authority", "local-authority-label"]:
|
|
epc[col] = epc_searcher.newest_epc[col]
|
|
|
|
# We check if the energy assessment is newer than the newest EPC
|
|
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(epc_searcher.newest_epc["inspection-date"]):
|
|
# In this case, our energy assessment is newer than the EPCs available for this property
|
|
energy_assessment_is_newer = True
|
|
return {
|
|
"original_epc": epc,
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy() + [epc_searcher.newest_epc.copy()]
|
|
}, energy_assessment_is_newer
|
|
|
|
# We check if the EPC we have produced is contained in the set of EPCs done for the property
|
|
# We do this based on inspection-date and SAP
|
|
epc_in_historicals = [
|
|
x for x in epc_searcher.older_epcs + [epc_searcher.newest_epc]
|
|
if x["inspection-date"] == energy_assessment_date and
|
|
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
|
|
]
|
|
energy_assessment_is_newer = False
|
|
|
|
if epc_in_historicals:
|
|
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
|
|
return {
|
|
"original_epc": epc_searcher.newest_epc.copy(),
|
|
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
|
|
"old_data": epc_searcher.older_epcs.copy()
|
|
}, energy_assessment_is_newer
|
|
|
|
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
|
|
# the historicals, so it can't have been lodged, so we include it in the old data
|
|
return {
|
|
'original_epc': epc_searcher.newest_epc.copy(),
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy() + [epc],
|
|
}, energy_assessment_is_newer
|
|
|
|
|
|
def get_on_site_data(body: PlanTriggerRequest):
|
|
"""
|
|
This function will read in the on-site data from the S3 bucket
|
|
:param body: The request body
|
|
:return:
|
|
"""
|
|
patches = []
|
|
if body.patches_file_path:
|
|
patches = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.patches_file_path)
|
|
|
|
already_installed = []
|
|
if body.already_installed_file_path:
|
|
already_installed = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.already_installed_file_path
|
|
)
|
|
|
|
non_invasive_recommendations = []
|
|
if body.non_invasive_recommendations_file_path:
|
|
non_invasive_recommendations = read_csv_from_s3(
|
|
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
|
|
)
|
|
|
|
return patches, already_installed, non_invasive_recommendations
|
|
|
|
|
|
def extract_propert_on_site_recommendations(config, already_installed, non_invasive_recommendations, uprn):
|
|
property_already_installed = next((
|
|
x for x in already_installed if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
|
|
# we need to check existence of uprn
|
|
has_uprn = "non_invasive_recommendations" in non_invasive_recommendations[0]
|
|
if has_uprn:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(x["uprn"] == uprn)
|
|
), {})
|
|
|
|
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
|
|
else:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
if isinstance(property_non_invasive_recommendations["recommendations"], str):
|
|
import ast
|
|
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
|
|
property_non_invasive_recommendations["recommendations"]
|
|
)
|
|
transformed = []
|
|
for rec in property_non_invasive_recommendations["recommendations"]:
|
|
if isinstance(rec, str):
|
|
transformed.append({"type": rec, })
|
|
|
|
property_non_invasive_recommendations["recommendations"] = str(transformed)
|
|
|
|
return property_already_installed, property_non_invasive_recommendations
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/plan",
|
|
tags=["plan"],
|
|
dependencies=[Depends(validate_token)],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
@router.post("/trigger")
|
|
async def trigger_plan(body: PlanTriggerRequest):
|
|
logger.info("Connecting to db")
|
|
session = sessionmaker(bind=db_engine)()
|
|
created_at = datetime.now().isoformat()
|
|
|
|
# TODO: if the measure is already installed, it should actually be the very first phase
|
|
|
|
try:
|
|
session.begin()
|
|
logger.info("Getting the inputs")
|
|
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
|
|
# If we have patches or overrides, we should read them in here
|
|
patches, already_installed, non_invasive_recommendations = get_on_site_data(body)
|
|
|
|
cleaning_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
|
)
|
|
|
|
input_properties = []
|
|
for config in tqdm(plan_input):
|
|
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
|
uprn = config.get("uprn", None)
|
|
if uprn:
|
|
uprn = int(float(uprn))
|
|
|
|
epc_searcher = SearchEpc(
|
|
address1=config["address"],
|
|
postcode=config["postcode"],
|
|
uprn=uprn,
|
|
auth_token=get_settings().EPC_AUTH_TOKEN,
|
|
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
|
|
)
|
|
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
|
|
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
|
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
|
epc_searcher.find_property(skip_os=True)
|
|
|
|
# We check for an energy assessment we have performed on this property:
|
|
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
|
|
|
|
# Create a record in db
|
|
property_id, is_new = create_property(
|
|
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
|
|
)
|
|
if not is_new and not body.multi_plan:
|
|
continue
|
|
|
|
if is_new:
|
|
create_property_targets(
|
|
session,
|
|
property_id=property_id,
|
|
portfolio_id=body.portfolio_id,
|
|
epc_target=body.goal_value,
|
|
heat_demand_target=None
|
|
)
|
|
|
|
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
|
|
# Otherwise, we use the newest EPC
|
|
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
|
|
# has been publically lodged
|
|
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
|
|
epc_searcher, energy_assessment
|
|
)
|
|
patch = next((
|
|
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
epc_records = patch_epc(patch, epc_records)
|
|
|
|
prepared_epc = EPCRecord(
|
|
epc_records=epc_records,
|
|
run_mode="newdata",
|
|
cleaning_data=cleaning_data
|
|
)
|
|
|
|
property_already_installed, property_non_invasive_recommendations = extract_propert_on_site_recommendations(
|
|
config, already_installed, non_invasive_recommendations, uprn
|
|
)
|
|
|
|
input_properties.append(
|
|
Property(
|
|
id=property_id,
|
|
is_new=is_new,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
epc_record=prepared_epc,
|
|
already_installed=property_already_installed,
|
|
non_invasive_recommendations=property_non_invasive_recommendations,
|
|
energy_assessment=energy_assessment,
|
|
**Property.extract_kwargs(config), # TODO: Depraecate this
|
|
)
|
|
)
|
|
|
|
if not input_properties:
|
|
return Response(status_code=204)
|
|
|
|
# The materials data could be cached or local so we don't need to make
|
|
# consistent requests to the backend for
|
|
# the same data
|
|
logger.info("Reading in materials and cleaned datasets")
|
|
materials = get_materials(session)
|
|
cleaned = get_cleaned()
|
|
|
|
solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY)
|
|
|
|
dataset_version = "2024-07-08"
|
|
energy_consumption_client = EnergyConsumptionModel(
|
|
model_paths={
|
|
"heating_kwh": f"model_directory/energy_consumption_model/heating_kwh_{dataset_version}.pkl",
|
|
"hot_water_kwh": f"model_directory/energy_consumption_model/hot_water_kwh_{dataset_version}.pkl"
|
|
},
|
|
dummy_schema_path=f"model_directory/energy_consumption_model/{dataset_version}_dummy_schema.pkl",
|
|
consumption_average_path=f"energy_consumption/{dataset_version}/consumption_averages.parquet",
|
|
cleaned=cleaned,
|
|
environment=get_settings().ENVIRONMENT
|
|
)
|
|
|
|
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
|
|
|
|
model_api = ModelApi(
|
|
portfolio_id=body.portfolio_id,
|
|
timestamp=created_at,
|
|
prediction_buckets=get_prediction_buckets()
|
|
)
|
|
|
|
epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned)
|
|
|
|
kwh_preds = model_api.paginated_predictions(
|
|
data=epcs_for_scoring,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"],
|
|
extract_ids=False,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the spatial data
|
|
logger.info("Getting spatial data")
|
|
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
|
|
|
|
logger.info("Setting property features")
|
|
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
|
|
|
|
logger.info("Performing solar analysis")
|
|
# TODO: Tidy this up
|
|
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
|
|
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
|
|
# extensions, since it doesn't seem to do a great job
|
|
# TODO: For simple properties, we should do a comparison/check between the solar API's roof area and the
|
|
# basic estimate of roof area
|
|
building_ids = [
|
|
{
|
|
"building_id": p.building_id,
|
|
"longitude": p.spatial["longitude"],
|
|
"latitude": p.spatial["latitude"],
|
|
# Energy consumption is adjusted for the property's expected post retrofit state
|
|
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
|
|
# property to achieve post retrofit of just the fabric
|
|
"energy_consumption": energy_consumption_client.estimate_new_consumption(
|
|
current_energy_efficiency=p.data["current-energy-efficiency"],
|
|
target_efficiency="69",
|
|
current_consumption=p.estimate_electrical_consumption(
|
|
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
|
|
)
|
|
),
|
|
"property_id": p.id,
|
|
"uprn": p.uprn
|
|
} for p in input_properties if p.building_id is not None
|
|
]
|
|
individual_units = [
|
|
{
|
|
"longitude": p.spatial["longitude"],
|
|
"latitude": p.spatial["latitude"],
|
|
# Energy consumption is adjusted for the property's expected post retrofit state
|
|
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
|
|
# property to achieve post retrofit of just the fabric
|
|
"energy_consumption": energy_consumption_client.estimate_new_consumption(
|
|
current_energy_efficiency=p.data["current-energy-efficiency"],
|
|
target_efficiency="69",
|
|
current_consumption=p.estimate_electrical_consumption(
|
|
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
|
|
),
|
|
),
|
|
"property_id": p.id,
|
|
"uprn": p.uprn
|
|
} for p in input_properties if p.building_id is None
|
|
]
|
|
if building_ids:
|
|
# Find the unique longitude and latitude pairs for each building id
|
|
unique_coordinates = {}
|
|
building_uprns = {}
|
|
for entry in building_ids:
|
|
building_id = entry['building_id']
|
|
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
|
|
|
|
if building_id not in unique_coordinates:
|
|
unique_coordinates[building_id] = []
|
|
|
|
if coordinate_pair not in unique_coordinates[building_id]:
|
|
unique_coordinates[building_id].append(coordinate_pair)
|
|
|
|
if building_id not in building_uprns:
|
|
building_uprns[building_id] = []
|
|
|
|
if entry['uprn'] not in building_uprns[building_id]:
|
|
building_uprns[building_id].append(
|
|
{
|
|
"uprn": entry['uprn'], "longitude": entry['longitude'], "latitude": entry['latitude']
|
|
}
|
|
)
|
|
|
|
solar_panel_configuration = {}
|
|
for building_id, coordinates in unique_coordinates.items():
|
|
if len(coordinates) > 1:
|
|
raise NotImplementedError("more than one coordinate for a building - handle me")
|
|
|
|
coordinates = coordinates[0]
|
|
energy_consumption = sum(
|
|
[entry['energy_consumption'] for entry in building_ids if entry['building_id'] == building_id]
|
|
)
|
|
solar_api_client.get(
|
|
longitude=coordinates["longitude"],
|
|
latitude=coordinates["latitude"],
|
|
energy_consumption=energy_consumption,
|
|
is_building=True,
|
|
session=session
|
|
)
|
|
solar_panel_configuration[building_id] = {
|
|
"insights_data": solar_api_client.insights_data,
|
|
"panel_performance": solar_api_client.panel_performance,
|
|
"n_units": len([entry for entry in building_ids if entry['building_id'] == building_id])
|
|
}
|
|
|
|
# Store the data in the database
|
|
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
|
|
# exists
|
|
solar_api_client.save_to_db(
|
|
session=session, uprns_to_location=building_uprns[building_id], scenario_type="building"
|
|
)
|
|
|
|
# Insert this into the properties that have this building id
|
|
for p in input_properties:
|
|
if p.building_id == building_id:
|
|
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
|
|
|
|
unit_solar_panel_configuration["unit_share_of_energy"] = (
|
|
[x for x in building_ids if x["property_id"] == p.id][0]["energy_consumption"] /
|
|
energy_consumption
|
|
)
|
|
p.set_solar_panel_configuration(unit_solar_panel_configuration)
|
|
if individual_units:
|
|
# Model the solar potential at the property level
|
|
for unit in individual_units:
|
|
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
|
|
# At this level, we check if the property is suitable for solar and if now, skip
|
|
if not property_instance.is_solar_pv_valid():
|
|
continue
|
|
|
|
# We check if we have a solar non-invasive recommendation
|
|
if [r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"]:
|
|
continue
|
|
|
|
solar_api_client.get(
|
|
longitude=unit["longitude"],
|
|
latitude=unit["latitude"],
|
|
energy_consumption=unit["energy_consumption"],
|
|
is_building=False,
|
|
session=session,
|
|
uprn=unit["uprn"],
|
|
property_instance=property_instance
|
|
)
|
|
|
|
# Store the data in the database
|
|
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
|
|
# exists
|
|
solar_api_client.save_to_db(
|
|
session=session,
|
|
uprns_to_location=[
|
|
{
|
|
"uprn": property_instance.uprn,
|
|
"longitude": property_instance.spatial["longitude"],
|
|
"latitude": property_instance.spatial["latitude"]
|
|
}
|
|
],
|
|
scenario_type="unit"
|
|
)
|
|
|
|
property_instance.set_solar_panel_configuration(
|
|
solar_panel_configuration={
|
|
"insights_data": solar_api_client.insights_data,
|
|
"panel_performance": solar_api_client.panel_performance,
|
|
"unit_share_of_energy": 1
|
|
},
|
|
roof_area=solar_api_client.roof_area
|
|
)
|
|
|
|
logger.info("Getting components and epc recommendations")
|
|
recommendations = {}
|
|
recommendations_scoring_data = []
|
|
representative_recommendations = {}
|
|
for p in tqdm(input_properties):
|
|
recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions)
|
|
property_recommendations, property_representative_recommendations = recommender.recommend()
|
|
|
|
if not property_recommendations:
|
|
continue
|
|
|
|
recommendations[p.id] = property_recommendations
|
|
representative_recommendations[p.id] = property_representative_recommendations
|
|
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
p.adjust_difference_record_with_recommendations(
|
|
property_recommendations, property_representative_recommendations
|
|
)
|
|
|
|
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
|
|
|
# TODO: Make sure that number_habitable_rooms has been dropped
|
|
logger.info("Preparing data for scoring in sap change api")
|
|
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
|
|
|
recommendations_scoring_data = recommendations_scoring_data.drop(
|
|
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
|
|
"carbon_ending"]
|
|
)
|
|
|
|
all_predictions = model_api.paginated_predictions(
|
|
data=recommendations_scoring_data,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# Insert the predictions into the recommendations, and get the impact summary
|
|
scoring_epcs = [] # For scoring the kwh models
|
|
for property_id in recommendations.keys():
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
recommendations_with_impact, impact_summary = (
|
|
Recommendations.calculate_recommendation_impact(
|
|
property_instance=property_instance,
|
|
all_predictions=all_predictions,
|
|
recommendations=recommendations,
|
|
)
|
|
)
|
|
|
|
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
|
|
# at each phase
|
|
property_instance.update_simulation_epcs(impact_summary)
|
|
scoring_epcs.extend(property_instance.updated_simulation_epcs)
|
|
recommendations[property_id] = recommendations_with_impact
|
|
|
|
# We call the API with the scoring epcs
|
|
scoring_epcs = pd.DataFrame(scoring_epcs)
|
|
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
|
|
|
|
kwh_simulation_predictions = model_api.paginated_predictions(
|
|
data=scoring_epcs,
|
|
bucket=get_settings().DATA_BUCKET,
|
|
model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"],
|
|
batch_size=SCORING_BATCH_SIZE
|
|
)
|
|
|
|
# We now insert kwh estimates and costs into the recommendations
|
|
for property_id in recommendations.keys():
|
|
property_recommendations = recommendations[property_id]
|
|
property_instance = [p for p in input_properties if p.id == property_id][0]
|
|
|
|
property_current_energy_bill = Recommendations.calculate_recommendation_tenant_savings(
|
|
property_instance=property_instance,
|
|
kwh_simulation_predictions=kwh_simulation_predictions,
|
|
property_recommendations=property_recommendations
|
|
)
|
|
property_instance.current_energy_bill = property_current_energy_bill
|
|
|
|
# Insert the predictions into the recommendations and run the optimiser
|
|
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
|
|
# possibility with heating system
|
|
# TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water
|
|
# cylinder jacket), we should add these to the recommendations as default
|
|
|
|
for p in input_properties:
|
|
if not recommendations[p.id]:
|
|
continue
|
|
input_measures = prepare_input_measures(recommendations[p.id], body.goal)
|
|
|
|
current_sap_points = int(p.data["current-energy-efficiency"])
|
|
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
|
|
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
|
|
|
|
if body.budget:
|
|
optimiser = GainOptimiser(
|
|
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
|
|
)
|
|
else:
|
|
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
|
|
# If the gain is negative, the optimiser will return an empty solution
|
|
optimiser = CostOptimiser(
|
|
input_measures,
|
|
min_gain=sap_gain
|
|
)
|
|
|
|
optimiser.setup()
|
|
optimiser.solve()
|
|
solution = optimiser.solution
|
|
|
|
selected_recommendations = {r["id"] for r in solution}
|
|
|
|
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
|
|
if any(x in [r["type"] for r in solution] for x in [
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
|
|
]):
|
|
ventilation_rec = next(
|
|
(r[0] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"),
|
|
None
|
|
)
|
|
|
|
# If a matching recommendation was found, add its ID to the selected recommendations
|
|
if ventilation_rec:
|
|
selected_recommendations.add(ventilation_rec["recommendation_id"])
|
|
|
|
# We'll use the set of selected recommendations to filter the recommendations to upload
|
|
final_recommendations = [
|
|
[
|
|
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
|
|
for rec in recommendations_by_type
|
|
]
|
|
for recommendations_by_type in recommendations[p.id]
|
|
]
|
|
|
|
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
|
|
final_recommendations = [
|
|
rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type
|
|
]
|
|
recommendations[p.id] = final_recommendations
|
|
|
|
# With that complete, we now total the kwh and cost savings for the property
|
|
# total_kwh_savings = sum([rec["kwh_savings"] for rec in final_recommendations if rec["default"]])
|
|
# total_energy_cost_savings = sum(
|
|
# [rec["energy_cost_savings"] for rec in final_recommendations if rec["default"]]
|
|
# )
|
|
|
|
logger.info("Uploading recommendations to the database")
|
|
# If we have any work to do, we create a new scenario
|
|
engine_scenario = create_scenario(
|
|
session=session,
|
|
scenario={
|
|
"name": body.scenario_name,
|
|
"created_at": created_at,
|
|
"budget": body.budget,
|
|
"portfolio_id": body.portfolio_id,
|
|
"housing_type": body.housing_type,
|
|
"goal": body.goal,
|
|
"trigger_file_path": body.trigger_file_path,
|
|
"already_installed_file_path": body.already_installed_file_path,
|
|
"patches_file_path": body.patches_file_path,
|
|
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
|
|
"exclusions": body.exclusions,
|
|
"multi_plan": body.multi_plan
|
|
}
|
|
)
|
|
|
|
property_valuation_increases = []
|
|
session.commit()
|
|
new_epc_bands = {}
|
|
property_value_increase_ranges = {}
|
|
for i in range(0, len(input_properties), BATCH_SIZE):
|
|
try:
|
|
# Take a slice of the input_properties list to make a batch
|
|
batch_properties = input_properties[i:i + BATCH_SIZE]
|
|
|
|
for p in batch_properties:
|
|
recommendations_to_upload = recommendations.get(p.id, [])
|
|
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
|
|
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
|
|
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
|
|
new_epc = sap_to_epc(new_sap_points)
|
|
new_epc_bands[p.id] = new_epc
|
|
|
|
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
|
|
property_value_increase_ranges[p.id] = valuations
|
|
|
|
if p.is_new:
|
|
property_details_epc = p.get_property_details_epc(
|
|
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
|
|
)
|
|
create_property_details_epc(session, property_details_epc)
|
|
|
|
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
|
|
|
|
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
|
|
update_property_data(
|
|
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
|
|
)
|
|
|
|
if not recommendations_to_upload:
|
|
continue
|
|
|
|
new_plan_id = create_plan(session, {
|
|
"portfolio_id": body.portfolio_id,
|
|
"property_id": p.id,
|
|
"scenario_id": engine_scenario.id,
|
|
"is_default": True if p.is_new else False,
|
|
"name": body.scenario_name,
|
|
"valuation_increase_lower_bound": (
|
|
valuations["lower_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_upper_bound": (
|
|
valuations["upper_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_average": (
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
),
|
|
})
|
|
|
|
upload_recommendations(
|
|
session, recommendations_to_upload, p.id, new_plan_id
|
|
)
|
|
|
|
property_valuation_increases.append(
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
)
|
|
|
|
# Commit the session after each batch
|
|
session.commit()
|
|
|
|
except Exception as e:
|
|
# Rollback the session if an error occurs
|
|
session.rollback()
|
|
print("Failed i = %s" % str(i))
|
|
logger.error(f"An error occurred during batch starting at index {i}: {e}")
|
|
|
|
logger.info("Creating portfolio aggregations")
|
|
# We implement this in the simplest way possible which will be just to query the database for all
|
|
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
|
|
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
|
|
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
|
|
# the portfolion level impact
|
|
|
|
total_valuation_increase = sum(property_valuation_increases)
|
|
labour_days = round(max(
|
|
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
|
|
))
|
|
|
|
aggregated_data = extract_portfolio_aggregation_data(
|
|
input_properties=input_properties,
|
|
total_valuation_increase=total_valuation_increase,
|
|
recommendations=recommendations,
|
|
new_epc_bands=new_epc_bands,
|
|
property_value_increase_ranges=property_value_increase_ranges
|
|
)
|
|
|
|
aggregate_portfolio_recommendations(
|
|
session,
|
|
portfolio_id=body.portfolio_id,
|
|
scenario_id=engine_scenario.id,
|
|
total_valuation_increase=total_valuation_increase,
|
|
labour_days=labour_days,
|
|
aggregated_data=aggregated_data
|
|
)
|
|
|
|
# Commit final changes
|
|
session.commit()
|
|
except IntegrityError:
|
|
logger.error("Database integrity error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database integrity error.")
|
|
except OperationalError:
|
|
logger.error("Database operational error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database operational error.")
|
|
except ValueError:
|
|
logger.error("Value error - possibly due to malformed data", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=400, content="Bad request: malformed data.")
|
|
except Exception as e: # General exception handling
|
|
logger.error(f"An error occurred: {e}")
|
|
session.rollback()
|
|
return Response(status_code=500, content="An unexpected error occurred.")
|
|
finally:
|
|
session.close()
|
|
|
|
return Response(status_code=200)
|
|
|
|
|
|
@router.post("/mds")
|
|
async def build_mds(body: MdsRequest):
|
|
# TODO: This is a placeholder location for the MDS endpoint, which this is being assembled
|
|
|
|
logger.info("Connecting to db")
|
|
session = sessionmaker(bind=db_engine)()
|
|
created_at = datetime.now().isoformat()
|
|
|
|
try:
|
|
session.begin()
|
|
logger.info("Getting the inputs")
|
|
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
|
|
measure_set = body.measures
|
|
optimise_measures = measure_set is not None
|
|
|
|
cleaning_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
|
)
|
|
|
|
input_properties = []
|
|
for property_id, config in tqdm(enumerate(plan_input), total=len(plan_input)):
|
|
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
|
uprn = config.get("uprn", None)
|
|
uprn = None if uprn == "" else uprn
|
|
if uprn:
|
|
uprn = int(float(uprn))
|
|
|
|
epc_searcher = SearchEpc(
|
|
address1=config["address"],
|
|
postcode=config["postcode"],
|
|
uprn=uprn,
|
|
auth_token=get_settings().EPC_AUTH_TOKEN,
|
|
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
|
|
)
|
|
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
|
|
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
|
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
|
epc_searcher.find_property(skip_os=True)
|
|
|
|
if config["address"] == "35b High Street":
|
|
print("Performing temporary patch on 35b High Street")
|
|
epc_searcher.newest_epc["uprn"] = 10002911892
|
|
epc_searcher.full_sap_epc["uprn"] = 10002911892
|
|
|
|
if config["address"] == "Cobnut Barn":
|
|
print("Performing temporary patch on Cobnut Barn")
|
|
epc_searcher.newest_epc["uprn"] = 10013924689
|
|
|
|
# Create a record in db
|
|
# TODO: If we productionise the creation of this mds report, we will need to store this in the db
|
|
# property_id, is_new = create_property(
|
|
# session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
|
|
# )
|
|
# if not is_new:
|
|
# continue
|
|
#
|
|
# create_property_targets(
|
|
# session,
|
|
# property_id=property_id,
|
|
# portfolio_id=body.portfolio_id,
|
|
# epc_target=body.goal_value,
|
|
# heat_demand_target=None
|
|
# )
|
|
|
|
epc_records = {
|
|
'original_epc': epc_searcher.newest_epc.copy(),
|
|
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
|
|
'old_data': epc_searcher.older_epcs.copy(),
|
|
}
|
|
|
|
# patch = next((
|
|
# x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
# ), {})
|
|
# epc_records = patch_epc(patch, epc_records)
|
|
|
|
prepared_epc = EPCRecord(
|
|
epc_records=epc_records,
|
|
run_mode="newdata",
|
|
cleaning_data=cleaning_data
|
|
)
|
|
|
|
# property_already_installed = next((
|
|
# x for x in already_installed if
|
|
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
# ), {})
|
|
#
|
|
# property_non_invasive_recommendations = next((
|
|
# x for x in non_invasive_recommendations if
|
|
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
# ), {})
|
|
|
|
if measure_set is None:
|
|
measures = config["measures"] if "measures" in config else None
|
|
else:
|
|
measures = measure_set
|
|
|
|
input_properties.append(
|
|
Property(
|
|
id=property_id,
|
|
address=epc_searcher.address_clean,
|
|
postcode=epc_searcher.postcode_clean,
|
|
epc_record=prepared_epc,
|
|
# already_installed=property_already_installed,
|
|
# non_invasive_recommendations=property_non_invasive_recommendations,
|
|
measures=measures,
|
|
is_new=is_new,
|
|
**Property.extract_kwargs(config)
|
|
)
|
|
)
|
|
|
|
logger.info("Reading in materials and cleaned datasets")
|
|
materials = get_materials(session)
|
|
cleaned = get_cleaned()
|
|
|
|
uprn_filenames = read_dataframe_from_s3_parquet(
|
|
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
|
|
)
|
|
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
|
|
|
|
logger.info("Getting spatial data")
|
|
for p in tqdm(input_properties):
|
|
p.get_spatial_data(uprn_filenames)
|
|
|
|
logger.info("Getting components and epc recommendations")
|
|
recommendations_scoring_data = []
|
|
representative_recommendations = {}
|
|
recommendations = {}
|
|
|
|
for p in tqdm(input_properties):
|
|
p.set_features(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
|
|
|
|
mds = Mds(property_instance=p, materials=materials, optimise_measures=optimise_measures)
|
|
mds_recommendations, property_representative_recommendations, errors = mds.build()
|
|
|
|
if isinstance(errors, list):
|
|
if errors:
|
|
raise Exception("Errors occurred during MDS build")
|
|
else:
|
|
if any([len(x) for x in errors.values()]):
|
|
raise Exception("Errors occurred during MDS build")
|
|
|
|
recommendations[p.id] = mds_recommendations
|
|
representative_recommendations[p.id] = property_representative_recommendations
|
|
|
|
# Build the scoring data
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
if optimise_measures:
|
|
for _id, mds_recs in mds_recommendations.items():
|
|
representative_ids = [r["recommendation_id"] for r in property_representative_recommendations[_id]]
|
|
simulation_mds_recs = []
|
|
for recs in mds_recs:
|
|
simulation_mds_recs.append(
|
|
[r for r in recs if r["recommendation_id"] in representative_ids]
|
|
)
|
|
|
|
p.adjust_difference_record_with_recommendations(
|
|
simulation_mds_recs, property_representative_recommendations[_id]
|
|
)
|
|
|
|
data = p.recommendations_scoring_data.copy()
|
|
for d in data:
|
|
d["id"] = d["id"] + "*" + _id
|
|
|
|
recommendations_scoring_data.extend(data)
|
|
|
|
else:
|
|
recommendations_scoring_data.append(
|
|
p.simulate_all_representative_recommendations(property_representative_recommendations)
|
|
)
|
|
|
|
logger.info("Preparing data for scoring in sap change api")
|
|
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
|
|
|
recommendations_scoring_data = recommendations_scoring_data.drop(
|
|
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
|
|
"carbon_ending"]
|
|
)
|
|
|
|
model_api = ModelApi(
|
|
portfolio_id=body.portfolio_id, timestamp=created_at, prediction_buckets=get_prediction_buckets()
|
|
)
|
|
|
|
all_predictions = {
|
|
"sap_change_predictions": pd.DataFrame(),
|
|
"heat_demand_predictions": pd.DataFrame(),
|
|
"carbon_change_predictions": pd.DataFrame()
|
|
}
|
|
to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE)
|
|
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
|
|
predictions_dict = model_api.predict_all(
|
|
df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE],
|
|
)
|
|
|
|
# Append the predictions to the predictions dictionary
|
|
for key, scored in predictions_dict.items():
|
|
all_predictions[key] = pd.concat([all_predictions[key], scored])
|
|
|
|
# TODO: 1) walls_insulation_thickness_ending is not being set in the recommendations_scoring_data,
|
|
# insulation_thickness_ending is being set instead
|
|
# 2)
|
|
|
|
# TODO: TEMP
|
|
for p in plan_input:
|
|
if p["uprn"]:
|
|
p["uprn"] = str(int(float(p["uprn"])))
|
|
|
|
import re
|
|
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
|
|
|
|
if optimise_measures:
|
|
results = []
|
|
for p in input_properties:
|
|
|
|
sap_before = int(p.data["current-energy-efficiency"])
|
|
epc_before = p.data["current-energy-rating"]
|
|
heat_demand_before = p.data["energy-consumption-current"]
|
|
carbon_before = p.data["co2-emissions-current"]
|
|
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
|
epc_energy_consumption=heat_demand_before * p.floor_area,
|
|
current_epc_rating=epc_before,
|
|
)
|
|
current_energy_bill = AnnualBillSavings.calculate_annual_bill(current_adjusted_energy)
|
|
|
|
package_comparison = []
|
|
for _id in recommendations[p.id].keys():
|
|
|
|
sap_prediction = all_predictions["sap_change_predictions"][
|
|
(all_predictions["sap_change_predictions"]["property_id"] == str(p.id)) &
|
|
(all_predictions["sap_change_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
|
|
].copy().reset_index(drop=True)
|
|
sap_prediction["row_id"] = sap_prediction.index
|
|
|
|
heat_demand_prediction = all_predictions["heat_demand_predictions"][
|
|
(all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)) &
|
|
(all_predictions["heat_demand_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
|
|
].copy().reset_index(drop=True)
|
|
heat_demand_prediction["row_id"] = heat_demand_prediction.index
|
|
|
|
carbon_prediction = all_predictions["carbon_change_predictions"][
|
|
(all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)) &
|
|
(all_predictions["carbon_change_predictions"]["recommendation_id"].str.contains(re.escape(_id)))
|
|
].copy().reset_index(drop=True)
|
|
carbon_prediction["row_id"] = carbon_prediction.index
|
|
|
|
epc_target = body.goal_value
|
|
if epc_before == epc_target:
|
|
continue
|
|
|
|
sap_target = epc_to_sap_lower_bound(epc_target)
|
|
# Define the measures
|
|
sap_threshold_barrier = sap_prediction[sap_prediction["predictions"] >= sap_target]
|
|
meets_threshold = True
|
|
if sap_threshold_barrier.empty:
|
|
sap_threshold_barrier = sap_prediction.tail(1)
|
|
meets_threshold = False
|
|
sap_threshold_barrier = sap_threshold_barrier.head(1)
|
|
|
|
sap_prediction = sap_prediction[
|
|
sap_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
|
|
]
|
|
heat_demand_prediction = heat_demand_prediction[
|
|
heat_demand_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
|
|
]
|
|
carbon_prediction = carbon_prediction[
|
|
carbon_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0]
|
|
]
|
|
|
|
reverse_map = {v: k for k, v in Mds.format_map.items()}
|
|
|
|
selected_measures = [
|
|
reverse_map[x.split("-")[0]] for x in sap_prediction["recommendation_id"].values
|
|
]
|
|
selected_measure_ids = [x.split("*")[0] for x in sap_prediction["recommendation_id"].values]
|
|
|
|
costs = [
|
|
r["total"] for r in representative_recommendations[p.id][_id] if
|
|
r["recommendation_id"] in selected_measure_ids
|
|
]
|
|
costs = sum(costs)
|
|
|
|
sap_after = sap_prediction["predictions"].values[-1]
|
|
epc_after = sap_to_epc(sap_after)
|
|
heat_demand_after = heat_demand_prediction["predictions"].values[-1]
|
|
carbon_after = carbon_prediction["predictions"].values[-1]
|
|
|
|
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
|
epc_energy_consumption=heat_demand_after * p.floor_area,
|
|
current_epc_rating=epc_before,
|
|
)
|
|
|
|
expected_energy_bill = AnnualBillSavings.calculate_annual_bill(expected_adjusted_energy)
|
|
|
|
bill_savings = current_energy_bill - expected_energy_bill
|
|
energy_savings = current_adjusted_energy - expected_adjusted_energy
|
|
|
|
package_comparison.append(
|
|
{
|
|
"id": _id,
|
|
"cost": costs,
|
|
"measures": selected_measures,
|
|
"sap_before": sap_before,
|
|
"sap_after": sap_after,
|
|
"epc_before": epc_before,
|
|
"epc_after": epc_after,
|
|
"heat_demand_before": heat_demand_before,
|
|
"heat_demand_after": heat_demand_after,
|
|
"carbon_before": carbon_before,
|
|
"carbon_after": carbon_after,
|
|
"bill_savings": bill_savings,
|
|
"energy_savings": energy_savings,
|
|
"current_energy_bill": current_energy_bill,
|
|
"meets_threshold": meets_threshold
|
|
}
|
|
)
|
|
|
|
package_comparison = pd.DataFrame(package_comparison)
|
|
# Find the smallest cost package
|
|
if not package_comparison.empty:
|
|
|
|
# We check if any of the packages meet the threshold
|
|
# If none of them do, take the one that gets closest to the target
|
|
if package_comparison["meets_threshold"].any():
|
|
package_comparison = package_comparison[package_comparison["meets_threshold"]]
|
|
package_comparison = package_comparison.sort_values("cost")
|
|
else:
|
|
package_comparison = package_comparison.sort_values("sap_after", ascending=False)
|
|
|
|
package_comparison = package_comparison.head(1).to_dict("records")[0]
|
|
else:
|
|
package_comparison = {
|
|
"measures": [],
|
|
"sap_before": sap_before,
|
|
"sap_after": sap_before,
|
|
"epc_before": epc_before,
|
|
"epc_after": epc_before,
|
|
"heat_demand_before": heat_demand_before,
|
|
"heat_demand_after": heat_demand_before,
|
|
"carbon_before": carbon_before,
|
|
"carbon_after": carbon_before,
|
|
"bill_savings": 0,
|
|
"energy_savings": 0,
|
|
"current_energy_bill": current_energy_bill,
|
|
"meets_threshold": False
|
|
}
|
|
|
|
config = [c for c in plan_input if c["uprn"] == str(p.uprn)]
|
|
if not config:
|
|
config = {"address": None, "postcode": None}
|
|
else:
|
|
config = config[0]
|
|
|
|
results.append({
|
|
"config_address": config["address"],
|
|
"config_postcode": config["postcode"],
|
|
"uprn": p.uprn,
|
|
"address": p.address,
|
|
"postcode": p.postcode,
|
|
"measures": package_comparison["measures"],
|
|
"year_of_epc": p.data['lodgement-date'],
|
|
"sap_before": package_comparison["sap_before"],
|
|
"sap_after": package_comparison["sap_after"],
|
|
"epc_before": package_comparison["epc_before"],
|
|
"epc_after": package_comparison["epc_after"],
|
|
"heat_demand_before": package_comparison["heat_demand_before"],
|
|
"heat_demand_after": package_comparison["heat_demand_after"],
|
|
"carbon_before": package_comparison["carbon_before"],
|
|
"carbon_after": package_comparison["carbon_after"],
|
|
"bill_savings": round(package_comparison["bill_savings"], 2),
|
|
"energy_savings": round(package_comparison["energy_savings"], 2),
|
|
"current_energy_bill": round(package_comparison["current_energy_bill"], 2),
|
|
"EWI": "EWI" if "external_wall_insulation" in package_comparison["measures"] else None,
|
|
"CWI": "CWI" if "cavity_wall_insulation" in package_comparison["measures"] else None,
|
|
"LI": "LI" if "loft_insulation" in package_comparison["measures"] else None,
|
|
"ASHP Htg": "ASHP Htg" if "air_source_heat_pump" in package_comparison["measures"] else None,
|
|
"Elec Storage": (
|
|
"Elec Storage Htrs (Out of scope -Prov sum only)" if "high_heat_retention_storage_heaters" in
|
|
package_comparison["measures"] else None
|
|
),
|
|
"Solar PV": "Solar PV" if "solar_pv" in package_comparison["measures"] else None,
|
|
})
|
|
|
|
results = pd.DataFrame(results)
|
|
|
|
# For the different measures, we check the impact with a few debugging functions
|
|
|
|
walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures)
|
|
|
|
results.to_excel("optimised mds_results 5th June.xlsx")
|
|
|
|
results = []
|
|
for p in input_properties:
|
|
measures = p.measures
|
|
property_recommendations = [r['type'] for r in representative_recommendations[p.id]]
|
|
|
|
# TODO: Check high heat retention storage heaters - looks like it's excluded controls!
|
|
|
|
sap_prediction = all_predictions["sap_change_predictions"][
|
|
all_predictions["sap_change_predictions"]["property_id"] == str(p.id)
|
|
]
|
|
|
|
heat_demand_prediction = all_predictions["heat_demand_predictions"][
|
|
all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)
|
|
]
|
|
|
|
carbon_prediction = all_predictions["carbon_change_predictions"][
|
|
all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)
|
|
]
|
|
|
|
# Get a before and after for SAP, heat demand, CO2 and also calculate energy bill and energy savings
|
|
sap_before = int(p.data["current-energy-efficiency"])
|
|
sap_after = sap_prediction["predictions"].values[0] if measures else sap_before
|
|
|
|
epc_before = p.data["current-energy-rating"]
|
|
epc_after = sap_to_epc(sap_after) if measures else epc_before
|
|
|
|
heat_demand_before = p.data["energy-consumption-current"]
|
|
heat_demand_after = heat_demand_prediction["predictions"].values[0] if measures else heat_demand_before
|
|
|
|
carbon_before = p.data["co2-emissions-current"]
|
|
carbon_after = carbon_prediction["predictions"].values[0] if measures else carbon_before
|
|
|
|
# Estimate bill savings
|
|
|
|
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
|
|
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
|
epc_energy_consumption=heat_demand_before * p.floor_area,
|
|
current_epc_rating=epc_before,
|
|
)
|
|
|
|
# TODO: This isn't quite right as this is based on EVERY possible measure, not just the ones that are
|
|
# actually implemented
|
|
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
|
|
epc_energy_consumption=heat_demand_after * p.floor_area,
|
|
current_epc_rating=epc_before,
|
|
)
|
|
|
|
# TODO: We should determine if the home is gas & electricity or just electricity
|
|
|
|
# Determine if the heating and hotwater was previously electric only or both
|
|
|
|
current_energy_bill = AnnualBillSavings.calculate_annual_bill(
|
|
kwh=current_adjusted_energy,
|
|
)
|
|
expected_energy_bill = AnnualBillSavings.calculate_annual_bill(
|
|
kwh=expected_adjusted_energy,
|
|
)
|
|
|
|
bill_savings = current_energy_bill - expected_energy_bill
|
|
energy_savings = current_adjusted_energy - expected_adjusted_energy
|
|
|
|
config = [c for c in plan_input if c["uprn"] == str(p.uprn)]
|
|
if not config:
|
|
config = {"address": None, "postcode": None}
|
|
else:
|
|
config = config[0]
|
|
|
|
to_append = {
|
|
"config_address": config["address"],
|
|
"config_postcode": config["postcode"],
|
|
"uprn": p.uprn,
|
|
"address": p.address,
|
|
"postcode": p.postcode,
|
|
"measures": measures,
|
|
"property_recommendations": property_recommendations,
|
|
"year_of_epc": p.data['lodgement-date'],
|
|
"sap_before": sap_before,
|
|
"sap_after": sap_after,
|
|
"epc_before": epc_before,
|
|
"epc_after": epc_after,
|
|
"heat_demand_before": heat_demand_before,
|
|
"heat_demand_after": heat_demand_after,
|
|
"carbon_before": carbon_before,
|
|
"carbon_after": carbon_after,
|
|
"bill_savings": round(bill_savings, 2),
|
|
"energy_savings": round(energy_savings, 2),
|
|
"current_energy_bill": round(current_energy_bill, 2),
|
|
"fuel_type": p.main_fuel["fuel_type"],
|
|
}
|
|
results.append(to_append)
|
|
|
|
results = pd.DataFrame(results)
|
|
results["sap_uplift"] = results["sap_after"] - results["sap_before"]
|
|
|
|
# results.to_excel("mds_results 5th June.xlsx")
|
|
|
|
walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures)
|
|
|
|
except IntegrityError:
|
|
logger.error("Database integrity error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database integrity error.")
|
|
except OperationalError:
|
|
logger.error("Database operational error occurred", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=500, content="Database operational error.")
|
|
except ValueError:
|
|
logger.error("Value error - possibly due to malformed data", exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=400, content="Bad request: malformed data.")
|
|
except Exception as e: # General exception handling
|
|
logger.error(f"An error occurred: {e}")
|
|
session.rollback()
|
|
return Response(status_code=500, content="An unexpected error occurred.")
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def check_mds(results, input_properties, recommendations, optimise_measures):
|
|
import ast
|
|
walls_check = []
|
|
hhr_check = []
|
|
for p in input_properties:
|
|
res = results[results["uprn"] == p.uprn]
|
|
wall = p.walls
|
|
heating = p.main_heating
|
|
heating_controls = p.main_heating_controls
|
|
|
|
if optimise_measures:
|
|
measures = res["measures"].values[0]
|
|
else:
|
|
measures = [list(z.keys())[0] for z in res["measures"].values[0]]
|
|
|
|
wall_recommendation = [
|
|
x for x in measures if
|
|
x in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]
|
|
]
|
|
|
|
hhr_recommendation = [
|
|
x for x in measures if
|
|
x in ["high_heat_retention_storage_heaters"]
|
|
]
|
|
|
|
if optimise_measures:
|
|
possible_measures = [ast.literal_eval(x) for x in list(recommendations[p.id].keys())]
|
|
# Unlist them
|
|
possible_measures = [x for sublist in possible_measures for x in sublist]
|
|
possible_measures = list(set(possible_measures))
|
|
else:
|
|
possible_measures = p.measures
|
|
|
|
if wall_recommendation:
|
|
if len(wall_recommendation) > 1:
|
|
raise Exception("something went wrong")
|
|
wall_recommendation = wall_recommendation[0]
|
|
else:
|
|
wall_recommendation = None
|
|
|
|
hhr_recommendation = hhr_recommendation[0] if hhr_recommendation else None
|
|
|
|
walls_check.append(
|
|
{
|
|
"uprn": p.uprn,
|
|
"address": p.address,
|
|
"postcode": p.postcode,
|
|
"property_type": p.data['property-type'],
|
|
"conservation_status": p.spatial["conservation_status"],
|
|
"is_listed_building": p.spatial["is_listed_building"],
|
|
"is_heritage_building": p.spatial["is_heritage_building"],
|
|
"wall": wall["clean_description"],
|
|
"recommendation": wall_recommendation,
|
|
"possible_measures": possible_measures,
|
|
"selected_measures": res["measures"].values[0],
|
|
}
|
|
)
|
|
|
|
hhr_check.append(
|
|
{
|
|
"uprn": p.uprn,
|
|
"address": p.address,
|
|
"postcode": p.postcode,
|
|
"heating": heating["clean_description"],
|
|
"heating_controls": heating_controls["clean_description"],
|
|
"recommendation": hhr_recommendation,
|
|
"possible_measures": possible_measures,
|
|
"selected_measures": res["measures"].values[0],
|
|
}
|
|
)
|
|
|
|
walls_check = pd.DataFrame(walls_check)
|
|
hhr_check = pd.DataFrame(hhr_check)
|
|
|
|
return walls_check, hhr_check
|