removing funding

This commit is contained in:
Khalim Conn-Kowlessar 2026-01-01 12:58:14 +08:00
parent 90c5f12671
commit 6b46542d35
2 changed files with 112 additions and 183 deletions

View file

@ -45,7 +45,7 @@ from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
from utils.logger import setup_logger
@ -1069,21 +1069,6 @@ async def model_engine(body: PlanTriggerRequest):
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
@ -1102,41 +1087,8 @@ async def model_engine(body: PlanTriggerRequest):
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=int(p.data["current-energy-efficiency"]),
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (0, 0, 0, 0)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
@ -1146,62 +1098,36 @@ async def model_engine(body: PlanTriggerRequest):
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
solutions = optimise_with_funding_paths(
solutions = optimise_with_scenarios(
p=p,
input_measures=input_measures,
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding,
work_package=eco_packages[p.id][2]
enforce_heat_pump_insulation=True,
enforce_fabric_first=False
)
# if handle the empty case
if solutions.empty:
scheme = "none"
funded_measures, solution = [], []
(
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score,
battery_sap_score
) = 0, 0, 0, 0, 0, 0
solution, battery_sap_score = [], 0
else:
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
# We re-organise, taking the solution with the most gain and then the cheapest
solutions = solutions.sort_values(
by=["total_gain", "total_cost"], ascending=[False, True]
)
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
# This is the SAP score associated to a battery
pv_size = next(
(m["array_size"] for m in optimal_solution["items"] if m["type"] == "solar_pv"), 0
)
solution = deepcopy(optimal_solution["items"])
pv_size = float(optimal_solution["array_size"])
battery_sap_score = BatterySAPScorer.score(
starting_sap=optimal_solution["ending_sap"], pv_size=pv_size
starting_sap=optimal_solution["ending_sap_without_battery"], pv_size=pv_size
)
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
@ -1216,52 +1142,6 @@ async def model_engine(body: PlanTriggerRequest):
gain = optimiser.solution_gain
post_sap = int(p.data["current-energy-efficiency"]) + gain
recommendation_types = []
for measures in input_measures:
for measure in measures:
recommendation_types.append(measure["type"])
recommendation_types = set(recommendation_types)
has_wall_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
WALL_INSULATION_MEASURES
)
has_roof_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
pv_size = next(
(m["array_size"] for m in solution if m["type"] == "solar_pv"), 0
)
@ -1282,21 +1162,6 @@ async def model_engine(body: PlanTriggerRequest):
p.id, recommendations, selected, battery_sap_score
)
# TODO: functionise
for measure in funded_measures:
if "+mechanical_ventilation" in measure["type"]:
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
p.insert_funding(
scheme=scheme,
funded_measures=funded_measures,
project_funding=project_funding,
total_uplift=total_uplift,
full_project_score=full_project_score,
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
# of them
# TODO: We can probably do better and optimise at the building level - this is temp
@ -1470,12 +1335,6 @@ async def model_engine(body: PlanTriggerRequest):
session, recommendation_payload
)
funding_payload = [
{"plan_id": plan_id_by_property[f["property_id"]], **{k: v for k, v in f.items() if k != "property_id"}}
for f in funding_to_create if f["property_id"] in plan_id_by_property
]
db_funcs.funding_functions.bulk_upload_funding_packages(session, funding_payload)
logger.info("Work completed, updating log status")
except IntegrityError as e:

View file

@ -638,6 +638,7 @@ def exclude_measure_types(input_measures, excluded_types):
def optimise_with_scenarios(
p,
input_measures,
budget=None,
target_gain=None,
@ -659,6 +660,21 @@ def optimise_with_scenarios(
all_measure_types.extend([x["type"] for x in inputs])
all_measure_types = list(set(all_measure_types))
# We modify the solar PV gain, if there is a battery, to include an estimated SAP battery uplift, should
# the property hit the upgrade target, plus 1. We add the additional 1 because the higher the starting SAP,
# the lower the battery SAP uplift, so this is a conservative approach since the true SAP score is
# re-calculated later on.
optimisation_measures = deepcopy(input_measures)
for measures in optimisation_measures:
if measures[0]["type"] == "solar_pv":
for x in measures:
if x["has_battery"]:
x["battery_gain"] = BatterySAPScorer.score(
starting_sap=int(p.data["current-energy-efficiency"]) + target_gain + 1,
pv_size=x["array_size"]
)
x["gain"] += x["battery_gain"]
if enforce_fabric_first:
# If this is true, it means we only want to consider a fabric first approach. This means that
# - We treat the fabric of the house first
@ -667,7 +683,9 @@ def optimise_with_scenarios(
# This should be wall insulation, roof insulation, floor insulation and windows
fabric_measures = WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES + ECO4_ELIGIBILE_FABRIC_MEASURES
fabric_only_measures = [[opt for opt in group if opt["type"] in fabric_measures] for group in input_measures]
fabric_only_measures = [
[opt for opt in group if opt["type"] in fabric_measures] for group in optimisation_measures
]
fabric_only_measures = [g for g in fabric_only_measures if g]
if not fabric_only_measures:
@ -685,7 +703,7 @@ def optimise_with_scenarios(
picked_fabric_types = {m["type"] for m in picked_fabric}
remaining_measures = []
for group in input_measures:
for group in optimisation_measures:
kept = [m for m in group if m["type"] not in picked_fabric_types]
if kept:
remaining_measures.append(kept)
@ -709,15 +727,21 @@ def optimise_with_scenarios(
"fixed_items": picked_fabric,
"total_cost": fabric_cost + extra_cost,
"total_gain": fabric_gain + extra_gain,
"already_installed_gain": sum([x["gain"] for x in picked_fabric + picked_extra if x["already_installed"]])
})
return solutions
return append_solution_metrics(solutions, target_gain, p)
# ------------------------------------------------------------------
# Scenario 1: Air source heat pump with required insulation
# ------------------------------------------------------------------
if enforce_heat_pump_insulation:
# Wall measures could be IWI or EWI
remaining_wall_measures = [x for x in all_measure_types if x in WALL_INSULATION_MEASURES]
remaining_wall_measures = [
x for x in all_measure_types if x in WALL_INSULATION_MEASURES + [
"internal_wall_insulation+mechanical_ventilation", "external_wall_insulation+mechanical_ventilation"
]
]
remaining_roof_measures = [x for x in all_measure_types if x in ROOF_INSULATION_MEASURES]
# Mandatory structure:
@ -728,28 +752,7 @@ def optimise_with_scenarios(
heat_pump_paths = build_heat_pump_paths(remaining_wall_measures, remaining_roof_measures)
paths.extend(heat_pump_paths)
# ------------------------------------------------------------------
# Scenario 2: Optimise without air source heat pump
# ------------------------------------------------------------------
# No special path; just exclude ASHP from options and allow us to optimise.
measures_no_heat_pump = exclude_measure_types(input_measures, ["air_source_heat_pump"])
picked, total_cost, total_gain = run_optimizer(
measures_no_heat_pump,
budget=budget,
sub_target_gain=target_gain,
)
if picked is not None:
solutions.append({
"scenario": "no_heat_pump",
"items": picked,
"fixed_items": [],
"total_cost": total_cost,
"total_gain": total_gain,
})
fixed_selections = expand_funding_path(input_measures, paths)
fixed_selections = expand_funding_path(optimisation_measures, paths)
for fixed in fixed_selections:
@ -761,7 +764,7 @@ def optimise_with_scenarios(
# Remaining measures (all other groups)
remaining_measures = [
grp for gi, grp in enumerate(input_measures)
grp for gi, grp in enumerate(optimisation_measures)
if gi not in fixed_groups
]
@ -795,9 +798,76 @@ def optimise_with_scenarios(
"fixed_items": fixed_items,
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in total_items if x["already_installed"]])
})
return solutions
# ------------------------------------------------------------------
# Scenario 2: Optimise without air source heat pump
# ------------------------------------------------------------------
# No special path; just exclude ASHP from options and allow us to optimise.
measures_no_heat_pump = exclude_measure_types(optimisation_measures, ["air_source_heat_pump"])
picked, total_cost, total_gain = run_optimizer(
measures_no_heat_pump,
budget=budget,
sub_target_gain=target_gain,
)
if picked is not None:
solutions.append({
"scenario": "no_heat_pump",
"items": picked,
"fixed_items": [],
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
})
solutions_df = append_solution_metrics(solutions, target_gain, p)
return solutions_df
def _get_ending_sap_without_battery(x):
gain = [y["gain"] - y.get("battery_gain", 0) for y in x["items"]]
return float(sum(gain))
def append_solution_metrics(solutions, target_gain, p):
"""
Given a set of solutions, this function will return a dataframe, with cost metrics appended, to allow
the end user to select the optimal solution.
:param solutions:
:param target_gain:
:return:
"""
solutions_df = pd.DataFrame(solutions)
if solutions_df.empty:
# We return a blank dataframe
return solutions_df
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain - 0.1
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
# We flag projects that are including batteries
solutions_df["has_battery"] = solutions_df["items"].apply(has_battery)
solutions_df["array_size"] = solutions_df["items"].apply(
lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y)
)
# We need the ending SAP, but we'll need to remove the battery SAP uplift first
solutions_df["ending_sap_without_battery"] = solutions_df.apply(
lambda x: int(p.data["current-energy-efficiency"]) + _get_ending_sap_without_battery(x),
axis=1
)
solutions_df = solutions_df.sort_values("total_cost", ascending=True)
return solutions_df
# ---- helpers -------------------------------------------------------------