diff --git a/recommendations/tests/test_optimisers.py b/recommendations/tests/test_optimisers.py index a875eb22..bc16eb4a 100644 --- a/recommendations/tests/test_optimisers.py +++ b/recommendations/tests/test_optimisers.py @@ -692,79 +692,6 @@ def make_funding_paths(p, input_measures, tenure): # ---- main wrapper around your optimiser ---------------------------------- -def optimise_with_funding_paths(input_measures, budget=None, target_gain=None, social=False): - """ - run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain) - """ - # TODO: Should be EPC D only that we require only innovation measures - # Social housing: filter to innovation-only before doing anything else - # if social: - # filtered = [] - # for group in input_measures: - # opts = [o for o in group if o.get("is_innovation", False)] - # if opts: - # filtered.append(opts) - # input_measures = filtered - - # Always include a "no funding path" baseline (empty fixed) - all_paths = FUNDING_PATHS + [[]] - - solutions = [] - for path_spec in all_paths: - # 1) expand fixed selections for this path - fixed_selections = expand_funding_path(input_measures, path_spec) if path_spec else [[]] - if not fixed_selections: - continue - - for fixed in fixed_selections: - - # 2) min insulation if heating is already in fixed - fixed_variants = expand_min_insulation_if_needed(input_measures, fixed) - if not fixed_variants: - continue - - for fixed2 in fixed_variants: - # 3) compute fixed cost/gain, and strip those groups from subproblem - fixed_items = [opt for (_, _, opt) in fixed2] - fixed_ids = [opt['id'] for opt in fixed_items] - fixed_cost, fixed_gain = sum_cost_gain(fixed_items) - fixed_groups = {gi for (gi, _, _) in fixed2} - - sub_measures = strip_groups(input_measures, fixed_groups) - - # 4) run your existing optimiser for the remaining groups - # If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which - # may already be over budget) and the fixed gain (which may not be achievable) - picked, sub_cost, sub_gain = run_optimizer( - sub_measures, - budget - fixed_cost if budget is not None else None, - sub_target_gain=target_gain - fixed_gain if target_gain is not None else None - ) - - if picked is None: - continue - - total_cost = fixed_cost + sub_cost - total_gain = fixed_gain + sub_gain - total_picks = fixed_items + picked - - # you can change the objective here; I’ll use max gain under budget - if budget is not None and total_cost > budget + 1e-9: - continue - - solutions.append({ - "fixed_ids": fixed_ids, - "items": total_picks, - "total_cost": total_cost, - "total_gain": total_gain, - "path": path_spec, - }) - - solutions = pd.DataFrame(solutions) - - return solutions - - # Run inputs: target_gain = 18.5 @@ -772,25 +699,108 @@ from itertools import product import math +def violates_min_insulation(fixed): + """Return True if fixed selection includes a heating/PV measure but no required insulation.""" + picked_types = {opt["type"] for (_, _, opt) in fixed} + + def has_any(substrs): + return any(any(s in t for s in substrs) for t in picked_types) + + # heating (incl. PV) flags + is_heating = has_any([ + "air_source_heat_pump", + "high_heat_retention_storage_heater", + "boiler_upgrade", + "electric_boiler", + "time_temperature_zone_control", + "secondary_heating", + "solar_pv", # PV treated as heating for MIR + ]) + + # MIR insulation (the ones you’re using in path construction) + has_insul = has_any([ + "external_wall_insulation", + "internal_wall_insulation", + "cavity_wall_insulation", + "extension_cavity_wall_insulation", + "loft_insulation", + "flat_roof_insulation", + "room_roof_insulation", + ]) + + return is_heating and not has_insul + + +def optimise_with_funding_paths(input_measures, budget=None, target_gain=None, social=False): + """ + run_optimizer(sub_measures, budget, target_gain) -> (picked_options, sub_cost, sub_gain) + """ + + funding_paths = make_funding_paths(p, input_measures, body.housing_type) + + solutions = [] + for path_spec in funding_paths: + # TODO: If the path spec is GBIS, need to handle this differently. There is no funding associated + # with the other measures we're optimising. Instead, we fix the GBIS measure (which is funded) + # and then run the optimiser on the remaining measures which are NOT funded. The key change is all + # measures in input_measures right now have costs adjusted with innovation uplift, which we don't want + # to apply to the GBIS measures. So we need to strip the innovation uplift from the GBIS measures + # 1) expand fixed selections for this path + fixed_selections = expand_funding_path(input_measures, path_spec) if path_spec else [[]] + if not fixed_selections: + continue + + for fixed in fixed_selections: + + if violates_min_insulation(fixed): + # We log an error and skip this - we should not see any errors but we can probably get a reasonable + # outcome for the end user without a complete termination of the process + logger.error("Skipping fixed selection due to minimum insulation violation: %s", fixed) + continue + + # 3) compute fixed cost/gain, and strip those groups from subproblem + fixed_items = [opt for (_, _, opt) in fixed] + fixed_ids = [opt['id'] for opt in fixed_items] + fixed_cost, fixed_gain = sum_cost_gain(fixed_items) + fixed_groups = {gi for (gi, _, _) in fixed} + + sub_measures = [grp for gi, grp in enumerate(input_measures) if gi not in fixed_groups] + + # 4) run your existing optimiser for the remaining groups + # If we have a budget, we need to ensure the subproblem respects it so we remove the fixed cost (which + # may already be over budget) and the fixed gain (which may not be achievable) + picked, sub_cost, sub_gain = run_optimizer( + sub_measures, + budget - fixed_cost if budget is not None else None, + sub_target_gain=target_gain - fixed_gain if target_gain is not None else None + ) + + if picked is None: + continue + + total_cost = fixed_cost + sub_cost + total_gain = fixed_gain + sub_gain + total_picks = fixed_items + picked + + # you can change the objective here; I’ll use max gain under budget + if budget is not None and total_cost > budget + 1e-9: + continue + + solutions.append({ + "fixed_ids": fixed_ids, + "items": total_picks, + "total_cost": total_cost, + "total_gain": total_gain, + "path": path_spec, + }) + + solutions = pd.DataFrame(solutions) + + return solutions + + # ---- helpers ------------------------------------------------------------- -def split_types(t): - # supports "external_wall_insulation+mechanical_ventilation" - return set(part.strip() for part in str(t).split('+')) - - -def group_has_type(group, want): - # group is a list[option], all same 'type' pattern - return any(want in split_types(opt['type']) for opt in group) - - -def find_groups(input_measures, type_name): - return [(gi, g) for gi, g in enumerate(input_measures) if group_has_type(g, type_name)] - - -def strip_groups(input_measures, taken_group_indices): - return [g for gi, g in enumerate(input_measures) if gi not in taken_group_indices] - def sum_cost_gain(items): c = sum(float(x['cost']) for x in items) @@ -799,122 +809,158 @@ def sum_cost_gain(items): # ---- candidate expansion ------------------------------------------------- +def type_matches(option_type: str, required: str) -> bool: + # substring match so "external_wall_insulation+mechanical_ventilation" satisfies "external_wall_insulation" + return required in option_type -def iter_or_candidates(input_measures, type_set): - # collect all groups that match ANY type in type_set - matching = [(gi, group) for gi, group in enumerate(input_measures) - if any(group_has_type(group, t) for t in type_set)] - if not matching: - return # nothing to yield - # choose ONE option from ANY one of these groups - for gi, group in matching: + +def candidates_for_type(input_measures, required_type): + """ + Return a list of (gi, oi, opt) where opt['type'] contains required_type. + gi = group index, oi = option index inside that group. + """ + cands = [] + for gi, group in enumerate(input_measures): for oi, opt in enumerate(group): - yield {"fixed": [(gi, oi, opt)]} + if type_matches(opt["type"], required_type): + cands.append((gi, oi, opt)) + return cands -def iter_and_candidates(input_measures, type_vec): - # type_vec is like [{"types": {"solar_pv"}}, {"types": {"high_heat_retention_storage_heater"}}] - per_leg = [] - for leg in type_vec: - leg_types = leg["types"] - leg_groups = [(gi, group) for gi, group in enumerate(input_measures) - if any(group_has_type(group, t) for t in leg_types)] - if not leg_groups: - return # this AND path isn’t available in this property; skip - # options for this leg: (gi, oi, opt) - options = [] - for gi, group in leg_groups: - for oi, opt in enumerate(group): - options.append((gi, oi, opt)) - per_leg.append(options) - for combo in product(*per_leg): - yield {"fixed": list(combo)} +def iter_or_candidates(input_measures, types_list): + """ + For OR: pick exactly ONE candidate whose type matches ANY in types_list. + Return a list of dicts: {"fixed": [(gi, oi, opt)]} + """ + union = [] + seen_ids = set() + for t in types_list: + for tup in candidates_for_type(input_measures, t): + # de-dupe by the option id so the same physical option (with multi-type name) isn’t repeated + if tup[2]["id"] not in seen_ids: + seen_ids.add(tup[2]["id"]) + union.append(tup) + return [{"fixed": [t]} for t in union] + + +def iter_and_candidates(input_measures, types_list): + """ + For AND: we must cover ALL required types. + We allow a single option to satisfy multiple types. + We build a simple product but collapse duplicates by (gi, oi). + """ + # Build candidate pools per required type + pools = [candidates_for_type(input_measures, t) for t in types_list] + if any(len(pool) == 0 for pool in pools): + return [] # impossible to satisfy AND + + # Start with one empty selection; accumulate per pool + selections = [[]] # each selection is a list of (gi, oi, opt) + for pool in pools: + new_selections = [] + for sel in selections: + for cand in pool: + # Try adding cand; collapse duplicates by (gi,oi) + gi, oi, opt = cand + replaced = False + conflict = False + merged = [] + for (sgi, soi, sopt) in sel: + if (sgi, soi) == (gi, oi): + # same exact option already in selection (satisfies another required type) – keep one + replaced = True + # keep the existing one (identical) + merged.append((sgi, soi, sopt)) + else: + merged.append((sgi, soi, sopt)) + if not replaced: + merged.append(cand) + if not conflict: + new_selections.append(merged) + selections = new_selections + if not selections: + return [] + + # After accumulation, we may still have duplicate groups with different options (conflict). Drop those. + cleaned = [] + for sel in selections: + seen_by_group = {} + ok = True + for gi, oi, opt in sel: + if gi in seen_by_group and seen_by_group[gi] != oi: + # same group, different option -> conflict for AND; invalid selection + ok = False + break + seen_by_group[gi] = oi + if ok: + # ensure stable order and unique by (gi,oi) + uniq = {} + for gi, oi, opt in sel: + uniq[(gi, oi)] = opt + cleaned.append([(gi, oi, opt) for (gi, oi), opt in uniq.items()]) + return [{"fixed": c} for c in cleaned] def expand_funding_path(input_measures, path_spec): - # path_spec is a list of elements; combine all elements (they’re all required) - # Start with one empty selection; then cross-product accumulate - selections = [[]] + """ + path_spec is a list of elements; each element is either: + {"OR": [type1, type2, ...], "reference": "..."} or + {"AND": [type1, type2, ...], "reference": "..."} + We cross-product across elements (all required), and produce selections as lists of (gi, oi, opt). + """ + selections = [[]] # list[list[(gi,oi,opt)]] for elem in path_spec: - new_selections = [] if "OR" in elem: - for cand in iter_or_candidates(input_measures, elem["OR"]["types"]): - for base in selections: - new_selections.append(base + cand["fixed"]) + cands = iter_or_candidates(input_measures, elem["OR"]) elif "AND" in elem: - for cand in iter_and_candidates(input_measures, elem["AND"]): - for base in selections: - new_selections.append(base + cand["fixed"]) + cands = iter_and_candidates(input_measures, elem["AND"]) else: - raise ValueError("unknown path element") + raise ValueError("unknown path element; expected 'OR' or 'AND'") + + if not cands: + return [] + + new_selections = [] + for base in selections: + for cand in cands: + # merge base + cand["fixed"], collapsing duplicate same-option picks + combined = list(base) + # reject if combined picks two different options from the same group + groups_to_oi = {(gi,): oi for gi, oi, _ in combined} # temporary; we’ll refactor below + conflict = False + # simpler: build a dict by group -> (oi, opt), conflict if group exists with different oi + gmap = {gi: (oi, opt) for gi, oi, opt in combined} + for gi, oi, opt in cand["fixed"]: + if gi in gmap: + prev_oi, _ = gmap[gi] + if prev_oi != oi: + conflict = True + break + gmap[gi] = (oi, opt) + if conflict: + continue + # back to list + merged = [(gi, oi, opt) for gi, (oi, opt) in gmap.items()] + new_selections.append(merged) selections = new_selections if not selections: - break - # selections are lists of (gi, oi, opt) - # dedupe by group index (if users set a weird path that hits same group twice) + return [] + + # Final tidy: ensure no duplicate groups with different options (already protected), keep stable ordering deduped = [] for sel in selections: - seen = set() - clean = [] - ok = True + gmap = {} for gi, oi, opt in sel: - if gi in seen: - ok = False - break - seen.add(gi) - clean.append((gi, oi, opt)) - if ok: - deduped.append(clean) + # keep the first occurrence + if gi not in gmap: + gmap[gi] = (oi, opt) + else: + # same group, different oi would have been filtered; if same oi, ignore duplicate + pass + deduped.append([(gi, oi, opt) for gi, (oi, opt) in gmap.items()]) return deduped -# ---- minimum insulation handling ---------------------------------------- - -def expand_min_insulation_if_needed(input_measures, fixed_selection): - # If fixed contains any HEATING_TYPES, we must also include at least one of MIN_INSULATION_OR groups. - fixed_types = set() - fixed_group_idx = {gi for gi, _, _ in fixed_selection} - for _, _, opt in fixed_selection: - fixed_types |= split_types(opt['type']) - - if not (fixed_types & HEATING_TYPES): - # BUT: heating might later be picked by optimiser… If you want to be strict, - # you can also add a *feasibility check* after optimisation and reject combos - # that pick heating without min insulation. For now we enforce only when - # already in fixed set. - return [fixed_selection] - - # Build OR candidates for required insulation, but exclude groups already fixed - or_pool = [] - for alt in MIN_INSULATION_OR: - types = alt - matches = [] - for gi, group in enumerate(input_measures): - if gi in fixed_group_idx: - continue - if any(group_has_type(group, t) for t in types): - for oi, opt in enumerate(group): - matches.append((gi, oi, opt)) - if not matches: - # No feasible insulation to satisfy the rule -> invalidate this branch - return [] - or_pool.append(matches) - - # choose one from any of the alt sets (if you have more than one OR bucket, pick one from at least one; - # simplest: union first OR bucket only — or take the union and pick one) - # Here we’ll take the union across all buckets then pick exactly one. - union = {(gi, oi): (gi, oi, opt) - for matches in or_pool for (gi, oi, opt) in matches}.values() - - expanded = [] - for gi, oi, opt in union: - # avoid duplicating the same group as fixed - if gi in fixed_group_idx: - continue - expanded.append(fixed_selection + [(gi, oi, opt)]) - return expanded - - # ---- tiny utilities ---------------------------------------------------------- def parse_types(t):