tell subtask_handler whether to send TaskOrchestrator to handler, defaulting to True

This commit is contained in:
Daniel Roth 2026-06-10 12:43:24 +00:00
parent 2c605f80ca
commit 8ff58bd645
4 changed files with 13 additions and 9 deletions

View file

@ -16,7 +16,7 @@ from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
@subtask_handler(task_orchestrator=False)
@subtask_handler(pass_task_orchestrator=False)
def handler(body: dict[str, Any], context: Any) -> None:
trigger = AuditGeneratorTriggerRequest.model_validate(body)

View file

@ -25,6 +25,5 @@ testpaths =
etl/epc_clean/tests
etl/hubspot/tests
etl/spatial/tests
tests/
markers =
integration: mark a test as an integration test

View file

@ -236,15 +236,21 @@ def test_subtask_handler_completes_subtask_without_orchestrator_parameter(
task_source="manual:test"
)
@subtask_handler(orchestrator_cm=harness.factory)
received: dict[str, Any] = {}
@subtask_handler(orchestrator_cm=harness.factory, pass_task_orchestrator=False)
def handler(body: dict[str, Any], context: Any) -> None:
return None
received["body"] = body
received["context"] = context
# act
handler(_direct_event(task.id, subtask.id), context=None)
handler(_direct_event(task.id, subtask.id), context="ctx-sentinel")
# assert
# assert — SubTask lifecycle completes and handler received correct args
assert harness.subtasks.get(subtask.id).status is SubTaskStatus.COMPLETE
assert harness.tasks.get(task.id).status is TaskStatus.COMPLETE
assert received["context"] == "ctx-sentinel"
assert received["body"]["sub_task_id"] == str(subtask.id)
def test_subtask_handler_leaves_cloudwatch_url_unset_outside_lambda(

View file

@ -4,7 +4,6 @@ Translates an AWS Lambda invocation (SQS-shaped or direct) into
TaskOrchestrator.run_subtask(...) calls.
"""
import inspect
import json
import logging
import os
@ -26,7 +25,7 @@ OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]]
def subtask_handler(
*,
orchestrator_cm: Optional[OrchestratorCM] = None,
task_orchestrator: Optional[bool] = True,
pass_task_orchestrator: bool = True,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Run the wrapped function as the body of an existing SubTask.
@ -39,7 +38,7 @@ def subtask_handler(
factory = orchestrator_cm or default_orchestrator
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
_wants_orchestrator = task_orchestrator
_wants_orchestrator = pass_task_orchestrator
@wraps(func)
def wrapper(event: dict[str, Any], context: Any) -> None: