from collections import defaultdict from typing import Dict, List from backend.app.db.functions.recommendations_functions import ( bulk_update_plans, get_plans_by_portfolio_id, get_scenarios_by_portfolio_id, ) from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.scenario import Scenario from utils.logger import setup_logger logger = setup_logger() def process_portfolio(portfolio_id: int) -> None: logger.info(f"Processing portfolio {portfolio_id}") plans: List[Plan] = _load_plans_for_portfolio(portfolio_id) plans_by_property: Dict[int, List[Plan]] = _group_plans_by_property(plans) for uprn, property_plans in plans_by_property.items(): if not property_plans: raise ValueError(f"No plans for property {uprn}") cheapest_plan = _choose_cheapest_relevant_plan(property_plans) _update_default_flags(property_plans, cheapest_plan) def _load_plans_for_portfolio(portfolio_id: int) -> List[Plan]: plans: List[Plan] = [] plan_models = get_plans_by_portfolio_id(portfolio_id) scenarios: List[ScenarioModel] = get_scenarios_by_portfolio_id(portfolio_id) if not scenarios: raise Exception(f"No scenarios found for Portfolio {portfolio_id}") for model in plan_models: scenario_model = next((s for s in scenarios if s.id == model.scenario_id)) if not scenario_model: logger.info(f"No Scenario associated with Plan of ID {model.id}") continue plans.append( Plan.from_sqlalchemy(model, Scenario.from_sqlalchemy(scenario_model)) ) logger.info("Successfully mapped plan and scenario to domain object") logger.info(f"Got {len(plans)} plans from database") return plans def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]: grouped: dict[int, List[Plan]] = defaultdict(list) for plan in plans: grouped[plan.record.property_id].append(plan) return grouped def _choose_cheapest_relevant_plan(plans: List[Plan]) -> Plan: plans_to_consider: List[Plan] = [p for p in plans if p.is_compliant] or plans def plan_cost(plan: Plan) -> float: return ( plan.record.cost_of_works if plan.record.cost_of_works is not None else float("inf") ) cheapest_plan = min(plans_to_consider, key=plan_cost) return cheapest_plan def _update_default_flags(plans: List[Plan], cheapest_plan: Plan) -> None: plans_to_update: List[Plan] = [] for plan in plans: should_be_default: bool = plan.id == cheapest_plan.id if plan.record.is_default != should_be_default: plan.set_default(should_be_default) plans_to_update.append(plan) if plans_to_update: plan_models: List[PlanModel] = [] scenario_models: List[ScenarioModel] = [] for plan in plans_to_update: plan_model, scenario_model = plan.to_sqlalchemy() plan_models.append(plan_model) scenario_models.append(scenario_model) bulk_update_plans(plan_models, scenario_models)