Model/backend/utils/subtasks.py

186 lines
5.8 KiB
Python

# decorators/subtask_handler.py
from functools import wraps
from typing import Callable, Any
from uuid import UUID
import json
from backend.app.db.functions.tasks.Tasks import SubTaskInterface, TasksInterface
from utils.logger import setup_logger
def subtask_handler():
"""
Decorator that wraps your existing handler and automatically:
- Extracts task_id + sub_task_id from event
- Marks subtask as in progress
- Executes handler logic
- Marks subtask complete on success
- Marks failed on exception
"""
def decorator(func: Callable[..., Any]):
@wraps(func)
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
records = event.get("Records", [event])
interface = SubTaskInterface()
for record in records:
# -------------------------------
# Parse body safely
# -------------------------------
body = {}
if isinstance(record.get("body"), str):
try:
body = json.loads(record["body"])
except Exception:
body = {}
else:
body = record.get("body", {}) or {}
task_id_raw = body.get("task_id")
subtask_id_raw = body.get("sub_task_id")
task_id = UUID(task_id_raw) if isinstance(task_id_raw, str) else None
subtask_id = (
UUID(subtask_id_raw) if isinstance(subtask_id_raw, str) else None
)
if not task_id or not subtask_id:
raise RuntimeError("task_id or sub_task_id missing")
# -------------------------------
# Mark in progress
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="in progress",
)
try:
# Pass the parsed body into your function
result = func(body, context, *args, **kwargs)
# -------------------------------
# Success → mark complete
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
)
except Exception as e:
# -------------------------------
# Failure → mark failed
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
raise
return None
return wrapper
return decorator
def task_handler():
"""
Decorator that wraps a Lambda handler and automatically:
- Parses body from the first SQS record (or uses the event dict directly)
- Creates a fresh Task + SubTask in the database
- Marks the subtask as in progress
- Executes the handler, passing the parsed body
- Marks complete on success, failed on exception (and re-raises)
"""
def decorator(func: Callable[..., Any]):
task_source = f"{func.__module__}.{func.__qualname__}"
@wraps(func)
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
logger = setup_logger()
records = event.get("Records", [event]) # fallback for non-SQS
results = []
failures = []
for record in records:
# Parse body
raw_body = record.get("body", record)
if isinstance(raw_body, str):
try:
body = json.loads(raw_body)
except Exception:
body = {}
else:
body = raw_body or {}
# Create task per message
logger.info("Creating task for source: %s", task_source)
task_id, subtask_id = TasksInterface.create_task(
task_source=task_source,
inputs=body,
)
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
interface = SubTaskInterface()
interface.update_subtask_status(
subtask_id=subtask_id,
status="in progress",
)
try:
result = func(body, context, *args, **kwargs)
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
)
logger.info("Task %s completed successfully", task_id)
results.append(result)
except Exception as e:
logger.exception("Task %s failed: %s", task_id, e)
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
if "Records" in event:
failures.append({"itemIdentifier": record["messageId"]})
else:
# Handle non-SQS events
raise
if "Records" in event:
return {"batchItemFailures": failures}
# Handle non-SQS events
return results
return wrapper
return decorator