From 5edc8b691f0bfe312d7e23453de1f6c78883e699 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 31 Jul 2025 19:46:53 +0100 Subject: [PATCH] refactored optimisation by adding helper functions --- backend/engine/engine.py | 188 +++--------- .../optimiser/optimiser_functions.py | 281 ++++++++++++++++-- 2 files changed, 300 insertions(+), 169 deletions(-) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index db3b6d0d..032ca0b0 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -26,7 +26,7 @@ from backend.app.db.functions.energy_assessment_functions import get_latest_asse from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import get_cleaned -from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc +from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions from backend.ml_models.api import ModelApi @@ -35,7 +35,7 @@ from backend.apis.GoogleSolarApi import GoogleSolarApi from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser -from recommendations.optimiser.optimiser_functions import prepare_input_measures +import recommendations.optimiser.optimiser_functions as optimiser_functions from recommendations.Recommendations import Recommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 @@ -798,165 +798,59 @@ async def model_engine(body: PlanTriggerRequest): # we need to double unlist because we have a list of lists property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} + property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] + measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] - property_required_measures = [ - m for m in recommendations[p.id] if m[0]["type"] in body.required_measures - ] - measures_to_optimise = [ - m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures - ] - - # If we have a wall insulation measure, we MUST include mechanical ventilation - # Additionally, if we have required measures, they should also be included. Therefore - # we can discount the number of points required to get to the target SAP band (or increase) - # in the case of ventilation + # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore + # its inclusion needs_ventilation = any( x in property_measure_types for x in assumptions.measures_needing_ventilation ) and not p.has_ventilation - input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation) + input_measures = optimiser_functions.prepare_input_measures( + measures_to_optimise, body.goal, needs_ventilation + ) if not input_measures[0]: - # This means that we have no defaults - selected_recommendations = {} - solution = [] + # Nothing to do, we just reshape the recommendations + recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( + p.id, recommendations, set() + ) + continue + + fixed_gain = optimiser_functions.calculate_fixed_gain( + property_required_measures, recommendations, p, needs_ventilation + ) + gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain) + + if not body.optimise: + if body.goal != "Increasing EPC": + raise NotImplementedError("Only EPC optimisation is currently supported") + solution = [max(sub_list, key=lambda x: (x['gain'], -x['cost'])) for sub_list in input_measures] else: + optimiser = ( + GainOptimiser( + input_measures, max_cost=body.budget, max_gain=gain, allow_slack=body.goal == "Increasing EPC" + ) if body.budget else CostOptimiser(input_measures, min_gain=gain) + ) + optimiser.setup() + optimiser.solve() + solution = optimiser.solution - fixed_gain = 0 - if property_required_measures: - # We get the SAP points for the required measures - if body.goal != "Increasing EPC": - raise NotImplementedError("Only EPC optimisation is currently supported") - sap_by_type = [ - {"type": rec["type"], "sap_points": rec["sap_points"]} for recs in property_required_measures - for rec in recs - ] - # We get a MAX sap points per type - max_per_type = ( - pd.DataFrame(sap_by_type).groupby("type")["sap_points"].max().to_dict() - ) - fixed_gain = sum(max_per_type.values()) - - property_required_measure_types = {rec["type"] for rec in sap_by_type} - - # if the property needs ventilation, but the measure we optimise didn't include - # venilation we add the points for ventilation as a fixed gain - if needs_ventilation and any( - r in property_required_measure_types for r in assumptions.measures_needing_ventilation - ): - fixed_gain += next( - (r[0]["sap_points"] for r in recommendations[p.id] if - r[0]["type"] == "mechanical_ventilation"), - 0 - ) - - if body.goal == "Increasing EPC": - current_sap_points = int(p.data["current-energy-efficiency"]) - gain = CostOptimiser.calculate_sap_gain_with_slack( - epc_to_sap_lower_bound(body.goal_value) - current_sap_points - ) - fixed_gain - if body.simulate_sap_10: - # We add 3 additional SAP points to the required gain to account for SAP 10 - gain += 3 - - gain = gain if gain > 0 else 0 - elif body.goal in ["Energy Savings", "Reducing CO2 emissions"]: - # We will aim to maximise these goals, while constaining by budget - gain = None - else: - raise NotImplementedError(f"Goal {body.goal} is not supported") - - if not body.optimise: - if body.goal != "Increasing EPC": - raise NotImplementedError("Only EPC optimisation is currently supported") - solution = [] - for sub_list in input_measures: - # Select the entry with the highest gain, and if tied, choose the one with the lowest cost - best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost'])) - solution.append(best_measure) - else: - - if body.budget: - optimiser = GainOptimiser( - input_measures, max_cost=body.budget, max_gain=gain, - allow_slack=body.goal == "Increasing EPC" - ) - else: - # The minimum gain is the minimum number of SAP points required to get to the target SAP band - # If the gain is negative, the optimiser will return an empty solution - optimiser = CostOptimiser( - input_measures, - min_gain=gain - ) - - optimiser.setup() - optimiser.solve() - solution = optimiser.solution - - selected_recommendations = {r["id"] for r in solution} + selected = {r["id"] for r in solution} if property_required_measures: - # We select the cheapest of the required measures, into selected - for recs in property_required_measures: - # We select the cheapest of the required measures - cost_to_id = { - rec["recommendation_id"]: rec["total"] for rec in recs - if rec["recommendation_id"] not in selected_recommendations - } - # Take the recommendation id with the lowers cost - - selected_recommendations.add(min(cost_to_id, key=cost_to_id.get)) - # Update the solution with the selected recommendaitons - solution = [] - for recs in recommendations[p.id]: - for rec in recs: - if rec["recommendation_id"] in selected_recommendations: - solution.append( - { - "id": rec["recommendation_id"], - "cost": rec["total"], - "gain": rec["sap_points"], - "type": rec["type"] - } - ) - - # If wall insulation is selected, we also include mechanical ventilation as a best practice measure - ventilation_selected = [ - r for r in solution if "+mechanical_ventilation" in r["type"] - ] - if (any(x in [r["type"] for r in solution] for x in assumptions.measures_needing_ventilation) or - len(ventilation_selected)): - ventilation_rec = next( - (r[0] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"), - None + solution = optimiser_functions.add_required_measures( + property_id=p.id, property_required_measures=property_required_measures, + recommendations=recommendations, selected=selected, ) - # If a matching recommendation was found, add its ID to the selected recommendations - if ventilation_rec: - selected_recommendations.add(ventilation_rec["recommendation_id"]) - - # If we have a trickle vents recommendation, we also switch it on. We don't just check the solution - trickle_vents_rec = next( - (r[0] for r in recommendations[p.id] if r[0]["type"] == "trickle_vents"), - None + # Add best practice measures (ventilation/trickle vents) + selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) + # Final flattening + recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( + p.id, recommendations, selected ) - # If a matching recommendation was found, add its ID to the selected recommendations - if trickle_vents_rec: - selected_recommendations.add(trickle_vents_rec["recommendation_id"]) - - # We'll use the set of selected recommendations to filter the recommendations to upload - final_recommendations = [ - [ - {**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False} - for rec in recommendations_by_type - ] - for recommendations_by_type in recommendations[p.id] - ] - - # We'll also unlist the recommendations so they're a bit easier to handle from here onwards - recommendations[p.id] = [ - rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type - ] # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all # of them diff --git a/recommendations/optimiser/optimiser_functions.py b/recommendations/optimiser/optimiser_functions.py index 45e04a1f..5fd6e0ee 100644 --- a/recommendations/optimiser/optimiser_functions.py +++ b/recommendations/optimiser/optimiser_functions.py @@ -1,15 +1,44 @@ +import pandas as pd import backend.app.assumptions as assumptions +from Property import Property +from app.plan.schemas import PlanTriggerRequest +from backend.app.utils import epc_to_sap_lower_bound +from recommendations.optimiser.CostOptimiser import CostOptimiser def prepare_input_measures(property_recommendations, goal, needs_ventilation): """ - Basic function to convert recommendations_to_upload to a format that is - suitable for the optimiser - large - :param property_recommendations: object containing the recommendations, created in the plan trigger api - :param goal: goal to be optimised for, should be one of the keys in gain_map. E.g. if the gain is SAP points, - the goal should reflect that desired gain - :param needs_ventilation: boolean to indicate if the property needs ventilation - :return: Nested list of input measures + Prepares a nested list of measure options for optimisation. + + Each sublist represents all available variants of a single measure type (e.g. all solar PV options). + Within each sublist, each measure is represented as a dictionary containing: + - id: unique recommendation identifier + - cost: total cost of the measure (including ventilation if bundled) + - gain: the relevant gain metric based on the selected goal + - type: the measure type, optionally combined with ventilation (e.g. "wall_insulation+mechanical_ventilation") + + Ventilation bundling: + - If a property needs ventilation, and a measure type requires it (as defined in + assumptions.measures_needing_ventilation), + the ventilation cost and gain are added to that measure’s values. + + Filtering: + - Measures with negative `energy_cost_savings` are excluded. + - Solar PV options with batteries are excluded (currently handled by a placeholder bitwise NOT). + + Parameters + ---------- + property_recommendations : list[list[dict]] + Nested list of recommendations for a property. Each inner list represents variations of the same measure type. + goal : str + Optimisation goal, one of: "Increasing EPC", "Energy Savings", "Reducing CO2 emissions". + needs_ventilation : bool + Whether the property requires mechanical ventilation to accompany certain measures. + + Returns + ------- + list[list[dict]] + Nested list of prepared measure options, ready for input into the optimiser. """ goal_map = { @@ -22,7 +51,6 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation): if not goal_key: raise NotImplementedError("Not implemented this gain type - investigate me") - # We ony ever have one ventilation measure with now ventilation_recommendation = next( (measure[0] for measure in property_recommendations if measure[0]["type"] == "mechanical_ventilation"), {} @@ -31,22 +59,22 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation): input_measures = [] for recs in property_recommendations: + # Skip ventilation as a standalone optimisation option (it will be bundled) if needs_ventilation and recs[0]["type"] == "mechanical_ventilation": - # If we house needs ventilation, ventilation will be packaged with the fabric measure so - # we don't need to optimise it independently continue + # Filter out solar PV with batteries if recs[0]["type"] == "solar_pv": - # if the recommendation is a solar recommendation with a battery, we exclude it from the optimisation. recs = [r for r in recs if ~r["has_battery"]] + # Only include measures with non-negative cost savings recs_to_append = [rec for rec in recs if rec["energy_cost_savings"] >= 0] if not recs_to_append: continue + # Build enriched measure data to_append = [] for rec in recs: - # We bundle the impact of ventilation with the measure total = ( rec["total"] + ventilation_recommendation["total"] if rec["type"] in assumptions.measures_needing_ventilation and needs_ventilation @@ -57,23 +85,232 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation): if rec["type"] in assumptions.measures_needing_ventilation and needs_ventilation else rec[goal_key] ) - rec_type = ( - "+".join( - [rec["type"], ventilation_recommendation["type"]] - ) if rec["type"] in assumptions.measures_needing_ventilation and needs_ventilation + f"{rec['type']}+{ventilation_recommendation['type']}" + if rec["type"] in assumptions.measures_needing_ventilation and needs_ventilation else rec["type"] ) to_append.append( - { - "id": rec["recommendation_id"], - "cost": total, - "gain": gain, - "type": rec_type - } + {"id": rec["recommendation_id"], "cost": total, "gain": gain, "type": rec_type} ) input_measures.append(to_append) return input_measures + + +def calculate_fixed_gain(property_required_measures, recommendations, p, needs_ventilation): + """ + Calculates the total "fixed gain" from required measures for a property. + + Required measures are applied regardless of optimisation. This function: + - Finds the maximum SAP points for each required measure type. + - Sums those max SAP values into a fixed gain total. + - Adds the SAP points for mechanical ventilation if: + * The property needs ventilation, and + * At least one required measure needs ventilation. + + Parameters + ---------- + property_required_measures : list[list[dict]] + Nested list of required measures for the property. + recommendations : dict + All recommendations for all properties, keyed by property id. + p : object + Property object (must have .id). + needs_ventilation : bool + Whether ventilation should be bundled with certain measures. + + Returns + ------- + float + Total fixed SAP gain from required measures (and ventilation, if applicable). + """ + if not property_required_measures: + return 0 + + sap_by_type = [ + {"type": rec["type"], "sap_points": rec["sap_points"]} + for recs in property_required_measures for rec in recs + ] + + max_per_type = pd.DataFrame(sap_by_type).groupby("type")["sap_points"].max().to_dict() + fixed_gain = sum(max_per_type.values()) + + required_types = {rec["type"] for rec in sap_by_type} + if needs_ventilation and any(r in required_types for r in assumptions.measures_needing_ventilation): + fixed_gain += next( + (r[0]["sap_points"] for r in recommendations[p.id] if r[0]["type"] == "mechanical_ventilation"), + 0 + ) + + return fixed_gain + + +def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) -> float | None: + """ + Calculates the target gain value for optimisation based on the goal. + + - For "Increasing EPC": Computes the SAP gain needed to reach the target EPC, + applies a slack adjustment (via CostOptimiser), and subtracts fixed gains from required measures. + - For "Energy Savings" or "Reducing CO2 emissions": Returns None, + which signals the optimiser to simply maximise gain under a budget. + + Parameters + ---------- + body : object + Request body object containing optimisation settings (goal, goal_value, simulate_sap_10, etc.) + p : object + Property object with EPC data (must have p.data["current-energy-efficiency"]). + fixed_gain : float + Total fixed gain from required measures (returned by calculate_fixed_gain). + + Returns + ------- + float or None + Required SAP gain for EPC, or None for non-EPC goals. + """ + if body.goal == "Increasing EPC": + current_sap = int(p.data["current-energy-efficiency"]) + gain = CostOptimiser.calculate_sap_gain_with_slack( + epc_to_sap_lower_bound(body.goal_value) - current_sap + ) - fixed_gain + if body.simulate_sap_10: + gain += 3 + return max(gain, 0) + elif body.goal in ["Energy Savings", "Reducing CO2 emissions"]: + return None + else: + raise NotImplementedError(f"Goal {body.goal} is not supported") + + +def add_required_measures(property_id, property_required_measures, recommendations, selected): + """ + Ensures the cheapest variant of each required measure is added to the selected recommendations. + + For each required measure type, this function: + - Finds the lowest-cost variant not already selected. + - Adds it to the selected recommendation IDs. + - Returns a flattened list of all selected measure details for final output. + + Parameters + ---------- + property_id : int + Unique identifier for the property. + property_required_measures : list[list[dict]] + Nested list of required measures for the property. + recommendations : dict + All recommendations for all properties, keyed by property id. + selected : set + Set of already selected recommendation IDs from the optimiser. + + Returns + ------- + list[dict] + Flat list of selected measure details, each containing: + {"id", "cost", "gain", "type"} + """ + for recs in property_required_measures: + cheapest = min( + (rec for rec in recs if rec["recommendation_id"] not in selected), + key=lambda rec: rec["total"], + ) + selected.add(cheapest["recommendation_id"]) + + return [ + {"id": rec["recommendation_id"], "cost": rec["total"], "gain": rec["sap_points"], "type": rec["type"]} + for recs in recommendations[property_id] for rec in recs + if rec["recommendation_id"] in selected + ] + + +def add_best_practice_measures(property_id, solution, recommendations, selected): + """ + Ensures best-practice measures like ventilation and trickle vents are included + in the selected recommendations when appropriate. + + Rules: + - If a measure requiring ventilation is selected AND ventilation is not already present, + add the corresponding mechanical ventilation recommendation. + - Always add trickle vents if they exist in the recommendations. + + Parameters + ---------- + property_id : int + The unique identifier for the property. + solution : list[dict] + The current list of selected measures (each containing id, type, gain, cost). + recommendations : dict + All recommendations for all properties, keyed by property id. + selected : set + Set of already selected recommendation IDs. + + Returns + ------- + set + Updated set of selected recommendation IDs, including ventilation and trickle vents if applicable. + """ + # Check if any selected measure requires ventilation + ventilation_selected = [r for r in solution if "+mechanical_ventilation" in r["type"]] + + # If ventilation has been selected, or one of the measures needs ventilation, we need to ensure ventilation is + # included + needs_ventilation = any( + x in [r["type"] for r in solution] for x in assumptions.measures_needing_ventilation + ) or len(ventilation_selected) > 0 + + if needs_ventilation: + ventilation_rec = next( + (r[0] for r in recommendations[property_id] if r[0]["type"] == "mechanical_ventilation"), + None + ) + if ventilation_rec: + selected.add(ventilation_rec["recommendation_id"]) + + # Always add trickle vents if available + trickle_vents_rec = next( + (r[0] for r in recommendations[property_id] if r[0]["type"] == "trickle_vents"), + None + ) + if trickle_vents_rec: + selected.add(trickle_vents_rec["recommendation_id"]) + + return selected + + +def flatten_recommendations_with_defaults(property_id, recommendations, selected): + """ + Flattens nested recommendation lists for a property and marks which + recommendations were selected. + + Each recommendation dict is copied and an extra key `default` is added: + - True if the recommendation ID is in `selected` + - False otherwise + + Parameters + ---------- + property_id : int + The unique identifier for the property. + recommendations : dict + All recommendations for all properties, keyed by property id. + Each value is a list of lists (grouped by measure type). + selected : set + Set of selected recommendation IDs. + + Returns + ------- + list[dict] + A flattened list of recommendation dicts for the given property, + each with an added `default` field. + """ + final_recommendations = [ + [ + {**rec, "default": rec["recommendation_id"] in selected} + for rec in recommendations_by_type + ] + for recommendations_by_type in recommendations[property_id] + ] + + # Flatten the nested list of lists into a single list + return [rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type]