mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
tests working for optimiser
This commit is contained in:
parent
50de86e379
commit
dd488f1857
4 changed files with 1399 additions and 1182 deletions
|
|
@ -715,9 +715,6 @@ class Funding:
|
|||
- all other measures are insulation (can be non-innovation)
|
||||
"""
|
||||
|
||||
raise ValueError(
|
||||
"THis isnt quite right. Band D homes must be pre-insulated OR it should include one of the"
|
||||
)
|
||||
# The condition is:
|
||||
# one of the following insulation measures must be installed as part of the
|
||||
# same ECO4 project:
|
||||
|
|
|
|||
767
recommendations/optimiser/funding_optimiser.py
Normal file
767
recommendations/optimiser/funding_optimiser.py
Normal file
|
|
@ -0,0 +1,767 @@
|
|||
"""
|
||||
This script contains a number of functions which are designed to enable optimisation and selection of funding options
|
||||
for individual properties to improve their energy efficiency
|
||||
|
||||
The main entry point to this is optimise_with_funding_paths
|
||||
|
||||
In the future, we will adapt this into a class-based structure to allow for more flexibility and reusability
|
||||
"""
|
||||
|
||||
from copy import deepcopy
|
||||
import pandas as pd
|
||||
|
||||
from backend.app.plan.schemas import (
|
||||
WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES
|
||||
)
|
||||
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
||||
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
||||
from utils.logger import setup_logger
|
||||
from backend.Funding import Funding
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# measures we DO NOT treat as fundable in the ECO4 'funded' pass
|
||||
_ECO4_EXCLUDE_TYPES = {"secondary_heating"}
|
||||
|
||||
|
||||
def _path_scheme(path_spec):
|
||||
"""
|
||||
Infer scheme from any 'reference' tag in the path.
|
||||
Defaults to 'eco4' if not specified.
|
||||
"""
|
||||
for elem in path_spec or []:
|
||||
ref = elem.get("reference")
|
||||
if isinstance(ref, str):
|
||||
if ref.endswith(":gbis"):
|
||||
return "gbis"
|
||||
if ref.endswith(":eco4"):
|
||||
return "eco4"
|
||||
return "eco4"
|
||||
|
||||
|
||||
def _filter_fundable_subgroups(groups, scheme):
|
||||
"""
|
||||
Keep only options eligible for the funded pass of the given scheme.
|
||||
- ECO4: drop excluded types (e.g., secondary_heating)
|
||||
- GBIS: funded pass is the GBIS fixed measure only, so return empty sub-groups
|
||||
"""
|
||||
if scheme == "gbis":
|
||||
return [] # we won't optimise 'the rest' under GBIS here
|
||||
|
||||
# ECO4 case
|
||||
filtered = []
|
||||
for grp in groups:
|
||||
kept = [opt for opt in grp
|
||||
if not any(ex in opt["type"] for ex in _ECO4_EXCLUDE_TYPES)]
|
||||
if kept:
|
||||
filtered.append(kept)
|
||||
return filtered
|
||||
|
||||
|
||||
def _sum_cost_gain_with_scheme(items, scheme):
|
||||
"""
|
||||
Sum cost/gain of fixed items, adjusting for scheme rules.
|
||||
- GBIS: strip innovation uplift from GBIS-funded fixed measures only.
|
||||
"""
|
||||
total_cost = 0.0
|
||||
total_gain = 0.0
|
||||
for it in items:
|
||||
cost = float(it["cost"])
|
||||
if scheme == "gbis":
|
||||
# innovation uplifts are not paid under GBIS
|
||||
cost -= float(it.get("innovation_uplift", 0.0))
|
||||
total_cost += cost
|
||||
total_gain += float(it["gain"])
|
||||
return total_cost, total_gain
|
||||
|
||||
|
||||
def violates_min_insulation(fixed):
|
||||
"""Return True if fixed selection includes a heating/PV measure but no required insulation."""
|
||||
picked_types = {opt["type"] for (_, _, opt) in fixed}
|
||||
|
||||
def has_any(substrs):
|
||||
return any(any(s in t for s in substrs) for t in picked_types)
|
||||
|
||||
# heating (incl. PV) flags
|
||||
is_heating = has_any([
|
||||
"air_source_heat_pump",
|
||||
"high_heat_retention_storage_heater",
|
||||
"boiler_upgrade",
|
||||
"electric_boiler",
|
||||
"time_temperature_zone_control",
|
||||
"secondary_heating",
|
||||
"solar_pv", # PV treated as heating for MIR
|
||||
])
|
||||
|
||||
# MIR insulation (the ones you’re using in path construction)
|
||||
has_insul = has_any([
|
||||
"external_wall_insulation",
|
||||
"internal_wall_insulation",
|
||||
"cavity_wall_insulation",
|
||||
"extension_cavity_wall_insulation",
|
||||
"loft_insulation",
|
||||
"flat_roof_insulation",
|
||||
"room_roof_insulation",
|
||||
])
|
||||
|
||||
return is_heating and not has_insul
|
||||
|
||||
|
||||
# Treat "type" like "external_wall_insulation+mechanical_ventilation" → "external_wall_insulation"
|
||||
def _base_type(s: str) -> str:
|
||||
return s.split("+", 1)[0]
|
||||
|
||||
|
||||
def _filter_measures_by_types(input_measures, allowed_types):
|
||||
"""
|
||||
Keep only groups that have ≥1 allowed option; inside each group keep only allowed options.
|
||||
"""
|
||||
allowed_set = set(allowed_types)
|
||||
filtered = []
|
||||
for group in input_measures:
|
||||
kept_opts = [opt for opt in group if _base_type(opt["type"]) in allowed_set]
|
||||
if kept_opts:
|
||||
filtered.append(kept_opts)
|
||||
return filtered
|
||||
|
||||
|
||||
def _is_eligible_funding_package(scheme, starting_sap, total_gain):
|
||||
if scheme == "eco4":
|
||||
# We check if we meet the upgrade requirements
|
||||
# If the property is an E or above, we need to upgrade to a C or above
|
||||
if starting_sap >= 39: # ie. EPC C or above
|
||||
return starting_sap + total_gain >= 69
|
||||
|
||||
if scheme == "gbis":
|
||||
# GBIS is a fixed measure only, so we don't check the gain
|
||||
return True
|
||||
|
||||
|
||||
def _prs_solution_ok(items, p, funding):
|
||||
# items: list of picked option dicts (after optimisation)
|
||||
# treat "type" possibly like "x+y" -> split and look at base tokens
|
||||
types = set()
|
||||
for opt in items:
|
||||
for t in opt["type"].split("+"):
|
||||
types.add(t)
|
||||
|
||||
has_solid_wall = ("external_wall_insulation" in types) or ("internal_wall_insulation" in types)
|
||||
|
||||
# renewable set:
|
||||
has_ashp = ("air_source_heat_pump" in types) # ASHP alone is renewable
|
||||
has_solar = ("solar_pv" in types)
|
||||
has_hhrsh = ("high_heat_retention_storage_heater" in types) # only counts *with* solar
|
||||
|
||||
# solar PV qualifies if paired with eligible existing heating
|
||||
solar_ok_existing = has_solar and funding.check_solar_eligible_heating_system(
|
||||
p.main_heating["clean_description"], p.main_heating_controls["clean_description"]
|
||||
)
|
||||
|
||||
# or paired with ASHP/HHRSH in the same package
|
||||
solar_ok_with_installed = has_solar and (has_ashp or has_hhrsh)
|
||||
|
||||
renewable_ok = has_ashp or solar_ok_existing or solar_ok_with_installed
|
||||
|
||||
return has_solid_wall or renewable_ok
|
||||
|
||||
|
||||
def _ensure_unfunded_costs(groups):
|
||||
"""Make sure each option’s cost is base+uplift (i.e., no funding).
|
||||
Safe if fields already match; works on a deepcopy.
|
||||
"""
|
||||
for grp in groups:
|
||||
for opt in grp:
|
||||
base = opt.get("cost_minus_uplift")
|
||||
upl = opt.get("innovation_uplift", 0.0)
|
||||
if base is not None:
|
||||
opt["cost"] = float(base) + float(upl)
|
||||
# else: assume opt["cost"] already includes uplift
|
||||
return groups
|
||||
|
||||
|
||||
def optimise_with_funding_paths(p, input_measures, housing_type, funding: Funding, budget=None, target_gain=None):
|
||||
"""
|
||||
run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain)
|
||||
"""
|
||||
|
||||
solutions = []
|
||||
|
||||
# unfunded - we utilise all measures
|
||||
unfunded_measures = input_measures.copy()
|
||||
unfunded_measures = _ensure_unfunded_costs(unfunded_measures)
|
||||
picked, total_cost, total_gain = run_optimizer(
|
||||
unfunded_measures,
|
||||
budget=budget,
|
||||
sub_target_gain=target_gain
|
||||
)
|
||||
if picked is not None:
|
||||
solutions.append({
|
||||
"fixed_ids": [],
|
||||
"items": picked,
|
||||
"total_cost": total_cost,
|
||||
"total_gain": total_gain,
|
||||
"path": {"reference": "unfunded:all"},
|
||||
"scheme": "none",
|
||||
"is_eligible": False, # no funding scheme applied
|
||||
})
|
||||
|
||||
# This function will filter down on innovation measures if we are social EPC D
|
||||
funding_paths, optimisation_input_measures = make_funding_paths(p, input_measures, housing_type, funding)
|
||||
|
||||
# We now produce a fabric only path for ECO4
|
||||
# We add in generic insulation funding paths (where there is no fixed measure)
|
||||
# Heating controls are only eligible if installed as part of a heating upgrade and so we do not include them
|
||||
# here
|
||||
if housing_type == "Social":
|
||||
funding_paths = (
|
||||
[
|
||||
{
|
||||
'reference': 'fabric-only:eco4',
|
||||
"allowed_types": WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES +
|
||||
ECO4_ELIGIBILE_FABRIC_MEASURES
|
||||
}
|
||||
] + funding_paths
|
||||
)
|
||||
|
||||
for path_spec in funding_paths:
|
||||
|
||||
# ECO4 fabric only path = special case
|
||||
if isinstance(path_spec, dict) and path_spec.get("reference") == "fabric-only:eco4":
|
||||
sub_measures = _filter_measures_by_types(optimisation_input_measures, path_spec["allowed_types"])
|
||||
if not sub_measures:
|
||||
continue
|
||||
|
||||
picked, sub_cost, sub_gain = run_optimizer(
|
||||
sub_measures,
|
||||
budget=budget, # no fixed items; budget unchanged
|
||||
sub_target_gain=target_gain
|
||||
)
|
||||
|
||||
if picked is None:
|
||||
continue
|
||||
|
||||
scheme = _path_scheme([path_spec])
|
||||
|
||||
solutions.append(
|
||||
{
|
||||
"fixed_ids": [],
|
||||
"items": picked,
|
||||
"total_cost": sub_cost,
|
||||
"total_gain": sub_gain,
|
||||
"path": path_spec,
|
||||
"scheme": scheme,
|
||||
"is_eligible": _is_eligible_funding_package(scheme, p.data["current-energy-efficiency"], sub_gain)
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
# 1) expand fixed selections for this path
|
||||
fixed_selections = expand_funding_path(optimisation_input_measures, path_spec) if path_spec else [[]]
|
||||
if not fixed_selections:
|
||||
continue
|
||||
|
||||
for fixed in fixed_selections:
|
||||
|
||||
if violates_min_insulation(fixed):
|
||||
# We log an error and skip this - we should not see any errors but we can probably get a reasonable
|
||||
# outcome for the end user without a complete termination of the process
|
||||
logger.error("Skipping fixed selection due to minimum insulation violation: %s", fixed)
|
||||
continue
|
||||
|
||||
scheme = _path_scheme(path_spec)
|
||||
|
||||
# 3) compute fixed cost/gain, and strip those groups from subproblem
|
||||
fixed_items = [opt for (_, _, opt) in fixed]
|
||||
fixed_ids = [opt['id'] for opt in fixed_items]
|
||||
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
|
||||
fixed_groups = {gi for (gi, _, _) in fixed}
|
||||
|
||||
sub_measures = deepcopy(
|
||||
[grp for gi, grp in enumerate(optimisation_input_measures) if gi not in fixed_groups]
|
||||
)
|
||||
|
||||
if scheme == "gbis":
|
||||
# Then for the sub-measures, we need to strip the innovation uplift from the GBIS fixed measures. We
|
||||
# do this by adding innovation back onto the cost
|
||||
for grp in sub_measures:
|
||||
for opt in grp:
|
||||
opt["cost"] = opt["cost_minus_uplift"] + opt.get("innovation_uplift", 0.0)
|
||||
|
||||
if scheme == "eco4":
|
||||
# Need to strip out any measure types that are not eligible for ECO4 funding (e.g. secondary heating)
|
||||
raise ValueError()
|
||||
|
||||
# 4) run your existing optimiser for the remaining groups
|
||||
# If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which
|
||||
# may already be over budget) and the fixed gain (which may not be achievable)
|
||||
picked, sub_cost, sub_gain = run_optimizer(
|
||||
sub_measures,
|
||||
budget - fixed_cost if budget is not None else None,
|
||||
sub_target_gain=target_gain - fixed_gain if target_gain is not None else None
|
||||
)
|
||||
|
||||
if picked is None:
|
||||
continue
|
||||
|
||||
total_cost = fixed_cost + sub_cost
|
||||
total_gain = fixed_gain + sub_gain
|
||||
total_picks = fixed_items + picked
|
||||
|
||||
if housing_type == "Private":
|
||||
if not _prs_solution_ok(total_picks, p, funding):
|
||||
logger.error(
|
||||
"Found a solution that does not meet the PRS requirements: %s - this shouldn't be happening",
|
||||
total_picks
|
||||
)
|
||||
continue
|
||||
|
||||
scheme = _path_scheme(path_spec)
|
||||
|
||||
solutions.append({
|
||||
"fixed_ids": fixed_ids,
|
||||
"items": total_picks,
|
||||
"total_cost": total_cost,
|
||||
"total_gain": total_gain,
|
||||
"path": path_spec,
|
||||
"scheme": scheme,
|
||||
"is_eligible": _is_eligible_funding_package(scheme, p.data["current-energy-efficiency"], total_gain)
|
||||
})
|
||||
|
||||
solutions = pd.DataFrame(solutions)
|
||||
|
||||
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
|
||||
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
|
||||
solutions["meets_upgrade_target"] = solutions["total_gain"] >= target_gain
|
||||
|
||||
# If we have packages that are fundable, but do not meet the upgrade target, we can run a final optimisation pass
|
||||
if not solutions[solutions["is_eligible"] & ~solutions["meets_upgrade_target"]].empty:
|
||||
raise NotImplementedError("Implement me")
|
||||
|
||||
return solutions
|
||||
|
||||
|
||||
# ---- helpers -------------------------------------------------------------
|
||||
|
||||
|
||||
def sum_cost_gain(items):
|
||||
c = sum(float(x['cost']) for x in items)
|
||||
g = sum(float(x['gain']) for x in items)
|
||||
return c, g
|
||||
|
||||
|
||||
# ---- candidate expansion -------------------------------------------------
|
||||
def type_matches(option_type: str, required: str) -> bool:
|
||||
# substring match so "external_wall_insulation+mechanical_ventilation" satisfies "external_wall_insulation"
|
||||
return required in option_type
|
||||
|
||||
|
||||
def candidates_for_type(input_measures, required_type):
|
||||
"""
|
||||
Return a list of (gi, oi, opt) where opt['type'] contains required_type.
|
||||
gi = group index, oi = option index inside that group.
|
||||
"""
|
||||
cands = []
|
||||
for gi, group in enumerate(input_measures):
|
||||
for oi, opt in enumerate(group):
|
||||
if type_matches(opt["type"], required_type):
|
||||
cands.append((gi, oi, opt))
|
||||
return cands
|
||||
|
||||
|
||||
def iter_or_candidates(input_measures, types_list):
|
||||
"""
|
||||
For OR: pick exactly ONE candidate whose type matches ANY in types_list.
|
||||
Return a list of dicts: {"fixed": [(gi, oi, opt)]}
|
||||
"""
|
||||
union = []
|
||||
seen_ids = set()
|
||||
for t in types_list:
|
||||
for tup in candidates_for_type(input_measures, t):
|
||||
# de-dupe by the option id so the same physical option (with multi-type name) isn’t repeated
|
||||
if tup[2]["id"] not in seen_ids:
|
||||
seen_ids.add(tup[2]["id"])
|
||||
union.append(tup)
|
||||
return [{"fixed": [t]} for t in union]
|
||||
|
||||
|
||||
def iter_and_candidates(input_measures, types_list):
|
||||
"""
|
||||
For AND: we must cover ALL required types.
|
||||
We allow a single option to satisfy multiple types.
|
||||
We build a simple product but collapse duplicates by (gi, oi).
|
||||
"""
|
||||
# Build candidate pools per required type
|
||||
pools = [candidates_for_type(input_measures, t) for t in types_list]
|
||||
if any(len(pool) == 0 for pool in pools):
|
||||
return [] # impossible to satisfy AND
|
||||
|
||||
# Start with one empty selection; accumulate per pool
|
||||
selections = [[]] # each selection is a list of (gi, oi, opt)
|
||||
for pool in pools:
|
||||
new_selections = []
|
||||
for sel in selections:
|
||||
for cand in pool:
|
||||
# Try adding cand; collapse duplicates by (gi,oi)
|
||||
gi, oi, opt = cand
|
||||
replaced = False
|
||||
conflict = False
|
||||
merged = []
|
||||
for (sgi, soi, sopt) in sel:
|
||||
if (sgi, soi) == (gi, oi):
|
||||
# same exact option already in selection (satisfies another required type) – keep one
|
||||
replaced = True
|
||||
# keep the existing one (identical)
|
||||
merged.append((sgi, soi, sopt))
|
||||
else:
|
||||
merged.append((sgi, soi, sopt))
|
||||
if not replaced:
|
||||
merged.append(cand)
|
||||
if not conflict:
|
||||
new_selections.append(merged)
|
||||
selections = new_selections
|
||||
if not selections:
|
||||
return []
|
||||
|
||||
# After accumulation, we may still have duplicate groups with different options (conflict). Drop those.
|
||||
cleaned = []
|
||||
for sel in selections:
|
||||
seen_by_group = {}
|
||||
ok = True
|
||||
for gi, oi, opt in sel:
|
||||
if gi in seen_by_group and seen_by_group[gi] != oi:
|
||||
# same group, different option -> conflict for AND; invalid selection
|
||||
ok = False
|
||||
break
|
||||
seen_by_group[gi] = oi
|
||||
if ok:
|
||||
# ensure stable order and unique by (gi,oi)
|
||||
uniq = {}
|
||||
for gi, oi, opt in sel:
|
||||
uniq[(gi, oi)] = opt
|
||||
cleaned.append([(gi, oi, opt) for (gi, oi), opt in uniq.items()])
|
||||
return [{"fixed": c} for c in cleaned]
|
||||
|
||||
|
||||
def expand_funding_path(input_measures, path_spec):
|
||||
"""
|
||||
path_spec is a list of elements; each element is either:
|
||||
{"OR": [type1, type2, ...], "reference": "..."} or
|
||||
{"AND": [type1, type2, ...], "reference": "..."}
|
||||
We cross-product across elements (all required), and produce selections as lists of (gi, oi, opt).
|
||||
"""
|
||||
selections = [[]] # list[list[(gi,oi,opt)]]
|
||||
for elem in path_spec:
|
||||
if "OR" in elem:
|
||||
cands = iter_or_candidates(input_measures, elem["OR"])
|
||||
elif "AND" in elem:
|
||||
cands = iter_and_candidates(input_measures, elem["AND"])
|
||||
else:
|
||||
raise ValueError("unknown path element; expected 'OR' or 'AND'")
|
||||
|
||||
if not cands:
|
||||
return []
|
||||
|
||||
new_selections = []
|
||||
for base in selections:
|
||||
for cand in cands:
|
||||
# merge base + cand["fixed"], collapsing duplicate same-option picks
|
||||
combined = list(base)
|
||||
# reject if combined picks two different options from the same group
|
||||
groups_to_oi = {(gi,): oi for gi, oi, _ in combined} # temporary; we’ll refactor below
|
||||
conflict = False
|
||||
# simpler: build a dict by group -> (oi, opt), conflict if group exists with different oi
|
||||
gmap = {gi: (oi, opt) for gi, oi, opt in combined}
|
||||
for gi, oi, opt in cand["fixed"]:
|
||||
if gi in gmap:
|
||||
prev_oi, _ = gmap[gi]
|
||||
if prev_oi != oi:
|
||||
conflict = True
|
||||
break
|
||||
gmap[gi] = (oi, opt)
|
||||
if conflict:
|
||||
continue
|
||||
# back to list
|
||||
merged = [(gi, oi, opt) for gi, (oi, opt) in gmap.items()]
|
||||
new_selections.append(merged)
|
||||
selections = new_selections
|
||||
if not selections:
|
||||
return []
|
||||
|
||||
# Final tidy: ensure no duplicate groups with different options (already protected), keep stable ordering
|
||||
deduped = []
|
||||
for sel in selections:
|
||||
gmap = {}
|
||||
for gi, oi, opt in sel:
|
||||
# keep the first occurrence
|
||||
if gi not in gmap:
|
||||
gmap[gi] = (oi, opt)
|
||||
else:
|
||||
# same group, different oi would have been filtered; if same oi, ignore duplicate
|
||||
pass
|
||||
deduped.append([(gi, oi, opt) for gi, (oi, opt) in gmap.items()])
|
||||
return deduped
|
||||
|
||||
|
||||
# ---- tiny utilities ----------------------------------------------------------
|
||||
|
||||
def parse_types(t):
|
||||
# e.g. "external_wall_insulation+mechanical_ventilation" -> {"external_wall_insulation","mechanical_ventilation"}
|
||||
return set(map(str.strip, t.split("+"))) if isinstance(t, str) else set()
|
||||
|
||||
|
||||
def includes_heating(opt_types):
|
||||
return any(x in opt_types for x in {
|
||||
"air_source_heat_pump",
|
||||
"high_heat_retention_storage_heater",
|
||||
"time_temperature_zone_control", # controls count as a heating measure in your pipeline
|
||||
"solar_pv" # you treat PV as heating for funding logic
|
||||
})
|
||||
|
||||
|
||||
def contributes_min_insulation(opt_types):
|
||||
# MIR satisfiers you mentioned (extend as needed)
|
||||
return any(x in opt_types for x in {
|
||||
"external_wall_insulation",
|
||||
"internal_wall_insulation",
|
||||
"loft_insulation",
|
||||
"cavity_wall_insulation",
|
||||
})
|
||||
|
||||
|
||||
def run_optimizer(input_measures, budget=None, sub_target_gain=None, allow_slack=False):
|
||||
"""
|
||||
Thin wrapper over your optimisers.
|
||||
Returns: list[dict] selected_options
|
||||
"""
|
||||
if budget is not None:
|
||||
opt = GainOptimiser(
|
||||
input_measures, max_cost=budget, max_gain=(sub_target_gain or float("inf")),
|
||||
allow_slack=allow_slack
|
||||
)
|
||||
else:
|
||||
if sub_target_gain is None:
|
||||
raise ValueError("Either budget or target_gain must be provided.")
|
||||
opt = CostOptimiser(input_measures, min_gain=sub_target_gain)
|
||||
|
||||
opt.setup()
|
||||
opt.solve()
|
||||
cost = sum([x["cost"] for x in opt.solution])
|
||||
return opt.solution, cost, opt.solution_gain
|
||||
|
||||
|
||||
# ---- Define optimisation paths ----------------------------------------------------------
|
||||
|
||||
def _find_measure(input_measures, measure_type):
|
||||
for measures in input_measures:
|
||||
for m in measures:
|
||||
if measure_type in m["type"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _make_solar_heating_funding_paths(
|
||||
p, input_measures, funding_paths, remaining_insulation_type, housing_type, funding: Funding
|
||||
):
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# Solar PV with existing eligible heating system
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
has_eligible_heating_system = funding.check_solar_eligible_heating_system(
|
||||
mainheat_description=p.main_heating["clean_description"],
|
||||
heating_control_description=p.main_heating_controls["clean_description"]
|
||||
)
|
||||
|
||||
if has_eligible_heating_system and _find_measure(input_measures, "solar_pv"):
|
||||
single_solar_template = [{"AND": ["solar_pv"], "reference": None}]
|
||||
# We now look to pair this with any lingering insulation measures
|
||||
solar_paths = []
|
||||
for insulation_measure in remaining_insulation_type:
|
||||
new_solar_path = deepcopy(single_solar_template)
|
||||
new_solar_path[0]["AND"].append(insulation_measure)
|
||||
# Make a specific reference for this path
|
||||
new_solar_path[0]["reference"] = "solar_pv+" + insulation_measure + ":eco4"
|
||||
solar_paths.append(new_solar_path)
|
||||
|
||||
if solar_paths:
|
||||
funding_paths.extend(solar_paths)
|
||||
else:
|
||||
# If we have no insulation measures, we just add the solar PV path
|
||||
funding_paths.append([{"AND": ["solar_pv"], "reference": "solar_pv:eco4"}])
|
||||
|
||||
# For each of these, because there is a heating measure begin implemented, we check for minimum insulation
|
||||
# requirements.
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# Solar PV + Heating Upgrade combos
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# We don't include electric boilers as they are not eligible for ECO4 funding
|
||||
solar_heating_combos = [
|
||||
("high_heat_retention_storage_heater", "solar_pv+hhrsh:eco4"),
|
||||
("air_source_heat_pump", "solar_pv+ashp:eco4"),
|
||||
]
|
||||
if _find_measure(input_measures, "solar_pv"):
|
||||
for heat_type, ref in solar_heating_combos:
|
||||
if _find_measure(input_measures, heat_type):
|
||||
if remaining_insulation_type:
|
||||
for insulation_measure in remaining_insulation_type:
|
||||
funding_paths.append(
|
||||
[{"AND": ["solar_pv", heat_type, insulation_measure],
|
||||
"reference": f"{ref[:-5]}+{insulation_measure}:eco4"}] # keeps naming consistent
|
||||
)
|
||||
else:
|
||||
funding_paths.append([{"AND": ["solar_pv", heat_type], "reference": ref}])
|
||||
|
||||
# We've actually covered all possible options where solar PV can be included in a funded package, so where
|
||||
# solar PV is not in a reference, we can exclude it
|
||||
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# Heating Upgrades
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# Must have an existing eligible heating system
|
||||
|
||||
# For private, HHRSH alone, or a boiler upgrade is NOT eligible for ECO4 funding. Boiler upgrade also doesn't
|
||||
# count as an eligible heating system
|
||||
if housing_type == "Private":
|
||||
single_heating_measures = ["air_source_heat_pump"]
|
||||
else:
|
||||
single_heating_measures = [
|
||||
"boiler_upgrade", "high_heat_retention_storage_heater", "air_source_heat_pump"
|
||||
]
|
||||
measure_references = {
|
||||
"boiler_upgrade": "boiler_upgrade",
|
||||
"high_heat_retention_storage_heater": "hhrsh",
|
||||
"air_source_heat_pump": "ashp"
|
||||
}
|
||||
for heating_upgrade in single_heating_measures:
|
||||
if _find_measure(input_measures, heating_upgrade):
|
||||
if remaining_insulation_type:
|
||||
for insulation_measure in remaining_insulation_type:
|
||||
path = [
|
||||
{
|
||||
"AND": [heating_upgrade, insulation_measure],
|
||||
"reference": f"{measure_references[heating_upgrade]}+{insulation_measure}:eco4"
|
||||
}
|
||||
]
|
||||
funding_paths.append(path)
|
||||
else:
|
||||
funding_paths.append(
|
||||
[{"AND": [heating_upgrade], "reference": f"{measure_references[heating_upgrade]}:eco4"}]
|
||||
)
|
||||
|
||||
return funding_paths
|
||||
|
||||
|
||||
def _make_generic_gbis_funding_paths(input_gbis_measures, funding_paths):
|
||||
"""
|
||||
For GBIS, the packages are single insulation measure.
|
||||
|
||||
We also have potential GBIS packages that allow heating controls as a secondary measure, however this
|
||||
is not currently implemented in the optimiser due to not being certain about the heating controls pre conditions
|
||||
:param input_gbis_measures:
|
||||
:param funding_paths:
|
||||
:return:
|
||||
"""
|
||||
|
||||
gbis_funding_paths = []
|
||||
for input_measure in input_gbis_measures:
|
||||
for measure in input_measure:
|
||||
# We create a path for each measure
|
||||
gbis_funding_paths.append([{"AND": [measure["type"]], "reference": measure["type"] + ":gbis"}])
|
||||
|
||||
return funding_paths + gbis_funding_paths
|
||||
|
||||
|
||||
def make_funding_paths(p, input_measures, housing_type, funding: Funding):
|
||||
"""
|
||||
This function generates funding paths based on the input measures and the tenure of the property.
|
||||
It checks for the presence of specific measures and creates paths that include necessary insulation measures
|
||||
to meet minimum insulation requirements, particularly when a heating system is recommended.
|
||||
|
||||
Remaining measures that are not fixed as part of the package are then optimised
|
||||
:param p: The property object containing details about the property, including main heating and controls.
|
||||
:param input_measures:
|
||||
:param housing_type:
|
||||
:return:
|
||||
"""
|
||||
# We handle the case of minimum insulation requirements. Whenever we have a heating system recommendation,
|
||||
# we *must* include an additional insulation measure, unless the property already has sufficient insulation.
|
||||
|
||||
# We determine which insulation measures need to be included
|
||||
wall_insulation_measures = [
|
||||
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
||||
"extension_cavity_wall_insulation"
|
||||
]
|
||||
roof_insulation_measures = [
|
||||
"loft_insulation", "flat_roof_insulation", "room_roof_insulation"
|
||||
]
|
||||
other_gbis_insulation_measures = [
|
||||
"suspended_floor_insulation", "solid_floor_insulation",
|
||||
]
|
||||
# These are the insulation measures that the property still needs and so will be considered for
|
||||
# filling the minimum insulation requirements
|
||||
remaining_insulation_type = []
|
||||
for insulation_measure in wall_insulation_measures + roof_insulation_measures:
|
||||
if _find_measure(input_measures, insulation_measure):
|
||||
remaining_insulation_type.append(insulation_measure)
|
||||
|
||||
remaining_insulation_type = list(set(remaining_insulation_type))
|
||||
|
||||
funding_paths = []
|
||||
|
||||
if housing_type == "Social" and p.data["current-energy-rating"] == "D":
|
||||
# If the property is currently EPC D, we can only include innovation measures or measures to meet the
|
||||
# minimum insulation requirements
|
||||
input_measures_innovation = []
|
||||
input_gbis_measures_innovation = []
|
||||
for measures in input_measures:
|
||||
for measure in measures:
|
||||
if measure["innovation_uplift"] or measure["type"] in remaining_insulation_type:
|
||||
input_measures_innovation.append([measure])
|
||||
|
||||
if measure["innovation_uplift"] and measure["type"] in (
|
||||
remaining_insulation_type + other_gbis_insulation_measures
|
||||
):
|
||||
input_gbis_measures_innovation.append([measure])
|
||||
|
||||
funding_paths = _make_solar_heating_funding_paths(
|
||||
p, input_measures_innovation, funding_paths, remaining_insulation_type, housing_type, funding
|
||||
)
|
||||
|
||||
# Can only be innovation GBIS measures
|
||||
funding_paths = _make_generic_gbis_funding_paths(input_gbis_measures_innovation, funding_paths)
|
||||
return funding_paths, input_measures_innovation
|
||||
|
||||
if housing_type == "Private":
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# EWI or IWI
|
||||
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
# 1) The package must include EWI or IWI if the property is private rental sector
|
||||
# We check if we have any EWI or IWI measures available
|
||||
ewi_or_iwi = [{"OR": []}]
|
||||
reference_measures = []
|
||||
# If we have EWI we add it in
|
||||
if _find_measure(input_measures, "external_wall_insulation"):
|
||||
ewi_or_iwi[0]["OR"].append("external_wall_insulation")
|
||||
reference_measures.append("ewi")
|
||||
|
||||
if _find_measure(input_measures, "internal_wall_insulation"):
|
||||
ewi_or_iwi[0]["OR"].append("internal_wall_insulation")
|
||||
reference_measures.append("iwi")
|
||||
|
||||
if ewi_or_iwi[0]["OR"]:
|
||||
ewi_or_iwi[0]["reference"] = "+".join(reference_measures) + ":eco4"
|
||||
funding_paths.append(ewi_or_iwi)
|
||||
|
||||
funding_paths = _make_solar_heating_funding_paths(
|
||||
p, input_measures, funding_paths, remaining_insulation_type, housing_type, funding
|
||||
)
|
||||
|
||||
# If we have any remaining insulation measures, we add them to the funding paths
|
||||
input_gbis_measures = []
|
||||
for measures in input_measures:
|
||||
for measure in measures:
|
||||
if measure["type"] in remaining_insulation_type + other_gbis_insulation_measures:
|
||||
input_gbis_measures.append([measure])
|
||||
|
||||
funding_paths = _make_generic_gbis_funding_paths(input_gbis_measures, funding_paths)
|
||||
|
||||
return funding_paths, input_measures
|
||||
|
|
@ -12,7 +12,7 @@ class TestPrepareInputMeasures:
|
|||
recs = [
|
||||
[ # loft insulation measure
|
||||
{"recommendation_id": "loft1", "type": "loft_insulation", "total": 100, "kwh_savings": 200,
|
||||
"energy_cost_savings": 10, "has_battery": False},
|
||||
"energy_cost_savings": 10, "has_battery": False, "measure_type": "loft_insulation"},
|
||||
],
|
||||
]
|
||||
measures = optimiser_functions.prepare_input_measures(recs, goal="Energy Savings", needs_ventilation=False)
|
||||
|
|
@ -27,9 +27,9 @@ class TestPrepareInputMeasures:
|
|||
["internal_wall_insulation"])
|
||||
recs = [
|
||||
[{"recommendation_id": "wall1", "type": "internal_wall_insulation", "total": 500, "kwh_savings": 300,
|
||||
"energy_cost_savings": 5, "has_battery": False}],
|
||||
"energy_cost_savings": 5, "has_battery": False, "measure_type": "internal_wall_insulation"}],
|
||||
[{"recommendation_id": "vent1", "type": "mechanical_ventilation", "total": 50, "kwh_savings": 30,
|
||||
"energy_cost_savings": 5, "has_battery": False}]
|
||||
"energy_cost_savings": 5, "has_battery": False, "measure_type": "mechanical_ventilation"}],
|
||||
]
|
||||
measures = optimiser_functions.prepare_input_measures(recs, goal="Energy Savings", needs_ventilation=True)
|
||||
wall_option = measures[0][0]
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue