From 0a0444821756543f57625832681e28a09d264056 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 19 May 2026 17:46:12 +0000 Subject: [PATCH] applications/postcode_splitter: PostcodeSplitterOrchestrator + Lambda entrypoint slice Wires slice 1-5 primitives into a deployable splitter: - orchestration/postcode_splitter_orchestrator.py: PostcodeSplitterOrchestrator loads addresses via UserAddressRepository, groups by postcode via iter_postcode_grouped_batches, persists each batch under ara_postcode_splitter_batches/{task_id}/{subtask_id}/, creates a WAITING child SubTask, and publishes an address2UPRN SQS message per batch. - applications/postcode_splitter/: Lambda entrypoint. handler.py is decorated with @subtask_handler() so the parent SubTask lifecycle is decorator-owned; PostcodeSplitterTriggerBody validates the body. Dockerfile is the python:3.11 Lambda base with the DDD-shaped source layers and no pandas. - tests/orchestration/test_postcode_splitter_orchestrator.py: integration test using moto S3 + moto SQS + in-memory SQLite that exercises the full wiring against a fixture CSV spanning three postcode groups (one oversize) and asserts child count, persisted inputs, queue bodies, and dispatch order. backend/postcode_splitter/ and .github/workflows/deploy_terraform.yml are intentionally unchanged: the dockerfile_path flip is deferred until the companion backend/address2UPRN/ migration is also ready. --- applications/__init__.py | 0 applications/postcode_splitter/Dockerfile | 21 ++ applications/postcode_splitter/__init__.py | 0 applications/postcode_splitter/handler.py | 70 ++++ .../postcode_splitter_trigger_body.py | 32 ++ .../postcode_splitter/requirements.txt | 4 + .../postcode_splitter_orchestrator.py | 89 ++++++ .../test_postcode_splitter_orchestrator.py | 298 ++++++++++++++++++ 8 files changed, 514 insertions(+) create mode 100644 applications/__init__.py create mode 100644 applications/postcode_splitter/Dockerfile create mode 100644 applications/postcode_splitter/__init__.py create mode 100644 applications/postcode_splitter/handler.py create mode 100644 applications/postcode_splitter/postcode_splitter_trigger_body.py create mode 100644 applications/postcode_splitter/requirements.txt create mode 100644 orchestration/postcode_splitter_orchestrator.py create mode 100644 tests/orchestration/test_postcode_splitter_orchestrator.py diff --git a/applications/__init__.py b/applications/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/applications/postcode_splitter/Dockerfile b/applications/postcode_splitter/Dockerfile new file mode 100644 index 00000000..578ee7a7 --- /dev/null +++ b/applications/postcode_splitter/Dockerfile @@ -0,0 +1,21 @@ +FROM public.ecr.aws/lambda/python:3.11 + +WORKDIR /var/task + +COPY applications/postcode_splitter/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the layered source the handler imports from. The new splitter pulls +# only DDD-shaped packages — no pandas, no legacy backend/. +COPY domain/ domain/ +COPY infrastructure/ infrastructure/ +COPY orchestration/ orchestration/ +COPY repositories/ repositories/ +COPY utilities/ utilities/ +COPY applications/ applications/ + +# Place the handler at the Lambda task root so the runtime can resolve +# ``main.handler`` without an extra package prefix. +COPY applications/postcode_splitter/handler.py /var/task/main.py + +CMD ["main.handler"] diff --git a/applications/postcode_splitter/__init__.py b/applications/postcode_splitter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/applications/postcode_splitter/handler.py b/applications/postcode_splitter/handler.py new file mode 100644 index 00000000..005227a9 --- /dev/null +++ b/applications/postcode_splitter/handler.py @@ -0,0 +1,70 @@ +"""Lambda entrypoint for the postcode splitter slice. + +The :func:`handler` function is decorated with ``@subtask_handler()`` so the +decorator owns the parent ``SubTask`` lifecycle (start/complete/fail) and +injects the decorator-owned :class:`TaskOrchestrator` as the third positional +argument. The handler itself does only two things: + +1. Build a :class:`PostcodeSplitterOrchestrator` from env-driven config. +2. Delegate to ``split_and_dispatch`` and return its result so it lands in + ``SubTask.outputs["result"]``. +""" + +from __future__ import annotations + +import os +from typing import Any + +import boto3 + +from applications.postcode_splitter.postcode_splitter_trigger_body import ( + PostcodeSplitterTriggerBody, +) +from infrastructure.address2uprn_queue_client import Address2UprnQueueClient +from infrastructure.csv_s3_client import CsvS3Client +from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchestrator +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.user_address.user_address_csv_s3_repository import ( + UserAddressCsvS3Repository, +) +from utilities.aws_lambda.subtask_handler import subtask_handler + + +@subtask_handler() +def handler( + body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator +) -> dict[str, list[str]]: + """Validate the trigger body, build the splitter, dispatch children. + + Reads ``S3_BUCKET_NAME`` and ``ADDRESS2UPRN_QUEUE_URL`` from the + environment to construct the typed S3/SQS clients. The return value + lands in ``SubTask.outputs["result"]`` via the decorator. + """ + trigger = PostcodeSplitterTriggerBody.model_validate(body) + + bucket = os.environ["S3_BUCKET_NAME"] + queue_url = os.environ["ADDRESS2UPRN_QUEUE_URL"] + + # boto3.client is overloaded per-service in the installed stubs; cast + # to Any so the strict-mode checker treats it as opaque. + boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + boto_s3: Any = boto3_client("s3") + boto_sqs: Any = boto3_client("sqs") + + csv_client = CsvS3Client(boto_s3, bucket) + user_address_repo = UserAddressCsvS3Repository(csv_client, bucket) + queue_client = Address2UprnQueueClient(boto_sqs, queue_url) + + splitter = PostcodeSplitterOrchestrator( + task_orchestrator=task_orchestrator, + user_address_repo=user_address_repo, + queue_client=queue_client, + ) + + child_ids = splitter.split_and_dispatch( + parent_task_id=trigger.task_id, + parent_subtask_id=trigger.sub_task_id, + input_s3_uri=trigger.s3_uri, + ) + + return {"child_subtask_ids": [str(cid) for cid in child_ids]} diff --git a/applications/postcode_splitter/postcode_splitter_trigger_body.py b/applications/postcode_splitter/postcode_splitter_trigger_body.py new file mode 100644 index 00000000..bc983abc --- /dev/null +++ b/applications/postcode_splitter/postcode_splitter_trigger_body.py @@ -0,0 +1,32 @@ +"""Trigger payload model for the postcode splitter Lambda. + +The decorator (``@subtask_handler``) already validates ``task_id`` and +``sub_task_id`` via :class:`SubtaskTriggerBody`; this model layers on the +splitter-specific ``s3_uri`` field while keeping ``extra="allow"`` so any +upstream-passthrough keys (e.g. ``portfolio_id``) survive untouched. +""" + +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class PostcodeSplitterTriggerBody(BaseModel): + """Validated body for the postcode splitter Lambda. + + Attributes: + task_id: Parent ``Task`` id; used as the ``task_id`` input on each + child ``SubTask`` and as the ``parent_task_id`` on the fan-out + SQS messages. + sub_task_id: The splitter's own ``SubTask`` id; used as the path + segment under ``ara_postcode_splitter_batches/{task_id}/{...}`` + so per-invocation outputs cannot collide. + s3_uri: ``s3://bucket/key`` URI of the uploaded address CSV the + splitter must read. + """ + + model_config = ConfigDict(extra="allow") + + task_id: UUID + sub_task_id: UUID + s3_uri: str diff --git a/applications/postcode_splitter/requirements.txt b/applications/postcode_splitter/requirements.txt new file mode 100644 index 00000000..6a85a255 --- /dev/null +++ b/applications/postcode_splitter/requirements.txt @@ -0,0 +1,4 @@ +boto3 +pydantic +sqlmodel +psycopg2-binary diff --git a/orchestration/postcode_splitter_orchestrator.py b/orchestration/postcode_splitter_orchestrator.py new file mode 100644 index 00000000..6afa2538 --- /dev/null +++ b/orchestration/postcode_splitter_orchestrator.py @@ -0,0 +1,89 @@ +"""Use-case orchestrator for the postcode splitter Lambda. + +Wires the slice-1 domain (``iter_postcode_grouped_batches``), the slice-3 +``UserAddressRepository``, the slice-2 ``Address2UprnQueueClient``, and the +slice-4 ``TaskOrchestrator.create_child_subtask`` primitive together. + +``split_and_dispatch`` loads the input batch, groups it into per-postcode +chunks, writes each chunk back to S3 under a deterministic prefix, creates a +WAITING child ``SubTask`` for it, and publishes the address-to-UPRN fan-out +message that downstream consumers pick up. +""" + +from __future__ import annotations + +from uuid import UUID + +from infrastructure.address2uprn_queue_client import Address2UprnQueueClient +from orchestration.task_orchestrator import TaskOrchestrator +from domain.addresses.postcode_batching import iter_postcode_grouped_batches +from repositories.user_address.user_address_repository import UserAddressRepository + + +class PostcodeSplitterOrchestrator: + """Split an uploaded address batch into postcode-grouped child SubTasks. + + The orchestrator owns the algorithm; the IO collaborators + (:class:`UserAddressRepository`, :class:`Address2UprnQueueClient`) and + the :class:`TaskOrchestrator` lifecycle primitive are injected so the + same wiring can be exercised against moto/SQLite in tests and against + real AWS in the Lambda entrypoint. + """ + + def __init__( + self, + task_orchestrator: TaskOrchestrator, + user_address_repo: UserAddressRepository, + queue_client: Address2UprnQueueClient, + max_batch_size: int = 500, + ) -> None: + self._task_orchestrator = task_orchestrator + self._user_address_repo = user_address_repo + self._queue_client = queue_client + self._max_batch_size = max_batch_size + + def split_and_dispatch( + self, + *, + parent_task_id: UUID, + parent_subtask_id: UUID, + input_s3_uri: str, + ) -> list[UUID]: + """Split ``input_s3_uri`` into postcode batches and dispatch each. + + For each yielded batch: + + 1. Persist it under + ``ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}``. + 2. Create a WAITING child ``SubTask`` with + ``inputs={"task_id": str(parent_task_id), "s3_uri": batch_uri}``. + 3. Publish an ``address2UPRN`` SQS message referencing the new child. + + Returns: + The list of child ``SubTask`` ids, in dispatch order. + """ + addresses = self._user_address_repo.load_batch(input_s3_uri) + path_prefix = ( + f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}" + ) + + child_ids: list[UUID] = [] + for batch in iter_postcode_grouped_batches( + addresses, max_batch_size=self._max_batch_size + ): + batch_uri = self._user_address_repo.save_batch(batch, path_prefix) + child = self._task_orchestrator.create_child_subtask( + parent_task_id, + inputs={ + "task_id": str(parent_task_id), + "s3_uri": batch_uri, + }, + ) + self._queue_client.publish( + parent_task_id=parent_task_id, + child_subtask_id=child.id, + s3_uri=batch_uri, + ) + child_ids.append(child.id) + + return child_ids diff --git a/tests/orchestration/test_postcode_splitter_orchestrator.py b/tests/orchestration/test_postcode_splitter_orchestrator.py new file mode 100644 index 00000000..57bd2133 --- /dev/null +++ b/tests/orchestration/test_postcode_splitter_orchestrator.py @@ -0,0 +1,298 @@ +"""Integration test: PostcodeSplitterOrchestrator wired end-to-end. + +Combines moto S3 + moto SQS + an in-memory SQLite session for the +``TaskOrchestrator`` so the full slice-6 wiring is exercised through real +infrastructure adapters (not mocks). The fixture CSV spans three postcodes +with one oversize group, which forces both the buffer-flush-then-oversize +branch and the final-flush branch of +``iter_postcode_grouped_batches`` — three batches in total. +""" + +from __future__ import annotations + +import json +import os +from collections.abc import Iterator +from dataclasses import dataclass +from typing import Any, cast + +import boto3 +import pytest +from moto import mock_aws +from sqlmodel import Session, SQLModel, create_engine + +from infrastructure.address2uprn_queue_client import Address2UprnQueueClient +from infrastructure.csv_s3_client import CsvS3Client +from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchestrator +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository +from repositories.tasks.task_postgres_repository import TaskPostgresRepository +from repositories.user_address.user_address_csv_s3_repository import ( + UserAddressCsvS3Repository, +) + +BUCKET = "splitter-bucket" +REGION = "us-east-1" + + +def _make_boto_client(service_name: str) -> Any: + factory: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + return factory(service_name, region_name=REGION) + + +@pytest.fixture(autouse=True) +def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction] + keys = ( + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + "AWS_DEFAULT_REGION", + ) + prev: dict[str, Any] = {k: os.environ.get(k) for k in keys} + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = REGION + try: + yield + finally: + for k, v in prev.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v + + +@dataclass +class Harness: + splitter: PostcodeSplitterOrchestrator + task_orchestrator: TaskOrchestrator + subtasks: SubTaskPostgresRepository + csv_client: CsvS3Client + boto_sqs: Any + queue_url: str + repo: UserAddressCsvS3Repository + + +@pytest.fixture +def harness() -> Iterator[Harness]: + with mock_aws(): + # Infra: S3 + SQS + boto_s3 = _make_boto_client("s3") + boto_s3.create_bucket(Bucket=BUCKET) + boto_sqs = _make_boto_client("sqs") + queue: dict[str, Any] = boto_sqs.create_queue(QueueName="address2uprn-queue") + queue_url = cast(str, queue["QueueUrl"]) + + csv_client = CsvS3Client(boto_s3, BUCKET) + repo = UserAddressCsvS3Repository(csv_client, BUCKET) + queue_client = Address2UprnQueueClient(boto_sqs, queue_url) + + # DB: in-memory SQLite TaskOrchestrator + engine = create_engine("sqlite://") + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + task_repo = TaskPostgresRepository(session=session) + subtask_repo = SubTaskPostgresRepository(session=session) + task_orchestrator = TaskOrchestrator( + task_repo=task_repo, subtask_repo=subtask_repo + ) + + splitter = PostcodeSplitterOrchestrator( + task_orchestrator=task_orchestrator, + user_address_repo=repo, + queue_client=queue_client, + max_batch_size=3, + ) + + yield Harness( + splitter=splitter, + task_orchestrator=task_orchestrator, + subtasks=subtask_repo, + csv_client=csv_client, + boto_sqs=boto_sqs, + queue_url=queue_url, + repo=repo, + ) + + +def _upload_fixture_csv(csv_client: CsvS3Client) -> str: + # Three postcode groups: + # AA1 1AA × 2 (within cap) + # BB2 2BB × 4 (oversize: > max_batch_size=3) + # CC3 3CC × 1 (final flush) + # Expected batching with cap=3 and the algorithm in + # ``iter_postcode_grouped_batches``: + # batch 1: [AA1 1AA × 2] (flushed because oversize follows) + # batch 2: [BB2 2BB × 4] (oversize own batch) + # batch 3: [CC3 3CC × 1] (final flush) + rows: list[dict[str, str]] = [] + rows.extend( + { + "Address 1": f"{i} High St", + "Address 2": "", + "Address 3": "", + "Postcode": "AA1 1AA", + "Internal Reference": f"AA-{i}", + } + for i in range(1, 3) + ) + rows.extend( + { + "Address 1": f"{i} Long Road", + "Address 2": "", + "Address 3": "", + "Postcode": "BB2 2BB", + "Internal Reference": f"BB-{i}", + } + for i in range(1, 5) + ) + rows.append( + { + "Address 1": "1 Final Way", + "Address 2": "", + "Address 3": "", + "Postcode": "CC3 3CC", + "Internal Reference": "CC-1", + } + ) + return csv_client.save_rows(rows, "uploads/input.csv") + + +def _drain_queue(boto_sqs: Any, queue_url: str) -> list[dict[str, Any]]: + bodies: list[dict[str, Any]] = [] + while True: + received: dict[str, Any] = boto_sqs.receive_message( + QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=0 + ) + messages = cast(list[dict[str, Any]], received.get("Messages", [])) + if not messages: + break + for message in messages: + bodies.append(cast(dict[str, Any], json.loads(message["Body"]))) + boto_sqs.delete_message( + QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"] + ) + return bodies + + +def test_split_and_dispatch_creates_three_children_for_fixture( + harness: Harness, +) -> None: + parent_task, parent_subtask = ( + harness.task_orchestrator.create_task_with_subtask( + task_source="manual:postcode-splitter-int" + ) + ) + input_uri = _upload_fixture_csv(harness.csv_client) + + child_ids = harness.splitter.split_and_dispatch( + parent_task_id=parent_task.id, + parent_subtask_id=parent_subtask.id, + input_s3_uri=input_uri, + ) + + assert len(child_ids) == 3 + # All child ids are unique and persisted as WAITING children of the + # parent task. + assert len(set(child_ids)) == 3 + for cid in child_ids: + child = harness.subtasks.get(cid) + assert child.task_id == parent_task.id + + +def test_split_and_dispatch_persists_child_inputs_with_task_id_and_s3_uri( + harness: Harness, +) -> None: + parent_task, parent_subtask = ( + harness.task_orchestrator.create_task_with_subtask( + task_source="manual:postcode-splitter-int" + ) + ) + input_uri = _upload_fixture_csv(harness.csv_client) + + child_ids = harness.splitter.split_and_dispatch( + parent_task_id=parent_task.id, + parent_subtask_id=parent_subtask.id, + input_s3_uri=input_uri, + ) + + for cid in child_ids: + child = harness.subtasks.get(cid) + assert child.inputs is not None + assert child.inputs["task_id"] == str(parent_task.id) + batch_uri = child.inputs["s3_uri"] + assert isinstance(batch_uri, str) + prefix = ( + f"s3://{BUCKET}/ara_postcode_splitter_batches/" + f"{parent_task.id}/{parent_subtask.id}/" + ) + assert batch_uri.startswith(prefix) + assert batch_uri.endswith(".csv") + + +def test_split_and_dispatch_publishes_one_message_per_child_with_matching_ids( + harness: Harness, +) -> None: + parent_task, parent_subtask = ( + harness.task_orchestrator.create_task_with_subtask( + task_source="manual:postcode-splitter-int" + ) + ) + input_uri = _upload_fixture_csv(harness.csv_client) + + child_ids = harness.splitter.split_and_dispatch( + parent_task_id=parent_task.id, + parent_subtask_id=parent_subtask.id, + input_s3_uri=input_uri, + ) + + bodies = _drain_queue(harness.boto_sqs, harness.queue_url) + assert len(bodies) == len(child_ids) + + # Match queue messages against persisted child inputs by child_subtask_id; + # the message body's task_id/s3_uri must agree with the SubTask inputs. + bodies_by_child = {body["sub_task_id"]: body for body in bodies} + assert set(bodies_by_child.keys()) == {str(cid) for cid in child_ids} + for cid in child_ids: + child = harness.subtasks.get(cid) + body = bodies_by_child[str(cid)] + assert child.inputs is not None + assert body == { + "task_id": str(parent_task.id), + "sub_task_id": str(cid), + "s3_uri": child.inputs["s3_uri"], + } + + +def test_split_and_dispatch_returns_child_ids_in_dispatch_order( + harness: Harness, +) -> None: + parent_task, parent_subtask = ( + harness.task_orchestrator.create_task_with_subtask( + task_source="manual:postcode-splitter-int" + ) + ) + input_uri = _upload_fixture_csv(harness.csv_client) + + child_ids = harness.splitter.split_and_dispatch( + parent_task_id=parent_task.id, + parent_subtask_id=parent_subtask.id, + input_s3_uri=input_uri, + ) + + # Re-load each child's saved batch and inspect the postcode column to + # confirm the dispatch order matches the postcode-batching algorithm: + # AA-batch first, BB oversize batch second, CC final-flush third. + postcodes_per_batch: list[set[str]] = [] + for cid in child_ids: + child = harness.subtasks.get(cid) + assert child.inputs is not None + rows = harness.csv_client.read_rows(child.inputs["s3_uri"]) + postcodes_per_batch.append({row["postcode"] for row in rows}) + + assert postcodes_per_batch == [ + {"AA11AA"}, + {"BB22BB"}, + {"CC33CC"}, + ]