reformatted to be DDD structure

This commit is contained in:
Jun-te Kim 2026-06-04 14:50:04 +00:00
parent dfd05ba28b
commit 261fae2e79
9 changed files with 278 additions and 158 deletions

View file

@ -33,8 +33,8 @@ from orchestration.task_orchestrator import TaskOrchestrator
from repositories.bulk_upload.bulk_upload_status_writer_postgres import (
BulkUploadStatusWriterPostgresRepository,
)
from repositories.property.property_identity_writer_postgres import (
PropertyIdentityWriterPostgresRepository,
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
@ -51,16 +51,17 @@ def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int:
session = make_session(engine)
try:
orchestrator = BulkUploadFinaliserOrchestrator(
property_writer=PropertyIdentityWriterPostgresRepository(session)
# Write-only path: no EpcRepository needed for inserts.
property_repo=PropertyPostgresRepository(session),
status_writer=BulkUploadStatusWriterPostgresRepository(session),
)
status_writer = BulkUploadStatusWriterPostgresRepository(session)
# Resolution is pure, so run it before opening the transaction.
inserts = orchestrator.to_property_rows(rows, trigger.portfolio_id)
# Atomic finalise: insert properties and mark `complete` together — a
# failure in either rolls back both, leaving the row for the failure path.
# Atomic finalise: the orchestrator inserts properties and marks `complete`
# via its injected writers; the transaction here makes them land together —
# a failure in either rolls back both, leaving the row for the failure path.
with commit_scope(session):
inserted = orchestrator.persist(inserts)
status_writer.set_status(trigger.task_id, "complete")
inserted = orchestrator.finalise(
rows, trigger.portfolio_id, trigger.task_id
)
finally:
session.close()

View file

@ -78,20 +78,25 @@ inserts properties. Finalise changes that.
writes `complete`; it owns only the `awaiting_review → finalising`
compare-and-swap at dispatch (frontend ADR-0005).
5. **DDD layering (ADR-0003 + the landlord precedent).** Domain logic — resolving
combiner rows into property identity rows — lives in
`orchestration/bulk_upload_finaliser_orchestrator.py`; the Lambda
`applications/bulk_upload_finaliser/handler.py` stays thin (parse trigger, wire
S3 + engine + repos, delegate, report status). New seams:
- Property insert: port `repositories/property/property_identity_writer.py`
(+ `PropertyIdentityInsert` DTO), adapter
`repositories/property/property_identity_writer_postgres.py`, writing the
amended `infrastructure/postgres/property_table.py` mirror. A dedicated *writer*
distinct from the aggregate-hydrating `PropertyRepository`, which needs an EPC
slice the finaliser doesn't.
5. **DDD layering (ADR-0003 + the landlord precedent).** The
`orchestration/bulk_upload_finaliser_orchestrator.py` owns the whole Finalise
domain flow via a single `finalise(rows, portfolio_id, task_id)` that depends
**only on the two repository ports** — it resolves the combiner rows, inserts
the properties, and marks the upload `complete`, with no engine/session/DB of
its own. So it is unit-testable with two in-memory fakes
(`tests/orchestration/test_bulk_upload_finaliser_orchestrator.py`). The Lambda
`applications/bulk_upload_finaliser/handler.py` is the composition root: parse
trigger, read S3, build the engine/session and the concrete adapters, open the
transaction around `finalise`, and handle the failure path. New seams:
- Property insert: `insert_all` (+ the `PropertyIdentityInsert` DTO) is added to
the existing `PropertyRepository` port and its `PropertyPostgresRepository`
adapter — **one repository per aggregate**, reads and writes together, writing
the amended `infrastructure/postgres/property_table.py` mirror. `epc_repo` is
optional: the write path creates identity rows and never hydrates, so the
finaliser constructs the repo with a session alone.
- Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter
`…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py`
mirror.
mirror (a separate table → a separate repository).
- Transaction boundary stays in the `infrastructure/postgres/engine` helper
(`commit_scope`); the handler opens the context (once, around insert+complete),
never calls `.commit()`; orchestrator and repositories never commit.

View file

@ -11,9 +11,12 @@ from __future__ import annotations
import re
from typing import Any, Optional
from repositories.property.property_identity_writer import (
from uuid import UUID
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyIdentityWriter,
PropertyRepository,
)
# Combiner-output columns — identical to the old frontend /finalize route and the
@ -67,29 +70,47 @@ def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
class BulkUploadFinaliserOrchestrator:
def __init__(self, property_writer: PropertyIdentityWriter) -> None:
self._property_writer = property_writer
"""Owns the domain flow of Finalise, depending only on repository ports.
Both collaborators are ports (``PropertyRepository``,
``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the
Lambda handler (the composition root). So a unit test constructs this with two
fakes and exercises ``finalise`` end-to-end no engine, session, or DB. The
orchestrator never commits: the caller opens the transaction around
``finalise`` so the insert and the ``complete`` flip land atomically.
"""
def __init__(
self,
property_repo: PropertyRepository,
status_writer: BulkUploadStatusWriter,
) -> None:
self._property_repo = property_repo
self._status_writer = status_writer
def finalise(
self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID
) -> int:
"""Resolve the combiner rows, insert the properties, and mark the upload
``complete`` all via the injected repositories, no DB connection of its
own. Returns the number of properties inserted. Does not commit."""
inserts = self.to_property_rows(rows, portfolio_id)
inserted = self._property_repo.insert_all(inserts)
self._status_writer.set_status(task_id, "complete")
return inserted
def to_property_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> list[PropertyIdentityInsert]:
"""Resolve combiner rows into property identity inserts.
Pure (no DB / IO), so the caller can run it before opening a transaction.
Reproduces the old ``/finalize`` route's resolution exactly: matched
address falls back to the user-inputted one; postcode is extracted from
the matched address or falls back to the user-inputted postcode.
Pure (no DB / IO) and independently testable. Reproduces the old
``/finalize`` route's resolution exactly: matched address falls back to the
user-inputted one; postcode is extracted from the matched address or falls
back to the user-inputted postcode.
"""
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
def persist(self, inserts: list[PropertyIdentityInsert]) -> int:
"""Insert the resolved rows via the writer (idempotent — see the adapter).
Does not commit; the caller opens the transaction around this call.
Returns the number of properties actually inserted.
"""
return self._property_writer.insert_all(inserts)
@staticmethod
def _row_to_insert(
raw: dict[str, str], portfolio_id: int

View file

@ -1,43 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class PropertyIdentityInsert:
"""One row the finaliser inserts into the FE-owned ``property`` table.
Mirrors the exact column set today's Next.js ``/finalize`` writes (ADR-0013):
nine fields plus ``creation_status='READY'``. ``address``/``postcode`` are the
resolved (matched ?? user-inputted) values and may be ``None``.
"""
portfolio_id: int
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
postcode: Optional[str]
user_inputted_address: Optional[str]
user_inputted_postcode: Optional[str]
lexiscore: Optional[float]
creation_status: str = "READY"
class PropertyIdentityWriter(ABC):
"""Port: bulk-inserts ``property`` identity rows at Finalise (ADR-0013).
Distinct from ``PropertyRepository`` (which hydrates the Property *aggregate*
for reads and needs an EPC slice). This writer only persists identity rows and
has no read/EPC dependency. Idempotent on re-run: existing properties are not
churned see the adapter's conflict policy.
"""
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Insert all rows, skipping any whose ``(portfolio_id, uprn)`` already
exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). Rows with
no UPRN are always inserted. Returns the number of rows actually inserted.
An empty list is a no-op returning 0."""
...

View file

@ -1,63 +0,0 @@
"""Postgres adapter for ``PropertyIdentityWriter`` (ADR-0013).
Bulk-inserts ``property`` identity rows through the ``PropertyRow`` SQLModel
mirror. The conflict policy lives in SQL and reproduces today's Next.js
``onConflictDoNothing`` exactly: skip a row whose ``(portfolio_id, uprn)`` already
exists under the FE partial unique index ``uq_property_portfolio_uprn``
(``WHERE uprn IS NOT NULL``); rows with no UPRN are always inserted. This makes a
re-run leave existing properties untouched contrast ``property_overrides``,
which recalculates (ADR-0005).
"""
from __future__ import annotations
from typing import Any, cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from infrastructure.postgres.property_table import PropertyRow
from repositories.property.property_identity_writer import (
PropertyIdentityInsert,
PropertyIdentityWriter,
)
class PropertyIdentityWriterPostgresRepository(PropertyIdentityWriter):
def __init__(self, session: Session) -> None:
self._session = session
# SQLModel injects ``__table__`` at runtime on table=True classes but the
# stubs don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the INSERT path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)

View file

@ -1,24 +1,47 @@
from __future__ import annotations
from typing import Any, Optional, cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
from domain.property.properties import Properties
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.property_table import PropertyRow
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
class PropertyPostgresRepository(PropertyRepository):
"""Hydrates the Property aggregate from the FE-owned ``property`` row plus the
EPC slice (via an injected `EpcRepository`). Reads only from repos no
external IO so a hydrated Property is a pure function of repository state
(ADR-0003).
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
(via an injected `EpcRepository`), so a hydrated Property is a pure function of
repository state. ``epc_repo`` is optional: the Finalise write path
(``insert_all``) creates new identity rows and never hydrates, so callers that
only insert construct this with a session alone.
"""
def __init__(self, session: Session, epc_repo: EpcRepository) -> None:
def __init__(
self, session: Session, epc_repo: Optional[EpcRepository] = None
) -> None:
self._session = session
self._epc_repo = epc_repo
# ``__table__`` is injected at runtime on table=True classes but the stubs
# don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def _epc(self) -> EpcRepository:
if self._epc_repo is None:
raise ValueError(
"PropertyPostgresRepository needs an EpcRepository to read; it was "
"constructed for the write-only Finalise path."
)
return self._epc_repo
def get(self, property_id: int) -> Property:
row = self._session.get(PropertyRow, property_id)
@ -36,7 +59,7 @@ class PropertyPostgresRepository(PropertyRepository):
)
return Property(
identity=identity,
epc=self._epc_repo.get_for_property(property_id),
epc=self._epc().get_for_property(property_id),
)
def get_many(self, property_ids: list[int]) -> Properties:
@ -46,7 +69,7 @@ class PropertyPostgresRepository(PropertyRepository):
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
).all()
row_by_id = {row.id: row for row in rows}
epcs = self._epc_repo.get_for_properties(property_ids)
epcs = self._epc().get_for_properties(property_ids)
items: list[Property] = []
for property_id in property_ids:
row = row_by_id.get(property_id)
@ -65,3 +88,36 @@ class PropertyPostgresRepository(PropertyRepository):
)
)
return Properties(items)
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
# properties untouched (contrast property_overrides, which recalculates).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the INSERT path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)

View file

@ -1,17 +1,40 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from domain.property.properties import Properties
from domain.property.property import Property
class PropertyRepository(ABC):
"""Loads and saves the Property aggregate.
@dataclass(frozen=True)
class PropertyIdentityInsert:
"""One row inserted into the FE-owned ``property`` table at Finalise (ADR-0013).
Composes the aggregate whole from the FE-owned ``property`` identity row plus
its source-data slices (EPC today; Site Notes / enrichments as later slices
land). Aggregates load whole never half a Property (ADR-0002).
Mirrors the exact column set today's Next.js ``/finalize`` writes: nine fields
plus ``creation_status='READY'``. ``address``/``postcode`` are the resolved
(matched ?? user-inputted) values and may be ``None``.
"""
portfolio_id: int
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
postcode: Optional[str]
user_inputted_address: Optional[str]
user_inputted_postcode: Optional[str]
lexiscore: Optional[float]
creation_status: str = "READY"
class PropertyRepository(ABC):
"""Reads and writes the FE-owned ``property`` table.
Reads hydrate the Property aggregate whole never half a Property from the
identity row plus its source-data slices (EPC today; Site Notes / enrichments
as later slices land) (ADR-0002, ADR-0012). Writes bulk-insert identity rows at
Finalise (ADR-0013). One repository per aggregate.
"""
@abstractmethod
@ -23,3 +46,11 @@ class PropertyRepository(ABC):
rather than one round-trip per property (ADR-0012). Order follows the
input ids."""
...
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)``
already exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``).
Rows with no UPRN are always inserted. Returns the number actually
inserted; an empty list is a no-op returning 0 (ADR-0013)."""
...

View file

@ -15,7 +15,10 @@ from domain.property.properties import Properties
from domain.property.property import Property
from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
from repositories.solar.solar_repository import SolarRepository
from repositories.unit_of_work import UnitOfWork
@ -30,6 +33,10 @@ class FakePropertyRepo(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties:
return Properties([self._by_id[property_id] for property_id in property_ids])
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted: list[PropertyIdentityInsert] = list(rows)
return len(rows)
class FakeEpcRepo(EpcRepository):
def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None:

View file

@ -0,0 +1,105 @@
"""The finaliser orchestrator runs end-to-end against fake writers — no DB.
This is the payoff of injecting the repository ports (ADR-0013): the whole
Finalise domain flow (resolve combiner rows -> insert properties -> mark complete)
is unit-testable with two in-memory fakes, no engine/session/Postgres.
"""
from __future__ import annotations
from uuid import UUID, uuid4
from domain.property.properties import Properties
from domain.property.property import Property
from orchestration.bulk_upload_finaliser_orchestrator import (
BulkUploadFinaliserOrchestrator,
)
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
class FakePropertyRepository(PropertyRepository):
"""Records inserts; the read half is unused on the Finalise path."""
def __init__(self) -> None:
self.inserted: list[PropertyIdentityInsert] = []
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted = list(rows)
return len(rows)
def get(self, property_id: int) -> Property: # pragma: no cover
raise NotImplementedError
def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover
raise NotImplementedError
class FakeStatusWriter(BulkUploadStatusWriter):
def __init__(self) -> None:
self.calls: list[tuple[UUID, str]] = []
def set_status(self, task_id: UUID, status: str) -> None:
self.calls.append((task_id, status))
def _orchestrator() -> tuple[
BulkUploadFinaliserOrchestrator, FakePropertyRepository, FakeStatusWriter
]:
prop = FakePropertyRepository()
status = FakeStatusWriter()
return BulkUploadFinaliserOrchestrator(prop, status), prop, status
def test_finalise_inserts_resolved_rows_and_marks_complete() -> None:
orchestrator, prop, status = _orchestrator()
task_id = uuid4()
rows = [
{
"Address 1": "1 Some Street",
"postcode": "A0 0AA",
"Internal Reference": "REF-1",
"address2uprn_uprn": "100023",
"address2uprn_address": "1 SOME STREET, TOWN, SW1A 1AA",
"address2uprn_lexiscore": "0.95",
},
]
inserted = orchestrator.finalise(rows, portfolio_id=7, task_id=task_id)
assert inserted == 1
(row,) = prop.inserted
assert row.portfolio_id == 7
assert row.uprn == 100023
assert row.landlord_property_id == "REF-1"
assert row.address == "1 SOME STREET, TOWN, SW1A 1AA" # matched wins
assert row.postcode == "SW1A 1AA" # extracted from the matched address
assert row.user_inputted_address == "1 Some Street"
assert row.lexiscore == 0.95
assert row.creation_status == "READY"
# The upload is marked complete via the injected status writer.
assert status.calls == [(task_id, "complete")]
def test_finalise_falls_back_to_user_input_when_unmatched() -> None:
orchestrator, prop, _status = _orchestrator()
rows = [
{
"Address 1": "2 Other Road",
"postcode": "B1 1BB",
"address2uprn_uprn": "invalid postcode",
"address2uprn_address": "invalid postcode",
"address2uprn_lexiscore": "",
},
]
orchestrator.finalise(rows, portfolio_id=9, task_id=uuid4())
(row,) = prop.inserted
assert row.uprn is None # missing sentinel -> None
assert row.address == "2 Other Road" # falls back to user input
assert row.postcode == "B1 1BB" # falls back to user-inputted postcode
assert row.lexiscore is None