mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
169 lines
5.3 KiB
Python
169 lines
5.3 KiB
Python
# decorators/subtask_handler.py
|
|
|
|
from functools import wraps
|
|
from typing import Callable, Any
|
|
from uuid import UUID
|
|
import json
|
|
|
|
from backend.app.db.functions.tasks.Tasks import SubTaskInterface, TasksInterface
|
|
from utils.logger import setup_logger
|
|
|
|
|
|
def subtask_handler():
|
|
"""
|
|
Decorator that wraps your existing handler and automatically:
|
|
|
|
- Extracts task_id + sub_task_id from event
|
|
- Marks subtask as in progress
|
|
- Executes handler logic
|
|
- Marks subtask complete on success
|
|
- Marks failed on exception
|
|
"""
|
|
|
|
def decorator(func: Callable[..., Any]):
|
|
|
|
@wraps(func)
|
|
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
|
|
|
|
records = event.get("Records", [event])
|
|
|
|
interface = SubTaskInterface()
|
|
|
|
for record in records:
|
|
|
|
# -------------------------------
|
|
# Parse body safely
|
|
# -------------------------------
|
|
body = {}
|
|
|
|
if isinstance(record.get("body"), str):
|
|
try:
|
|
body = json.loads(record["body"])
|
|
except Exception:
|
|
body = {}
|
|
else:
|
|
body = record.get("body", {}) or {}
|
|
|
|
task_id_raw = body.get("task_id")
|
|
subtask_id_raw = body.get("sub_task_id")
|
|
|
|
task_id = UUID(task_id_raw) if isinstance(task_id_raw, str) else None
|
|
subtask_id = (
|
|
UUID(subtask_id_raw) if isinstance(subtask_id_raw, str) else None
|
|
)
|
|
|
|
if not task_id or not subtask_id:
|
|
raise RuntimeError("task_id or sub_task_id missing")
|
|
|
|
# -------------------------------
|
|
# Mark in progress
|
|
# -------------------------------
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="in progress",
|
|
)
|
|
|
|
try:
|
|
# Pass the parsed body into your function
|
|
result = func(body, context, *args, **kwargs)
|
|
|
|
# -------------------------------
|
|
# Success → mark complete
|
|
# -------------------------------
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="complete",
|
|
outputs={"result": result} if result else None,
|
|
)
|
|
|
|
except Exception as e:
|
|
|
|
# -------------------------------
|
|
# Failure → mark failed
|
|
# -------------------------------
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="failed",
|
|
outputs={"error": str(e)},
|
|
)
|
|
|
|
raise
|
|
|
|
return None
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def task_handler():
|
|
"""
|
|
Decorator that wraps a Lambda handler and automatically:
|
|
|
|
- Parses body from the first SQS record (or uses the event dict directly)
|
|
- Creates a fresh Task + SubTask in the database
|
|
- Marks the subtask as in progress
|
|
- Executes the handler, passing the parsed body
|
|
- Marks complete on success, failed on exception (and re-raises)
|
|
"""
|
|
|
|
def decorator(func: Callable[..., Any]):
|
|
|
|
task_source = f"{func.__module__}.{func.__qualname__}"
|
|
|
|
@wraps(func)
|
|
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
|
|
|
|
logger = setup_logger()
|
|
|
|
# Parse body: Records-style SQS or plain dict event
|
|
if "Records" in event:
|
|
raw_body = event["Records"][0].get("body", {})
|
|
if isinstance(raw_body, str):
|
|
try:
|
|
body = json.loads(raw_body)
|
|
except Exception:
|
|
body = {}
|
|
else:
|
|
body = raw_body or {}
|
|
else:
|
|
body = event
|
|
|
|
# Create fresh task + subtask
|
|
logger.info("Creating task for source: %s", task_source)
|
|
task_id, subtask_id = TasksInterface.create_task(
|
|
task_source=task_source,
|
|
inputs=body,
|
|
)
|
|
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
|
|
|
|
interface = SubTaskInterface()
|
|
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="in progress",
|
|
)
|
|
|
|
try:
|
|
result = func(body, context, *args, **kwargs)
|
|
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="complete",
|
|
outputs={"result": result} if result else None,
|
|
)
|
|
logger.info("Task %s completed successfully", task_id)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.exception("Task %s failed: %s", task_id, e)
|
|
interface.update_subtask_status(
|
|
subtask_id=subtask_id,
|
|
status="failed",
|
|
outputs={"error": str(e)},
|
|
)
|
|
raise
|
|
|
|
return wrapper
|
|
|
|
return decorator
|