From 51cf54577620a7ad5e0022dd840abc7ec9c29a00 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Wed, 10 Jun 2026 08:22:43 +0000 Subject: [PATCH 1/5] =?UTF-8?q?Two-parameter=20subtask=20handler=20complet?= =?UTF-8?q?es=20without=20TypeError=20=F0=9F=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- applications/audit_generator/handler.py | 5 ++++- pytest.ini | 1 + .../aws_lambda/test_subtask_handler.py | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/applications/audit_generator/handler.py b/applications/audit_generator/handler.py index 533f5d91..76f53538 100644 --- a/applications/audit_generator/handler.py +++ b/applications/audit_generator/handler.py @@ -15,11 +15,14 @@ from orchestration.audit_generator_orchestrator import AuditGeneratorOrchestrato from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler + @subtask_handler() def handler(body: dict[str, Any], context: Any) -> None: trigger = AuditGeneratorTriggerRequest.model_validate(body) - boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + boto3_client: Any = ( + boto3.client + ) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] boto_s3: Any = boto3_client("s3") bucket = os.environ["S3_BUCKET_NAME"] s3_client = S3Client(boto_s3_client=boto_s3, bucket=bucket) diff --git a/pytest.ini b/pytest.ini index 2bcd6178..cb6af047 100644 --- a/pytest.ini +++ b/pytest.ini @@ -25,5 +25,6 @@ testpaths = etl/epc_clean/tests etl/hubspot/tests etl/spatial/tests + tests/ markers = integration: mark a test as an integration test diff --git a/tests/utilities/aws_lambda/test_subtask_handler.py b/tests/utilities/aws_lambda/test_subtask_handler.py index d671adc4..3b33d7fd 100644 --- a/tests/utilities/aws_lambda/test_subtask_handler.py +++ b/tests/utilities/aws_lambda/test_subtask_handler.py @@ -228,6 +228,25 @@ def test_subtask_handler_records_cloudwatch_url_on_subtask( assert "$255B$2524LATEST$255D" in saved_url +def test_subtask_handler_completes_subtask_without_orchestrator_parameter( + harness: Harness, +) -> None: + # arrange + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + + @subtask_handler(orchestrator_cm=harness.factory) + def handler(body: dict[str, Any], context: Any) -> None: + return None + + # act + handler(_direct_event(task.id, subtask.id), context=None) + + # assert + assert harness.subtasks.get(subtask.id).status is SubTaskStatus.COMPLETE + + def test_subtask_handler_leaves_cloudwatch_url_unset_outside_lambda( harness: Harness, monkeypatch: pytest.MonkeyPatch ) -> None: From bb1c8c88cea900da353c164bfb447b2b7d3808d2 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Wed, 10 Jun 2026 08:40:32 +0000 Subject: [PATCH 2/5] =?UTF-8?q?Two-parameter=20subtask=20handler=20complet?= =?UTF-8?q?es=20without=20TypeError=20=F0=9F=9F=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utilities/aws_lambda/subtask_handler.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/utilities/aws_lambda/subtask_handler.py b/utilities/aws_lambda/subtask_handler.py index 592ffebf..42c9e07a 100644 --- a/utilities/aws_lambda/subtask_handler.py +++ b/utilities/aws_lambda/subtask_handler.py @@ -4,6 +4,7 @@ Translates an AWS Lambda invocation (SQS-shaped or direct) into TaskOrchestrator.run_subtask(...) calls. """ +import inspect import json import logging import os @@ -37,6 +38,8 @@ def subtask_handler( factory = orchestrator_cm or default_orchestrator def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + _wants_orchestrator = len(inspect.signature(func).parameters) >= 3 + @wraps(func) def wrapper(event: dict[str, Any], context: Any) -> None: cloud_logs_url = _cloudwatch_url() @@ -45,12 +48,22 @@ def subtask_handler( body = _parse_body(record) trigger = SubtaskTriggerBody.model_validate(body) logger.info("Running subtask %s", trigger.sub_task_id) + def _work_with( + _body: dict[str, Any] = body, + _o: TaskOrchestrator = orchestrator, + ) -> Any: + return func(_body, context, _o) + + def _work_without(_body: dict[str, Any] = body) -> Any: + return func(_body, context) + + work: Callable[[], Any] = ( + _work_with if _wants_orchestrator else _work_without + ) try: orchestrator.run_subtask( trigger.sub_task_id, - work=lambda body=body, o=orchestrator: func( - body, context, o - ), + work=work, cloud_logs_url=cloud_logs_url, ) except Exception: From 2c605f80ca7885c3af04497ff9001918039de64e Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Wed, 10 Jun 2026 09:29:46 +0000 Subject: [PATCH 3/5] pass task orchestrator to handler --- applications/audit_generator/handler.py | 2 +- utilities/aws_lambda/subtask_handler.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/applications/audit_generator/handler.py b/applications/audit_generator/handler.py index 76f53538..72358143 100644 --- a/applications/audit_generator/handler.py +++ b/applications/audit_generator/handler.py @@ -16,7 +16,7 @@ from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler -@subtask_handler() +@subtask_handler(task_orchestrator=False) def handler(body: dict[str, Any], context: Any) -> None: trigger = AuditGeneratorTriggerRequest.model_validate(body) diff --git a/utilities/aws_lambda/subtask_handler.py b/utilities/aws_lambda/subtask_handler.py index 42c9e07a..c53b59ca 100644 --- a/utilities/aws_lambda/subtask_handler.py +++ b/utilities/aws_lambda/subtask_handler.py @@ -26,6 +26,7 @@ OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]] def subtask_handler( *, orchestrator_cm: Optional[OrchestratorCM] = None, + task_orchestrator: Optional[bool] = True, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Run the wrapped function as the body of an existing SubTask. @@ -38,7 +39,7 @@ def subtask_handler( factory = orchestrator_cm or default_orchestrator def decorator(func: Callable[..., Any]) -> Callable[..., Any]: - _wants_orchestrator = len(inspect.signature(func).parameters) >= 3 + _wants_orchestrator = task_orchestrator @wraps(func) def wrapper(event: dict[str, Any], context: Any) -> None: @@ -48,6 +49,7 @@ def subtask_handler( body = _parse_body(record) trigger = SubtaskTriggerBody.model_validate(body) logger.info("Running subtask %s", trigger.sub_task_id) + def _work_with( _body: dict[str, Any] = body, _o: TaskOrchestrator = orchestrator, @@ -67,9 +69,7 @@ def subtask_handler( cloud_logs_url=cloud_logs_url, ) except Exception: - logger.exception( - "Subtask %s failed", trigger.sub_task_id - ) + logger.exception("Subtask %s failed", trigger.sub_task_id) raise logger.info("Subtask %s completed", trigger.sub_task_id) From 8ff58bd645b0ff8b983341461f9db17ca5882ddf Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Wed, 10 Jun 2026 12:43:24 +0000 Subject: [PATCH 4/5] tell subtask_handler whether to send TaskOrchestrator to handler, defaulting to True --- applications/audit_generator/handler.py | 2 +- pytest.ini | 1 - tests/utilities/aws_lambda/test_subtask_handler.py | 14 ++++++++++---- utilities/aws_lambda/subtask_handler.py | 5 ++--- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/applications/audit_generator/handler.py b/applications/audit_generator/handler.py index 72358143..3bbed734 100644 --- a/applications/audit_generator/handler.py +++ b/applications/audit_generator/handler.py @@ -16,7 +16,7 @@ from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler -@subtask_handler(task_orchestrator=False) +@subtask_handler(pass_task_orchestrator=False) def handler(body: dict[str, Any], context: Any) -> None: trigger = AuditGeneratorTriggerRequest.model_validate(body) diff --git a/pytest.ini b/pytest.ini index cb6af047..2bcd6178 100644 --- a/pytest.ini +++ b/pytest.ini @@ -25,6 +25,5 @@ testpaths = etl/epc_clean/tests etl/hubspot/tests etl/spatial/tests - tests/ markers = integration: mark a test as an integration test diff --git a/tests/utilities/aws_lambda/test_subtask_handler.py b/tests/utilities/aws_lambda/test_subtask_handler.py index 3b33d7fd..b79de3b3 100644 --- a/tests/utilities/aws_lambda/test_subtask_handler.py +++ b/tests/utilities/aws_lambda/test_subtask_handler.py @@ -236,15 +236,21 @@ def test_subtask_handler_completes_subtask_without_orchestrator_parameter( task_source="manual:test" ) - @subtask_handler(orchestrator_cm=harness.factory) + received: dict[str, Any] = {} + + @subtask_handler(orchestrator_cm=harness.factory, pass_task_orchestrator=False) def handler(body: dict[str, Any], context: Any) -> None: - return None + received["body"] = body + received["context"] = context # act - handler(_direct_event(task.id, subtask.id), context=None) + handler(_direct_event(task.id, subtask.id), context="ctx-sentinel") - # assert + # assert — SubTask lifecycle completes and handler received correct args assert harness.subtasks.get(subtask.id).status is SubTaskStatus.COMPLETE + assert harness.tasks.get(task.id).status is TaskStatus.COMPLETE + assert received["context"] == "ctx-sentinel" + assert received["body"]["sub_task_id"] == str(subtask.id) def test_subtask_handler_leaves_cloudwatch_url_unset_outside_lambda( diff --git a/utilities/aws_lambda/subtask_handler.py b/utilities/aws_lambda/subtask_handler.py index c53b59ca..e5ac086a 100644 --- a/utilities/aws_lambda/subtask_handler.py +++ b/utilities/aws_lambda/subtask_handler.py @@ -4,7 +4,6 @@ Translates an AWS Lambda invocation (SQS-shaped or direct) into TaskOrchestrator.run_subtask(...) calls. """ -import inspect import json import logging import os @@ -26,7 +25,7 @@ OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]] def subtask_handler( *, orchestrator_cm: Optional[OrchestratorCM] = None, - task_orchestrator: Optional[bool] = True, + pass_task_orchestrator: bool = True, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Run the wrapped function as the body of an existing SubTask. @@ -39,7 +38,7 @@ def subtask_handler( factory = orchestrator_cm or default_orchestrator def decorator(func: Callable[..., Any]) -> Callable[..., Any]: - _wants_orchestrator = task_orchestrator + _wants_orchestrator = pass_task_orchestrator @wraps(func) def wrapper(event: dict[str, Any], context: Any) -> None: From 921dc5108b3ae4df1f429ac3753a774e6c9c033b Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Wed, 10 Jun 2026 12:59:06 +0000 Subject: [PATCH 5/5] revert pytest.ini --- pytest.ini | 1 - 1 file changed, 1 deletion(-) diff --git a/pytest.ini b/pytest.ini index cb6af047..2bcd6178 100644 --- a/pytest.ini +++ b/pytest.ini @@ -25,6 +25,5 @@ testpaths = etl/epc_clean/tests etl/hubspot/tests etl/spatial/tests - tests/ markers = integration: mark a test as an integration test