applications/postcode_splitter: PostcodeSplitterOrchestrator + Lambda entrypoint slice

Wires slice 1-5 primitives into a deployable splitter:

- orchestration/postcode_splitter_orchestrator.py: PostcodeSplitterOrchestrator
  loads addresses via UserAddressRepository, groups by postcode via
  iter_postcode_grouped_batches, persists each batch under
  ara_postcode_splitter_batches/{task_id}/{subtask_id}/, creates a WAITING
  child SubTask, and publishes an address2UPRN SQS message per batch.

- applications/postcode_splitter/: Lambda entrypoint. handler.py is decorated
  with @subtask_handler() so the parent SubTask lifecycle is decorator-owned;
  PostcodeSplitterTriggerBody validates the body. Dockerfile is the
  python:3.11 Lambda base with the DDD-shaped source layers and no pandas.

- tests/orchestration/test_postcode_splitter_orchestrator.py: integration
  test using moto S3 + moto SQS + in-memory SQLite that exercises the full
  wiring against a fixture CSV spanning three postcode groups (one
  oversize) and asserts child count, persisted inputs, queue bodies, and
  dispatch order.

backend/postcode_splitter/ and .github/workflows/deploy_terraform.yml are
intentionally unchanged: the dockerfile_path flip is deferred until the
companion backend/address2UPRN/ migration is also ready.
This commit is contained in:
Jun-te Kim 2026-05-19 17:46:12 +00:00
parent 708f1b5d18
commit 0a04448217
8 changed files with 514 additions and 0 deletions

0
applications/__init__.py Normal file
View file

View file

@ -0,0 +1,21 @@
FROM public.ecr.aws/lambda/python:3.11
WORKDIR /var/task
COPY applications/postcode_splitter/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the layered source the handler imports from. The new splitter pulls
# only DDD-shaped packages — no pandas, no legacy backend/.
COPY domain/ domain/
COPY infrastructure/ infrastructure/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY utilities/ utilities/
COPY applications/ applications/
# Place the handler at the Lambda task root so the runtime can resolve
# ``main.handler`` without an extra package prefix.
COPY applications/postcode_splitter/handler.py /var/task/main.py
CMD ["main.handler"]

View file

@ -0,0 +1,70 @@
"""Lambda entrypoint for the postcode splitter slice.
The :func:`handler` function is decorated with ``@subtask_handler()`` so the
decorator owns the parent ``SubTask`` lifecycle (start/complete/fail) and
injects the decorator-owned :class:`TaskOrchestrator` as the third positional
argument. The handler itself does only two things:
1. Build a :class:`PostcodeSplitterOrchestrator` from env-driven config.
2. Delegate to ``split_and_dispatch`` and return its result so it lands in
``SubTask.outputs["result"]``.
"""
from __future__ import annotations
import os
from typing import Any
import boto3
from applications.postcode_splitter.postcode_splitter_trigger_body import (
PostcodeSplitterTriggerBody,
)
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from infrastructure.csv_s3_client import CsvS3Client
from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> dict[str, list[str]]:
"""Validate the trigger body, build the splitter, dispatch children.
Reads ``S3_BUCKET_NAME`` and ``ADDRESS2UPRN_QUEUE_URL`` from the
environment to construct the typed S3/SQS clients. The return value
lands in ``SubTask.outputs["result"]`` via the decorator.
"""
trigger = PostcodeSplitterTriggerBody.model_validate(body)
bucket = os.environ["S3_BUCKET_NAME"]
queue_url = os.environ["ADDRESS2UPRN_QUEUE_URL"]
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
boto_sqs: Any = boto3_client("sqs")
csv_client = CsvS3Client(boto_s3, bucket)
user_address_repo = UserAddressCsvS3Repository(csv_client, bucket)
queue_client = Address2UprnQueueClient(boto_sqs, queue_url)
splitter = PostcodeSplitterOrchestrator(
task_orchestrator=task_orchestrator,
user_address_repo=user_address_repo,
queue_client=queue_client,
)
child_ids = splitter.split_and_dispatch(
parent_task_id=trigger.task_id,
parent_subtask_id=trigger.sub_task_id,
input_s3_uri=trigger.s3_uri,
)
return {"child_subtask_ids": [str(cid) for cid in child_ids]}

View file

@ -0,0 +1,32 @@
"""Trigger payload model for the postcode splitter Lambda.
The decorator (``@subtask_handler``) already validates ``task_id`` and
``sub_task_id`` via :class:`SubtaskTriggerBody`; this model layers on the
splitter-specific ``s3_uri`` field while keeping ``extra="allow"`` so any
upstream-passthrough keys (e.g. ``portfolio_id``) survive untouched.
"""
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class PostcodeSplitterTriggerBody(BaseModel):
"""Validated body for the postcode splitter Lambda.
Attributes:
task_id: Parent ``Task`` id; used as the ``task_id`` input on each
child ``SubTask`` and as the ``parent_task_id`` on the fan-out
SQS messages.
sub_task_id: The splitter's own ``SubTask`` id; used as the path
segment under ``ara_postcode_splitter_batches/{task_id}/{...}``
so per-invocation outputs cannot collide.
s3_uri: ``s3://bucket/key`` URI of the uploaded address CSV the
splitter must read.
"""
model_config = ConfigDict(extra="allow")
task_id: UUID
sub_task_id: UUID
s3_uri: str

View file

@ -0,0 +1,4 @@
boto3
pydantic
sqlmodel
psycopg2-binary

View file

@ -0,0 +1,89 @@
"""Use-case orchestrator for the postcode splitter Lambda.
Wires the slice-1 domain (``iter_postcode_grouped_batches``), the slice-3
``UserAddressRepository``, the slice-2 ``Address2UprnQueueClient``, and the
slice-4 ``TaskOrchestrator.create_child_subtask`` primitive together.
``split_and_dispatch`` loads the input batch, groups it into per-postcode
chunks, writes each chunk back to S3 under a deterministic prefix, creates a
WAITING child ``SubTask`` for it, and publishes the address-to-UPRN fan-out
message that downstream consumers pick up.
"""
from __future__ import annotations
from uuid import UUID
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from orchestration.task_orchestrator import TaskOrchestrator
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
from repositories.user_address.user_address_repository import UserAddressRepository
class PostcodeSplitterOrchestrator:
"""Split an uploaded address batch into postcode-grouped child SubTasks.
The orchestrator owns the algorithm; the IO collaborators
(:class:`UserAddressRepository`, :class:`Address2UprnQueueClient`) and
the :class:`TaskOrchestrator` lifecycle primitive are injected so the
same wiring can be exercised against moto/SQLite in tests and against
real AWS in the Lambda entrypoint.
"""
def __init__(
self,
task_orchestrator: TaskOrchestrator,
user_address_repo: UserAddressRepository,
queue_client: Address2UprnQueueClient,
max_batch_size: int = 500,
) -> None:
self._task_orchestrator = task_orchestrator
self._user_address_repo = user_address_repo
self._queue_client = queue_client
self._max_batch_size = max_batch_size
def split_and_dispatch(
self,
*,
parent_task_id: UUID,
parent_subtask_id: UUID,
input_s3_uri: str,
) -> list[UUID]:
"""Split ``input_s3_uri`` into postcode batches and dispatch each.
For each yielded batch:
1. Persist it under
``ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}``.
2. Create a WAITING child ``SubTask`` with
``inputs={"task_id": str(parent_task_id), "s3_uri": batch_uri}``.
3. Publish an ``address2UPRN`` SQS message referencing the new child.
Returns:
The list of child ``SubTask`` ids, in dispatch order.
"""
addresses = self._user_address_repo.load_batch(input_s3_uri)
path_prefix = (
f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}"
)
child_ids: list[UUID] = []
for batch in iter_postcode_grouped_batches(
addresses, max_batch_size=self._max_batch_size
):
batch_uri = self._user_address_repo.save_batch(batch, path_prefix)
child = self._task_orchestrator.create_child_subtask(
parent_task_id,
inputs={
"task_id": str(parent_task_id),
"s3_uri": batch_uri,
},
)
self._queue_client.publish(
parent_task_id=parent_task_id,
child_subtask_id=child.id,
s3_uri=batch_uri,
)
child_ids.append(child.id)
return child_ids

View file

@ -0,0 +1,298 @@
"""Integration test: PostcodeSplitterOrchestrator wired end-to-end.
Combines moto S3 + moto SQS + an in-memory SQLite session for the
``TaskOrchestrator`` so the full slice-6 wiring is exercised through real
infrastructure adapters (not mocks). The fixture CSV spans three postcodes
with one oversize group, which forces both the buffer-flush-then-oversize
branch and the final-flush branch of
``iter_postcode_grouped_batches`` three batches in total.
"""
from __future__ import annotations
import json
import os
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, cast
import boto3
import pytest
from moto import mock_aws
from sqlmodel import Session, SQLModel, create_engine
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from infrastructure.csv_s3_client import CsvS3Client
from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
)
BUCKET = "splitter-bucket"
REGION = "us-east-1"
def _make_boto_client(service_name: str) -> Any:
factory: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
return factory(service_name, region_name=REGION)
@pytest.fixture(autouse=True)
def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
keys = (
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
"AWS_DEFAULT_REGION",
)
prev: dict[str, Any] = {k: os.environ.get(k) for k in keys}
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = REGION
try:
yield
finally:
for k, v in prev.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
@dataclass
class Harness:
splitter: PostcodeSplitterOrchestrator
task_orchestrator: TaskOrchestrator
subtasks: SubTaskPostgresRepository
csv_client: CsvS3Client
boto_sqs: Any
queue_url: str
repo: UserAddressCsvS3Repository
@pytest.fixture
def harness() -> Iterator[Harness]:
with mock_aws():
# Infra: S3 + SQS
boto_s3 = _make_boto_client("s3")
boto_s3.create_bucket(Bucket=BUCKET)
boto_sqs = _make_boto_client("sqs")
queue: dict[str, Any] = boto_sqs.create_queue(QueueName="address2uprn-queue")
queue_url = cast(str, queue["QueueUrl"])
csv_client = CsvS3Client(boto_s3, BUCKET)
repo = UserAddressCsvS3Repository(csv_client, BUCKET)
queue_client = Address2UprnQueueClient(boto_sqs, queue_url)
# DB: in-memory SQLite TaskOrchestrator
engine = create_engine("sqlite://")
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
task_repo = TaskPostgresRepository(session=session)
subtask_repo = SubTaskPostgresRepository(session=session)
task_orchestrator = TaskOrchestrator(
task_repo=task_repo, subtask_repo=subtask_repo
)
splitter = PostcodeSplitterOrchestrator(
task_orchestrator=task_orchestrator,
user_address_repo=repo,
queue_client=queue_client,
max_batch_size=3,
)
yield Harness(
splitter=splitter,
task_orchestrator=task_orchestrator,
subtasks=subtask_repo,
csv_client=csv_client,
boto_sqs=boto_sqs,
queue_url=queue_url,
repo=repo,
)
def _upload_fixture_csv(csv_client: CsvS3Client) -> str:
# Three postcode groups:
# AA1 1AA × 2 (within cap)
# BB2 2BB × 4 (oversize: > max_batch_size=3)
# CC3 3CC × 1 (final flush)
# Expected batching with cap=3 and the algorithm in
# ``iter_postcode_grouped_batches``:
# batch 1: [AA1 1AA × 2] (flushed because oversize follows)
# batch 2: [BB2 2BB × 4] (oversize own batch)
# batch 3: [CC3 3CC × 1] (final flush)
rows: list[dict[str, str]] = []
rows.extend(
{
"Address 1": f"{i} High St",
"Address 2": "",
"Address 3": "",
"Postcode": "AA1 1AA",
"Internal Reference": f"AA-{i}",
}
for i in range(1, 3)
)
rows.extend(
{
"Address 1": f"{i} Long Road",
"Address 2": "",
"Address 3": "",
"Postcode": "BB2 2BB",
"Internal Reference": f"BB-{i}",
}
for i in range(1, 5)
)
rows.append(
{
"Address 1": "1 Final Way",
"Address 2": "",
"Address 3": "",
"Postcode": "CC3 3CC",
"Internal Reference": "CC-1",
}
)
return csv_client.save_rows(rows, "uploads/input.csv")
def _drain_queue(boto_sqs: Any, queue_url: str) -> list[dict[str, Any]]:
bodies: list[dict[str, Any]] = []
while True:
received: dict[str, Any] = boto_sqs.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=0
)
messages = cast(list[dict[str, Any]], received.get("Messages", []))
if not messages:
break
for message in messages:
bodies.append(cast(dict[str, Any], json.loads(message["Body"])))
boto_sqs.delete_message(
QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
)
return bodies
def test_split_and_dispatch_creates_three_children_for_fixture(
harness: Harness,
) -> None:
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
)
input_uri = _upload_fixture_csv(harness.csv_client)
child_ids = harness.splitter.split_and_dispatch(
parent_task_id=parent_task.id,
parent_subtask_id=parent_subtask.id,
input_s3_uri=input_uri,
)
assert len(child_ids) == 3
# All child ids are unique and persisted as WAITING children of the
# parent task.
assert len(set(child_ids)) == 3
for cid in child_ids:
child = harness.subtasks.get(cid)
assert child.task_id == parent_task.id
def test_split_and_dispatch_persists_child_inputs_with_task_id_and_s3_uri(
harness: Harness,
) -> None:
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
)
input_uri = _upload_fixture_csv(harness.csv_client)
child_ids = harness.splitter.split_and_dispatch(
parent_task_id=parent_task.id,
parent_subtask_id=parent_subtask.id,
input_s3_uri=input_uri,
)
for cid in child_ids:
child = harness.subtasks.get(cid)
assert child.inputs is not None
assert child.inputs["task_id"] == str(parent_task.id)
batch_uri = child.inputs["s3_uri"]
assert isinstance(batch_uri, str)
prefix = (
f"s3://{BUCKET}/ara_postcode_splitter_batches/"
f"{parent_task.id}/{parent_subtask.id}/"
)
assert batch_uri.startswith(prefix)
assert batch_uri.endswith(".csv")
def test_split_and_dispatch_publishes_one_message_per_child_with_matching_ids(
harness: Harness,
) -> None:
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
)
input_uri = _upload_fixture_csv(harness.csv_client)
child_ids = harness.splitter.split_and_dispatch(
parent_task_id=parent_task.id,
parent_subtask_id=parent_subtask.id,
input_s3_uri=input_uri,
)
bodies = _drain_queue(harness.boto_sqs, harness.queue_url)
assert len(bodies) == len(child_ids)
# Match queue messages against persisted child inputs by child_subtask_id;
# the message body's task_id/s3_uri must agree with the SubTask inputs.
bodies_by_child = {body["sub_task_id"]: body for body in bodies}
assert set(bodies_by_child.keys()) == {str(cid) for cid in child_ids}
for cid in child_ids:
child = harness.subtasks.get(cid)
body = bodies_by_child[str(cid)]
assert child.inputs is not None
assert body == {
"task_id": str(parent_task.id),
"sub_task_id": str(cid),
"s3_uri": child.inputs["s3_uri"],
}
def test_split_and_dispatch_returns_child_ids_in_dispatch_order(
harness: Harness,
) -> None:
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
)
input_uri = _upload_fixture_csv(harness.csv_client)
child_ids = harness.splitter.split_and_dispatch(
parent_task_id=parent_task.id,
parent_subtask_id=parent_subtask.id,
input_s3_uri=input_uri,
)
# Re-load each child's saved batch and inspect the postcode column to
# confirm the dispatch order matches the postcode-batching algorithm:
# AA-batch first, BB oversize batch second, CC final-flush third.
postcodes_per_batch: list[set[str]] = []
for cid in child_ids:
child = harness.subtasks.get(cid)
assert child.inputs is not None
rows = harness.csv_client.read_rows(child.inputs["s3_uri"])
postcodes_per_batch.append({row["postcode"] for row in rows})
assert postcodes_per_batch == [
{"AA11AA"},
{"BB22BB"},
{"CC33CC"},
]