pass task orchestrator to handler

This commit is contained in:
Daniel Roth 2026-06-10 09:29:46 +00:00
parent bb1c8c88ce
commit 2c605f80ca
2 changed files with 5 additions and 5 deletions

View file

@ -16,7 +16,7 @@ from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
@subtask_handler()
@subtask_handler(task_orchestrator=False)
def handler(body: dict[str, Any], context: Any) -> None:
trigger = AuditGeneratorTriggerRequest.model_validate(body)

View file

@ -26,6 +26,7 @@ OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]]
def subtask_handler(
*,
orchestrator_cm: Optional[OrchestratorCM] = None,
task_orchestrator: Optional[bool] = True,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Run the wrapped function as the body of an existing SubTask.
@ -38,7 +39,7 @@ def subtask_handler(
factory = orchestrator_cm or default_orchestrator
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
_wants_orchestrator = len(inspect.signature(func).parameters) >= 3
_wants_orchestrator = task_orchestrator
@wraps(func)
def wrapper(event: dict[str, Any], context: Any) -> None:
@ -48,6 +49,7 @@ def subtask_handler(
body = _parse_body(record)
trigger = SubtaskTriggerBody.model_validate(body)
logger.info("Running subtask %s", trigger.sub_task_id)
def _work_with(
_body: dict[str, Any] = body,
_o: TaskOrchestrator = orchestrator,
@ -67,9 +69,7 @@ def subtask_handler(
cloud_logs_url=cloud_logs_url,
)
except Exception:
logger.exception(
"Subtask %s failed", trigger.sub_task_id
)
logger.exception("Subtask %s failed", trigger.sub_task_id)
raise
logger.info("Subtask %s completed", trigger.sub_task_id)