Model/backend/tests/test_integration.py
2025-11-13 13:50:38 +00:00

531 lines
23 KiB
Python

import ast
import json
from copy import deepcopy
from dataclasses import replace
from datetime import datetime
import random
from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, upload_recommendations, create_scenario
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import get_cleaned
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.Funding import Funding
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
# Input data (temp)
import pickle
import pandas as pd
with open("local_data_for_deletion.pkl", 'rb') as f:
local_data = pickle.load(f)
cleaning_data = local_data["cleaning_data"]
materials = local_data["materials"]
cleaned = local_data["cleaned"]
project_scores_matrix = local_data["project_scores_matrix"]
partial_project_scores_matrix = local_data["partial_project_scores_matrix"]
whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"]
with open("kwh_client_for_deletion.pkl", "rb") as f:
kwh_client = pickle.load(f)
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/domestic-E06000002-Middlesbrough/certificates.csv",
low_memory=False
)
# TODO: Store this for cleaning
costs_by_floor_area = epc_data[
pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01"
][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT",
"HOT_WATER_COST_CURRENT"]].copy()
costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns]
for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"]
costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[
["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"]
].mean().reset_index()
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample(
10000).reset_index(drop=True)
# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type
# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used
# in the google solar api but is it really needed? I don't think it's super accurate. It might be better to
# just use an average energy consumption by floor area for UK households?
# Load the input properties
input_properties = []
for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)):
epc = {
k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items()
}
# Avoid the data load inside of EPCRecord - something we should pull out
for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]:
if pd.isnull(epc[x]):
if x == "floor-height":
epc[x] = 2.4
if x == "number-habitable-rooms":
epc[x] = 3
if x == "number-heated-rooms":
epc[x] = 3
epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []}
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data,
)
input_properties.append(
Property(
id=row_id,
is_new=True,
address=epc["address"],
postcode=epc["postcode"],
epc_record=prepared_epc,
already_installed={},
property_valuation={},
non_invasive_recommendations=[],
energy_assessment=None,
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)
# For each property, insert the default solar configuration
for p in tqdm(input_properties):
solar_api = GoogleSolarApi(
api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5
)
panel_performance = solar_api.default_panel_performance(property_instance=p)
p.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1
},
)
# We mock kwh preds
mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []}
for p in tqdm(input_properties):
mocked_kwh_predictions["heating_kwh_predictions"].append({
"id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0]
})
mocked_kwh_predictions["hotwater_kwh_predictions"].append({
"id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0]
})
mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"])
mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"])
# TODO: We might want to implement this generally, via an ETL process
for p in input_properties:
for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
if pd.isnull(p.data[col]):
min_diff = abs(
(costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"])
).min()
df = costs_by_floor_area[
abs((costs_by_floor_area["current-energy-efficiency"] - p.data[
"current-energy-efficiency"])) == min_diff
]
if df.shape[0] > 1:
df = df.head(1)
p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0]
[
p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in
input_properties
]
# for p in input_properties:
# p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions)
# Run the recommendations
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]):
p.data["built-form"] = "Semi-Detached"
recommender = Recommendations(
property_instance=p,
materials=materials,
exclusions=[],
inclusions=[],
default_u_values=True
)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
representative_recommendations[p.id] = property_representative_recommendations
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(
property_recommendations, property_representative_recommendations
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
)
model_predictions_mocked = {
"sap_change_predictions": None,
"heat_demand_predictions": None,
"carbon_change_predictions": None,
"heating_kwh_predictions": None,
"hotwater_kwh_predictions": None,
}
for k in model_predictions_mocked.keys():
model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy()
model_predictions_mocked[k][['property_id', 'recommendation_id']] = (
model_predictions_mocked[k]['id'].str.split('+', expand=True)
)
model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply(
ModelApi.extract_phase)
if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]:
model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000),
k=len(recommendations_scoring_data))
continue
model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True)
preds = []
for p_id in model_predictions_mocked[k]["property_id"].unique():
# We add some amount each time
p = [p for p in input_properties if str(p.id) == p_id][0]
if k == "sap_change_predictions":
start = p.data["current-energy-efficiency"]
elif k == "heat_demand_predictions":
start = p.data["energy-consumption-current"]
else:
start = p.data["co2-emissions-current"]
df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy()
# Add some amount each time
to_add = random.choices(range(0, 15), k=len(df))
to_add = np.cumsum(to_add)
df["predictions"] = start + to_add
preds.append(df)
preds = pd.concat(preds)
model_predictions_mocked[k] = preds
for property_id in tqdm(recommendations.keys(), total=len(recommendations)):
property_instance = [p for p in input_properties if p.id == property_id][0]
recommendations_with_impact, impact_summary = (
Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=model_predictions_mocked,
recommendations=recommendations,
representative_recommendations=representative_recommendations
)
)
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
# at each phase
property_instance.update_simulation_epcs(impact_summary)
recommendations[property_id] = recommendations_with_impact
for property_id in tqdm([p.id for p in input_properties]):
property_recommendations = recommendations.get(property_id, [])
property_instance = [p for p in input_properties if p.id == property_id][0]
property_current_energy_bill = (
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=model_predictions_mocked,
property_recommendations=property_recommendations,
ashp_cop=2.8
)
)
property_instance.current_energy_bill = property_current_energy_bill
body = PlanTriggerRequest(
**{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0,
'trigger_file_path': '', 'already_installed_file_path': '',
'patches_file_path': None, 'non_invasive_recommendations_file_path': None,
'valuation_file_path': '',
'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None,
'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8,
'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None,
'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None}
)
for p in tqdm(input_properties):
if not recommendations.get(p.id):
continue
# we need to double unlist because we have a list of lists
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# its inclusion
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation
) and not p.has_ventilation
if not measures_to_optimise:
# Nothing to do, we just reshape the recommendations
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, set()
)
continue
fixed_gain = optimiser_functions.calculate_fixed_gain(
property_required_measures, recommendations, p, needs_ventilation
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
funding = Funding(
tenure="Social",
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=12.5,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=12.5,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
current_wall_u_value = p.walls["thermal_transmittance"]
if current_wall_u_value is None:
current_wall_u_value = get_wall_u_value(
clean_description=p.walls["clean_description"],
age_band=p.age_band,
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
)
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=p.data["current-energy-efficiency"],
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True
)
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
solutions = optimise_with_funding_paths(
p=p,
input_measures=input_measures,
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding
)
# Given the solutions we select the optimal one
solutions["cost_less_full_project_funding"] = np.where(
solutions["scheme"] == "eco4",
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"],
solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"]
)
solutions["cost_less_full_project_funding"] = (
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"]
)
solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True)
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
funded_measures = optimal_solution["items"] if scheme != "none" else []
solution = optimal_solution["items"] + optimal_solution["unfunded_items"]
# This is the total amount of funding that the project will produce (including uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
optimiser = (
GainOptimiser(
input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
) if body.budget else CostOptimiser(input_measures, min_gain=gain)
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
recommendation_types = []
for measures in input_measures:
for measure in measures:
recommendation_types.append(measure["type"])
recommendation_types = set(recommendation_types)
has_wall_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
WALL_INSULATION_MEASURES
)
has_roof_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=p.data["current-energy-efficiency"],
ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
selected = {r["id"] for r in solution}
if property_required_measures:
solution = optimiser_functions.add_required_measures(
property_id=p.id, property_required_measures=property_required_measures,
recommendations=recommendations, selected=selected,
)
# Add best practice measures (ventilation/trickle vents)
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# Final flattening - Don't do this!
# recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
# p.id, recommendations, selected
# )
# TODO: functionise
for measure in funded_measures:
if "+mechanical_ventilation" in measure["type"]:
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
p.insert_funding(
scheme=scheme,
funded_measures=funded_measures,
project_funding=project_funding,
total_uplift=total_uplift,
full_project_score=full_project_score,
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)