rename to SAL and AssetList and RawAddresses

This commit is contained in:
Jun-te Kim 2026-05-22 08:14:46 +00:00
parent acb306f7b9
commit cf14a4e3aa
23 changed files with 169 additions and 310 deletions

View file

@ -1,34 +0,0 @@
FROM public.ecr.aws/lambda/python:3.11
# Postgres host/port/database are baked into the image at build time from
# the deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets),
# mirroring backend/postcode_splitter/handler/Dockerfile. They map onto the
# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT
# baked in -- Terraform injects those as Lambda env vars from Secrets Manager.
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV POSTGRES_HOST=${DEV_DB_HOST}
ENV POSTGRES_PORT=${DEV_DB_PORT}
ENV POSTGRES_DATABASE=${DEV_DB_NAME}
WORKDIR /var/task
COPY applications/postcode_splitter/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the layered source the handler imports from. The new splitter pulls
# only DDD-shaped packages — no pandas, no legacy backend/.
COPY domain/ domain/
COPY infrastructure/ infrastructure/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY utilities/ utilities/
COPY applications/ applications/
# Place the handler at the Lambda task root so the runtime can resolve
# ``main.handler`` without an extra package prefix.
COPY applications/landlord_description_overrides/handler.py /var/task/main.py
CMD ["main.handler"]

View file

@ -1,48 +0,0 @@
from typing import Any
import boto3
from orchestration.landlord_description_overrides_orchestrator import (
LandlordDescriptionOverridesOrchestrator,
)
from infrastructure.csv_s3_client import CsvS3Client
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
)
from domain.addresses.user_address import AssetList
def handler(
body: dict[str, Any],
context: Any,
) -> dict[str, list[str]]:
s3_uri = "s3://retrofit-data-dev/bulk_onboarding_inputs/hyde2 (1).csv"
bucket = "retrofit-data-dev"
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
boto3_client: Any = (
boto3.client
) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
csv_client = CsvS3Client(boto_s3, bucket)
user_address_repo = UserAddressCsvS3Repository(csv_client, bucket)
orchestrator = LandlordDescriptionOverridesOrchestrator(
user_address_repo=user_address_repo,
)
list_of_user_address: list[AssetList] = orchestrator.get_user_address(
input_s3_uri=s3_uri
)
col_to_desc_map = orchestrator.get_col_to_description_mappings(
list_of_user_address=list_of_user_address
)
# Read csv of user input
# get the column and unique variations of each description
# { walls: "wall variation 1", "wall varition 2"}
# Call chatgpt(input from landlord, our way of understanding the mapping) Retrun -> lanlordMapped
return {"hello world": ["hello world"]}

View file

@ -1,5 +0,0 @@
POSTGRES_HOST=
POSTGRES_PORT=5432
POSTGRES_USERNAME=
POSTGRES_PASSWORD=
POSTGRES_DATABASE=

View file

@ -1,9 +0,0 @@
services:
landlord_overrides:
build:
context: ../../../
dockerfile: applications/landlord_description_overrides/Dockerfile
ports:
- "9002:8080"
env_file:
- .env.local

View file

@ -1,16 +0,0 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9002"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {"Records": [{"body": json.dumps({})}]}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -1,12 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")"
if [ ! -f .env.local ]; then
cp .env.local.example .env.local
echo "Created .env.local from the template — fill it in, then re-run." >&2
exit 1
fi
docker compose build --no-cache
docker compose up --force-recreate

View file

@ -1,4 +0,0 @@
boto3
pydantic
sqlmodel
psycopg2-binary

View file

@ -12,8 +12,8 @@ from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from infrastructure.csv_s3_client import CsvS3Client
from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
from repositories.raw_address.raw_address_csv_s3_repository import (
RawAddressCsvS3Repository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
@ -29,17 +29,19 @@ def handler(
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto3_client: Any = (
boto3.client
) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
boto_sqs: Any = boto3_client("sqs")
csv_client = CsvS3Client(boto_s3, bucket)
user_address_repo = UserAddressCsvS3Repository(csv_client, bucket)
raw_address_repo = RawAddressCsvS3Repository(csv_client, bucket)
queue_client = Address2UprnQueueClient(boto_sqs, queue_url)
splitter = PostcodeSplitterOrchestrator(
task_orchestrator=task_orchestrator,
user_address_repo=user_address_repo,
raw_address_repo=raw_address_repo,
queue_client=queue_client,
)

View file

@ -2,21 +2,21 @@ from __future__ import annotations
from collections.abc import Iterable, Iterator
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import AddressList, RawAddress
from domain.postcode import Postcode
def iter_postcode_grouped_batches(
addresses: Iterable[AssetList],
addresses: Iterable[RawAddress],
*,
max_batch_size: int = 500,
) -> Iterator[list[AssetList]]:
) -> Iterator[AddressList]:
if max_batch_size < 1:
raise ValueError("max_batch_size must be >= 1")
groups = _group_by_postcode_in_order(addresses)
buffer: list[AssetList] = []
buffer: AddressList = AddressList([])
for group in groups.values():
group_len = len(group)
@ -26,14 +26,14 @@ def iter_postcode_grouped_batches(
if group_len >= max_batch_size:
if buffer:
yield buffer
buffer = []
buffer = AddressList([])
yield group
continue
# Adding this group would overflow: flush buffer before appending.
if len(buffer) + group_len > max_batch_size:
yield buffer
buffer = []
buffer = AddressList([])
buffer.extend(group)
@ -43,9 +43,9 @@ def iter_postcode_grouped_batches(
def _group_by_postcode_in_order(
addresses: Iterable[AssetList],
) -> dict[Postcode, list[AssetList]]:
groups: dict[Postcode, list[AssetList]] = {}
addresses: Iterable[RawAddress],
) -> dict[Postcode, AddressList]:
groups: dict[Postcode, AddressList] = {}
for address in addresses:
groups.setdefault(address.postcode, []).append(address)
groups.setdefault(address.postcode, AddressList([])).append(address)
return groups

View file

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, NewType
from domain.postcode import Postcode
@ -11,10 +11,14 @@ def _empty_source_row() -> dict[str, str]:
@dataclass(frozen=True)
class AssetList:
class RawAddress:
address: str
postcode: Postcode
org_reference: Optional[str] = None
additional_info: dict[str, str] = field(
default_factory=_empty_source_row, compare=False
)
# A batch of raw, pre-standardisation addresses as supplied by a landlord.
AddressList = NewType("AddressList", list[RawAddress])

View file

@ -1,23 +0,0 @@
from repositories.user_address.user_address_repository import UserAddressRepository
from domain.addresses.user_address import AssetList
class LandlordDescriptionOverridesOrchestrator:
def __init__(self, user_address_repo: UserAddressRepository) -> None:
self._user_address_repo = user_address_repo
def get_user_address(
self,
input_s3_uri: str,
) -> list[AssetList]:
return self._user_address_repo.load_batch(input_s3_uri)
def get_col_to_description_mappings(
self, list_of_user_address: list[AssetList]
) -> dict[str, set[str]]:
mappings: dict[str, set[str]] = {}
for user_address in list_of_user_address:
for key, value in user_address.additional_info.items():
# Lower-case so case-only typos collapse to one variant.
mappings.setdefault(key, set()).add(value.lower())
return mappings

View file

@ -5,19 +5,19 @@ from uuid import UUID
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from orchestration.task_orchestrator import TaskOrchestrator
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
from repositories.user_address.user_address_repository import UserAddressRepository
from repositories.raw_address.raw_address_repository import RawAddressRepository
class PostcodeSplitterOrchestrator:
def __init__(
self,
task_orchestrator: TaskOrchestrator,
user_address_repo: UserAddressRepository,
raw_address_repo: RawAddressRepository,
queue_client: Address2UprnQueueClient,
max_batch_size: int = 500,
) -> None:
self._task_orchestrator = task_orchestrator
self._user_address_repo = user_address_repo
self._raw_address_repo = raw_address_repo
self._queue_client = queue_client
self._max_batch_size = max_batch_size
@ -28,7 +28,7 @@ class PostcodeSplitterOrchestrator:
parent_subtask_id: UUID,
input_s3_uri: str,
) -> list[UUID]:
addresses = self._user_address_repo.load_batch(input_s3_uri)
addresses = self._raw_address_repo.load_batch(input_s3_uri)
path_prefix = (
f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}"
)
@ -37,7 +37,7 @@ class PostcodeSplitterOrchestrator:
for batch in iter_postcode_grouped_batches(
addresses, max_batch_size=self._max_batch_size
):
batch_uri = self._user_address_repo.save_batch(batch, path_prefix)
batch_uri = self._raw_address_repo.save_batch(batch, path_prefix)
child = self._task_orchestrator.create_child_subtask(
parent_task_id,
inputs={

View file

@ -4,10 +4,10 @@ import uuid
from datetime import datetime, timezone
from typing import Optional
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import AddressList, RawAddress
from domain.postcode import Postcode
from infrastructure.csv_s3_client import CsvS3Client
from repositories.user_address.user_address_repository import UserAddressRepository
from repositories.raw_address.raw_address_repository import RawAddressRepository
_ADDRESS_COLUMNS: tuple[str, str, str] = ("Address 1", "Address 2", "Address 3")
_POSTCODE_COLUMN: str = "postcode"
@ -15,32 +15,32 @@ _INTERNAL_REFERENCE_COLUMN: str = "Internal Reference"
_POSTCODE_CLEAN_COLUMN: str = "postcode_clean"
class UserAddressCsvS3Repository(UserAddressRepository):
class RawAddressCsvS3Repository(RawAddressRepository):
def __init__(self, csv_client: CsvS3Client, bucket: str) -> None:
self._csv_client = csv_client
self._bucket = bucket
def load_batch(self, s3_uri: str) -> list[AssetList]:
def load_batch(self, s3_uri: str) -> AddressList:
rows = self._csv_client.read_rows(s3_uri)
if rows and _POSTCODE_COLUMN not in rows[0]:
raise ValueError(
f"Input CSV {s3_uri} has no {_POSTCODE_COLUMN!r} column; "
f"columns present: {sorted(rows[0])}"
)
addresses: list[AssetList] = []
addresses: AddressList = AddressList([])
for row in rows:
parts = [
row[col].strip()
for col in _ADDRESS_COLUMNS
if col in row and row[col].strip()
]
user_address = ", ".join(parts)
raw_address = ", ".join(parts)
postcode = row.get(_POSTCODE_COLUMN, "")
raw_ref = row.get(_INTERNAL_REFERENCE_COLUMN, "").strip()
internal_reference: Optional[str] = raw_ref or None
addresses.append(
AssetList(
address=user_address,
RawAddress(
address=raw_address,
postcode=Postcode(postcode),
org_reference=internal_reference,
additional_info=row,
@ -48,7 +48,7 @@ class UserAddressCsvS3Repository(UserAddressRepository):
)
return addresses
def save_batch(self, addresses: list[AssetList], path_prefix: str) -> str:
def save_batch(self, addresses: AddressList, path_prefix: str) -> str:
rows: list[dict[str, str]] = [
{
**addr.additional_info,

View file

@ -0,0 +1,13 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.addresses.raw_address import AddressList
class RawAddressRepository(ABC):
@abstractmethod
def load_batch(self, s3_uri: str) -> AddressList: ...
@abstractmethod
def save_batch(self, addresses: AddressList, path_prefix: str) -> str: ...

View file

@ -1,13 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.addresses.user_address import AssetList
class UserAddressRepository(ABC):
@abstractmethod
def load_batch(self, s3_uri: str) -> list[AssetList]: ...
@abstractmethod
def save_batch(self, addresses: list[AssetList], path_prefix: str) -> str: ...

View file

@ -1,15 +1,17 @@
import pytest
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import AddressList, RawAddress
from domain.postcode import Postcode
def _addrs(postcode: str, n: int) -> list[AssetList]:
return [
AssetList(address=f"{i} {postcode} Street", postcode=Postcode(postcode))
for i in range(n)
]
def _addrs(postcode: str, n: int) -> AddressList:
return AddressList(
[
RawAddress(address=f"{i} {postcode} Street", postcode=Postcode(postcode))
for i in range(n)
]
)
def test_empty_input_yields_no_batches() -> None:

View file

@ -2,36 +2,36 @@ import dataclasses
import pytest
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import RawAddress
from domain.postcode import Postcode
def test_user_address_holds_postcode_value_object() -> None:
def test_raw_address_holds_postcode_value_object() -> None:
# act
addr = AssetList(address="1 The Street", postcode=Postcode("sw1a 1aa"))
addr = RawAddress(address="1 The Street", postcode=Postcode("sw1a 1aa"))
# assert
assert addr.postcode == Postcode("SW1A1AA")
def test_user_address_preserves_user_address_verbatim() -> None:
# The free-text user_address string is intentionally NOT normalised --
def test_raw_address_preserves_raw_address_verbatim() -> None:
# The free-text raw_address string is intentionally NOT normalised --
# only the postcode is canonicalised, and that happens inside Postcode.
# act
addr = AssetList(address=" 1 The Street ", postcode=Postcode("SW1A1AA"))
addr = RawAddress(address=" 1 The Street ", postcode=Postcode("SW1A1AA"))
# assert
assert addr.address == " 1 The Street "
def test_user_address_internal_reference_defaults_to_none() -> None:
def test_raw_address_internal_reference_defaults_to_none() -> None:
# act
addr = AssetList(address="1 The Street", postcode=Postcode("SW1A1AA"))
addr = RawAddress(address="1 The Street", postcode=Postcode("SW1A1AA"))
# assert
assert addr.org_reference is None
def test_user_address_internal_reference_accepted() -> None:
def test_raw_address_internal_reference_accepted() -> None:
# act
addr = AssetList(
addr = RawAddress(
address="1 The Street",
postcode=Postcode("SW1A1AA"),
org_reference="cust-42",
@ -40,36 +40,36 @@ def test_user_address_internal_reference_accepted() -> None:
assert addr.org_reference == "cust-42"
def test_user_address_is_frozen() -> None:
def test_raw_address_is_frozen() -> None:
# arrange
addr = AssetList(address="1 The Street", postcode=Postcode("SW1A1AA"))
addr = RawAddress(address="1 The Street", postcode=Postcode("SW1A1AA"))
# act / assert
with pytest.raises(dataclasses.FrozenInstanceError):
addr.postcode = Postcode("OTHER") # type: ignore[misc]
def test_user_address_equality_uses_canonical_postcode() -> None:
def test_raw_address_equality_uses_canonical_postcode() -> None:
# Postcode sanitises eagerly, so addresses built from different surface
# forms of the same postcode compare equal.
# arrange
a = AssetList(address="1 The Street", postcode=Postcode("sw1a 1aa"))
b = AssetList(address="1 The Street", postcode=Postcode("SW1A1AA"))
a = RawAddress(address="1 The Street", postcode=Postcode("sw1a 1aa"))
b = RawAddress(address="1 The Street", postcode=Postcode("SW1A1AA"))
# act / assert
assert a == b
def test_user_address_source_row_defaults_to_empty_dict() -> None:
def test_raw_address_source_row_defaults_to_empty_dict() -> None:
# act
addr = AssetList(address="1 The Street", postcode=Postcode("SW1A1AA"))
addr = RawAddress(address="1 The Street", postcode=Postcode("SW1A1AA"))
# assert
assert addr.additional_info == {}
def test_user_address_carries_source_row() -> None:
def test_raw_address_carries_source_row() -> None:
# arrange
row = {"Address 1": "1 The Street", "postcode": "SW1A 1AA", "SAP Score": "72"}
# act
addr = AssetList(
addr = RawAddress(
address="1 The Street",
postcode=Postcode("SW1A 1AA"),
additional_info=row,
@ -78,16 +78,16 @@ def test_user_address_carries_source_row() -> None:
assert addr.additional_info == row
def test_user_address_equality_ignores_source_row() -> None:
def test_raw_address_equality_ignores_source_row() -> None:
# source_row is excluded from equality (and hashing): identity stays
# defined by the parsed fields.
# arrange
a = AssetList(
a = RawAddress(
address="1 The Street",
postcode=Postcode("SW1A1AA"),
additional_info={"x": "1"},
)
b = AssetList(
b = RawAddress(
address="1 The Street",
postcode=Postcode("SW1A1AA"),
additional_info={"y": "2"},

View file

@ -1,44 +1,44 @@
from __future__ import annotations
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import AddressList, RawAddress
from domain.postcode import Postcode
from orchestration.landlord_description_overrides_orchestrator import (
LandlordDescriptionOverridesOrchestrator,
SALOrchestrator,
)
from repositories.user_address.user_address_repository import UserAddressRepository
from repositories.raw_address.raw_address_repository import RawAddressRepository
class _StubUserAddressRepository(UserAddressRepository):
class _StubRawAddressRepository(RawAddressRepository):
"""``get_col_to_description_mappings`` never touches the repo."""
def load_batch(self, s3_uri: str) -> list[AssetList]:
def load_batch(self, s3_uri: str) -> AddressList:
raise NotImplementedError()
def save_batch(self, addresses: list[AssetList], path_prefix: str) -> str:
def save_batch(self, addresses: AddressList, path_prefix: str) -> str:
raise NotImplementedError()
def _make_user_address(landlord_additional_info: dict[str, str]) -> AssetList:
return AssetList(
def _make_raw_address(landlord_additional_info: dict[str, str]) -> RawAddress:
return RawAddress(
address="1 High St",
postcode=Postcode("AA1 1AA"),
additional_info=landlord_additional_info,
)
def _orchestrator() -> LandlordDescriptionOverridesOrchestrator:
return LandlordDescriptionOverridesOrchestrator(
user_address_repo=_StubUserAddressRepository()
)
def _orchestrator() -> SALOrchestrator:
return SALOrchestrator(raw_address_repo=_StubRawAddressRepository())
def test_collects_every_value_per_shared_key() -> None:
# arrange: every address carries the same keys, all values distinct.
addresses = [
_make_user_address({"description": "cosy", "condition": "new"}),
_make_user_address({"description": "spacious", "condition": "worn"}),
_make_user_address({"description": "bright", "condition": "fair"}),
]
addresses = AddressList(
[
_make_raw_address({"description": "cosy", "condition": "new"}),
_make_raw_address({"description": "spacious", "condition": "worn"}),
_make_raw_address({"description": "bright", "condition": "fair"}),
]
)
# act
mappings = _orchestrator().get_col_to_description_mappings(addresses)
@ -52,11 +52,13 @@ def test_collects_every_value_per_shared_key() -> None:
def test_repeated_values_collapse_to_one_variant() -> None:
# arrange: two addresses share the same wall description.
addresses = [
_make_user_address({"description": "cosy"}),
_make_user_address({"description": "cosy"}),
_make_user_address({"description": "bright"}),
]
addresses = AddressList(
[
_make_raw_address({"description": "cosy"}),
_make_raw_address({"description": "cosy"}),
_make_raw_address({"description": "bright"}),
]
)
# act
mappings = _orchestrator().get_col_to_description_mappings(addresses)
@ -67,11 +69,13 @@ def test_repeated_values_collapse_to_one_variant() -> None:
def test_case_only_variants_collapse_to_one() -> None:
# arrange: the same description typed with inconsistent casing.
addresses = [
_make_user_address({"description": "Cosy"}),
_make_user_address({"description": "cosy"}),
_make_user_address({"description": "COSY"}),
]
addresses = AddressList(
[
_make_raw_address({"description": "Cosy"}),
_make_raw_address({"description": "cosy"}),
_make_raw_address({"description": "COSY"}),
]
)
# act
mappings = _orchestrator().get_col_to_description_mappings(addresses)
@ -82,7 +86,7 @@ def test_case_only_variants_collapse_to_one() -> None:
def test_empty_address_list_yields_empty_mapping() -> None:
# arrange / act
mappings = _orchestrator().get_col_to_description_mappings([])
mappings = _orchestrator().get_col_to_description_mappings(AddressList([]))
# assert
assert mappings == {}
@ -90,7 +94,7 @@ def test_empty_address_list_yields_empty_mapping() -> None:
def test_single_address_yields_single_value_per_key() -> None:
# arrange
addresses = [_make_user_address({"description": "cosy"})]
addresses = AddressList([_make_raw_address({"description": "cosy"})])
# act
mappings = _orchestrator().get_col_to_description_mappings(addresses)

View file

@ -18,8 +18,8 @@ from orchestration.postcode_splitter_orchestrator import PostcodeSplitterOrchest
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
from repositories.raw_address.raw_address_csv_s3_repository import (
RawAddressCsvS3Repository,
)
BUCKET = "splitter-bucket"
@ -27,7 +27,9 @@ REGION = "us-east-1"
def _make_boto_client(service_name: str) -> Any:
factory: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
factory: Any = (
boto3.client
) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
return factory(service_name, region_name=REGION)
@ -62,7 +64,7 @@ class Harness:
csv_client: CsvS3Client
boto_sqs: Any
queue_url: str
repo: UserAddressCsvS3Repository
repo: RawAddressCsvS3Repository
@pytest.fixture
@ -76,7 +78,7 @@ def harness(db_engine: Engine) -> Iterator[Harness]:
queue_url = cast(str, queue["QueueUrl"])
csv_client = CsvS3Client(boto_s3, BUCKET)
repo = UserAddressCsvS3Repository(csv_client, BUCKET)
repo = RawAddressCsvS3Repository(csv_client, BUCKET)
queue_client = Address2UprnQueueClient(boto_sqs, queue_url)
# DB: ephemeral PostgreSQL TaskOrchestrator
@ -89,7 +91,7 @@ def harness(db_engine: Engine) -> Iterator[Harness]:
splitter = PostcodeSplitterOrchestrator(
task_orchestrator=task_orchestrator,
user_address_repo=repo,
raw_address_repo=repo,
queue_client=queue_client,
max_batch_size=3,
)
@ -169,10 +171,8 @@ def test_split_and_dispatch_creates_three_children_for_fixture(
harness: Harness,
) -> None:
# arrange
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
parent_task, parent_subtask = harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
input_uri = _upload_fixture_csv(harness.csv_client)
@ -197,10 +197,8 @@ def test_split_and_dispatch_persists_child_inputs_with_task_id_and_s3_uri(
harness: Harness,
) -> None:
# arrange
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
parent_task, parent_subtask = harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
input_uri = _upload_fixture_csv(harness.csv_client)
@ -230,10 +228,8 @@ def test_split_and_dispatch_publishes_one_message_per_child_with_matching_ids(
harness: Harness,
) -> None:
# arrange
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
parent_task, parent_subtask = harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
input_uri = _upload_fixture_csv(harness.csv_client)
@ -267,10 +263,8 @@ def test_split_and_dispatch_returns_child_ids_in_dispatch_order(
harness: Harness,
) -> None:
# arrange
parent_task, parent_subtask = (
harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
parent_task, parent_subtask = harness.task_orchestrator.create_task_with_subtask(
task_source="manual:postcode-splitter-int"
)
input_uri = _upload_fixture_csv(harness.csv_client)

View file

@ -3,11 +3,11 @@ from collections.abc import Iterator
import pytest
from moto import mock_aws
from domain.addresses.user_address import AssetList
from domain.addresses.raw_address import AddressList, RawAddress
from domain.postcode import Postcode
from infrastructure.csv_s3_client import CsvS3Client
from repositories.user_address.user_address_csv_s3_repository import (
UserAddressCsvS3Repository,
from repositories.raw_address.raw_address_csv_s3_repository import (
RawAddressCsvS3Repository,
)
from tests.infrastructure import make_boto_client
@ -15,22 +15,22 @@ BUCKET = "user-address-bucket"
@pytest.fixture
def repo() -> Iterator[UserAddressCsvS3Repository]:
def repo() -> Iterator[RawAddressCsvS3Repository]:
with mock_aws():
boto_client = make_boto_client("s3")
boto_client.create_bucket(Bucket=BUCKET)
csv_client = CsvS3Client(boto_client, BUCKET)
yield UserAddressCsvS3Repository(csv_client, BUCKET)
yield RawAddressCsvS3Repository(csv_client, BUCKET)
def _upload_csv(
repo: UserAddressCsvS3Repository, rows: list[dict[str, str]], key: str
repo: RawAddressCsvS3Repository, rows: list[dict[str, str]], key: str
) -> str:
return repo._csv_client.save_rows(rows, key) # pyright: ignore[reportPrivateUsage]
def test_load_batch_parses_address_postcode_and_reference(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
rows = [
@ -56,7 +56,7 @@ def test_load_batch_parses_address_postcode_and_reference(
def test_load_batch_uses_only_address_1_when_others_missing(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
rows = [
@ -81,7 +81,7 @@ def test_load_batch_uses_only_address_1_when_others_missing(
def test_load_batch_handles_missing_internal_reference(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
rows = [
@ -106,10 +106,10 @@ def test_load_batch_handles_missing_internal_reference(
def test_load_batch_captures_full_source_row(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# A raw EPC-export-shaped row: the splitter must preserve every column,
# not just the ones it parses into UserAddress fields.
# not just the ones it parses into RawAddress fields.
# arrange
row = {
"Asset Reference": "511",
@ -128,7 +128,7 @@ def test_load_batch_captures_full_source_row(
def test_load_batch_raises_when_postcode_column_absent(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
rows = [{"Address 1": "1 High Street", "Property Type": "Flat"}]
@ -140,7 +140,7 @@ def test_load_batch_raises_when_postcode_column_absent(
def test_save_batch_passes_through_all_columns_and_appends_postcode_clean(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
row = {
@ -169,19 +169,21 @@ def test_save_batch_passes_through_all_columns_and_appends_postcode_clean(
def test_save_batch_returns_uri_under_path_prefix(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
addresses = [
AssetList(
address="1 High Street",
postcode=Postcode("SW1A 1AA"),
additional_info={
"Address 1": "1 High Street",
"postcode": "SW1A 1AA",
},
),
]
addresses = AddressList(
[
RawAddress(
address="1 High Street",
postcode=Postcode("SW1A 1AA"),
additional_info={
"Address 1": "1 High Street",
"postcode": "SW1A 1AA",
},
),
]
)
# act
uri = repo.save_batch(addresses, "tasks/abc/batches")
@ -192,7 +194,7 @@ def test_save_batch_returns_uri_under_path_prefix(
def test_save_then_reload_round_trip_preserves_columns(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
rows = [
@ -225,19 +227,21 @@ def test_save_then_reload_round_trip_preserves_columns(
def test_save_batch_uses_unique_filename_per_call(
repo: UserAddressCsvS3Repository,
repo: RawAddressCsvS3Repository,
) -> None:
# arrange
addresses = [
AssetList(
address="1 High Street",
postcode=Postcode("SW1A 1AA"),
additional_info={
"Address 1": "1 High Street",
"postcode": "SW1A 1AA",
},
),
]
addresses = AddressList(
[
RawAddress(
address="1 High Street",
postcode=Postcode("SW1A 1AA"),
additional_info={
"Address 1": "1 High Street",
"postcode": "SW1A 1AA",
},
),
]
)
# act
uri_1 = repo.save_batch(addresses, "tasks/uniqueness")