diff --git a/applications/bulk_upload_finaliser/handler.py b/applications/bulk_upload_finaliser/handler.py index 72894fa0..6017cbb5 100644 --- a/applications/bulk_upload_finaliser/handler.py +++ b/applications/bulk_upload_finaliser/handler.py @@ -33,8 +33,8 @@ from orchestration.task_orchestrator import TaskOrchestrator from repositories.bulk_upload.bulk_upload_status_writer_postgres import ( BulkUploadStatusWriterPostgresRepository, ) -from repositories.property.property_identity_writer_postgres import ( - PropertyIdentityWriterPostgresRepository, +from repositories.property.property_postgres_repository import ( + PropertyPostgresRepository, ) from utilities.aws_lambda.subtask_handler import subtask_handler @@ -51,16 +51,17 @@ def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int: session = make_session(engine) try: orchestrator = BulkUploadFinaliserOrchestrator( - property_writer=PropertyIdentityWriterPostgresRepository(session) + # Write-only path: no EpcRepository needed for inserts. + property_repo=PropertyPostgresRepository(session), + status_writer=BulkUploadStatusWriterPostgresRepository(session), ) - status_writer = BulkUploadStatusWriterPostgresRepository(session) - # Resolution is pure, so run it before opening the transaction. - inserts = orchestrator.to_property_rows(rows, trigger.portfolio_id) - # Atomic finalise: insert properties and mark `complete` together — a - # failure in either rolls back both, leaving the row for the failure path. + # Atomic finalise: the orchestrator inserts properties and marks `complete` + # via its injected writers; the transaction here makes them land together — + # a failure in either rolls back both, leaving the row for the failure path. with commit_scope(session): - inserted = orchestrator.persist(inserts) - status_writer.set_status(trigger.task_id, "complete") + inserted = orchestrator.finalise( + rows, trigger.portfolio_id, trigger.task_id + ) finally: session.close() diff --git a/docs/adr/0013-bulk-upload-finaliser-writes-properties.md b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md index 378351cf..f9a61d20 100644 --- a/docs/adr/0013-bulk-upload-finaliser-writes-properties.md +++ b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md @@ -78,20 +78,25 @@ inserts properties. Finalise changes that. writes `complete`; it owns only the `awaiting_review → finalising` compare-and-swap at dispatch (frontend ADR-0005). -5. **DDD layering (ADR-0003 + the landlord precedent).** Domain logic — resolving - combiner rows into property identity rows — lives in - `orchestration/bulk_upload_finaliser_orchestrator.py`; the Lambda - `applications/bulk_upload_finaliser/handler.py` stays thin (parse trigger, wire - S3 + engine + repos, delegate, report status). New seams: - - Property insert: port `repositories/property/property_identity_writer.py` - (+ `PropertyIdentityInsert` DTO), adapter - `repositories/property/property_identity_writer_postgres.py`, writing the - amended `infrastructure/postgres/property_table.py` mirror. A dedicated *writer* - distinct from the aggregate-hydrating `PropertyRepository`, which needs an EPC - slice the finaliser doesn't. +5. **DDD layering (ADR-0003 + the landlord precedent).** The + `orchestration/bulk_upload_finaliser_orchestrator.py` owns the whole Finalise + domain flow via a single `finalise(rows, portfolio_id, task_id)` that depends + **only on the two repository ports** — it resolves the combiner rows, inserts + the properties, and marks the upload `complete`, with no engine/session/DB of + its own. So it is unit-testable with two in-memory fakes + (`tests/orchestration/test_bulk_upload_finaliser_orchestrator.py`). The Lambda + `applications/bulk_upload_finaliser/handler.py` is the composition root: parse + trigger, read S3, build the engine/session and the concrete adapters, open the + transaction around `finalise`, and handle the failure path. New seams: + - Property insert: `insert_all` (+ the `PropertyIdentityInsert` DTO) is added to + the existing `PropertyRepository` port and its `PropertyPostgresRepository` + adapter — **one repository per aggregate**, reads and writes together, writing + the amended `infrastructure/postgres/property_table.py` mirror. `epc_repo` is + optional: the write path creates identity rows and never hydrates, so the + finaliser constructs the repo with a session alone. - Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter `…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py` - mirror. + mirror (a separate table → a separate repository). - Transaction boundary stays in the `infrastructure/postgres/engine` helper (`commit_scope`); the handler opens the context (once, around insert+complete), never calls `.commit()`; orchestrator and repositories never commit. diff --git a/orchestration/bulk_upload_finaliser_orchestrator.py b/orchestration/bulk_upload_finaliser_orchestrator.py index 44d65178..46ad5b12 100644 --- a/orchestration/bulk_upload_finaliser_orchestrator.py +++ b/orchestration/bulk_upload_finaliser_orchestrator.py @@ -11,9 +11,12 @@ from __future__ import annotations import re from typing import Any, Optional -from repositories.property.property_identity_writer import ( +from uuid import UUID + +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.property.property_repository import ( PropertyIdentityInsert, - PropertyIdentityWriter, + PropertyRepository, ) # Combiner-output columns — identical to the old frontend /finalize route and the @@ -67,29 +70,47 @@ def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]: class BulkUploadFinaliserOrchestrator: - def __init__(self, property_writer: PropertyIdentityWriter) -> None: - self._property_writer = property_writer + """Owns the domain flow of Finalise, depending only on repository ports. + + Both collaborators are ports (``PropertyRepository``, + ``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the + Lambda handler (the composition root). So a unit test constructs this with two + fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The + orchestrator never commits: the caller opens the transaction around + ``finalise`` so the insert and the ``complete`` flip land atomically. + """ + + def __init__( + self, + property_repo: PropertyRepository, + status_writer: BulkUploadStatusWriter, + ) -> None: + self._property_repo = property_repo + self._status_writer = status_writer + + def finalise( + self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID + ) -> int: + """Resolve the combiner rows, insert the properties, and mark the upload + ``complete`` — all via the injected repositories, no DB connection of its + own. Returns the number of properties inserted. Does not commit.""" + inserts = self.to_property_rows(rows, portfolio_id) + inserted = self._property_repo.insert_all(inserts) + self._status_writer.set_status(task_id, "complete") + return inserted def to_property_rows( self, rows: list[dict[str, str]], portfolio_id: int ) -> list[PropertyIdentityInsert]: """Resolve combiner rows into property identity inserts. - Pure (no DB / IO), so the caller can run it before opening a transaction. - Reproduces the old ``/finalize`` route's resolution exactly: matched - address falls back to the user-inputted one; postcode is extracted from - the matched address or falls back to the user-inputted postcode. + Pure (no DB / IO) and independently testable. Reproduces the old + ``/finalize`` route's resolution exactly: matched address falls back to the + user-inputted one; postcode is extracted from the matched address or falls + back to the user-inputted postcode. """ return [self._row_to_insert(raw, portfolio_id) for raw in rows] - def persist(self, inserts: list[PropertyIdentityInsert]) -> int: - """Insert the resolved rows via the writer (idempotent — see the adapter). - - Does not commit; the caller opens the transaction around this call. - Returns the number of properties actually inserted. - """ - return self._property_writer.insert_all(inserts) - @staticmethod def _row_to_insert( raw: dict[str, str], portfolio_id: int diff --git a/repositories/property/property_identity_writer.py b/repositories/property/property_identity_writer.py deleted file mode 100644 index a5db0129..00000000 --- a/repositories/property/property_identity_writer.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Optional - - -@dataclass(frozen=True) -class PropertyIdentityInsert: - """One row the finaliser inserts into the FE-owned ``property`` table. - - Mirrors the exact column set today's Next.js ``/finalize`` writes (ADR-0013): - nine fields plus ``creation_status='READY'``. ``address``/``postcode`` are the - resolved (matched ?? user-inputted) values and may be ``None``. - """ - - portfolio_id: int - uprn: Optional[int] - landlord_property_id: Optional[str] - address: Optional[str] - postcode: Optional[str] - user_inputted_address: Optional[str] - user_inputted_postcode: Optional[str] - lexiscore: Optional[float] - creation_status: str = "READY" - - -class PropertyIdentityWriter(ABC): - """Port: bulk-inserts ``property`` identity rows at Finalise (ADR-0013). - - Distinct from ``PropertyRepository`` (which hydrates the Property *aggregate* - for reads and needs an EPC slice). This writer only persists identity rows and - has no read/EPC dependency. Idempotent on re-run: existing properties are not - churned — see the adapter's conflict policy. - """ - - @abstractmethod - def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: - """Insert all rows, skipping any whose ``(portfolio_id, uprn)`` already - exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). Rows with - no UPRN are always inserted. Returns the number of rows actually inserted. - An empty list is a no-op returning 0.""" - ... diff --git a/repositories/property/property_identity_writer_postgres.py b/repositories/property/property_identity_writer_postgres.py deleted file mode 100644 index 2be9aba5..00000000 --- a/repositories/property/property_identity_writer_postgres.py +++ /dev/null @@ -1,63 +0,0 @@ -"""Postgres adapter for ``PropertyIdentityWriter`` (ADR-0013). - -Bulk-inserts ``property`` identity rows through the ``PropertyRow`` SQLModel -mirror. The conflict policy lives in SQL and reproduces today's Next.js -``onConflictDoNothing`` exactly: skip a row whose ``(portfolio_id, uprn)`` already -exists under the FE partial unique index ``uq_property_portfolio_uprn`` -(``WHERE uprn IS NOT NULL``); rows with no UPRN are always inserted. This makes a -re-run leave existing properties untouched — contrast ``property_overrides``, -which recalculates (ADR-0005). -""" - -from __future__ import annotations - -from typing import Any, cast - -from sqlalchemy import Table -from sqlalchemy.dialects.postgresql import insert as pg_insert -from sqlmodel import Session - -from infrastructure.postgres.property_table import PropertyRow -from repositories.property.property_identity_writer import ( - PropertyIdentityInsert, - PropertyIdentityWriter, -) - - -class PropertyIdentityWriterPostgresRepository(PropertyIdentityWriter): - def __init__(self, session: Session) -> None: - self._session = session - # SQLModel injects ``__table__`` at runtime on table=True classes but the - # stubs don't expose it; pin to ``Table`` so the dialect insert is typed. - self._table: Table = cast(Table, getattr(PropertyRow, "__table__")) - - def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: - if not rows: - return 0 - - values = [ - { - "portfolio_id": r.portfolio_id, - "creation_status": r.creation_status, - "uprn": r.uprn, - "landlord_property_id": r.landlord_property_id, - "address": r.address, - "postcode": r.postcode, - "user_inputted_address": r.user_inputted_address, - "user_inputted_postcode": r.user_inputted_postcode, - "lexiscore": r.lexiscore, - } - for r in rows - ] - - stmt = pg_insert(self._table).values(values) - # Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL). - stmt = stmt.on_conflict_do_nothing( - index_elements=["portfolio_id", "uprn"], - index_where=self._table.c.uprn.isnot(None), - ) - - # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked - # deprecated in the stubs but the INSERT path is supported. - result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] - return cast(int, result.rowcount) diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index 6226e1e9..407fe1e5 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -1,24 +1,47 @@ from __future__ import annotations +from typing import Any, Optional, cast + +from sqlalchemy import Table +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col, select from domain.property.properties import Properties from domain.property.property import Property, PropertyIdentity from infrastructure.postgres.property_table import PropertyRow from repositories.epc.epc_repository import EpcRepository -from repositories.property.property_repository import PropertyRepository +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) class PropertyPostgresRepository(PropertyRepository): - """Hydrates the Property aggregate from the FE-owned ``property`` row plus the - EPC slice (via an injected `EpcRepository`). Reads only from repos — no - external IO — so a hydrated Property is a pure function of repository state - (ADR-0003). + """Postgres adapter for the ``property`` table — reads and writes (ADR-0003). + + Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice + (via an injected `EpcRepository`), so a hydrated Property is a pure function of + repository state. ``epc_repo`` is optional: the Finalise write path + (``insert_all``) creates new identity rows and never hydrates, so callers that + only insert construct this with a session alone. """ - def __init__(self, session: Session, epc_repo: EpcRepository) -> None: + def __init__( + self, session: Session, epc_repo: Optional[EpcRepository] = None + ) -> None: self._session = session self._epc_repo = epc_repo + # ``__table__`` is injected at runtime on table=True classes but the stubs + # don't expose it; pin to ``Table`` so the dialect insert is typed. + self._table: Table = cast(Table, getattr(PropertyRow, "__table__")) + + def _epc(self) -> EpcRepository: + if self._epc_repo is None: + raise ValueError( + "PropertyPostgresRepository needs an EpcRepository to read; it was " + "constructed for the write-only Finalise path." + ) + return self._epc_repo def get(self, property_id: int) -> Property: row = self._session.get(PropertyRow, property_id) @@ -36,7 +59,7 @@ class PropertyPostgresRepository(PropertyRepository): ) return Property( identity=identity, - epc=self._epc_repo.get_for_property(property_id), + epc=self._epc().get_for_property(property_id), ) def get_many(self, property_ids: list[int]) -> Properties: @@ -46,7 +69,7 @@ class PropertyPostgresRepository(PropertyRepository): select(PropertyRow).where(col(PropertyRow.id).in_(property_ids)) ).all() row_by_id = {row.id: row for row in rows} - epcs = self._epc_repo.get_for_properties(property_ids) + epcs = self._epc().get_for_properties(property_ids) items: list[Property] = [] for property_id in property_ids: row = row_by_id.get(property_id) @@ -65,3 +88,36 @@ class PropertyPostgresRepository(PropertyRepository): ) ) return Properties(items) + + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + if not rows: + return 0 + + values = [ + { + "portfolio_id": r.portfolio_id, + "creation_status": r.creation_status, + "uprn": r.uprn, + "landlord_property_id": r.landlord_property_id, + "address": r.address, + "postcode": r.postcode, + "user_inputted_address": r.user_inputted_address, + "user_inputted_postcode": r.user_inputted_postcode, + "lexiscore": r.lexiscore, + } + for r in rows + ] + + stmt = pg_insert(self._table).values(values) + # Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL), + # reproducing today's Next.js onConflictDoNothing — a re-run leaves existing + # properties untouched (contrast property_overrides, which recalculates). + stmt = stmt.on_conflict_do_nothing( + index_elements=["portfolio_id", "uprn"], + index_where=self._table.c.uprn.isnot(None), + ) + + # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked + # deprecated in the stubs but the INSERT path is supported. + result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] + return cast(int, result.rowcount) diff --git a/repositories/property/property_repository.py b/repositories/property/property_repository.py index 1f3df1da..1773b530 100644 --- a/repositories/property/property_repository.py +++ b/repositories/property/property_repository.py @@ -1,17 +1,40 @@ from __future__ import annotations from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional from domain.property.properties import Properties from domain.property.property import Property -class PropertyRepository(ABC): - """Loads and saves the Property aggregate. +@dataclass(frozen=True) +class PropertyIdentityInsert: + """One row inserted into the FE-owned ``property`` table at Finalise (ADR-0013). - Composes the aggregate whole from the FE-owned ``property`` identity row plus - its source-data slices (EPC today; Site Notes / enrichments as later slices - land). Aggregates load whole — never half a Property (ADR-0002). + Mirrors the exact column set today's Next.js ``/finalize`` writes: nine fields + plus ``creation_status='READY'``. ``address``/``postcode`` are the resolved + (matched ?? user-inputted) values and may be ``None``. + """ + + portfolio_id: int + uprn: Optional[int] + landlord_property_id: Optional[str] + address: Optional[str] + postcode: Optional[str] + user_inputted_address: Optional[str] + user_inputted_postcode: Optional[str] + lexiscore: Optional[float] + creation_status: str = "READY" + + +class PropertyRepository(ABC): + """Reads and writes the FE-owned ``property`` table. + + Reads hydrate the Property aggregate whole — never half a Property — from the + identity row plus its source-data slices (EPC today; Site Notes / enrichments + as later slices land) (ADR-0002, ADR-0012). Writes bulk-insert identity rows at + Finalise (ADR-0013). One repository per aggregate. """ @abstractmethod @@ -23,3 +46,11 @@ class PropertyRepository(ABC): rather than one round-trip per property (ADR-0012). Order follows the input ids.""" ... + + @abstractmethod + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + """Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)`` + already exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). + Rows with no UPRN are always inserted. Returns the number actually + inserted; an empty list is a no-op returning 0 (ADR-0013).""" + ... diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py index 3e2feef0..c9dcf891 100644 --- a/tests/orchestration/fakes.py +++ b/tests/orchestration/fakes.py @@ -15,7 +15,10 @@ from domain.property.properties import Properties from domain.property.property import Property from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository from repositories.epc.epc_repository import EpcRepository -from repositories.property.property_repository import PropertyRepository +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) from repositories.solar.solar_repository import SolarRepository from repositories.unit_of_work import UnitOfWork @@ -30,6 +33,10 @@ class FakePropertyRepo(PropertyRepository): def get_many(self, property_ids: list[int]) -> Properties: return Properties([self._by_id[property_id] for property_id in property_ids]) + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + self.inserted: list[PropertyIdentityInsert] = list(rows) + return len(rows) + class FakeEpcRepo(EpcRepository): def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None: diff --git a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py new file mode 100644 index 00000000..da6de277 --- /dev/null +++ b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py @@ -0,0 +1,105 @@ +"""The finaliser orchestrator runs end-to-end against fake writers — no DB. + +This is the payoff of injecting the repository ports (ADR-0013): the whole +Finalise domain flow (resolve combiner rows -> insert properties -> mark complete) +is unit-testable with two in-memory fakes, no engine/session/Postgres. +""" + +from __future__ import annotations + +from uuid import UUID, uuid4 + +from domain.property.properties import Properties +from domain.property.property import Property +from orchestration.bulk_upload_finaliser_orchestrator import ( + BulkUploadFinaliserOrchestrator, +) +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) + + +class FakePropertyRepository(PropertyRepository): + """Records inserts; the read half is unused on the Finalise path.""" + + def __init__(self) -> None: + self.inserted: list[PropertyIdentityInsert] = [] + + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + self.inserted = list(rows) + return len(rows) + + def get(self, property_id: int) -> Property: # pragma: no cover + raise NotImplementedError + + def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover + raise NotImplementedError + + +class FakeStatusWriter(BulkUploadStatusWriter): + def __init__(self) -> None: + self.calls: list[tuple[UUID, str]] = [] + + def set_status(self, task_id: UUID, status: str) -> None: + self.calls.append((task_id, status)) + + +def _orchestrator() -> tuple[ + BulkUploadFinaliserOrchestrator, FakePropertyRepository, FakeStatusWriter +]: + prop = FakePropertyRepository() + status = FakeStatusWriter() + return BulkUploadFinaliserOrchestrator(prop, status), prop, status + + +def test_finalise_inserts_resolved_rows_and_marks_complete() -> None: + orchestrator, prop, status = _orchestrator() + task_id = uuid4() + rows = [ + { + "Address 1": "1 Some Street", + "postcode": "A0 0AA", + "Internal Reference": "REF-1", + "address2uprn_uprn": "100023", + "address2uprn_address": "1 SOME STREET, TOWN, SW1A 1AA", + "address2uprn_lexiscore": "0.95", + }, + ] + + inserted = orchestrator.finalise(rows, portfolio_id=7, task_id=task_id) + + assert inserted == 1 + (row,) = prop.inserted + assert row.portfolio_id == 7 + assert row.uprn == 100023 + assert row.landlord_property_id == "REF-1" + assert row.address == "1 SOME STREET, TOWN, SW1A 1AA" # matched wins + assert row.postcode == "SW1A 1AA" # extracted from the matched address + assert row.user_inputted_address == "1 Some Street" + assert row.lexiscore == 0.95 + assert row.creation_status == "READY" + # The upload is marked complete via the injected status writer. + assert status.calls == [(task_id, "complete")] + + +def test_finalise_falls_back_to_user_input_when_unmatched() -> None: + orchestrator, prop, _status = _orchestrator() + rows = [ + { + "Address 1": "2 Other Road", + "postcode": "B1 1BB", + "address2uprn_uprn": "invalid postcode", + "address2uprn_address": "invalid postcode", + "address2uprn_lexiscore": "", + }, + ] + + orchestrator.finalise(rows, portfolio_id=9, task_id=uuid4()) + + (row,) = prop.inserted + assert row.uprn is None # missing sentinel -> None + assert row.address == "2 Other Road" # falls back to user input + assert row.postcode == "B1 1BB" # falls back to user-inputted postcode + assert row.lexiscore is None