mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
168 lines
5.6 KiB
Python
168 lines
5.6 KiB
Python
from collections import defaultdict
|
|
from typing import Dict, List, Optional
|
|
|
|
from backend.app.db.functions.recommendations_functions import (
|
|
bulk_update_plans,
|
|
get_default_plan_ids_for_property,
|
|
get_plans_by_portfolio_id,
|
|
get_plans_by_scenario_ids,
|
|
get_scenarios_by_portfolio_id,
|
|
set_plan_and_scenario_default,
|
|
)
|
|
from backend.app.db.models.recommendations import PlanModel, ScenarioModel
|
|
from backend.app.domain.classes.plan import Plan
|
|
from backend.app.domain.classes.scenario import Scenario
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def process_portfolio(
|
|
portfolio_id: int,
|
|
scenarios_to_consider: Optional[List[int]] = None,
|
|
scenario_priority_order: Optional[List[int]] = None,
|
|
) -> None:
|
|
logger.info(f"Processing portfolio {portfolio_id}")
|
|
|
|
plans: List[Plan] = _load_plans_for_portfolio(portfolio_id, scenarios_to_consider)
|
|
|
|
plans_by_property: Dict[int, List[Plan]] = _group_plans_by_property(plans)
|
|
|
|
for property_id, property_plans in plans_by_property.items():
|
|
|
|
if not property_plans:
|
|
raise ValueError(f"No plans for property {property_id}")
|
|
|
|
cheapest_plan = choose_cheapest_relevant_plan(
|
|
property_plans, scenario_priority_order
|
|
)
|
|
|
|
# Unset existing default(s) in case they are outside the plans to consider
|
|
default_plan_ids: List[int] = get_default_plan_ids_for_property(property_id)
|
|
for id in default_plan_ids:
|
|
set_plan_and_scenario_default(id, False)
|
|
|
|
_update_default_flags(
|
|
property_plans, cheapest_plan
|
|
) # TODO: we have already unset existing default(s), so this method can probably be a bit simpler now
|
|
|
|
|
|
def choose_cheapest_relevant_plan(
|
|
plans: List[Plan], scenario_priority_order: Optional[List[int]] = None
|
|
) -> Plan:
|
|
scenario_priority_order = scenario_priority_order or []
|
|
|
|
eligible_plans: List[Plan] = [plan for plan in plans if plan.is_compliant] or plans
|
|
if not eligible_plans:
|
|
raise ValueError("No plans available to choose from.")
|
|
|
|
for plan in eligible_plans:
|
|
if plan.id is None:
|
|
# This should never actually happen, but plan.id is optional to cater
|
|
# for new plans. We are only working with already persisted plans here
|
|
raise ValueError(
|
|
f"All plans must have an ID, but found a plan with no ID: {plan}"
|
|
)
|
|
|
|
min_cost: float = min(
|
|
(
|
|
plan.record.cost_of_works
|
|
if plan.record.cost_of_works is not None
|
|
else float("inf")
|
|
)
|
|
for plan in eligible_plans
|
|
)
|
|
|
|
cheapest_plans: List[Plan] = [
|
|
plan
|
|
for plan in eligible_plans
|
|
if (plan.record.cost_of_works or float("inf")) == min_cost
|
|
]
|
|
|
|
for priority_plan_id in scenario_priority_order:
|
|
for plan in cheapest_plans:
|
|
if plan.id == priority_plan_id:
|
|
return plan
|
|
|
|
return cheapest_plans[0]
|
|
|
|
|
|
def _load_plans_for_portfolio(
|
|
portfolio_id: int, scenarios_to_consider: Optional[List[int]] = None
|
|
) -> List[Plan]:
|
|
|
|
if scenarios_to_consider:
|
|
if len(scenarios_to_consider) < 2:
|
|
raise ValueError(
|
|
"Cannot run auto categorisation for fewer than 2 scenarios"
|
|
)
|
|
|
|
logger.info(f"Getting {len(scenarios_to_consider)} plans")
|
|
plan_models: List[PlanModel] = get_plans_by_scenario_ids(scenarios_to_consider)
|
|
|
|
else:
|
|
logger.info(
|
|
f"No list of Plans to consider provided. Getting all Plans for portfolio {portfolio_id}"
|
|
)
|
|
plan_models: List[PlanModel] = get_plans_by_portfolio_id(portfolio_id)
|
|
|
|
plans: List[Plan] = []
|
|
|
|
scenarios: List[ScenarioModel] = get_scenarios_by_portfolio_id(portfolio_id)
|
|
|
|
if not scenarios:
|
|
raise Exception(f"No scenarios found for Portfolio {portfolio_id}")
|
|
|
|
for model in plan_models:
|
|
|
|
scenario_model = next((s for s in scenarios if s.id == model.scenario_id))
|
|
if not scenario_model:
|
|
logger.info(f"No Scenario associated with Plan of ID {model.id}")
|
|
continue
|
|
|
|
plans.append(
|
|
Plan.from_sqlalchemy(model, Scenario.from_sqlalchemy(scenario_model))
|
|
)
|
|
logger.debug(
|
|
f"Successfully mapped plan {model.id} and scenario {scenario_model.id} to domain object"
|
|
)
|
|
|
|
logger.debug(f"Got {len(plans)} plans from database")
|
|
return plans
|
|
|
|
|
|
def _group_plans_by_property(plans: List[Plan]) -> Dict[int, List[Plan]]:
|
|
grouped: dict[int, List[Plan]] = defaultdict(list)
|
|
|
|
for plan in plans:
|
|
grouped[plan.record.property_id].append(plan)
|
|
|
|
return grouped
|
|
|
|
|
|
def _update_default_flags(plans: List[Plan], cheapest_plan: Plan) -> None:
|
|
plans_to_update: List[Plan] = []
|
|
|
|
for plan in plans:
|
|
should_be_default: bool = plan.id == cheapest_plan.id
|
|
if plan.record.is_default != should_be_default:
|
|
logger.info(
|
|
f"Setting Plan {plan.id} (Scenario Name: {plan.scenario.record.name}) to is_default: {should_be_default}"
|
|
)
|
|
plan.set_default(should_be_default)
|
|
plans_to_update.append(plan)
|
|
|
|
if plans_to_update:
|
|
plan_models: List[PlanModel] = []
|
|
scenario_models: List[ScenarioModel] = []
|
|
|
|
for plan in plans_to_update:
|
|
plan_model, scenario_model = plan.to_sqlalchemy()
|
|
plan_models.append(plan_model)
|
|
scenario_models.append(scenario_model)
|
|
|
|
bulk_update_plans(plan_models, scenario_models)
|
|
logger.info("Successfully updated Plan default values")
|
|
|
|
else:
|
|
logger.info("All plan default values already correct. Not udpating")
|