diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index 09d6da83..ed3fb435 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -1,5 +1,14 @@ -from typing import Any, Dict, List, Tuple -from sqlalchemy import inspect, text, insert, delete, select +from typing import Any, Dict, List, Optional +from sqlalchemy import ( + ColumnElement, + and_, + func, + inspect, + text, + insert, + delete, + select, +) from sqlalchemy.orm import Session, Mapper from sqlalchemy.exc import SQLAlchemyError from sqlmodel import Session @@ -625,11 +634,22 @@ def get_plans_by_scenario_ids(ids: List[int]) -> List[PlanModel]: return session_any.exec(stmt).scalars().all() -def get_most_recent_plans_by_portfolio_id(portfolio_id: int) -> List[PlanModel]: +def get_most_recent_plans_by_portfolio_id( + portfolio_id: int, + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, +) -> List[PlanModel]: + filters = [PlanModel.portfolio_id == portfolio_id] + + if min_property_id is not None: + filters.append(PlanModel.property_id >= min_property_id) + if max_property_id is not None: + filters.append(PlanModel.property_id <= max_property_id) + # NOTE: This statement works for Postgres only, because of the Distinct stmt = ( select(PlanModel) - .where(PlanModel.portfolio_id == portfolio_id) + .where(and_(*filters)) .distinct( PlanModel.property_id, PlanModel.scenario_id ) # one plan per property per scenario @@ -645,11 +665,27 @@ def get_most_recent_plans_by_portfolio_id(portfolio_id: int) -> List[PlanModel]: return session_any.exec(stmt).scalars().all() -def get_most_recent_plans_by_scenario_ids(scenario_ids: List[int]) -> List[PlanModel]: +def get_most_recent_plans_by_scenario_ids( + scenario_ids: List[int], + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, +) -> List[PlanModel]: + if not scenario_ids: + return [] + + # Base filter: scenario_id in provided list + filters: List[ColumnElement[bool]] = [PlanModel.scenario_id.in_(scenario_ids)] + + # Add optional property ID range filters + if min_property_id is not None: + filters.append(PlanModel.property_id >= min_property_id) + if max_property_id is not None: + filters.append(PlanModel.property_id <= max_property_id) + # NOTE: This statement works for Postgres only, because of the Distinct stmt = ( select(PlanModel) - .where(PlanModel.scenario_id.in_(scenario_ids)) + .where(and_(*filters)) .distinct( PlanModel.property_id, PlanModel.scenario_id ) # one plan per property per scenario @@ -673,16 +709,37 @@ def get_scenarios_by_portfolio_id(portfolio_id: int) -> List[ScenarioModel]: return session_any.exec(stmt).scalars().all() +def get_scenarios_count_by_portfolio_id(portfolio_id: int) -> int: + stmt = ( + select(func.count()) + .select_from(ScenarioModel) + .where(ScenarioModel.portfolio_id == portfolio_id) + ) + with db_read_session() as session: + session_any: Any = session # Typehint as Any to satisfy Pylance... + return session_any.exec(stmt).scalar_one() + + def get_default_plans( portfolio_id: int, + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, ) -> List[PlanModel]: - plan_stmt = select(PlanModel).where( - (PlanModel.portfolio_id == portfolio_id) & (PlanModel.is_default == True) - ) + filters: List[ColumnElement[bool]] = [ + PlanModel.portfolio_id == portfolio_id, + PlanModel.is_default.is_(True), + ] + + if min_property_id is not None: + filters.append(PlanModel.property_id >= min_property_id) + if max_property_id is not None: + filters.append(PlanModel.property_id <= max_property_id) + + stmt = select(PlanModel).where(and_(*filters)) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... - plans: List[PlanModel] = session_any.exec(plan_stmt).scalars().all() + plans: List[PlanModel] = session_any.exec(stmt).scalars().all() return plans diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 4a1b90fa..e9c06e40 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -1,3 +1,5 @@ +from typing import List + import boto3 import json import math @@ -18,7 +20,11 @@ from backend.categorisation.categorisation_trigger_request import ( from utils.logger import setup_logger from backend.app.db.connection import db_engine -from backend.app.db.functions.recommendations_functions import create_scenario +from backend.app.db.functions.recommendations_functions import ( + create_scenario, + get_property_ids, + get_scenarios_count_by_portfolio_id, +) from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface logger = setup_logger() @@ -33,25 +39,43 @@ router = APIRouter( sqs_client = boto3.client("sqs") -@contextmanager -def db_session(): - session = Session(db_engine) - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - session.close() - - -@router.post("/categoisation", status_code=202) -async def trigger_categorisation(body: CategorisationTriggerRequest): - payload = CategorisationTriggerRequest.model_validate(body) +@router.post("/categorisation", status_code=202) +async def trigger_categorisation( + body: CategorisationTriggerRequest, +) -> dict[str, int]: + payload: CategorisationTriggerRequest = CategorisationTriggerRequest.model_validate( + body + ) logger.info("API triggered with body: %s", payload) + property_ids: List[int] = get_property_ids(payload.portfolio_id) + property_ids.sort() + + num_scenarios: int = get_scenarios_count_by_portfolio_id(payload.portfolio_id) + batch_size: int = math.ceil(1000 / num_scenarios) + num_property_buckets: int = max(1, math.ceil(len(property_ids) / batch_size)) + + bucket_requests: List[CategorisationTriggerRequest] = [] + + for bucket_index in range(num_property_buckets): + bucket_property_ids: List[int] = [ + pid for pid in property_ids if pid % num_property_buckets == bucket_index + ] + bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest( + portfolio_id=payload.portfolio_id, + scenarios_to_consider=payload.scenarios_to_consider, + scenario_priority_order=payload.scenario_priority_order, + min_property_id=min(bucket_property_ids), + max_property_id=max(bucket_property_ids), + ) + + bucket_requests.append(bucket_request) + + # Dispatch requests to lambdas + + return {"num_buckets": len(bucket_requests)} + @router.post("/trigger", status_code=202) async def trigger_plan_entrypoint(body: PlanTriggerRequest): diff --git a/backend/categorisation/categorisation_trigger_request.py b/backend/categorisation/categorisation_trigger_request.py index 4b1b6553..6a0c872c 100644 --- a/backend/categorisation/categorisation_trigger_request.py +++ b/backend/categorisation/categorisation_trigger_request.py @@ -8,8 +8,8 @@ class CategorisationTriggerRequest(BaseModel): scenarios_to_consider: Optional[List[int]] = None scenario_priority_order: Optional[List[int]] = None - property_bucket_index: Optional[int] = None - num_property_buckets: Optional[int] = None + min_property_id: Optional[int] = None + max_property_id: Optional[int] = None # {"portfolio_id": 556, "scenarios_to_consider": [1039,1041], "scenario_priority_order": [1041,1039]} diff --git a/backend/categorisation/handler/handler.py b/backend/categorisation/handler/handler.py index 9fb235d5..fea62342 100644 --- a/backend/categorisation/handler/handler.py +++ b/backend/categorisation/handler/handler.py @@ -29,6 +29,8 @@ def handler(event: Mapping[str, Any], context: Any) -> None: payload.portfolio_id, payload.scenarios_to_consider, payload.scenario_priority_order, + payload.min_property_id, + payload.max_property_id, ) except Exception as e: diff --git a/backend/categorisation/processor.py b/backend/categorisation/processor.py index 09db2983..00c20ec1 100644 --- a/backend/categorisation/processor.py +++ b/backend/categorisation/processor.py @@ -20,6 +20,8 @@ def process_portfolio( portfolio_id: int, scenarios_to_consider: Optional[List[int]] = None, scenario_priority_order: Optional[List[int]] = None, + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, ) -> None: # TODO: make this a class logger.info(f"Processing portfolio {portfolio_id}") @@ -34,14 +36,20 @@ def process_portfolio( # first get all plans that we're interested in plans_for_consideration: List[Plan] = _load_plans_for_portfolio( - portfolio_id, all_scenarios, scenarios_to_consider + portfolio_id, + all_scenarios, + scenarios_to_consider, + min_property_id, + max_property_id, ) for plan in plans_for_consideration: if plan.id is not None: # just in case plans_by_id[plan.id] = plan # then unset existing defaults on domain objects regardless of whether they're under consideration or not - default_plans: List[Plan] = _get_default_plans(portfolio_id, all_scenarios) + default_plans: List[Plan] = _get_default_plans( + portfolio_id, all_scenarios, min_property_id, max_property_id + ) for plan in default_plans: plan.set_default(False) if plan.id is not None: # just in case @@ -109,8 +117,15 @@ def choose_cheapest_relevant_plan( return cheapest_plans[0] -def _get_default_plans(portfolio_id: int, scenarios: List[Scenario]) -> List[Plan]: - default_plan_models = get_default_plans(portfolio_id) +def _get_default_plans( + portfolio_id: int, + scenarios: List[Scenario], + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, +) -> List[Plan]: + default_plan_models = get_default_plans( + portfolio_id, min_property_id, max_property_id + ) scenario_map = {s.id: s for s in scenarios} @@ -131,12 +146,14 @@ def _load_plans_for_portfolio( portfolio_id: int, all_scenarios: List[Scenario], scenarios_to_consider: Optional[List[int]] = None, + min_property_id: Optional[int] = None, + max_property_id: Optional[int] = None, ) -> List[Plan]: if scenarios_to_consider: logger.info(f"Getting plans for {len(scenarios_to_consider)} scenarios") plan_models: List[PlanModel] = get_most_recent_plans_by_scenario_ids( - scenarios_to_consider + scenarios_to_consider, min_property_id, max_property_id ) logger.info(f"Got {len(plan_models)} plan models from database") else: @@ -144,7 +161,7 @@ def _load_plans_for_portfolio( f"No list of Plans to consider provided. Getting all Plans for portfolio {portfolio_id}" ) plan_models: List[PlanModel] = get_most_recent_plans_by_portfolio_id( - portfolio_id + portfolio_id, min_property_id, max_property_id ) plans: List[Plan] = []