From 2ffd09bdd2668213a83f5dfa84bc8544e7f5e135 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Tue, 24 Feb 2026 15:28:18 +0000 Subject: [PATCH] update subtask when processor completes --- .devcontainer/backend/requirements.txt | 3 +- .../categorisation_trigger_request.py | 3 ++ backend/categorisation/handler/handler.py | 8 +---- backend/categorisation/local_runner.py | 11 +++++-- backend/categorisation/processor.py | 30 +++++++++++++++---- 5 files changed, 39 insertions(+), 16 deletions(-) diff --git a/.devcontainer/backend/requirements.txt b/.devcontainer/backend/requirements.txt index 9814c8d4..c84332dd 100644 --- a/.devcontainer/backend/requirements.txt +++ b/.devcontainer/backend/requirements.txt @@ -19,4 +19,5 @@ pytest==9.0.2 pytest-cov==7.0.0 ipykernel>=6.25,<7 # Formatting -black==26.1.0 \ No newline at end of file +black==26.1.0 +boto3-stubs \ No newline at end of file diff --git a/backend/categorisation/categorisation_trigger_request.py b/backend/categorisation/categorisation_trigger_request.py index 6a0c872c..17a5d916 100644 --- a/backend/categorisation/categorisation_trigger_request.py +++ b/backend/categorisation/categorisation_trigger_request.py @@ -11,5 +11,8 @@ class CategorisationTriggerRequest(BaseModel): min_property_id: Optional[int] = None max_property_id: Optional[int] = None + task_id: Optional[str] = None + subtask_id: Optional[str] = None + # {"portfolio_id": 556, "scenarios_to_consider": [1039,1041], "scenario_priority_order": [1041,1039]} diff --git a/backend/categorisation/handler/handler.py b/backend/categorisation/handler/handler.py index fea62342..eb532624 100644 --- a/backend/categorisation/handler/handler.py +++ b/backend/categorisation/handler/handler.py @@ -25,13 +25,7 @@ def handler(event: Mapping[str, Any], context: Any) -> None: logger.debug("Successfully validated request body") - process_portfolio( - payload.portfolio_id, - payload.scenarios_to_consider, - payload.scenario_priority_order, - payload.min_property_id, - payload.max_property_id, - ) + process_portfolio(payload) except Exception as e: logger.info("Handler exception") diff --git a/backend/categorisation/local_runner.py b/backend/categorisation/local_runner.py index 7de55bc0..384ce5ef 100644 --- a/backend/categorisation/local_runner.py +++ b/backend/categorisation/local_runner.py @@ -1,5 +1,8 @@ from typing import List +from backend.categorisation.categorisation_trigger_request import ( + CategorisationTriggerRequest, +) from backend.categorisation.processor import process_portfolio @@ -9,9 +12,11 @@ def main() -> None: scenario_priority_order: List[int] = [] process_portfolio( - portfolio_id=portfolio_id, - scenarios_to_consider=scenarios_to_consider, - scenario_priority_order=scenario_priority_order, + CategorisationTriggerRequest( + portfolio_id=portfolio_id, + scenarios_to_consider=scenarios_to_consider, + scenario_priority_order=scenario_priority_order, + ) ) diff --git a/backend/categorisation/processor.py b/backend/categorisation/processor.py index 00c20ec1..7a7d48ca 100644 --- a/backend/categorisation/processor.py +++ b/backend/categorisation/processor.py @@ -1,5 +1,7 @@ +import time from collections import defaultdict from typing import Dict, List, Optional +from uuid import UUID from backend.app.db.functions.recommendations_functions import ( bulk_update_plans, @@ -8,22 +10,31 @@ from backend.app.db.functions.recommendations_functions import ( get_most_recent_plans_by_scenario_ids, get_scenarios_by_portfolio_id, ) +from backend.app.db.functions.tasks.Tasks import SubTaskInterface from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.scenario import Scenario +from backend.app.plan.utils import build_cloudwatch_log_url +from backend.categorisation.categorisation_trigger_request import ( + CategorisationTriggerRequest, +) from utils.logger import setup_logger logger = setup_logger() def process_portfolio( - portfolio_id: int, - scenarios_to_consider: Optional[List[int]] = None, - scenario_priority_order: Optional[List[int]] = None, - min_property_id: Optional[int] = None, - max_property_id: Optional[int] = None, + body: CategorisationTriggerRequest, ) -> None: # TODO: make this a class + portfolio_id: int = body.portfolio_id + scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider + scenario_priority_order: Optional[List[int]] = body.scenario_priority_order + min_property_id: Optional[int] = body.min_property_id + max_property_id: Optional[int] = body.max_property_id + subtask_id: Optional[str] = body.subtask_id + logger.info(f"Processing portfolio {portfolio_id}") + start_ms = int(time.time() * 1000) all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id) plans_by_id: Dict[int, Plan] = {} # TODO: make this an in-memory repository class @@ -85,6 +96,15 @@ def process_portfolio( _update_plans_in_db(list(plans_by_id.values())) logger.info(f"Successfully updated {len(plans_by_id)} Plans in database") + # Mark the subtask as successful + if subtask_id: + cloud_logs_url = build_cloudwatch_log_url(start_ms) + SubTaskInterface().update_subtask_status( + subtask_id=UUID(subtask_id), + status="complete", + cloud_logs_url=cloud_logs_url, + ) + def choose_cheapest_relevant_plan( plans: List[Plan], scenario_priority_order: Optional[List[int]] = None