from collections import defaultdict from typing import Dict, List, Optional, Tuple from backend.app.db.functions.recommendations_functions import ( bulk_update_plans, get_default_scenario_ids_for_portfolio, get_plan_ids_by_scenario_ids, get_plans_by_portfolio_id, get_plans_by_scenario_ids, get_scenarios_by_portfolio_id, set_plan_and_scenario_default, ) from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.scenario import Scenario from utils.logger import setup_logger logger = setup_logger() def process_portfolio( portfolio_id: int, scenarios_to_consider: Optional[List[int]] = None, scenario_priority_order: Optional[List[int]] = None, ) -> None: logger.info(f"Processing portfolio {portfolio_id}") if scenarios_to_consider: if len(scenarios_to_consider) < 2: raise ValueError( "Cannot run auto categorisation for fewer than 2 scenarios" ) if scenarios_to_consider is not None: _unset_defaults_for_scenarios_not_being_considered( portfolio_id, scenarios_to_consider ) plans: List[Plan] = _load_plans_for_portfolio(portfolio_id, scenarios_to_consider) logger.info(f"Successfully loaded {len(plans)}") plans_by_property: Dict[int, List[Plan]] = _group_plans_by_property(plans) logger.info("Successfully grouped plans by property") updated_plan_models: List[PlanModel] = [] updated_scenario_models: List[ScenarioModel] = [] for property_id, property_plans in plans_by_property.items(): if not property_plans: raise ValueError(f"No plans for property {property_id}") cheapest_plan = choose_cheapest_relevant_plan( property_plans, scenario_priority_order ) logger.info(f"Successfully found cheapest plan for Property {property_id}") updated_property_plan_models, updated_property_scenario_models = ( _update_plan_and_scenario_objects(property_plans, cheapest_plan) ) updated_plan_models.extend(updated_property_plan_models) updated_scenario_models.extend(updated_property_scenario_models) if len(updated_plan_models) > 0: logger.info(f"Updating {len(updated_plan_models)} Plans in database") bulk_update_plans(updated_plan_models, updated_scenario_models) logger.info("Successfully updated Plan default values in database") def choose_cheapest_relevant_plan( plans: List[Plan], scenario_priority_order: Optional[List[int]] = None ) -> Plan: scenario_priority_order = scenario_priority_order or [] eligible_plans: List[Plan] = [plan for plan in plans if plan.is_compliant] or plans if not eligible_plans: raise ValueError("No plans available to choose from.") for plan in eligible_plans: if plan.id is None: # This should never actually happen, but plan.id is optional to cater # for new plans. We are only working with already persisted plans here raise ValueError( f"All plans must have an ID, but found a plan with no ID: {plan}" ) min_cost: float = min( ( plan.record.cost_of_works if plan.record.cost_of_works is not None else float("inf") ) for plan in eligible_plans ) if all(p.record.cost_of_works == 0 for p in eligible_plans): cheapest_plans = eligible_plans else: cheapest_plans: List[Plan] = [ plan for plan in eligible_plans if (plan.record.cost_of_works or float("inf")) == min_cost ] for priority_scenario_id in scenario_priority_order: for plan in cheapest_plans: if plan.scenario.id == priority_scenario_id: return plan return cheapest_plans[0] def _unset_defaults_for_scenarios_not_being_considered( portfolio_id: int, scenarios_to_consider: List[int] ) -> None: default_scenario_ids: List[int] = get_default_scenario_ids_for_portfolio( portfolio_id ) scenarios_to_unset_default: List[int] = [] for id in default_scenario_ids: if id not in scenarios_to_consider: scenarios_to_unset_default.append(id) if len(scenarios_to_unset_default) > 0: logger.info( f"Unsetting {scenarios_to_unset_default} as default scenario(s) as not included in provided list of scenarios to consider" ) if len(scenarios_to_unset_default) > 0: plans_to_unset_default: List[int] = get_plan_ids_by_scenario_ids( scenarios_to_unset_default ) for plan_id in plans_to_unset_default: set_plan_and_scenario_default(plan_id, False) # TODO: do this in batch def _load_plans_for_portfolio( portfolio_id: int, scenarios_to_consider: Optional[List[int]] = None ) -> List[Plan]: if scenarios_to_consider: logger.info(f"Getting plans for {len(scenarios_to_consider)} scenarios") plan_models: List[PlanModel] = get_plans_by_scenario_ids(scenarios_to_consider) logger.info(f"Got {len(plan_models)} plan models from database") else: logger.info( f"No list of Plans to consider provided. Getting all Plans for portfolio {portfolio_id}" ) plan_models: List[PlanModel] = get_plans_by_portfolio_id(portfolio_id) plans: List[Plan] = [] scenarios: List[ScenarioModel] = get_scenarios_by_portfolio_id(portfolio_id) if not scenarios: raise Exception(f"No scenarios found for Portfolio {portfolio_id}") for model in plan_models: scenario_model = next((s for s in scenarios if s.id == model.scenario_id)) if not scenario_model: logger.info(f"No Scenario associated with Plan of ID {model.id}") continue plans.append( Plan.from_sqlalchemy(model, Scenario.from_sqlalchemy(scenario_model)) ) logger.info(f"Got {len(plans)} Plans") return plans def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]: grouped: dict[int, List[Plan]] = defaultdict(list) for plan in plans: grouped[plan.record.property_id].append(plan) return grouped def _update_plan_and_scenario_objects( plans: List[Plan], cheapest_plan: Plan ) -> Tuple[List[PlanModel], List[ScenarioModel]]: plans_to_update: List[Plan] = [] for plan in plans: should_be_default: bool = plan.id == cheapest_plan.id if plan.record.is_default != should_be_default: logger.info( f"Setting Plan {plan.id} (Scenario Name: {plan.scenario.record.name}) to is_default: {should_be_default}" ) plan.set_default(should_be_default) plans_to_update.append(plan) plan_models: List[PlanModel] = [] scenario_models: List[ScenarioModel] = [] for plan in plans_to_update: plan_model, scenario_model = plan.to_sqlalchemy() plan_models.append(plan_model) scenario_models.append(scenario_model) return (plan_models, scenario_models)