Model/orchestration/bulk_upload_finaliser_orchestrator.py
2026-06-04 14:50:04 +00:00

146 lines
5 KiB
Python

"""Finalises a BulkUpload into ``property`` rows (ADR-0013).
The domain logic of Finalise: turn the combiner output rows into property identity
rows (the same resolution the old Next.js ``/finalize`` route did) and persist them
through the injected writer. Like every orchestrator it never commits — the caller
owns the transaction boundary (see the Lambda handler).
"""
from __future__ import annotations
import re
from typing import Any, Optional
from uuid import UUID
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
# Combiner-output columns — identical to the old frontend /finalize route and the
# backend combined-results reader (router.py).
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3")
POSTCODE_COL = "postcode"
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE)
def _normalize(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing(value: str) -> bool:
return value == "" or value.lower() == MISSING_SENTINEL
def _parse_uprn(raw: Any) -> Optional[int]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return int(val)
except ValueError:
return None
def _parse_lexiscore(raw: Any) -> Optional[float]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return float(val)
except ValueError:
return None
def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
if matched:
m = UK_POSTCODE_RE.search(matched)
if m:
return m.group(0).upper()
return fallback or None
class BulkUploadFinaliserOrchestrator:
"""Owns the domain flow of Finalise, depending only on repository ports.
Both collaborators are ports (``PropertyRepository``,
``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the
Lambda handler (the composition root). So a unit test constructs this with two
fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The
orchestrator never commits: the caller opens the transaction around
``finalise`` so the insert and the ``complete`` flip land atomically.
"""
def __init__(
self,
property_repo: PropertyRepository,
status_writer: BulkUploadStatusWriter,
) -> None:
self._property_repo = property_repo
self._status_writer = status_writer
def finalise(
self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID
) -> int:
"""Resolve the combiner rows, insert the properties, and mark the upload
``complete`` — all via the injected repositories, no DB connection of its
own. Returns the number of properties inserted. Does not commit."""
inserts = self.to_property_rows(rows, portfolio_id)
inserted = self._property_repo.insert_all(inserts)
self._status_writer.set_status(task_id, "complete")
return inserted
def to_property_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> list[PropertyIdentityInsert]:
"""Resolve combiner rows into property identity inserts.
Pure (no DB / IO) and independently testable. Reproduces the old
``/finalize`` route's resolution exactly: matched address falls back to the
user-inputted one; postcode is extracted from the matched address or falls
back to the user-inputted postcode.
"""
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
@staticmethod
def _row_to_insert(
raw: dict[str, str], portfolio_id: int
) -> PropertyIdentityInsert:
user_inputted_address = (
", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p)
or None
)
user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None
uprn = _parse_uprn(raw.get(UPRN_COL))
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None if _is_missing(matched_address_raw) else matched_address_raw
)
address = matched_address or user_inputted_address
postcode = _extract_postcode(matched_address, user_inputted_postcode or "")
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
return PropertyIdentityInsert(
portfolio_id=portfolio_id,
uprn=uprn,
landlord_property_id=internal_ref,
address=address,
postcode=postcode,
user_inputted_address=user_inputted_address,
user_inputted_postcode=user_inputted_postcode,
lexiscore=lexiscore,
creation_status="READY",
)