Model/recommendations/optimiser/funding_optimiser.py
Khalim Conn-Kowlessar 115209c84c condensed tests
2026-03-04 12:44:25 +00:00

1395 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
This script contains a number of functions which are designed to enable optimisation and selection of funding options
for individual properties to improve their energy efficiency
The main entry point to this is optimise_with_funding_paths
In the future, we will adapt this into a class-based structure to allow for more flexibility and reusability
"""
from copy import deepcopy
import pandas as pd
import numpy as np
from typing import Mapping, Union
from itertools import product
from backend.app.plan.schemas import (
WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES,
WALL_INSULATION_WITH_VENTILATION_MEASURES
)
from recommendations.optimiser.StrategicOptimiser import StrategicOptimiser
from utils.logger import setup_logger
from backend.Funding import Funding
from backend.app.BatterySapScorer import BatterySAPScorer
logger = setup_logger()
# measures we DO NOT treat as fundable in the ECO4 'funded' pass
_ECO4_EXCLUDE_TYPES = {
"secondary_heating", "extension_cavity_wall_insulation", "sealing_open_fireplace", "low_energy_lighting"
}
def _path_scheme(path_spec):
"""
Infer scheme from any 'reference' tag in the path.
Defaults to 'eco4' if not specified.
"""
for elem in path_spec or []:
ref = elem.get("reference")
if isinstance(ref, str):
if ref.endswith(":gbis"):
return "gbis"
if ref.endswith(":eco4"):
return "eco4"
return "eco4"
def _filter_fundable_subgroups(groups, scheme):
"""
Keep only options eligible for the funded pass of the given scheme.
- ECO4: drop excluded types (e.g., secondary_heating)
- GBIS: funded pass is the GBIS fixed measure only, so return empty sub-groups
"""
if scheme == "gbis":
return [] # we won't optimise 'the rest' under GBIS here
# ECO4 case
filtered = []
for grp in groups:
kept = [opt for opt in grp
if not any(ex in opt["type"] for ex in _ECO4_EXCLUDE_TYPES)]
if kept:
filtered.append(kept)
return filtered
def _sum_cost_gain_with_scheme(items, scheme):
"""
Sum cost/gain of fixed items, adjusting for scheme rules.
- GBIS: strip innovation uplift from GBIS-funded fixed measures only.
"""
total_cost = 0.0
total_gain = 0.0
for it in items:
cost = float(it["cost"])
if scheme == "gbis":
# innovation uplifts are not paid under GBIS
cost -= float(it.get("innovation_uplift", 0.0))
total_cost += cost
total_gain += float(it["gain"])
return total_cost, total_gain
def violates_min_insulation(fixed, optimisation_input_measures):
"""
Return True if fixed selection includes a heating/PV measure but no required insulation.
It should *only* violate min insulation if the fixed selection excldes insulation but the
property needs insulation
"""
picked_types = {opt["type"] for (_, _, opt) in fixed}
def has_any(substrs):
return any(any(s in t for s in substrs) for t in picked_types)
# heating (incl. PV) flags
is_heating = has_any([
"air_source_heat_pump",
"high_heat_retention_storage_heaters",
"boiler_upgrade",
"electric_boiler",
"time_temperature_zone_control",
"secondary_heating",
"solar_pv", # PV treated as heating for MIR
])
# MIR insulation (the ones youre using in path construction)
has_insul = has_any([
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"extension_cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
])
def _needs_insulation(measures, t):
return _find_measure(measures, t) and not has_any({t})
needs_insul = any(
_needs_insulation(optimisation_input_measures, t)
for t in [
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
]
)
return is_heating and not has_insul and needs_insul
# Treat "type" like "external_wall_insulation+mechanical_ventilation" → "external_wall_insulation"
def _base_type(s: str) -> str:
return s.split("+", 1)[0]
def _filter_measures_by_types(input_measures, allowed_types):
"""
Keep only groups that have ≥1 allowed option; inside each group keep only allowed options.
"""
allowed_set = set(allowed_types)
filtered = []
for group in input_measures:
kept_opts = [opt for opt in group if _base_type(opt["type"]) in allowed_set]
if kept_opts:
filtered.append(kept_opts)
return filtered
def _is_eligible_funding_package(scheme, starting_sap, total_gain):
if scheme == "eco4":
# We check if we meet the upgrade requirements
# If the property is an E or above, we need to upgrade to a C or above
if starting_sap >= 39: # ie. EPC C or above
return starting_sap + total_gain >= 69
if scheme == "gbis":
# GBIS is a fixed measure only, so we don't check the gain
return True
def _prs_solution_ok(items, p, funding):
# items: list of picked option dicts (after optimisation)
# treat "type" possibly like "x+y" -> split and look at base tokens
types = set()
for opt in items:
for t in opt["type"].split("+"):
types.add(t)
has_solid_wall = ("external_wall_insulation" in types) or ("internal_wall_insulation" in types)
# renewable set:
has_ashp = ("air_source_heat_pump" in types) # ASHP alone is renewable
has_solar = ("solar_pv" in types)
has_hhrsh = ("high_heat_retention_storage_heaters" in types) # only counts *with* solar
# solar PV qualifies if paired with eligible existing heating
solar_ok_existing = has_solar and funding.check_solar_eligible_heating_system(
p.main_heating["clean_description"], p.main_heating_controls["clean_description"]
)
# or paired with ASHP/HHRSH in the same package
solar_ok_with_installed = has_solar and (has_ashp or has_hhrsh)
renewable_ok = has_ashp or solar_ok_existing or solar_ok_with_installed
return has_solid_wall or renewable_ok
def _ensure_unfunded_costs(groups):
"""Make sure each options cost is base+uplift (i.e., no funding).
Safe if fields already match; works on a deepcopy.
"""
for grp in groups:
for opt in grp:
base = opt.get("cost_minus_uplift")
if base is not None:
opt["cost"] = opt["raw_cost"]
return groups
def _get_already_installed_gain(selected_measures, needs_pre_eco_hhrsh_upgrade):
"""
Calculate already installed gain, with special case for pre-ECO4 HHRSH upgrade.
:param selected_measures: List of selected measures
:param needs_pre_eco_hhrsh_upgrade: Boolean indicating if pre-ECO4 HHRSH upgrade is needed
:return:
"""
if needs_pre_eco_hhrsh_upgrade:
return sum(
[x["gain"] for x in selected_measures if
x["already_installed"] or x["type"] == "high_heat_retention_storage_heaters"]
)
return sum([x["gain"] for x in selected_measures if x["already_installed"]])
def _move_hhrsh_to_unfunded(picked, unfunded_picked, needs_pre_eco_hhrsh_upgrade):
"""
This function handles the case of moving HHRSH to unfunded picks if needed, where we have an ECO4 project
where an unfunded measure needs to be installed first.
:param picked: List of picked measures
:param unfunded_picked: List of unfunded picked measures
:param needs_pre_eco_hhrsh_upgrade: Boolean indicating if pre-ECO4 HHRSH upgrade is needed
:return:
"""
if not needs_pre_eco_hhrsh_upgrade:
return picked, unfunded_picked
# We append HHRSH to unfunded items
hhrsh_measure = [x for x in picked if x["type"] == "high_heat_retention_storage_heaters"]
if not hhrsh_measure:
raise ValueError("Expected HHRSH measure to be in total picks")
unfunded_picked += hhrsh_measure
# Remove from total picks
picked = [x for x in picked if x["type"] != "high_heat_retention_storage_heaters"]
return picked, unfunded_picked
def has_battery(items):
return any(x.get("has_battery", False) for x in items)
def optimise_with_funding_paths(
p, input_measures, housing_type, funding: Funding, budget=None, target_gain=None, work_package=None
):
"""
run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain)
"""
solutions = []
# unfunded - we utilise all measures
unfunded_measures = input_measures.copy()
unfunded_measures = _ensure_unfunded_costs(unfunded_measures)
picked, total_cost, total_gain = run_optimizer(
unfunded_measures,
budget=budget,
sub_target_gain=target_gain
)
if picked is not None:
solutions.append({
"fixed_ids": [],
"items": picked,
"total_cost": total_cost,
"total_gain": total_gain,
"path": {"reference": "unfunded:all"},
"scheme": "none",
"is_eligible": False, # no funding scheme applied
"unfunded_items": [],
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
})
# This function will filter down on innovation measures if we are social EPC D
funding_paths, optimisation_input_measures = make_funding_paths(
p, input_measures, housing_type, funding, work_package
)
# We now produce a fabric only path for ECO4
# We add in generic insulation funding paths (where there is no fixed measure)
# Heating controls are only eligible if installed as part of a heating upgrade and so we do not include them
# here. We don't have an option if the property is a C or above
if housing_type == "Social" and p.data["current-energy-rating"] not in ["C", "B", "A"]:
funding_paths = (
[
{
'reference': 'fabric-only:eco4',
"allowed_types": WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES +
ECO4_ELIGIBILE_FABRIC_MEASURES
}
] + funding_paths
)
needs_pre_eco_hhrsh_upgrade = (
(p.data["current-energy-rating"] == "D") and work_package == "solar_hhrsh_eco4"
)
for path_spec in funding_paths:
# ECO4 fabric only path = special case
if isinstance(path_spec, dict) and path_spec.get("reference") == "fabric-only:eco4":
sub_measures = _filter_measures_by_types(optimisation_input_measures, path_spec["allowed_types"])
# If the property is EPC D and socil, we also include just innovation measures
if housing_type == "Social" and p.data["current-energy-rating"] == "D":
# We add in a second option which is just innovation measures
sub_measures_innovation = []
for measures in sub_measures:
group = []
for measure in measures:
if measure["innovation_uplift"]:
group.append(measure)
if group:
sub_measures_innovation.append(group)
sub_measures = deepcopy(sub_measures_innovation)
if not sub_measures:
continue
# If the only measure is loft insulation, we skip this because you cannot do a minor measure only (LI)
# under ECO4
if len(sub_measures) == 1 and sub_measures[0][0]["type"] in ["loft_insulation"]:
continue
picked, sub_cost, sub_gain = run_optimizer(
sub_measures,
budget=budget, # no fixed items; budget unchanged
sub_target_gain=target_gain
)
if picked is None:
continue
scheme = _path_scheme([path_spec])
# We sum of gain, for already installed measures. In this, we also include HHRSH, when we have
# an EPC D property that needs HHRSH but HHRSH isn't an eligible measure
already_installed_gain = _get_already_installed_gain(
picked, needs_pre_eco_hhrsh_upgrade
)
# If we need a pre-eco4 HHRSH upgrade, we move HHRSH to unfunded items
picked, unfunded_picked = _move_hhrsh_to_unfunded(picked, [], needs_pre_eco_hhrsh_upgrade)
solutions.append(
{
"fixed_ids": [],
"items": picked,
"total_cost": sub_cost,
"total_gain": sub_gain,
"path": path_spec,
"scheme": scheme,
"is_eligible": _is_eligible_funding_package(
scheme, float(p.data["current-energy-efficiency"]), sub_gain
),
"unfunded_items": unfunded_picked,
"already_installed_gain": already_installed_gain
}
)
continue
# 1) expand fixed selections for this path
fixed_selections = expand_funding_path(optimisation_input_measures, path_spec) if path_spec else [[]]
if not fixed_selections:
continue
for fixed in fixed_selections:
if violates_min_insulation(fixed, optimisation_input_measures):
# We log an error and skip this - we should not see any errors but we can probably get a reasonable
# outcome for the end user without a complete termination of the process
logger.error("Skipping fixed selection due to minimum insulation violation: %s", fixed)
continue
scheme = _path_scheme(path_spec)
# 3) compute fixed cost/gain, and strip those groups from subproblem
fixed_items = [opt for (_, _, opt) in fixed]
if scheme == "gbis":
# Re-set costs as the only funding we get is the PPS
for x in fixed_items:
x["cost"] = x["raw_cost"]
fixed_ids = [opt['id'] for opt in fixed_items]
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
fixed_groups = {gi for (gi, _, _) in fixed}
sub_measures = deepcopy(
[grp for gi, grp in enumerate(optimisation_input_measures) if gi not in fixed_groups]
)
if scheme == "gbis":
# Then for the sub-measures, we need to strip the innovation uplift from the GBIS fixed measures. We
# do this by adding innovation back onto the cost
for grp in sub_measures:
for opt in grp:
opt["cost"] = opt["raw_cost"]
if scheme == "eco4":
# Need to strip out any measure types that are not eligible for ECO4 funding (e.g. secondary heating)
sub_measures = _filter_fundable_subgroups(sub_measures, scheme)
# 4) run your existing optimiser for the remaining groups
# If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which
# may already be over budget) and the fixed gain (which may not be achievable)
if (fixed_gain > target_gain) or (fixed_gain <= target_gain and not sub_measures):
picked, sub_cost, sub_gain = ([], 0.0, 0.0)
else:
picked, sub_cost, sub_gain = run_optimizer(
sub_measures,
budget - fixed_cost if budget is not None else None,
sub_target_gain=target_gain - fixed_gain if target_gain is not None else None
)
# if picked is None:
# # If we have something in sub_measures, then we have a partial solution, just not enough to
# continue
scheme = _path_scheme(path_spec)
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
unfunded_picked = []
if scheme == "gbis":
# The fixed items are fundded, everything else is unfunded
total_picks = fixed_items
unfunded_picked = picked
else:
total_picks = fixed_items + picked
if housing_type == "Private":
if not _prs_solution_ok(total_picks, p, funding) and scheme == "eco4":
logger.error(
"Found a solution that does not meet the PRS requirements: %s - this shouldn't be happening",
total_picks
)
continue
if total_gain - target_gain < -0.1:
# In this case, we have a funded package that does not meet the target gain, so we look at the remaining
# measures and see if we can include them
picked_types = {opt["type"] for opt in total_picks + unfunded_picked}
# We find the indexes of the picked types
picked_group_index = {}
for pt in picked_types:
for gi, grp in enumerate(input_measures):
if any(pt in opt["type"] for opt in grp):
picked_group_index[pt] = gi
break
# We get the remaining types
# ECO4 case
remaining = []
for i, grp in enumerate(input_measures):
if i in picked_group_index.values():
continue
keep = [x for x in grp if x["type"] not in picked_types]
if keep:
for x in keep:
# Adjust to raw cost (without funding)
x["cost"] = x["raw_cost"]
remaining.append(keep)
if remaining:
# If we have remaining measures we can optimise, we run them down an unfunded route
unfunded_picked_remaining, unfunded_cost, unfunded_gain = run_optimizer(
remaining,
budget - total_cost if budget is not None else None,
sub_target_gain=target_gain - total_gain if target_gain is not None else None
)
if unfunded_picked_remaining is not None:
unfunded_picked += unfunded_picked_remaining
total_cost += unfunded_cost
total_gain += unfunded_gain
# We now grab the "already installed gain"
# We sum of gain, for already installed measures. In this, we also include HHRSH, when we have
# an EPC D property that needs HHRSH but HHRSH isn't an eligible measure
already_installed_gain = _get_already_installed_gain(
total_picks, needs_pre_eco_hhrsh_upgrade
)
# If we need a pre-eco4 HHRSH upgrade, we move HHRSH to unfunded items
total_picks, unfunded_picked = _move_hhrsh_to_unfunded(
total_picks, unfunded_picked, needs_pre_eco_hhrsh_upgrade
)
solutions.append({
"fixed_ids": fixed_ids,
"items": total_picks,
"total_cost": total_cost,
"total_gain": total_gain,
"path": path_spec,
"scheme": scheme,
"is_eligible": _is_eligible_funding_package(
scheme, int(p.data["current-energy-efficiency"]), total_gain
),
"unfunded_items": unfunded_picked,
"already_installed_gain": already_installed_gain
})
solutions = pd.DataFrame(solutions)
if solutions.empty:
# We return a blank dataframe
return solutions
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions["meets_upgrade_target"] = solutions["total_gain"] >= target_gain - 0.1
# If we have packages that are fundable, but do not meet the upgrade target, we can run a final optimisation pass
# Turned off logging - too noisy
# if not solutions[solutions["is_eligible"] & ~solutions["meets_upgrade_target"]].empty:
# logger.info("We have some packages that are fundable but do not meet the target gain")
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
solutions["starting_sap"] = int(p.data["current-energy-efficiency"])
solutions["floor_area"] = p.floor_area
solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"]
# We flag projects that are including batteries
solutions["has_battery"] = solutions["items"].apply(has_battery)
solutions["array_size"] = solutions["items"].apply(
lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y)
)
# For properties that are including batteries, we need to adjust the starting SAP to include the battery SAP uplift
# Note: We score on ending sap, as the battery SAP uplift is based on the ending SAP after fabric/heat/solar
# upgrades of each package is applied
# NB: The battery SAP uplift is used to potentially prioritise packages that include batteries, it does NOT impact
# the eventual SAP score at this point. Once the package is included, we'll re-calculate battery SAP score outside
# of this. This is because
solutions["battery_sap_uplift"] = solutions.apply(
lambda x: BatterySAPScorer.score(starting_sap=x["ending_sap"], pv_size=x["array_size"])
if x["has_battery"] else 0,
axis=1
)
solutions["starting_band"] = (solutions["starting_sap"] + solutions["already_installed_gain"]).apply(
funding.get_sap_band
)
solutions["ending_band"] = (solutions["ending_sap"] + solutions["battery_sap_uplift"]).apply(funding.get_sap_band)
solutions["floor_area_band"] = solutions["floor_area"].apply(funding.get_floor_area_band)
solutions["project_score"] = solutions.apply(
lambda x: funding._calculate_full_project_abs(
floor_area_band=x["floor_area_band"],
starting_sap_band=x["starting_band"],
ending_sap_band=x["ending_band"],
),
axis=1
)
rate = funding.get_eco4_abs_rate(is_cavity=p.walls["is_cavity_wall"])
# The full project funding, at this point, does NOT include any uplifts
solutions["full_project_funding"] = solutions["project_score"] * rate
# if the scheme is not ECO4, we set the funding to 0 with iloc
solutions.loc[solutions["scheme"] != "eco4", "full_project_funding"] = 0.0
solutions["partial_project_funding"] = solutions.apply(lambda x: get_gbis_pp_funding(x), axis=1)
solutions["partial_project_score"] = solutions.apply(lambda x: get_gbis_pps(x), axis=1)
# We pull out uplifts
solutions["total_uplift"] = solutions.apply(lambda x: get_total_uplift(x), axis=1)
solutions["total_uplift_score"] = solutions.apply(lambda x: get_total_innovation_score(x), axis=1)
# Given the solutions we select the optimal one
# 1) If the scheme is ECO4, the full project funding and uplift are deducted from the cost
# 2) If the sheme is GBIS, the partial project funding and uplift are deducted from the cost
# 3) Otherwise, no funding is deducted from the cost
solutions["cost_less_full_project_funding"] = np.where(
solutions["scheme"] == "none",
solutions["total_cost"],
np.where(
solutions["scheme"] == "eco4",
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"],
solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"]
)
)
solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True)
return solutions
def build_heat_pump_paths(
remaining_wall_measures,
remaining_roof_measures,
):
"""
Build AND-paths using cartesian products.
Rules:
- Always include air_source_heat_pump
- Choose 1 wall measure if any exist
- Choose 1 roof measure if any exist
"""
# If a category is empty, use [None] so product still works
wall_choices = remaining_wall_measures or [None]
roof_choices = remaining_roof_measures or [None]
paths = []
for wall, roof in product(wall_choices, roof_choices):
parts = []
if wall is not None:
parts.append(wall)
if roof is not None:
parts.append(roof)
parts.append("air_source_heat_pump")
paths.append({"AND": parts})
return paths
def exclude_measure_types(input_measures, excluded_types):
excluded = set(excluded_types)
filtered = []
for group in input_measures:
kept = [
opt for opt in group
if opt["type"] not in excluded
]
if kept:
filtered.append(kept)
return filtered
def optimise_with_scenarios(
p,
input_measures,
budget=None,
target_gain=None,
enforce_heat_pump_insulation=True,
enforce_fabric_first=False,
already_installed_sap=0
):
"""
Scenario-based optimiser (funding-agnostic).
Currently implemented scenarios:
1) With air source heat pump AND required insulation
"""
# Universally handle zero gain
if target_gain is not None:
if target_gain <= 0:
return pd.DataFrame([])
solutions = []
paths = []
# Produce the unique list of measure types
all_measure_types = []
for inputs in input_measures:
all_measure_types.extend([x["type"] for x in inputs])
all_measure_types = list(set(all_measure_types))
# We modify the solar PV gain, if there is a battery, to include an estimated SAP battery uplift, should
# the property hit the upgrade target, plus 1. We add the additional 1 because the higher the starting SAP,
# the lower the battery SAP uplift, so this is a conservative approach since the true SAP score is
# re-calculated later on.
optimisation_measures = deepcopy(input_measures)
for measures in optimisation_measures:
if measures[0]["type"] == "solar_pv":
for x in measures:
if x["has_battery"]:
x["battery_gain"] = BatterySAPScorer.score(
starting_sap=int(p.data["current-energy-efficiency"]) + target_gain + 1,
pv_size=x["array_size"]
)
x["gain"] += x["battery_gain"]
if enforce_fabric_first:
# If this is true, it means we only want to consider a fabric first approach. This means that
# - We treat the fabric of the house first
# - Only once the fabric has been upgraded, do we consider heating upgrades
# This should be wall insulation, roof insulation, floor insulation and windows
fabric_measures = (
WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES + ECO4_ELIGIBILE_FABRIC_MEASURES +
WALL_INSULATION_WITH_VENTILATION_MEASURES
)
fabric_only_measures = [
[opt for opt in group if opt["type"] in fabric_measures] for group in optimisation_measures
]
fabric_only_measures = [g for g in fabric_only_measures if g]
if not fabric_only_measures:
# If we have no fabric measures, it means the work has already been done and we can proceed
# straight to heating optimisation
picked_fabric, fabric_cost, fabric_gain = [], 0, 0
else:
picked_fabric, fabric_cost, fabric_gain = run_optimizer(
input_measures=fabric_only_measures,
budget=budget,
sub_target_gain=target_gain,
# If we can achieve the target gain with just insulation measures, we're done
)
picked_fabric_types = {m["type"] for m in picked_fabric}
remaining_measures = []
for group in optimisation_measures:
kept = [m for m in group if m["type"] not in picked_fabric_types]
if kept:
remaining_measures.append(kept)
remaining_budget = budget - fabric_cost if budget is not None else None
if remaining_budget is not None:
remaining_budget = 0 if remaining_budget < 0 else remaining_budget
picked_extra, extra_cost, extra_gain = run_optimizer(
remaining_measures,
budget=remaining_budget,
sub_target_gain=(
target_gain - fabric_gain
if target_gain is not None
else None
)
)
if picked_extra is None:
picked_extra, extra_cost, extra_gain = [], 0, 0
solutions.append({
"scenario": "fabric_first",
"items": picked_fabric + picked_extra,
"fixed_items": picked_fabric,
"total_cost": fabric_cost + extra_cost,
"total_gain": fabric_gain + extra_gain,
"already_installed_gain": sum([x["gain"] for x in picked_fabric + picked_extra if x["already_installed"]])
})
return append_solution_metrics(solutions, target_gain, p)
# ------------------------------------------------------------------
# Scenario 1: Air source heat pump with required insulation
# ------------------------------------------------------------------
if enforce_heat_pump_insulation:
# Wall measures could be IWI, EWI or CWI
remaining_wall_measures = [
x for x in all_measure_types if x in WALL_INSULATION_MEASURES + WALL_INSULATION_WITH_VENTILATION_MEASURES
]
remaining_roof_measures = [x for x in all_measure_types if x in ROOF_INSULATION_MEASURES]
# Mandatory structure:
# - must include ASHP
# - must include >=1 wall insulation (if still needed)
# - must include >=1 roof insulation (if still needed)
# We need all of the combinations of remaining wall and remaining roof measures
heat_pump_paths = build_heat_pump_paths(remaining_wall_measures, remaining_roof_measures)
paths.extend(heat_pump_paths)
fixed_selections = []
for path in paths:
result = expand_funding_path(input_measures, [path])
if result:
fixed_selections.extend(result)
for fixed in fixed_selections:
if target_gain is not None:
if target_gain <= 0:
# If we don't have any gain, we don't actually need to do this
continue
# fixed = [(gi, oi, opt), ...]
fixed_items = [opt for (_, _, opt) in fixed]
fixed_groups = {gi for (gi, _, _) in fixed}
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
if budget is not None:
# If we have a budget, we cannot exceed it via our fixed cost. If we do,
# this is not a viable solution
if fixed_cost > budget:
continue
# Remaining measures (all other groups)
remaining_measures = [
grp for gi, grp in enumerate(optimisation_measures)
if gi not in fixed_groups
]
# Optimise remaining measures
if (
target_gain is not None
and fixed_gain >= target_gain
):
picked, sub_cost, sub_gain = [], 0, 0
else:
picked, sub_cost, sub_gain = run_optimizer(
remaining_measures,
budget=budget - fixed_cost if budget is not None else None,
sub_target_gain=(
target_gain - fixed_gain
if target_gain is not None
else None
)
)
if picked is None:
continue
total_items = fixed_items + picked
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
solutions.append({
"scenario": "heat_pump_with_insulation",
"items": total_items,
"fixed_items": fixed_items,
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in total_items if x["already_installed"]])
})
# ------------------------------------------------------------------
# Scenario 2: Optimise without air source heat pump
# ------------------------------------------------------------------
# No special path; just exclude ASHP from options and allow us to optimise.
measures_no_heat_pump = exclude_measure_types(optimisation_measures, ["air_source_heat_pump"])
if target_gain > 0:
# If we don't have any gain, we don't actually need to do this
picked, total_cost, total_gain = run_optimizer(
measures_no_heat_pump,
budget=budget,
sub_target_gain=target_gain,
)
if picked is not None:
solutions.append({
"scenario": "no_heat_pump",
"items": picked,
"fixed_items": [],
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
})
solutions_df = append_solution_metrics(solutions, target_gain, p, already_installed_sap)
return solutions_df
def _get_ending_sap_without_battery(x):
gain = [y["gain"] - y.get("battery_gain", 0) for y in x["items"]]
return float(sum(gain))
def append_solution_metrics(solutions, target_gain, p, already_installed_sap=0):
"""
Given a set of solutions, this function will return a dataframe, with cost metrics appended, to allow
the end user to select the optimal solution.
:param solutions:
:param target_gain:
:param p:
:param already_installed_sap:
:return:
"""
solutions_df = pd.DataFrame(solutions)
if solutions_df.empty:
# We return a blank dataframe
return solutions_df
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
# We flag projects that are including batteries
solutions_df["has_battery"] = solutions_df["items"].apply(has_battery)
solutions_df["array_size"] = solutions_df["items"].apply(
lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y)
)
# We need the ending SAP, but we'll need to remove the battery SAP uplift first
solutions_df["ending_sap_without_battery"] = solutions_df.apply(
lambda x: int(p.data["current-energy-efficiency"]) + already_installed_sap + _get_ending_sap_without_battery(x),
axis=1
)
solutions_df = solutions_df.sort_values("total_cost", ascending=True)
return solutions_df
# ---- helpers -------------------------------------------------------------
def get_gbis_pp_funding(x):
if x["scheme"] != "gbis":
return 0
fixed_ids = x["fixed_ids"]
if len(fixed_ids) != 1:
raise ValueError("More than one fixed ID for GBIS")
return [x for x in x["items"] if x["id"] in fixed_ids][0]["partial_project_funding"]
def get_gbis_pps(x):
if x["scheme"] != "gbis":
return 0
fixed_ids = x["fixed_ids"]
if len(fixed_ids) != 1:
raise ValueError("More than one fixed ID for GBIS")
return [x for x in x["items"] if x["id"] in fixed_ids][0]["partial_project_score"]
def get_total_uplift(x):
return sum([y["innovation_uplift"] for y in x["items"]])
def get_total_innovation_score(x):
return sum([y["uplift_project_score"] for y in x["items"]])
def sum_cost_gain(items):
c = sum(float(x['cost']) for x in items)
g = sum(float(x['gain']) for x in items)
return c, g
# ---- candidate expansion -------------------------------------------------
def type_matches(option_type: str, required: str) -> bool:
# substring match so "external_wall_insulation+mechanical_ventilation" satisfies "external_wall_insulation"
return required in option_type
def candidates_for_type(input_measures, required_type):
"""
Return a list of (gi, oi, opt) where opt['type'] contains required_type.
gi = group index, oi = option index inside that group.
"""
cands = []
for gi, group in enumerate(input_measures):
for oi, opt in enumerate(group):
if type_matches(opt["type"], required_type):
cands.append((gi, oi, opt))
return cands
def iter_or_candidates(input_measures, types_list):
"""
For OR: pick exactly ONE candidate whose type matches ANY in types_list.
Return a list of dicts: {"fixed": [(gi, oi, opt)]}
"""
union = []
seen_ids = set()
for t in types_list:
for tup in candidates_for_type(input_measures, t):
# de-dupe by the option id so the same physical option (with multi-type name) isnt repeated
if tup[2]["id"] not in seen_ids:
seen_ids.add(tup[2]["id"])
union.append(tup)
return [{"fixed": [t]} for t in union]
def iter_and_candidates(input_measures, types_list):
"""
For AND: we must cover ALL required types.
We allow a single option to satisfy multiple types.
We build a simple product but collapse duplicates by (gi, oi).
"""
# Build candidate pools per required type
pools = [candidates_for_type(input_measures, t) for t in types_list]
if any(len(pool) == 0 for pool in pools):
return [] # impossible to satisfy AND
# Start with one empty selection; accumulate per pool
selections = [[]] # each selection is a list of (gi, oi, opt)
for pool in pools:
new_selections = []
for sel in selections:
for cand in pool:
# Try adding cand; collapse duplicates by (gi,oi)
gi, oi, opt = cand
replaced = False
conflict = False
merged = []
for (sgi, soi, sopt) in sel:
if (sgi, soi) == (gi, oi):
# same exact option already in selection (satisfies another required type) keep one
replaced = True
# keep the existing one (identical)
merged.append((sgi, soi, sopt))
else:
merged.append((sgi, soi, sopt))
if not replaced:
merged.append(cand)
if not conflict:
new_selections.append(merged)
selections = new_selections
if not selections:
return []
# After accumulation, we may still have duplicate groups with different options (conflict). Drop those.
cleaned = []
for sel in selections:
seen_by_group = {}
ok = True
for gi, oi, opt in sel:
if gi in seen_by_group and seen_by_group[gi] != oi:
# same group, different option -> conflict for AND; invalid selection
ok = False
break
seen_by_group[gi] = oi
if ok:
# ensure stable order and unique by (gi,oi)
uniq = {}
for gi, oi, opt in sel:
uniq[(gi, oi)] = opt
cleaned.append([(gi, oi, opt) for (gi, oi), opt in uniq.items()])
return [{"fixed": c} for c in cleaned]
def expand_funding_path(input_measures, path_spec):
"""
path_spec is a list of elements; each element is either:
{"OR": [type1, type2, ...], "reference": "..."} or
{"AND": [type1, type2, ...], "reference": "..."}
We cross-product across elements (all required), and produce selections as lists of (gi, oi, opt).
"""
selections = [[]] # list[list[(gi,oi,opt)]]
for elem in path_spec:
if "OR" in elem:
cands = iter_or_candidates(input_measures, elem["OR"])
elif "AND" in elem:
cands = iter_and_candidates(input_measures, elem["AND"])
else:
raise ValueError("unknown path element; expected 'OR' or 'AND'")
if not cands:
return []
new_selections = []
for base in selections:
for cand in cands:
# merge base + cand["fixed"], collapsing duplicate same-option picks
combined = list(base)
# reject if combined picks two different options from the same group
groups_to_oi = {(gi,): oi for gi, oi, _ in combined} # temporary; well refactor below
conflict = False
# simpler: build a dict by group -> (oi, opt), conflict if group exists with different oi
gmap = {gi: (oi, opt) for gi, oi, opt in combined}
for gi, oi, opt in cand["fixed"]:
if gi in gmap:
prev_oi, _ = gmap[gi]
if prev_oi != oi:
conflict = True
break
gmap[gi] = (oi, opt)
if conflict:
continue
# back to list
merged = [(gi, oi, opt) for gi, (oi, opt) in gmap.items()]
new_selections.append(merged)
selections = new_selections
if not selections:
return []
# Final tidy: ensure no duplicate groups with different options (already protected), keep stable ordering
deduped = []
for sel in selections:
gmap = {}
for gi, oi, opt in sel:
# keep the first occurrence
if gi not in gmap:
gmap[gi] = (oi, opt)
else:
# same group, different oi would have been filtered; if same oi, ignore duplicate
pass
deduped.append([(gi, oi, opt) for gi, (oi, opt) in gmap.items()])
return deduped
# ---- tiny utilities ----------------------------------------------------------
def parse_types(t):
# e.g. "external_wall_insulation+mechanical_ventilation" -> {"external_wall_insulation","mechanical_ventilation"}
return set(map(str.strip, t.split("+"))) if isinstance(t, str) else set()
def includes_heating(opt_types):
return any(x in opt_types for x in {
"air_source_heat_pump",
"high_heat_retention_storage_heaters",
"time_temperature_zone_control", # controls count as a heating measure in your pipeline
"solar_pv" # you treat PV as heating for funding logic
})
def contributes_min_insulation(opt_types):
# MIR satisfiers you mentioned (extend as needed)
return any(x in opt_types for x in {
"external_wall_insulation",
"internal_wall_insulation",
"loft_insulation",
"cavity_wall_insulation",
})
def run_optimizer(
input_measures: list[list[Mapping[str, int | float | str]]],
budget: Union[float, None] = None,
sub_target_gain: Union[float, None] = None,
allow_slack: bool = False
):
"""
Thin wrapper around the StrategicOptimiser to run it on a subset of measures with an optional budget and target
gain. Handles the cases of no input measures, and extracts the outputs for ease of use.
:param input_measures: list of groups of measures (each group is a list of measure dicts)
:param budget: optional budget to constrain the optimisation
:param sub_target_gain: optional target gain to achieve from this optimisation run
:param allow_slack: whether to allow solutions that exceed the target gain (True) or only solutions that meet it
exactly (False)
:return: tuple of (picked measures, total cost, total gain) where picked measures is a list of measure dicts
"""
if not input_measures:
return None, 0.0, 0.0
opt = StrategicOptimiser(
components=input_measures,
budget=budget,
target_gain=sub_target_gain,
allow_slack=allow_slack,
verbose=False,
)
opt.solve()
return opt.solution, opt.solution_cost, opt.solution_gain
# ---- Define optimisation paths ----------------------------------------------------------
def _find_measure(input_measures, measure_type):
for measures in input_measures:
for m in measures:
if measure_type in m["type"]:
return True
return False
def _make_solar_heating_funding_paths(
p, input_measures, funding_paths, remaining_insulation_type, housing_type, funding: Funding
):
# If a property is private and EPC D or above, it's not eligible
if housing_type == "Private" and p.data["current-energy-rating"] in ["D", "C", "B", "A"]:
return funding_paths
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Solar PV with existing eligible heating system
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
has_eligible_heating_system = funding.check_solar_eligible_heating_system(
mainheat_description=p.main_heating["clean_description"].lower(),
heating_control_description=p.main_heating_controls["clean_description"].lower()
)
if has_eligible_heating_system and _find_measure(input_measures, "solar_pv"):
single_solar_template = [{"AND": ["solar_pv"], "reference": None}]
# We now look to pair this with any lingering insulation measures
solar_paths = []
for insulation_measure in remaining_insulation_type:
new_solar_path = deepcopy(single_solar_template)
new_solar_path[0]["AND"].append(insulation_measure)
# Make a specific reference for this path
new_solar_path[0]["reference"] = "solar_pv+" + insulation_measure + ":eco4"
solar_paths.append(new_solar_path)
if solar_paths:
funding_paths.extend(solar_paths)
else:
# If we have no insulation measures, we just add the solar PV path
funding_paths.append([{"AND": ["solar_pv"], "reference": "solar_pv:eco4"}])
# For each of these, because there is a heating measure begin implemented, we check for minimum insulation
# requirements.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Solar PV + Heating Upgrade combos
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# We don't include electric boilers as they are not eligible for ECO4 funding
solar_heating_combos = [
("high_heat_retention_storage_heaters", "solar_pv+hhrsh:eco4"),
("air_source_heat_pump", "solar_pv+ashp:eco4"),
]
if _find_measure(input_measures, "solar_pv"):
for heat_type, ref in solar_heating_combos:
if _find_measure(input_measures, heat_type):
if remaining_insulation_type:
for insulation_measure in remaining_insulation_type:
funding_paths.append(
[{"AND": ["solar_pv", heat_type, insulation_measure],
"reference": f"{ref[:-5]}+{insulation_measure}:eco4"}] # keeps naming consistent
)
else:
funding_paths.append([{"AND": ["solar_pv", heat_type], "reference": ref}])
# We've actually covered all possible options where solar PV can be included in a funded package, so where
# solar PV is not in a reference, we can exclude it
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Heating Upgrades
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Must have an existing eligible heating system
# For private, HHRSH alone, or a boiler upgrade is NOT eligible for ECO4 funding. Boiler upgrade also doesn't
# count as an eligible heating system
if housing_type == "Private":
single_heating_measures = ["air_source_heat_pump"]
else:
single_heating_measures = [
"boiler_upgrade", "high_heat_retention_storage_heaters", "air_source_heat_pump"
]
measure_references = {
"boiler_upgrade": "boiler_upgrade",
"high_heat_retention_storage_heaters": "hhrsh",
"air_source_heat_pump": "ashp"
}
for heating_upgrade in single_heating_measures:
if _find_measure(input_measures, heating_upgrade):
if remaining_insulation_type:
for insulation_measure in remaining_insulation_type:
path = [
{
"AND": [heating_upgrade, insulation_measure],
"reference": f"{measure_references[heating_upgrade]}+{insulation_measure}:eco4"
}
]
funding_paths.append(path)
else:
funding_paths.append(
[{"AND": [heating_upgrade], "reference": f"{measure_references[heating_upgrade]}:eco4"}]
)
return funding_paths
def _make_generic_gbis_funding_paths(input_gbis_measures, funding_paths):
"""
For GBIS, the packages are single insulation measure.
We also have potential GBIS packages that allow heating controls as a secondary measure, however this
is not currently implemented in the optimiser due to not being certain about the heating controls pre conditions
:param input_gbis_measures:
:param funding_paths:
:return:
"""
gbis_funding_paths = []
for input_measure in input_gbis_measures:
for measure in input_measure:
# We create a path for each measure
gbis_funding_paths.append([{"AND": [measure["type"]], "reference": measure["type"] + ":gbis"}])
return funding_paths + gbis_funding_paths
def make_funding_paths(p, input_measures, housing_type, funding: Funding, work_package=None):
"""
This function generates funding paths based on the input measures and the tenure of the property.
It checks for the presence of specific measures and creates paths that include necessary insulation measures
to meet minimum insulation requirements, particularly when a heating system is recommended.
Remaining measures that are not fixed as part of the package are then optimised
:param p: The property object containing details about the property, including main heating and controls.
:param input_measures:
:param housing_type:
:param funding: The funding object that provides methods to check eligibility and calculate funding.
:param work_package: Optional work package information. We handle the case of an EPC D property needing a heating
upgrade, where the heating upgrade needs to be conducted before the solar PV work
:return:
"""
# If the property is currently EPC C, there is no funding availability
if p.data["current-energy-rating"] in ["C", "B", "A"]:
return [], input_measures
# We handle the case of minimum insulation requirements. Whenever we have a heating system recommendation,
# we *must* include an additional insulation measure, unless the property already has sufficient insulation.
# We determine which insulation measures need to be included
wall_insulation_measures = [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"extension_cavity_wall_insulation"
]
roof_insulation_measures = [
"loft_insulation", "flat_roof_insulation", "room_roof_insulation"
]
other_gbis_insulation_measures = [
"suspended_floor_insulation", "solid_floor_insulation",
]
# These are the insulation measures that the property still needs and so will be considered for
# filling the minimum insulation requirements
remaining_insulation_type = []
for insulation_measure in wall_insulation_measures + roof_insulation_measures:
if _find_measure(input_measures, insulation_measure):
remaining_insulation_type.append(insulation_measure)
remaining_insulation_type = list(set(remaining_insulation_type))
funding_paths = []
if housing_type == "Social" and p.data["current-energy-rating"] == "D":
# If the property is currently EPC D, we can only include innovation measures or measures to meet the
# minimum insulation requirements. We make an exception if we have a measure that is
# already installed, specifically a heat pump
input_measures_innovation = []
input_gbis_measures_innovation = []
for measures in input_measures:
group_of_innovation_measures = []
group_of_gbis_innovation_measures = []
for measure in measures:
if measure["type"] == "high_heat_retention_storage_heaters" and work_package == "solar_hhrsh_eco4":
# With this work type, if the property is EPC D and doesn't have an eligible heating system
# we install HHRSH as a pre-requisite measure, before the ECO4 project if complete.
group_of_innovation_measures.append(measure)
if measure["innovation_uplift"] or measure["type"] in remaining_insulation_type or measure[
"already_installed"]:
group_of_innovation_measures.append(measure)
if measure["innovation_uplift"] and measure["type"] in (
remaining_insulation_type + other_gbis_insulation_measures
):
group_of_gbis_innovation_measures.append([measure])
if group_of_innovation_measures:
input_measures_innovation.append(group_of_innovation_measures)
if group_of_gbis_innovation_measures:
input_gbis_measures_innovation.extend(group_of_gbis_innovation_measures)
funding_paths = _make_solar_heating_funding_paths(
p, input_measures_innovation, funding_paths, remaining_insulation_type, housing_type, funding,
)
# Can only be innovation GBIS measures
funding_paths = _make_generic_gbis_funding_paths(input_gbis_measures_innovation, funding_paths)
return funding_paths, input_measures_innovation
if housing_type == "Private":
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EWI or IWI
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 1) The package must include EWI or IWI if the property is private rental sector
# We check if we have any EWI or IWI measures available - only for EPC E or below
if p.data["current-energy-rating"] in ["E", "F", "G"]:
ewi_or_iwi = [{"OR": []}]
reference_measures = []
# If we have EWI we add it in
if _find_measure(input_measures, "external_wall_insulation"):
ewi_or_iwi[0]["OR"].append("external_wall_insulation")
reference_measures.append("ewi")
if _find_measure(input_measures, "internal_wall_insulation"):
ewi_or_iwi[0]["OR"].append("internal_wall_insulation")
reference_measures.append("iwi")
if ewi_or_iwi[0]["OR"]:
ewi_or_iwi[0]["reference"] = "+".join(reference_measures) + ":eco4"
funding_paths.append(ewi_or_iwi)
funding_paths = _make_solar_heating_funding_paths(
p, input_measures, funding_paths, remaining_insulation_type, housing_type, funding
)
# If we have any remaining insulation measures, we add them to the funding paths
input_gbis_measures = []
for measures in input_measures:
for measure in measures:
type_to_check = measure["type"].split("+")[0] if "+" in measure["type"] else measure["type"]
if type_to_check in remaining_insulation_type + other_gbis_insulation_measures:
input_gbis_measures.append([measure])
funding_paths = _make_generic_gbis_funding_paths(input_gbis_measures, funding_paths)
return funding_paths, input_measures