Model/backend/categorisation/processor.py

90 lines
2.8 KiB
Python

from collections import defaultdict
from typing import Dict, List, Tuple, cast
from backend.app.db.functions.recommendations_functions import (
get_plans_by_portfolio_id,
get_scenario,
update_plan,
)
from backend.app.db.models.recommendations import PlanModel, ScenarioModel
from backend.app.domain.classes.plan import Plan
from backend.app.domain.classes.scenario import Scenario
from utils.logger import setup_logger
logger = setup_logger()
def process_portfolio(portfolio_id: int) -> None:
print(f"Processing portfolio {portfolio_id}")
plans: List[Plan] = _load_plans_for_portfolio(portfolio_id)
plans_by_property: Dict[int, List[Plan]] = _group_plans_by_property(plans)
for uprn, property_plans in plans_by_property.items():
if not property_plans:
raise ValueError(f"No plans for property {uprn}")
cheapest_plan = _choose_cheapest_relevant_plan(property_plans)
_update_default_flags(property_plans, cheapest_plan)
def _load_plans_for_portfolio(portfolio_id: int) -> List[Plan]:
plan_models = get_plans_by_portfolio_id(portfolio_id)
print(f"Got {len(plan_models)} plans from database")
plans: List[Plan] = []
for model in plan_models:
if not model.scenario_id:
logger.info(f"No Scenario associated with Plan of ID {model.id}")
continue
scenario_model = get_scenario(model.scenario_id)
plans.append(
Plan.from_sqlalchemy(model, Scenario.from_sqlalchemy(scenario_model))
)
print("Successfully mapped plan and scenario to domain object")
return plans
def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]:
grouped: dict[int, List[Plan]] = defaultdict(list)
for plan in plans:
grouped[plan.record.property_id].append(plan)
return grouped
def _choose_cheapest_relevant_plan(plans: List[Plan]) -> Plan:
plans_to_consider: List[Plan] = [p for p in plans if p.is_compliant] or plans
def plan_cost(plan: Plan) -> float:
return (
plan.record.cost_of_works
if plan.record.cost_of_works is not None
else float("inf")
)
cheapest_plan = min(plans_to_consider, key=plan_cost)
return cheapest_plan
def _update_default_flags(plans: List[Plan], cheapest_plan: Plan) -> None:
for plan in plans:
if plan.id is None:
raise ValueError("Cannot update Plan with missing ID")
plan.set_default(plan.id == cheapest_plan.id)
print(
f"Setting plan of id {plan.id}, scenario name {plan.scenario.record.name} to is_default value {plan.id == cheapest_plan.id}"
)
plan_model, scenario_model = cast(
Tuple[PlanModel, ScenarioModel],
plan.to_sqlalchemy(),
)
update_plan(plan_model, scenario_model)