mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
ensure all events in task handler are processed, plus error handling
This commit is contained in:
parent
4ea7ef0e80
commit
58a5c9f5d0
1 changed files with 50 additions and 33 deletions
|
|
@ -113,12 +113,17 @@ def task_handler():
|
|||
|
||||
@wraps(func)
|
||||
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# Parse body: Records-style SQS or plain dict event
|
||||
if "Records" in event:
|
||||
raw_body = event["Records"][0].get("body", {})
|
||||
records = event.get("Records", [event]) # fallback for non-SQS
|
||||
|
||||
results = []
|
||||
failures = []
|
||||
|
||||
for record in records:
|
||||
# Parse body
|
||||
raw_body = record.get("body", record)
|
||||
|
||||
if isinstance(raw_body, str):
|
||||
try:
|
||||
body = json.loads(raw_body)
|
||||
|
|
@ -126,43 +131,55 @@ def task_handler():
|
|||
body = {}
|
||||
else:
|
||||
body = raw_body or {}
|
||||
else:
|
||||
body = event
|
||||
|
||||
# Create fresh task + subtask
|
||||
logger.info("Creating task for source: %s", task_source)
|
||||
task_id, subtask_id = TasksInterface.create_task(
|
||||
task_source=task_source,
|
||||
inputs=body,
|
||||
)
|
||||
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
|
||||
# Create task per message
|
||||
logger.info("Creating task for source: %s", task_source)
|
||||
task_id, subtask_id = TasksInterface.create_task(
|
||||
task_source=task_source,
|
||||
inputs=body,
|
||||
)
|
||||
|
||||
interface = SubTaskInterface()
|
||||
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
|
||||
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="in progress",
|
||||
)
|
||||
|
||||
try:
|
||||
result = func(body, context, *args, **kwargs)
|
||||
interface = SubTaskInterface()
|
||||
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="complete",
|
||||
outputs={"result": result} if result else None,
|
||||
status="in progress",
|
||||
)
|
||||
logger.info("Task %s completed successfully", task_id)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Task %s failed: %s", task_id, e)
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="failed",
|
||||
outputs={"error": str(e)},
|
||||
)
|
||||
raise
|
||||
try:
|
||||
result = func(body, context, *args, **kwargs)
|
||||
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="complete",
|
||||
outputs={"result": result} if result else None,
|
||||
)
|
||||
|
||||
logger.info("Task %s completed successfully", task_id)
|
||||
results.append(result)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Task %s failed: %s", task_id, e)
|
||||
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="failed",
|
||||
outputs={"error": str(e)},
|
||||
)
|
||||
|
||||
if "Records" in event:
|
||||
failures.append({"itemIdentifier": record["messageId"]})
|
||||
else:
|
||||
# Handle non-SQS events
|
||||
raise
|
||||
|
||||
if "Records" in event:
|
||||
return {"batchItemFailures": failures}
|
||||
|
||||
# Handle non-SQS events
|
||||
return results
|
||||
|
||||
return wrapper
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue