From 58a5c9f5d0a88f924405ef8801db61f71b16d0f8 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Fri, 10 Apr 2026 16:20:00 +0000 Subject: [PATCH] ensure all events in task handler are processed, plus error handling --- backend/utils/subtasks.py | 83 +++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/backend/utils/subtasks.py b/backend/utils/subtasks.py index e5668c53..6be3a742 100644 --- a/backend/utils/subtasks.py +++ b/backend/utils/subtasks.py @@ -113,12 +113,17 @@ def task_handler(): @wraps(func) def wrapper(event: dict[str, Any], context: Any, *args, **kwargs): - logger = setup_logger() - # Parse body: Records-style SQS or plain dict event - if "Records" in event: - raw_body = event["Records"][0].get("body", {}) + records = event.get("Records", [event]) # fallback for non-SQS + + results = [] + failures = [] + + for record in records: + # Parse body + raw_body = record.get("body", record) + if isinstance(raw_body, str): try: body = json.loads(raw_body) @@ -126,43 +131,55 @@ def task_handler(): body = {} else: body = raw_body or {} - else: - body = event - # Create fresh task + subtask - logger.info("Creating task for source: %s", task_source) - task_id, subtask_id = TasksInterface.create_task( - task_source=task_source, - inputs=body, - ) - logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id) + # Create task per message + logger.info("Creating task for source: %s", task_source) + task_id, subtask_id = TasksInterface.create_task( + task_source=task_source, + inputs=body, + ) - interface = SubTaskInterface() + logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id) - interface.update_subtask_status( - subtask_id=subtask_id, - status="in progress", - ) - - try: - result = func(body, context, *args, **kwargs) + interface = SubTaskInterface() interface.update_subtask_status( subtask_id=subtask_id, - status="complete", - outputs={"result": result} if result else None, + status="in progress", ) - logger.info("Task %s completed successfully", task_id) - return result - except Exception as e: - logger.exception("Task %s failed: %s", task_id, e) - interface.update_subtask_status( - subtask_id=subtask_id, - status="failed", - outputs={"error": str(e)}, - ) - raise + try: + result = func(body, context, *args, **kwargs) + + interface.update_subtask_status( + subtask_id=subtask_id, + status="complete", + outputs={"result": result} if result else None, + ) + + logger.info("Task %s completed successfully", task_id) + results.append(result) + + except Exception as e: + logger.exception("Task %s failed: %s", task_id, e) + + interface.update_subtask_status( + subtask_id=subtask_id, + status="failed", + outputs={"error": str(e)}, + ) + + if "Records" in event: + failures.append({"itemIdentifier": record["messageId"]}) + else: + # Handle non-SQS events + raise + + if "Records" in event: + return {"batchItemFailures": failures} + + # Handle non-SQS events + return results return wrapper