Merge pull request #977 from Hestia-Homes/bug/handler-tasks

Ensure all events in task handler are processed
This commit is contained in:
Daniel Roth 2026-04-10 17:46:55 +01:00 committed by GitHub
commit b0a0cfb56c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -113,12 +113,17 @@ def task_handler():
@wraps(func)
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
logger = setup_logger()
# Parse body: Records-style SQS or plain dict event
if "Records" in event:
raw_body = event["Records"][0].get("body", {})
records = event.get("Records", [event]) # fallback for non-SQS
results = []
failures = []
for record in records:
# Parse body
raw_body = record.get("body", record)
if isinstance(raw_body, str):
try:
body = json.loads(raw_body)
@ -126,43 +131,55 @@ def task_handler():
body = {}
else:
body = raw_body or {}
else:
body = event
# Create fresh task + subtask
logger.info("Creating task for source: %s", task_source)
task_id, subtask_id = TasksInterface.create_task(
task_source=task_source,
inputs=body,
)
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
# Create task per message
logger.info("Creating task for source: %s", task_source)
task_id, subtask_id = TasksInterface.create_task(
task_source=task_source,
inputs=body,
)
interface = SubTaskInterface()
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
interface.update_subtask_status(
subtask_id=subtask_id,
status="in progress",
)
try:
result = func(body, context, *args, **kwargs)
interface = SubTaskInterface()
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
status="in progress",
)
logger.info("Task %s completed successfully", task_id)
return result
except Exception as e:
logger.exception("Task %s failed: %s", task_id, e)
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
raise
try:
result = func(body, context, *args, **kwargs)
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
)
logger.info("Task %s completed successfully", task_id)
results.append(result)
except Exception as e:
logger.exception("Task %s failed: %s", task_id, e)
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
if "Records" in event:
failures.append({"itemIdentifier": record["messageId"]})
else:
# Handle non-SQS events
raise
if "Records" in event:
return {"batchItemFailures": failures}
# Handle non-SQS events
return results
return wrapper