diff --git a/backend/categorisation/processor.py b/backend/categorisation/processor.py index 02116d61..184ccac2 100644 --- a/backend/categorisation/processor.py +++ b/backend/categorisation/processor.py @@ -1,6 +1,8 @@ from collections import defaultdict from typing import Dict, List, Optional +from sqlalchemy import Tuple + from backend.app.db.functions.recommendations_functions import ( bulk_update_plans, get_plans_by_portfolio_id, @@ -33,20 +35,43 @@ def process_portfolio( def choose_cheapest_relevant_plan( - plans: List[Plan], plan_priority_order: Optional[List[int]] = [] + plans: List[Plan], plan_priority_order: Optional[List[int]] = None ) -> Plan: - plans_to_consider: List[Plan] = [p for p in plans if p.is_compliant] or plans + plan_priority_order = plan_priority_order or [] - def plan_cost(plan: Plan) -> float: - return ( + eligible_plans: List[Plan] = [plan for plan in plans if plan.is_compliant] or plans + if not eligible_plans: + raise ValueError("No plans available to choose from.") + + for plan in eligible_plans: + if plan.id is None: + # This should never actually happen, but plan.id is optional to cater + # for new plans. We are only working with already persisted plans here + raise ValueError( + f"All plans must have an ID, but found a plan with no ID: {plan}" + ) + + min_cost: float = min( + ( plan.record.cost_of_works if plan.record.cost_of_works is not None else float("inf") ) + for plan in eligible_plans + ) - cheapest_plan = min(plans_to_consider, key=plan_cost) + cheapest_plans: List[Plan] = [ + plan + for plan in eligible_plans + if (plan.record.cost_of_works or float("inf")) == min_cost + ] - return cheapest_plan + for priority_plan_id in plan_priority_order: + for plan in cheapest_plans: + if plan.id == priority_plan_id: + return plan + + return cheapest_plans[0] def _load_plans_for_portfolio(portfolio_id: int) -> List[Plan]: