Model/backend/categorisation/processor.py
2026-05-12 17:03:16 +00:00

262 lines
8.9 KiB
Python

import time
from collections import defaultdict
from typing import Dict, List, Optional
from uuid import UUID
from starlette.responses import Response
from backend.app.db.functions.recommendations_functions import (
bulk_update_plans,
get_default_plans,
get_most_recent_plans_by_portfolio_id,
get_most_recent_plans_by_scenario_ids,
get_scenarios_by_portfolio_id,
)
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.recommendations import PlanModel, ScenarioModel
from backend.app.domain.classes.plan import Plan
from backend.app.domain.classes.scenario import Scenario
from backend.app.plan.utils import handle_error
from backend.utils.cloudwatch import build_cloudwatch_log_url
from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest,
)
from utils.logger import setup_logger
logger = setup_logger()
def process_portfolio(
body: CategorisationTriggerRequest,
) -> Response: # TODO: make this a class
portfolio_id: int = body.portfolio_id
scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider
scenario_priority_order: Optional[List[int]] = body.scenario_priority_order
min_property_id: Optional[int] = body.min_property_id
max_property_id: Optional[int] = body.max_property_id
subtask_id: Optional[str] = body.subtask_id
logger.info(f"Processing portfolio {portfolio_id}")
start_ms = int(time.time() * 1000)
cloud_logs_url = build_cloudwatch_log_url(start_ms)
if body.subtask_id:
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="in progress",
cloud_logs_url=cloud_logs_url,
)
try:
all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id)
plans_by_id: Dict[int, Plan] = (
{}
) # TODO: make this an in-memory repository class
if scenarios_to_consider:
if len(scenarios_to_consider) < 2:
raise ValueError(
"Cannot run auto categorisation for fewer than 2 scenarios"
)
# first get all plans that we're interested in
plans_for_consideration: List[Plan] = _load_plans_for_portfolio(
portfolio_id,
all_scenarios,
scenarios_to_consider,
min_property_id,
max_property_id,
)
for plan in plans_for_consideration:
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
# then unset existing defaults on domain objects regardless of whether they're under consideration or not
default_plans: List[Plan] = _get_default_plans(
portfolio_id, all_scenarios, min_property_id, max_property_id
)
for plan in default_plans:
plan.set_default(False)
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
logger.info(f"Successfully unset {len(default_plans)} default plan(s)")
# then set new defaults on domain objects under consideration
plans_for_consideration_by_property: Dict[int, List[Plan]] = (
_group_plans_by_property(plans_for_consideration)
)
for property_id, property_plans in plans_for_consideration_by_property.items():
if not property_plans:
raise ValueError(f"No plans for property {property_id}")
try:
cheapest_plan = choose_cheapest_relevant_plan(
property_plans, scenario_priority_order
)
except Exception:
logger.error(f"Failed to find cheapest plan for property {property_id}")
raise
property_plans = _update_plan_objects(property_plans, cheapest_plan)
for plan in property_plans:
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
logger.info("Successfully set defaults on Plan objects in memory")
# then pass all domain objects to database to update (regardless of whether they've changed)
_update_plans_in_db(list(plans_by_id.values()))
# Mark the subtask as successful
logger.info(f"Successfully updated {len(plans_by_id)} Plans in database")
if body.subtask_id:
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="complete",
cloud_logs_url=cloud_logs_url,
)
return Response(status_code=200)
except Exception as e:
if subtask_id:
return handle_error(
"Exception during Categorisation processing.",
e,
subtask_id,
500,
start_ms,
)
raise
def choose_cheapest_relevant_plan(
plans: List[Plan], scenario_priority_order: Optional[List[int]] = None
) -> Plan:
scenario_priority_order = scenario_priority_order or []
eligible_plans: List[Plan] = [plan for plan in plans if plan.is_compliant] or plans
if not eligible_plans:
raise ValueError("No plans available to choose from.")
for plan in eligible_plans:
if plan.id is None:
# This should never actually happen, but plan.id is optional to cater
# for new plans. We are only working with already persisted plans here
raise ValueError(
f"All plans must have an ID, but found a plan with no ID: {plan}"
)
min_cost: float = min(plan.cost for plan in eligible_plans)
cheapest_plans: List[Plan] = [
plan for plan in eligible_plans if plan.cost == min_cost
]
for priority_scenario_id in scenario_priority_order:
for plan in cheapest_plans:
if plan.scenario.id == priority_scenario_id:
return plan
return cheapest_plans[0]
def _get_default_plans(
portfolio_id: int,
scenarios: List[Scenario],
min_property_id: Optional[int] = None,
max_property_id: Optional[int] = None,
) -> List[Plan]:
default_plan_models = get_default_plans(
portfolio_id, min_property_id, max_property_id
)
scenario_map = {s.id: s for s in scenarios}
return [
Plan.from_sqlalchemy(p, scenario_map[p.scenario_id])
for p in default_plan_models
if p.scenario_id in scenario_map
]
def _load_scenarios_for_portfolio(portfolio_id: int) -> List[Scenario]:
scenario_models: List[ScenarioModel] = get_scenarios_by_portfolio_id(portfolio_id)
return [Scenario.from_sqlalchemy(s) for s in scenario_models]
def _load_plans_for_portfolio(
portfolio_id: int,
all_scenarios: List[Scenario],
scenarios_to_consider: Optional[List[int]] = None,
min_property_id: Optional[int] = None,
max_property_id: Optional[int] = None,
) -> List[Plan]:
if scenarios_to_consider:
logger.info(f"Getting plans for {len(scenarios_to_consider)} scenarios")
plan_models: List[PlanModel] = get_most_recent_plans_by_scenario_ids(
scenarios_to_consider, min_property_id, max_property_id
)
logger.info(f"Got {len(plan_models)} plan models from database")
else:
logger.info(
f"No list of Plans to consider provided. Getting all Plans for portfolio {portfolio_id}"
)
plan_models: List[PlanModel] = get_most_recent_plans_by_portfolio_id(
portfolio_id, min_property_id, max_property_id
)
plans: List[Plan] = []
if not all_scenarios:
raise Exception(f"No scenarios found for Portfolio {portfolio_id}")
for model in plan_models:
scenario = next((s for s in all_scenarios if s.id == model.scenario_id))
if not scenario:
logger.info(f"No Scenario associated with Plan of ID {model.id}")
continue
plans.append(Plan.from_sqlalchemy(model, scenario))
logger.info(f"Got {len(plans)} Plans")
return plans
def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]:
grouped: dict[int, List[Plan]] = defaultdict(list)
for plan in plans:
grouped[plan.record.property_id].append(plan)
return grouped
def _update_plan_objects(plans: List[Plan], cheapest_plan: Plan) -> List[Plan]:
for plan in plans:
should_be_default: bool = plan.id == cheapest_plan.id
plan.set_default(should_be_default)
if should_be_default:
logger.debug(
f"Setting Plan {plan.id} (Scenario Name: {plan.scenario.record.name}) to default"
)
return plans
def _update_plans_in_db(plans: List[Plan]) -> None:
plan_models: List[PlanModel] = []
scenario_models: List[ScenarioModel] = []
for plan in plans:
plan_model, scenario_model = plan.to_sqlalchemy()
plan_models.append(plan_model)
scenario_models.append(scenario_model)
bulk_update_plans(plan_models, scenario_models)