optimisation process wip

This commit is contained in:
Khalim Conn-Kowlessar 2025-08-12 22:48:54 +01:00
parent f9f991c58b
commit bd3795eead

View file

@ -692,79 +692,6 @@ def make_funding_paths(p, input_measures, tenure):
# ---- main wrapper around your optimiser ----------------------------------
def optimise_with_funding_paths(input_measures, budget=None, target_gain=None, social=False):
"""
run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain)
"""
# TODO: Should be EPC D only that we require only innovation measures
# Social housing: filter to innovation-only before doing anything else
# if social:
# filtered = []
# for group in input_measures:
# opts = [o for o in group if o.get("is_innovation", False)]
# if opts:
# filtered.append(opts)
# input_measures = filtered
# Always include a "no funding path" baseline (empty fixed)
all_paths = FUNDING_PATHS + [[]]
solutions = []
for path_spec in all_paths:
# 1) expand fixed selections for this path
fixed_selections = expand_funding_path(input_measures, path_spec) if path_spec else [[]]
if not fixed_selections:
continue
for fixed in fixed_selections:
# 2) min insulation if heating is already in fixed
fixed_variants = expand_min_insulation_if_needed(input_measures, fixed)
if not fixed_variants:
continue
for fixed2 in fixed_variants:
# 3) compute fixed cost/gain, and strip those groups from subproblem
fixed_items = [opt for (_, _, opt) in fixed2]
fixed_ids = [opt['id'] for opt in fixed_items]
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
fixed_groups = {gi for (gi, _, _) in fixed2}
sub_measures = strip_groups(input_measures, fixed_groups)
# 4) run your existing optimiser for the remaining groups
# If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which
# may already be over budget) and the fixed gain (which may not be achievable)
picked, sub_cost, sub_gain = run_optimizer(
sub_measures,
budget - fixed_cost if budget is not None else None,
sub_target_gain=target_gain - fixed_gain if target_gain is not None else None
)
if picked is None:
continue
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
total_picks = fixed_items + picked
# you can change the objective here; Ill use max gain under budget
if budget is not None and total_cost > budget + 1e-9:
continue
solutions.append({
"fixed_ids": fixed_ids,
"items": total_picks,
"total_cost": total_cost,
"total_gain": total_gain,
"path": path_spec,
})
solutions = pd.DataFrame(solutions)
return solutions
# Run inputs:
target_gain = 18.5
@ -772,25 +699,108 @@ from itertools import product
import math
def violates_min_insulation(fixed):
"""Return True if fixed selection includes a heating/PV measure but no required insulation."""
picked_types = {opt["type"] for (_, _, opt) in fixed}
def has_any(substrs):
return any(any(s in t for s in substrs) for t in picked_types)
# heating (incl. PV) flags
is_heating = has_any([
"air_source_heat_pump",
"high_heat_retention_storage_heater",
"boiler_upgrade",
"electric_boiler",
"time_temperature_zone_control",
"secondary_heating",
"solar_pv", # PV treated as heating for MIR
])
# MIR insulation (the ones youre using in path construction)
has_insul = has_any([
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"extension_cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
])
return is_heating and not has_insul
def optimise_with_funding_paths(input_measures, budget=None, target_gain=None, social=False):
"""
run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain)
"""
funding_paths = make_funding_paths(p, input_measures, body.housing_type)
solutions = []
for path_spec in funding_paths:
# TODO: If the path spec is GBIS, need to handle this differently. There is no funding associated
# with the other measures we're optimising. Instead, we fix the GBIS measure (which is funded)
# and then run the optimiser on the remaining measures which are NOT funded. The key change is all
# measures in input_measures right now have costs adjusted with innovation uplift, which we don't want
# to apply to the GBIS measures. So we need to strip the innovation uplift from the GBIS measures
# 1) expand fixed selections for this path
fixed_selections = expand_funding_path(input_measures, path_spec) if path_spec else [[]]
if not fixed_selections:
continue
for fixed in fixed_selections:
if violates_min_insulation(fixed):
# We log an error and skip this - we should not see any errors but we can probably get a reasonable
# outcome for the end user without a complete termination of the process
logger.error("Skipping fixed selection due to minimum insulation violation: %s", fixed)
continue
# 3) compute fixed cost/gain, and strip those groups from subproblem
fixed_items = [opt for (_, _, opt) in fixed]
fixed_ids = [opt['id'] for opt in fixed_items]
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
fixed_groups = {gi for (gi, _, _) in fixed}
sub_measures = [grp for gi, grp in enumerate(input_measures) if gi not in fixed_groups]
# 4) run your existing optimiser for the remaining groups
# If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which
# may already be over budget) and the fixed gain (which may not be achievable)
picked, sub_cost, sub_gain = run_optimizer(
sub_measures,
budget - fixed_cost if budget is not None else None,
sub_target_gain=target_gain - fixed_gain if target_gain is not None else None
)
if picked is None:
continue
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
total_picks = fixed_items + picked
# you can change the objective here; Ill use max gain under budget
if budget is not None and total_cost > budget + 1e-9:
continue
solutions.append({
"fixed_ids": fixed_ids,
"items": total_picks,
"total_cost": total_cost,
"total_gain": total_gain,
"path": path_spec,
})
solutions = pd.DataFrame(solutions)
return solutions
# ---- helpers -------------------------------------------------------------
def split_types(t):
# supports "external_wall_insulation+mechanical_ventilation"
return set(part.strip() for part in str(t).split('+'))
def group_has_type(group, want):
# group is a list[option], all same 'type' pattern
return any(want in split_types(opt['type']) for opt in group)
def find_groups(input_measures, type_name):
return [(gi, g) for gi, g in enumerate(input_measures) if group_has_type(g, type_name)]
def strip_groups(input_measures, taken_group_indices):
return [g for gi, g in enumerate(input_measures) if gi not in taken_group_indices]
def sum_cost_gain(items):
c = sum(float(x['cost']) for x in items)
@ -799,122 +809,158 @@ def sum_cost_gain(items):
# ---- candidate expansion -------------------------------------------------
def type_matches(option_type: str, required: str) -> bool:
# substring match so "external_wall_insulation+mechanical_ventilation" satisfies "external_wall_insulation"
return required in option_type
def iter_or_candidates(input_measures, type_set):
# collect all groups that match ANY type in type_set
matching = [(gi, group) for gi, group in enumerate(input_measures)
if any(group_has_type(group, t) for t in type_set)]
if not matching:
return # nothing to yield
# choose ONE option from ANY one of these groups
for gi, group in matching:
def candidates_for_type(input_measures, required_type):
"""
Return a list of (gi, oi, opt) where opt['type'] contains required_type.
gi = group index, oi = option index inside that group.
"""
cands = []
for gi, group in enumerate(input_measures):
for oi, opt in enumerate(group):
yield {"fixed": [(gi, oi, opt)]}
if type_matches(opt["type"], required_type):
cands.append((gi, oi, opt))
return cands
def iter_and_candidates(input_measures, type_vec):
# type_vec is like [{"types": {"solar_pv"}}, {"types": {"high_heat_retention_storage_heater"}}]
per_leg = []
for leg in type_vec:
leg_types = leg["types"]
leg_groups = [(gi, group) for gi, group in enumerate(input_measures)
if any(group_has_type(group, t) for t in leg_types)]
if not leg_groups:
return # this AND path isnt available in this property; skip
# options for this leg: (gi, oi, opt)
options = []
for gi, group in leg_groups:
for oi, opt in enumerate(group):
options.append((gi, oi, opt))
per_leg.append(options)
for combo in product(*per_leg):
yield {"fixed": list(combo)}
def iter_or_candidates(input_measures, types_list):
"""
For OR: pick exactly ONE candidate whose type matches ANY in types_list.
Return a list of dicts: {"fixed": [(gi, oi, opt)]}
"""
union = []
seen_ids = set()
for t in types_list:
for tup in candidates_for_type(input_measures, t):
# de-dupe by the option id so the same physical option (with multi-type name) isnt repeated
if tup[2]["id"] not in seen_ids:
seen_ids.add(tup[2]["id"])
union.append(tup)
return [{"fixed": [t]} for t in union]
def iter_and_candidates(input_measures, types_list):
"""
For AND: we must cover ALL required types.
We allow a single option to satisfy multiple types.
We build a simple product but collapse duplicates by (gi, oi).
"""
# Build candidate pools per required type
pools = [candidates_for_type(input_measures, t) for t in types_list]
if any(len(pool) == 0 for pool in pools):
return [] # impossible to satisfy AND
# Start with one empty selection; accumulate per pool
selections = [[]] # each selection is a list of (gi, oi, opt)
for pool in pools:
new_selections = []
for sel in selections:
for cand in pool:
# Try adding cand; collapse duplicates by (gi,oi)
gi, oi, opt = cand
replaced = False
conflict = False
merged = []
for (sgi, soi, sopt) in sel:
if (sgi, soi) == (gi, oi):
# same exact option already in selection (satisfies another required type) keep one
replaced = True
# keep the existing one (identical)
merged.append((sgi, soi, sopt))
else:
merged.append((sgi, soi, sopt))
if not replaced:
merged.append(cand)
if not conflict:
new_selections.append(merged)
selections = new_selections
if not selections:
return []
# After accumulation, we may still have duplicate groups with different options (conflict). Drop those.
cleaned = []
for sel in selections:
seen_by_group = {}
ok = True
for gi, oi, opt in sel:
if gi in seen_by_group and seen_by_group[gi] != oi:
# same group, different option -> conflict for AND; invalid selection
ok = False
break
seen_by_group[gi] = oi
if ok:
# ensure stable order and unique by (gi,oi)
uniq = {}
for gi, oi, opt in sel:
uniq[(gi, oi)] = opt
cleaned.append([(gi, oi, opt) for (gi, oi), opt in uniq.items()])
return [{"fixed": c} for c in cleaned]
def expand_funding_path(input_measures, path_spec):
# path_spec is a list of elements; combine all elements (theyre all required)
# Start with one empty selection; then cross-product accumulate
selections = [[]]
"""
path_spec is a list of elements; each element is either:
{"OR": [type1, type2, ...], "reference": "..."} or
{"AND": [type1, type2, ...], "reference": "..."}
We cross-product across elements (all required), and produce selections as lists of (gi, oi, opt).
"""
selections = [[]] # list[list[(gi,oi,opt)]]
for elem in path_spec:
new_selections = []
if "OR" in elem:
for cand in iter_or_candidates(input_measures, elem["OR"]["types"]):
for base in selections:
new_selections.append(base + cand["fixed"])
cands = iter_or_candidates(input_measures, elem["OR"])
elif "AND" in elem:
for cand in iter_and_candidates(input_measures, elem["AND"]):
for base in selections:
new_selections.append(base + cand["fixed"])
cands = iter_and_candidates(input_measures, elem["AND"])
else:
raise ValueError("unknown path element")
raise ValueError("unknown path element; expected 'OR' or 'AND'")
if not cands:
return []
new_selections = []
for base in selections:
for cand in cands:
# merge base + cand["fixed"], collapsing duplicate same-option picks
combined = list(base)
# reject if combined picks two different options from the same group
groups_to_oi = {(gi,): oi for gi, oi, _ in combined} # temporary; well refactor below
conflict = False
# simpler: build a dict by group -> (oi, opt), conflict if group exists with different oi
gmap = {gi: (oi, opt) for gi, oi, opt in combined}
for gi, oi, opt in cand["fixed"]:
if gi in gmap:
prev_oi, _ = gmap[gi]
if prev_oi != oi:
conflict = True
break
gmap[gi] = (oi, opt)
if conflict:
continue
# back to list
merged = [(gi, oi, opt) for gi, (oi, opt) in gmap.items()]
new_selections.append(merged)
selections = new_selections
if not selections:
break
# selections are lists of (gi, oi, opt)
# dedupe by group index (if users set a weird path that hits same group twice)
return []
# Final tidy: ensure no duplicate groups with different options (already protected), keep stable ordering
deduped = []
for sel in selections:
seen = set()
clean = []
ok = True
gmap = {}
for gi, oi, opt in sel:
if gi in seen:
ok = False
break
seen.add(gi)
clean.append((gi, oi, opt))
if ok:
deduped.append(clean)
# keep the first occurrence
if gi not in gmap:
gmap[gi] = (oi, opt)
else:
# same group, different oi would have been filtered; if same oi, ignore duplicate
pass
deduped.append([(gi, oi, opt) for gi, (oi, opt) in gmap.items()])
return deduped
# ---- minimum insulation handling ----------------------------------------
def expand_min_insulation_if_needed(input_measures, fixed_selection):
# If fixed contains any HEATING_TYPES, we must also include at least one of MIN_INSULATION_OR groups.
fixed_types = set()
fixed_group_idx = {gi for gi, _, _ in fixed_selection}
for _, _, opt in fixed_selection:
fixed_types |= split_types(opt['type'])
if not (fixed_types & HEATING_TYPES):
# BUT: heating might later be picked by optimiser… If you want to be strict,
# you can also add a *feasibility check* after optimisation and reject combos
# that pick heating without min insulation. For now we enforce only when
# already in fixed set.
return [fixed_selection]
# Build OR candidates for required insulation, but exclude groups already fixed
or_pool = []
for alt in MIN_INSULATION_OR:
types = alt
matches = []
for gi, group in enumerate(input_measures):
if gi in fixed_group_idx:
continue
if any(group_has_type(group, t) for t in types):
for oi, opt in enumerate(group):
matches.append((gi, oi, opt))
if not matches:
# No feasible insulation to satisfy the rule -> invalidate this branch
return []
or_pool.append(matches)
# choose one from any of the alt sets (if you have more than one OR bucket, pick one from at least one;
# simplest: union first OR bucket only — or take the union and pick one)
# Here well take the union across all buckets then pick exactly one.
union = {(gi, oi): (gi, oi, opt)
for matches in or_pool for (gi, oi, opt) in matches}.values()
expanded = []
for gi, oi, opt in union:
# avoid duplicating the same group as fixed
if gi in fixed_group_idx:
continue
expanded.append(fixed_selection + [(gi, oi, opt)])
return expanded
# ---- tiny utilities ----------------------------------------------------------
def parse_types(t):