From 7b00a33cd242e9959ac47e4e207d67477d53b8a2 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 19 May 2026 17:12:21 +0000 Subject: [PATCH] infrastructure: typed S3/SQS clients (S3Client, CsvS3Client, SqsClient, Address2UprnQueueClient) Slice 3/6 of the postcode_splitter refactor (Hestia-Homes/Model#1101). Introduces a thin typed infrastructure layer wrapping boto3 for the AWS side of the splitter. S3Client/SqsClient are bucket-/queue-bound byte adapters; CsvS3Client subclasses S3Client to round-trip CSV row dicts via the existing parse_s3_uri helper in utils/s3.py; Address2UprnQueueClient subclasses SqsClient to publish the typed {task_id, sub_task_id, s3_uri} fan-out body the downstream consumer expects. moto[s3,sqs] is pulled into test.requirements.txt and the new tests/infrastructure/ suite exercises each client against the moto backend (S3 round-trip, CSV round-trip, SQS send + body inspection, typed publish + body inspection). pyright --strict is clean on the new modules. Co-Authored-By: Claude Opus 4.7 (1M context) --- infrastructure/address2uprn_queue_client.py | 27 ++++++++ infrastructure/csv_s3_client.py | 46 +++++++++++++ infrastructure/s3_client.py | 31 +++++++++ infrastructure/sqs_client.py | 28 ++++++++ test.requirements.txt | 3 +- tests/infrastructure/__init__.py | 17 +++++ tests/infrastructure/conftest.py | 32 +++++++++ .../test_address2uprn_queue_client.py | 65 +++++++++++++++++++ tests/infrastructure/test_csv_s3_client.py | 43 ++++++++++++ tests/infrastructure/test_s3_client.py | 31 +++++++++ tests/infrastructure/test_sqs_client.py | 38 +++++++++++ 11 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 infrastructure/address2uprn_queue_client.py create mode 100644 infrastructure/csv_s3_client.py create mode 100644 infrastructure/s3_client.py create mode 100644 infrastructure/sqs_client.py create mode 100644 tests/infrastructure/__init__.py create mode 100644 tests/infrastructure/conftest.py create mode 100644 tests/infrastructure/test_address2uprn_queue_client.py create mode 100644 tests/infrastructure/test_csv_s3_client.py create mode 100644 tests/infrastructure/test_s3_client.py create mode 100644 tests/infrastructure/test_sqs_client.py diff --git a/infrastructure/address2uprn_queue_client.py b/infrastructure/address2uprn_queue_client.py new file mode 100644 index 00000000..d81e2dd1 --- /dev/null +++ b/infrastructure/address2uprn_queue_client.py @@ -0,0 +1,27 @@ +from uuid import UUID + +from infrastructure.sqs_client import SqsClient + + +class Address2UprnQueueClient(SqsClient): + """SQS client that publishes Address-to-UPRN fan-out messages. + + The body shape is fixed by the downstream consumer: + ``{"task_id": str, "sub_task_id": str, "s3_uri": str}`` + """ + + def publish( + self, + *, + parent_task_id: UUID, + child_subtask_id: UUID, + s3_uri: str, + ) -> str: + """Send a typed Address-to-UPRN message. Returns the SQS ``MessageId``.""" + return self.send( + { + "task_id": str(parent_task_id), + "sub_task_id": str(child_subtask_id), + "s3_uri": s3_uri, + } + ) diff --git a/infrastructure/csv_s3_client.py b/infrastructure/csv_s3_client.py new file mode 100644 index 00000000..5163705b --- /dev/null +++ b/infrastructure/csv_s3_client.py @@ -0,0 +1,46 @@ +import csv +from io import StringIO + +from infrastructure.s3_client import S3Client +from utils.s3 import parse_s3_uri + + +class CsvS3Client(S3Client): + """:class:`S3Client` subclass that round-trips CSV row dictionaries. + + Rows are represented as ``list[dict[str, str]]`` — the same shape used by + :func:`csv.DictReader`/``DictWriter`` — which keeps the API trivially + compatible with existing CSV helpers in ``utils/s3.py``. + """ + + def read_rows(self, s3_uri: str) -> list[dict[str, str]]: + """Fetch the object at ``s3_uri`` and decode it as a CSV. + + The bucket portion of the URI is validated against this client's + configured bucket so cross-bucket reads fail loudly rather than + silently fetching from the wrong place. + """ + bucket, key = parse_s3_uri(s3_uri) + if bucket != self.bucket: + raise ValueError( + f"s3_uri bucket {bucket!r} does not match client bucket {self.bucket!r}" + ) + raw = self.get_object(key) + text = raw.decode("utf-8-sig") + reader = csv.DictReader(StringIO(text)) + return [dict(row) for row in reader] + + def save_rows(self, rows: list[dict[str, str]], key: str) -> str: + """Serialise ``rows`` to CSV under ``key`` and return the ``s3://`` URI. + + An empty ``rows`` list is rejected because we cannot otherwise infer + a header row. + """ + if not rows: + raise ValueError("Cannot save an empty rows list: header is unknown") + buffer = StringIO() + fieldnames = list(rows[0].keys()) + writer = csv.DictWriter(buffer, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + return self.put_object(key, buffer.getvalue().encode("utf-8")) diff --git a/infrastructure/s3_client.py b/infrastructure/s3_client.py new file mode 100644 index 00000000..9e772881 --- /dev/null +++ b/infrastructure/s3_client.py @@ -0,0 +1,31 @@ +from typing import Any + + +class S3Client: + """Thin typed wrapper around a boto3 S3 client bound to a single bucket. + + The class is deliberately small: it exposes only the byte-level + operations needed by the wider infrastructure layer. Serialisation + (CSV, JSON, etc.) lives in subclasses such as :class:`CsvS3Client`. + """ + + def __init__(self, boto_s3_client: Any, bucket: str) -> None: + self._client = boto_s3_client + self._bucket = bucket + + @property + def bucket(self) -> str: + return self._bucket + + def get_object(self, key: str) -> bytes: + """Return the raw bytes stored at ``key`` in this client's bucket.""" + response: dict[str, Any] = self._client.get_object( + Bucket=self._bucket, Key=key + ) + body: bytes = response["Body"].read() + return body + + def put_object(self, key: str, body: bytes) -> str: + """Write ``body`` to ``key`` and return the canonical ``s3://`` URI.""" + self._client.put_object(Bucket=self._bucket, Key=key, Body=body) + return f"s3://{self._bucket}/{key}" diff --git a/infrastructure/sqs_client.py b/infrastructure/sqs_client.py new file mode 100644 index 00000000..fb053680 --- /dev/null +++ b/infrastructure/sqs_client.py @@ -0,0 +1,28 @@ +import json +from typing import Any + + +class SqsClient: + """Thin typed wrapper around a boto3 SQS client bound to one queue URL. + + The body is JSON-serialised here so callers can pass plain dictionaries + instead of constructing message strings themselves. Typed publish + helpers (e.g. :class:`Address2UprnQueueClient`) build on this contract. + """ + + def __init__(self, boto_sqs_client: Any, queue_url: str) -> None: + self._client = boto_sqs_client + self._queue_url = queue_url + + @property + def queue_url(self) -> str: + return self._queue_url + + def send(self, body: dict[str, Any]) -> str: + """JSON-serialise ``body`` and send it. Returns the SQS ``MessageId``.""" + response: dict[str, Any] = self._client.send_message( + QueueUrl=self._queue_url, + MessageBody=json.dumps(body), + ) + message_id: str = response["MessageId"] + return message_id diff --git a/test.requirements.txt b/test.requirements.txt index 7fdd7dc4..26125034 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -9,4 +9,5 @@ hubspot-api-client fuzzywuzzy pymupdf playwright==1.58.0 -msal \ No newline at end of file +msal +moto[s3,sqs] \ No newline at end of file diff --git a/tests/infrastructure/__init__.py b/tests/infrastructure/__init__.py new file mode 100644 index 00000000..3478bda9 --- /dev/null +++ b/tests/infrastructure/__init__.py @@ -0,0 +1,17 @@ +from typing import Any + +import boto3 + +REGION = "us-east-1" + + +def make_boto_client(service_name: str) -> Any: + """Construct a boto3 client typed as ``Any``. + + boto3's overloaded ``client`` signature uses ``Literal[...]`` per service + in the installed stubs, which forces every call site to satisfy + ``reportArgumentType`` and ``reportUnknownMemberType`` under strict + pyright. Centralising the cast keeps each test file clean. + """ + factory: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + return factory(service_name, region_name=REGION) diff --git a/tests/infrastructure/conftest.py b/tests/infrastructure/conftest.py new file mode 100644 index 00000000..7ed2fdd6 --- /dev/null +++ b/tests/infrastructure/conftest.py @@ -0,0 +1,32 @@ +import os +from collections.abc import Iterator +from typing import Optional + +import pytest + + +@pytest.fixture(autouse=True) +def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction] + """Stub AWS creds so botocore doesn't probe the host environment. + + Applied automatically to every test in ``tests/infrastructure/``. + """ + keys = ( + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + "AWS_DEFAULT_REGION", + ) + prev: dict[str, Optional[str]] = {k: os.environ.get(k) for k in keys} + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + try: + yield + finally: + for k, v in prev.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v diff --git a/tests/infrastructure/test_address2uprn_queue_client.py b/tests/infrastructure/test_address2uprn_queue_client.py new file mode 100644 index 00000000..b4114742 --- /dev/null +++ b/tests/infrastructure/test_address2uprn_queue_client.py @@ -0,0 +1,65 @@ +import json +from collections.abc import Iterator +from typing import Any, cast +from uuid import uuid4 + +import pytest +from moto import mock_aws + +from infrastructure.address2uprn_queue_client import Address2UprnQueueClient +from tests.infrastructure import make_boto_client + + +@pytest.fixture +def queue_setup() -> Iterator[tuple[Address2UprnQueueClient, Any, str]]: + with mock_aws(): + boto_client = make_boto_client("sqs") + queue: dict[str, Any] = boto_client.create_queue( + QueueName="address2uprn-queue" + ) + queue_url = cast(str, queue["QueueUrl"]) + yield ( + Address2UprnQueueClient(boto_client, queue_url), + boto_client, + queue_url, + ) + + +def test_publish_returns_message_id( + queue_setup: tuple[Address2UprnQueueClient, Any, str], +) -> None: + client, _boto, _url = queue_setup + message_id = client.publish( + parent_task_id=uuid4(), + child_subtask_id=uuid4(), + s3_uri="s3://my-bucket/path/to/chunk.csv", + ) + assert isinstance(message_id, str) + assert message_id + + +def test_publish_body_uses_typed_shape( + queue_setup: tuple[Address2UprnQueueClient, Any, str], +) -> None: + client, boto_client, queue_url = queue_setup + parent_id = uuid4() + child_id = uuid4() + s3_uri = "s3://my-bucket/path/to/chunk.csv" + + client.publish( + parent_task_id=parent_id, + child_subtask_id=child_id, + s3_uri=s3_uri, + ) + + received: dict[str, Any] = boto_client.receive_message( + QueueUrl=queue_url, MaxNumberOfMessages=1 + ) + messages: list[dict[str, Any]] = received["Messages"] + assert len(messages) == 1 + body = json.loads(messages[0]["Body"]) + assert body == { + "task_id": str(parent_id), + "sub_task_id": str(child_id), + "s3_uri": s3_uri, + } diff --git a/tests/infrastructure/test_csv_s3_client.py b/tests/infrastructure/test_csv_s3_client.py new file mode 100644 index 00000000..4b9fc199 --- /dev/null +++ b/tests/infrastructure/test_csv_s3_client.py @@ -0,0 +1,43 @@ +from collections.abc import Iterator + +import pytest +from moto import mock_aws + +from infrastructure.csv_s3_client import CsvS3Client +from tests.infrastructure import make_boto_client + +BUCKET = "csv-bucket" + + +@pytest.fixture +def csv_client() -> Iterator[CsvS3Client]: + with mock_aws(): + boto_client = make_boto_client("s3") + boto_client.create_bucket(Bucket=BUCKET) + yield CsvS3Client(boto_client, BUCKET) + + +def test_save_rows_returns_s3_uri(csv_client: CsvS3Client) -> None: + rows = [{"address": "1 High St", "postcode": "AB1 2CD"}] + uri = csv_client.save_rows(rows, "uploads/addresses.csv") + assert uri == f"s3://{BUCKET}/uploads/addresses.csv" + + +def test_round_trip_preserves_rows(csv_client: CsvS3Client) -> None: + rows = [ + {"address": "1 High St", "postcode": "AB1 2CD"}, + {"address": "2 Low St", "postcode": "XY9 8ZW"}, + ] + uri = csv_client.save_rows(rows, "uploads/addresses.csv") + fetched = csv_client.read_rows(uri) + assert fetched == rows + + +def test_save_rows_rejects_empty_list(csv_client: CsvS3Client) -> None: + with pytest.raises(ValueError, match="empty"): + csv_client.save_rows([], "uploads/empty.csv") + + +def test_read_rows_rejects_wrong_bucket(csv_client: CsvS3Client) -> None: + with pytest.raises(ValueError, match="does not match client bucket"): + csv_client.read_rows("s3://other-bucket/uploads/addresses.csv") diff --git a/tests/infrastructure/test_s3_client.py b/tests/infrastructure/test_s3_client.py new file mode 100644 index 00000000..7ed4c30b --- /dev/null +++ b/tests/infrastructure/test_s3_client.py @@ -0,0 +1,31 @@ +from collections.abc import Iterator + +import pytest +from moto import mock_aws + +from infrastructure.s3_client import S3Client +from tests.infrastructure import make_boto_client + +BUCKET = "test-bucket" + + +@pytest.fixture +def s3_client() -> Iterator[S3Client]: + with mock_aws(): + boto_client = make_boto_client("s3") + boto_client.create_bucket(Bucket=BUCKET) + yield S3Client(boto_client, BUCKET) + + +def test_put_object_returns_s3_uri(s3_client: S3Client) -> None: + uri = s3_client.put_object("folder/data.bin", b"payload") + assert uri == f"s3://{BUCKET}/folder/data.bin" + + +def test_get_object_returns_bytes_written_by_put_object(s3_client: S3Client) -> None: + s3_client.put_object("round/trip.bin", b"hello world") + assert s3_client.get_object("round/trip.bin") == b"hello world" + + +def test_bucket_property_exposes_configured_bucket(s3_client: S3Client) -> None: + assert s3_client.bucket == BUCKET diff --git a/tests/infrastructure/test_sqs_client.py b/tests/infrastructure/test_sqs_client.py new file mode 100644 index 00000000..7f1e8f78 --- /dev/null +++ b/tests/infrastructure/test_sqs_client.py @@ -0,0 +1,38 @@ +import json +from collections.abc import Iterator +from typing import Any, cast + +import pytest +from moto import mock_aws + +from infrastructure.sqs_client import SqsClient +from tests.infrastructure import make_boto_client + + +@pytest.fixture +def sqs_setup() -> Iterator[tuple[SqsClient, Any, str]]: + with mock_aws(): + boto_client = make_boto_client("sqs") + queue: dict[str, Any] = boto_client.create_queue(QueueName="test-queue") + queue_url = cast(str, queue["QueueUrl"]) + yield SqsClient(boto_client, queue_url), boto_client, queue_url + + +def test_send_returns_message_id(sqs_setup: tuple[SqsClient, Any, str]) -> None: + client, _boto, _url = sqs_setup + message_id = client.send({"hello": "world"}) + assert isinstance(message_id, str) + assert message_id + + +def test_send_json_serialises_body(sqs_setup: tuple[SqsClient, Any, str]) -> None: + client, boto_client, queue_url = sqs_setup + body = {"hello": "world", "count": 3} + client.send(body) + + received: dict[str, Any] = boto_client.receive_message( + QueueUrl=queue_url, MaxNumberOfMessages=1 + ) + messages: list[dict[str, Any]] = received["Messages"] + assert len(messages) == 1 + assert json.loads(messages[0]["Body"]) == body