Model/backend/categorisation/processor.py
2026-02-20 11:35:12 +00:00

197 lines
6.8 KiB
Python

from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from backend.app.db.functions.recommendations_functions import (
bulk_update_plans,
get_default_scenario_ids_for_portfolio,
get_plan_ids_by_scenario_ids,
get_plans_by_portfolio_id,
get_plans_by_scenario_ids,
get_scenarios_by_portfolio_id,
set_plan_and_scenario_default,
)
from backend.app.db.models.recommendations import PlanModel, ScenarioModel
from backend.app.domain.classes.plan import Plan
from backend.app.domain.classes.scenario import Scenario
from utils.logger import setup_logger
logger = setup_logger()
def process_portfolio(
portfolio_id: int,
scenarios_to_consider: Optional[List[int]] = None,
scenario_priority_order: Optional[List[int]] = None,
) -> None:
logger.info(f"Processing portfolio {portfolio_id}")
if scenarios_to_consider:
if len(scenarios_to_consider) < 2:
raise ValueError(
"Cannot run auto categorisation for fewer than 2 scenarios"
)
if scenarios_to_consider is not None:
_unset_defaults_for_scenarios_not_being_considered(
portfolio_id, scenarios_to_consider
)
plans: List[Plan] = _load_plans_for_portfolio(portfolio_id, scenarios_to_consider)
logger.info(f"Successfully loaded {len(plans)}")
plans_by_property: Dict[int, List[Plan]] = _group_plans_by_property(plans)
logger.info("Successfully grouped plans by property")
updated_plan_models: List[PlanModel] = []
updated_scenario_models: List[ScenarioModel] = []
for property_id, property_plans in plans_by_property.items():
if not property_plans:
raise ValueError(f"No plans for property {property_id}")
try:
cheapest_plan = choose_cheapest_relevant_plan(
property_plans, scenario_priority_order
)
except Exception:
logger.error(f"Failed to find cheapest plan for property {property_id}")
raise
updated_property_plan_models, updated_property_scenario_models = (
_update_plan_and_scenario_objects(property_plans, cheapest_plan)
)
updated_plan_models.extend(updated_property_plan_models)
updated_scenario_models.extend(updated_property_scenario_models)
if len(updated_plan_models) > 0:
logger.info(f"Updating {len(updated_plan_models)} Plans in database")
bulk_update_plans(updated_plan_models, updated_scenario_models)
logger.info("Successfully updated Plan default values in database")
def choose_cheapest_relevant_plan(
plans: List[Plan], scenario_priority_order: Optional[List[int]] = None
) -> Plan:
scenario_priority_order = scenario_priority_order or []
eligible_plans: List[Plan] = [plan for plan in plans if plan.is_compliant] or plans
if not eligible_plans:
raise ValueError("No plans available to choose from.")
for plan in eligible_plans:
if plan.id is None:
# This should never actually happen, but plan.id is optional to cater
# for new plans. We are only working with already persisted plans here
raise ValueError(
f"All plans must have an ID, but found a plan with no ID: {plan}"
)
min_cost: float = min(plan.cost for plan in eligible_plans)
cheapest_plans: List[Plan] = [
plan for plan in eligible_plans if plan.cost == min_cost
]
for priority_scenario_id in scenario_priority_order:
for plan in cheapest_plans:
if plan.scenario.id == priority_scenario_id:
return plan
return cheapest_plans[0]
def _unset_defaults_for_scenarios_not_being_considered(
portfolio_id: int, scenarios_to_consider: List[int]
) -> None:
default_scenario_ids: List[int] = get_default_scenario_ids_for_portfolio(
portfolio_id
)
scenarios_to_unset_default: List[int] = []
for id in default_scenario_ids:
if id not in scenarios_to_consider:
scenarios_to_unset_default.append(id)
if len(scenarios_to_unset_default) > 0:
logger.info(
f"Unsetting {scenarios_to_unset_default} as default scenario(s) as not included in provided list of scenarios to consider"
)
if len(scenarios_to_unset_default) > 0:
plans_to_unset_default: List[int] = get_plan_ids_by_scenario_ids(
scenarios_to_unset_default
)
for plan_id in plans_to_unset_default:
set_plan_and_scenario_default(plan_id, False) # TODO: do this in batch
def _load_plans_for_portfolio(
portfolio_id: int, scenarios_to_consider: Optional[List[int]] = None
) -> List[Plan]:
if scenarios_to_consider:
logger.info(f"Getting plans for {len(scenarios_to_consider)} scenarios")
plan_models: List[PlanModel] = get_plans_by_scenario_ids(scenarios_to_consider)
logger.info(f"Got {len(plan_models)} plan models from database")
else:
logger.info(
f"No list of Plans to consider provided. Getting all Plans for portfolio {portfolio_id}"
)
plan_models: List[PlanModel] = get_plans_by_portfolio_id(portfolio_id)
plans: List[Plan] = []
scenarios: List[ScenarioModel] = get_scenarios_by_portfolio_id(portfolio_id)
if not scenarios:
raise Exception(f"No scenarios found for Portfolio {portfolio_id}")
for model in plan_models:
scenario_model = next((s for s in scenarios if s.id == model.scenario_id))
if not scenario_model:
logger.info(f"No Scenario associated with Plan of ID {model.id}")
continue
plans.append(
Plan.from_sqlalchemy(model, Scenario.from_sqlalchemy(scenario_model))
)
logger.info(f"Got {len(plans)} Plans")
return plans
def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]:
grouped: dict[int, List[Plan]] = defaultdict(list)
for plan in plans:
grouped[plan.record.property_id].append(plan)
return grouped
def _update_plan_and_scenario_objects(
plans: List[Plan], cheapest_plan: Plan
) -> Tuple[List[PlanModel], List[ScenarioModel]]:
plans_to_update: List[Plan] = []
for plan in plans:
should_be_default: bool = plan.id == cheapest_plan.id
if plan.record.is_default != should_be_default:
logger.info(
f"Setting Plan {plan.id} (Scenario Name: {plan.scenario.record.name}) to is_default: {should_be_default}"
)
plan.set_default(should_be_default)
plans_to_update.append(plan)
plan_models: List[PlanModel] = []
scenario_models: List[ScenarioModel] = []
for plan in plans_to_update:
plan_model, scenario_model = plan.to_sqlalchemy()
plan_models.append(plan_model)
scenario_models.append(scenario_model)
return (plan_models, scenario_models)