repositories: UserAddressRepository + UserAddressCsvS3Repository (CSV-on-S3 adapter)

Adds the persistence layer for UserAddress batches:

- Abstract UserAddressRepository with load_batch / save_batch.
- Concrete UserAddressCsvS3Repository over CsvS3Client:
  - load_batch reads canonical upload columns (Address 1/2/3, Postcode,
    Internal Reference), comma-joins non-empty address parts, and
    passes Internal Reference through (None when missing/empty).
  - save_batch writes a 3-column CSV (user_address,postcode,
    internal_reference) to {path_prefix}/{ISO datetime}_{uuid8}.csv
    and returns the s3://bucket/key URI.
- Postcode sanitisation flows through UserAddress.__post_init__; the
  repo never calls sanitise_postcode directly.

Tests (moto-backed) cover: three-line address load, Address-1-only
load, missing Internal Reference, save->reload round trip, and
unique-filename-per-save. pyright --strict clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jun-te Kim 2026-05-19 17:37:02 +00:00
parent d70e8a9e53
commit 708f1b5d18
6 changed files with 324 additions and 0 deletions

View file

View file

@ -0,0 +1,87 @@
"""CSV-on-S3 adapter for :class:`UserAddressRepository`.
Reads canonical upload CSVs (``Address 1``, ``Address 2``, ``Address 3``,
``Postcode``, ``Internal Reference``) and writes the splitter's compact
3-column form (``user_address``, ``postcode``, ``internal_reference``).
The frontend pre-applies the user's column mapping at upload time, so this
adapter does NOT consult any ``BulkAddressUpload.column_mapping``: it always
expects the canonical column names listed above.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Optional
from domain.addresses.user_address import UserAddress
from infrastructure.csv_s3_client import CsvS3Client
from repositories.user_address.user_address_repository import UserAddressRepository
_ADDRESS_COLUMNS: tuple[str, str, str] = ("Address 1", "Address 2", "Address 3")
_POSTCODE_COLUMN: str = "Postcode"
_INTERNAL_REFERENCE_COLUMN: str = "Internal Reference"
class UserAddressCsvS3Repository(UserAddressRepository):
"""Persist :class:`UserAddress` batches as CSV objects in S3.
The repo owns the unique-filename-within-prefix convention
(``{ISO datetime}_{8-char uuid}.csv``); callers own the directory
hierarchy supplied as ``path_prefix``.
"""
def __init__(self, csv_client: CsvS3Client, bucket: str) -> None:
self._csv_client = csv_client
self._bucket = bucket
def load_batch(self, s3_uri: str) -> list[UserAddress]:
"""Load canonical upload CSV rows into :class:`UserAddress` objects.
Concatenates ``Address 1``/``Address 2``/``Address 3`` with ``", "``,
skipping missing or empty parts, into ``user_address``. Falls back to
just ``Address 1`` when 2 and 3 are absent. Passes ``Internal Reference``
through to :attr:`UserAddress.internal_reference` (``None`` when the
column is missing or empty).
"""
rows = self._csv_client.read_rows(s3_uri)
addresses: list[UserAddress] = []
for row in rows:
parts = [
row[col].strip()
for col in _ADDRESS_COLUMNS
if col in row and row[col].strip()
]
user_address = ", ".join(parts)
postcode = row.get(_POSTCODE_COLUMN, "")
raw_ref = row.get(_INTERNAL_REFERENCE_COLUMN, "").strip()
internal_reference: Optional[str] = raw_ref or None
addresses.append(
UserAddress(
user_address=user_address,
postcode=postcode,
internal_reference=internal_reference,
)
)
return addresses
def save_batch(self, addresses: list[UserAddress], path_prefix: str) -> str:
"""Write a 3-column CSV under a unique key beneath ``path_prefix``.
The key is ``{path_prefix}/{ISO-8601 datetime}_{8-char uuid}.csv``.
Returns the full ``s3://bucket/key`` URI.
"""
rows: list[dict[str, str]] = [
{
"user_address": addr.user_address,
"postcode": addr.postcode,
"internal_reference": addr.internal_reference or "",
}
for addr in addresses
]
filename = (
f"{datetime.now(timezone.utc).isoformat()}_{uuid.uuid4().hex[:8]}.csv"
)
key = f"{path_prefix.rstrip('/')}/{filename}"
return self._csv_client.save_rows(rows, key)

View file

@ -0,0 +1,30 @@
"""Abstract repository for :class:`UserAddress` batches.
Persistence-agnostic interface for loading and saving batches of
:class:`domain.addresses.user_address.UserAddress`. Concrete adapters --
e.g. :class:`UserAddressCsvS3Repository` -- live alongside this module.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.addresses.user_address import UserAddress
class UserAddressRepository(ABC):
"""Load and persist batches of :class:`UserAddress`.
Implementations choose the underlying storage (S3 CSV, Postgres,
in-memory, ...) but must preserve the canonical column semantics:
the address text, postcode (sanitised by ``UserAddress.__post_init__``),
and an optional internal reference.
"""
@abstractmethod
def load_batch(self, s3_uri: str) -> list[UserAddress]:
"""Read a batch of addresses from ``s3_uri`` and return domain objects."""
@abstractmethod
def save_batch(self, addresses: list[UserAddress], path_prefix: str) -> str:
"""Persist ``addresses`` under ``path_prefix`` and return the URI written."""

View file

@ -0,0 +1,32 @@
import os
from collections.abc import Iterator
from typing import Optional
import pytest
@pytest.fixture(autouse=True)
def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
"""Stub AWS creds so botocore doesn't probe the host environment.
Applied automatically to every test in ``tests/repositories/user_address/``.
"""
keys = (
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
"AWS_DEFAULT_REGION",
)
prev: dict[str, Optional[str]] = {k: os.environ.get(k) for k in keys}
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
try:
yield
finally:
for k, v in prev.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v

View file

@ -0,0 +1,175 @@
from collections.abc import Iterator
import pytest
from moto import mock_aws
from infrastructure.csv_s3_client import CsvS3Client
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
)
from tests.infrastructure import make_boto_client
BUCKET = "user-address-bucket"
@pytest.fixture
def repo() -> Iterator[UserAddressCsvS3Repository]:
with mock_aws():
boto_client = make_boto_client("s3")
boto_client.create_bucket(Bucket=BUCKET)
csv_client = CsvS3Client(boto_client, BUCKET)
yield UserAddressCsvS3Repository(csv_client, BUCKET)
def _upload_csv(
repo: UserAddressCsvS3Repository, rows: list[dict[str, str]], key: str
) -> str:
return repo._csv_client.save_rows(rows, key) # pyright: ignore[reportPrivateUsage]
def test_load_batch_concatenates_three_address_lines(
repo: UserAddressCsvS3Repository,
) -> None:
rows = [
{
"Address 1": "1 High Street",
"Address 2": "Flat 2",
"Address 3": "Townville",
"Postcode": "sw1a 1aa",
"Internal Reference": "REF-001",
}
]
uri = _upload_csv(repo, rows, "uploads/full.csv")
addresses = repo.load_batch(uri)
assert len(addresses) == 1
address = addresses[0]
assert address.user_address == "1 High Street, Flat 2, Townville"
assert address.postcode == "SW1A1AA"
assert address.internal_reference == "REF-001"
def test_load_batch_uses_only_address_1_when_others_missing(
repo: UserAddressCsvS3Repository,
) -> None:
rows = [
{
"Address 1": "10 Cardiff Road",
"Address 2": "",
"Address 3": "",
"Postcode": "CF10 1AA",
"Internal Reference": "REF-002",
}
]
uri = _upload_csv(repo, rows, "uploads/address1-only.csv")
addresses = repo.load_batch(uri)
assert len(addresses) == 1
assert addresses[0].user_address == "10 Cardiff Road"
assert addresses[0].postcode == "CF101AA"
assert addresses[0].internal_reference == "REF-002"
def test_load_batch_handles_missing_internal_reference(
repo: UserAddressCsvS3Repository,
) -> None:
rows = [
{
"Address 1": "5 Park Lane",
"Address 2": "",
"Address 3": "",
"Postcode": "M1 1AA",
"Internal Reference": "",
}
]
uri = _upload_csv(repo, rows, "uploads/no-ref.csv")
addresses = repo.load_batch(uri)
assert len(addresses) == 1
assert addresses[0].user_address == "5 Park Lane"
assert addresses[0].postcode == "M11AA"
assert addresses[0].internal_reference is None
def test_save_batch_returns_uri_under_path_prefix(
repo: UserAddressCsvS3Repository,
) -> None:
from domain.addresses.user_address import UserAddress
addresses = [
UserAddress(
user_address="1 High Street, Flat 2, Townville",
postcode="SW1A 1AA",
internal_reference="REF-001",
),
]
uri = repo.save_batch(addresses, "tasks/abc/batches")
assert uri.startswith(f"s3://{BUCKET}/tasks/abc/batches/")
assert uri.endswith(".csv")
def test_save_then_reload_round_trip_preserves_values(
repo: UserAddressCsvS3Repository,
) -> None:
from domain.addresses.user_address import UserAddress
# save_batch writes the splitter's compact schema
# (user_address/postcode/internal_reference); load_batch reads the
# canonical upload schema. To round-trip through the repo we re-upload
# the saved CSV under the upload schema's column names.
original = [
UserAddress(
user_address="1 High Street",
postcode="SW1A 1AA",
internal_reference="REF-001",
),
UserAddress(
user_address="2 Low Street",
postcode="XY9 8ZW",
internal_reference=None,
),
]
saved_uri = repo.save_batch(original, "tasks/round-trip")
# Re-shape the saved CSV into the canonical upload schema for reload.
saved_rows = repo._csv_client.read_rows(saved_uri) # pyright: ignore[reportPrivateUsage]
upload_rows: list[dict[str, str]] = [
{
"Address 1": row["user_address"],
"Address 2": "",
"Address 3": "",
"Postcode": row["postcode"],
"Internal Reference": row["internal_reference"],
}
for row in saved_rows
]
upload_uri = _upload_csv(repo, upload_rows, "uploads/round-trip.csv")
reloaded = repo.load_batch(upload_uri)
assert reloaded == original
def test_save_batch_uses_unique_filename_per_call(
repo: UserAddressCsvS3Repository,
) -> None:
from domain.addresses.user_address import UserAddress
addresses = [
UserAddress(
user_address="1 High Street",
postcode="SW1A 1AA",
internal_reference="REF-001",
),
]
uri_1 = repo.save_batch(addresses, "tasks/uniqueness")
uri_2 = repo.save_batch(addresses, "tasks/uniqueness")
assert uri_1 != uri_2