update subtask when processor completes

This commit is contained in:
Daniel Roth 2026-02-24 15:28:18 +00:00
parent 76b648c861
commit 2ffd09bdd2
5 changed files with 39 additions and 16 deletions

View file

@ -19,4 +19,5 @@ pytest==9.0.2
pytest-cov==7.0.0 pytest-cov==7.0.0
ipykernel>=6.25,<7 ipykernel>=6.25,<7
# Formatting # Formatting
black==26.1.0 black==26.1.0
boto3-stubs

View file

@ -11,5 +11,8 @@ class CategorisationTriggerRequest(BaseModel):
min_property_id: Optional[int] = None min_property_id: Optional[int] = None
max_property_id: Optional[int] = None max_property_id: Optional[int] = None
task_id: Optional[str] = None
subtask_id: Optional[str] = None
# {"portfolio_id": 556, "scenarios_to_consider": [1039,1041], "scenario_priority_order": [1041,1039]} # {"portfolio_id": 556, "scenarios_to_consider": [1039,1041], "scenario_priority_order": [1041,1039]}

View file

@ -25,13 +25,7 @@ def handler(event: Mapping[str, Any], context: Any) -> None:
logger.debug("Successfully validated request body") logger.debug("Successfully validated request body")
process_portfolio( process_portfolio(payload)
payload.portfolio_id,
payload.scenarios_to_consider,
payload.scenario_priority_order,
payload.min_property_id,
payload.max_property_id,
)
except Exception as e: except Exception as e:
logger.info("Handler exception") logger.info("Handler exception")

View file

@ -1,5 +1,8 @@
from typing import List from typing import List
from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest,
)
from backend.categorisation.processor import process_portfolio from backend.categorisation.processor import process_portfolio
@ -9,9 +12,11 @@ def main() -> None:
scenario_priority_order: List[int] = [] scenario_priority_order: List[int] = []
process_portfolio( process_portfolio(
portfolio_id=portfolio_id, CategorisationTriggerRequest(
scenarios_to_consider=scenarios_to_consider, portfolio_id=portfolio_id,
scenario_priority_order=scenario_priority_order, scenarios_to_consider=scenarios_to_consider,
scenario_priority_order=scenario_priority_order,
)
) )

View file

@ -1,5 +1,7 @@
import time
from collections import defaultdict from collections import defaultdict
from typing import Dict, List, Optional from typing import Dict, List, Optional
from uuid import UUID
from backend.app.db.functions.recommendations_functions import ( from backend.app.db.functions.recommendations_functions import (
bulk_update_plans, bulk_update_plans,
@ -8,22 +10,31 @@ from backend.app.db.functions.recommendations_functions import (
get_most_recent_plans_by_scenario_ids, get_most_recent_plans_by_scenario_ids,
get_scenarios_by_portfolio_id, get_scenarios_by_portfolio_id,
) )
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.db.models.recommendations import PlanModel, ScenarioModel
from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.plan import Plan
from backend.app.domain.classes.scenario import Scenario from backend.app.domain.classes.scenario import Scenario
from backend.app.plan.utils import build_cloudwatch_log_url
from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest,
)
from utils.logger import setup_logger from utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
def process_portfolio( def process_portfolio(
portfolio_id: int, body: CategorisationTriggerRequest,
scenarios_to_consider: Optional[List[int]] = None,
scenario_priority_order: Optional[List[int]] = None,
min_property_id: Optional[int] = None,
max_property_id: Optional[int] = None,
) -> None: # TODO: make this a class ) -> None: # TODO: make this a class
portfolio_id: int = body.portfolio_id
scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider
scenario_priority_order: Optional[List[int]] = body.scenario_priority_order
min_property_id: Optional[int] = body.min_property_id
max_property_id: Optional[int] = body.max_property_id
subtask_id: Optional[str] = body.subtask_id
logger.info(f"Processing portfolio {portfolio_id}") logger.info(f"Processing portfolio {portfolio_id}")
start_ms = int(time.time() * 1000)
all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id) all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id)
plans_by_id: Dict[int, Plan] = {} # TODO: make this an in-memory repository class plans_by_id: Dict[int, Plan] = {} # TODO: make this an in-memory repository class
@ -85,6 +96,15 @@ def process_portfolio(
_update_plans_in_db(list(plans_by_id.values())) _update_plans_in_db(list(plans_by_id.values()))
logger.info(f"Successfully updated {len(plans_by_id)} Plans in database") logger.info(f"Successfully updated {len(plans_by_id)} Plans in database")
# Mark the subtask as successful
if subtask_id:
cloud_logs_url = build_cloudwatch_log_url(start_ms)
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="complete",
cloud_logs_url=cloud_logs_url,
)
def choose_cheapest_relevant_plan( def choose_cheapest_relevant_plan(
plans: List[Plan], scenario_priority_order: Optional[List[int]] = None plans: List[Plan], scenario_priority_order: Optional[List[int]] = None