Model/orchestration/bulk_upload_finaliser_orchestrator.py
Jun-te Kim 8b9dcc73f2 fix
2026-06-05 17:24:17 +00:00

375 lines
15 KiB
Python

"""Finalises a BulkUpload into ``property`` rows (ADR-0013).
The domain logic of Finalise: turn the combiner output rows into property identity
rows (the same resolution the old Next.js ``/finalize`` route did) and persist them
through the injected writer. Like every orchestrator it never commits — the caller
owns the transaction boundary (see the Lambda handler).
"""
from __future__ import annotations
import logging
import re
from typing import Any, Optional
from uuid import UUID
from domain.epc.built_form_type import BuiltFormType
from domain.epc.property_type import PropertyType
from domain.epc.roof_type import RoofType
from domain.epc.wall_type import WallType
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.landlord_overrides.landlord_override_reader import (
LandlordOverrideReader,
)
from repositories.property.property_override_repository import (
PropertyOverrideInsert,
PropertyOverrideRepository,
)
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
logger = logging.getLogger(__name__)
# Combiner-output columns — identical to the old frontend /finalize route and the
# backend combined-results reader (router.py).
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3")
POSTCODE_COL = "postcode"
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE)
# Synthetic per-row join key minted by the frontend start-address-matching route
# (ADR-0006) and emitted into BOTH the combiner output and the classifier CSV.
# Must match SOURCE_ROW_ID_COLUMN in src/lib/bulkUpload/s3Keys.ts.
SOURCE_ROW_ID_COL = "source_row_id"
# The resolved-value sentinels that mean "the classifier could not decide". v2
# treats these as never-final (ADR-0006): reaching the finaliser is a defect, so
# we fail loudly. All four category enums spell it "Unknown".
UNKNOWN_VALUES = frozenset(
{
PropertyType.UNKNOWN.value,
BuiltFormType.UNKNOWN.value,
WallType.UNKNOWN.value,
RoofType.UNKNOWN.value,
}
)
def _split_entries(cell: Any) -> list[str]:
"""Split a multi-valued cell into per-building-part entries — mirrors the
frontend ``splitEntries`` (``split(",") → trim → drop empty``)."""
return [part.strip() for part in str(cell or "").split(",") if part.strip()]
def _permutation_for(count: int, multi_entry_ordering: dict[str, list[int]]) -> list[int]:
"""``permutation[k]`` = the file position holding building part ``k``.
A single-entry cell is always the main building (part 0). For ``count >= 2``
the confirmed per-count permutation is required (the verify/ordering gate
guarantees it); a missing or malformed one is a defect → fail loudly.
"""
if count == 1:
return [0]
permutation = multi_entry_ordering.get(str(count))
if (
permutation is None
or len(permutation) != count
or sorted(permutation) != list(range(count))
):
raise ValueError(
f"No confirmed building-part ordering for {count}-entry cells "
f"(have {sorted(multi_entry_ordering)}). The awaiting_review ordering "
f"step should have captured it (ADR-0004/0006); failing the finalise."
)
return permutation
def _normalize(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing(value: str) -> bool:
return value == "" or value.lower() == MISSING_SENTINEL
def _parse_uprn(raw: Any) -> Optional[int]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return int(val)
except ValueError:
pass
# The combiner writes the UPRN column via pandas; when any row in the batch is
# unmatched (NaN), pandas coerces the whole column to float64, so a UPRN
# arrives as e.g. "100020933699.0". Recover the integer from that float form
# (otherwise every matched row in a mixed batch parses to None — the cause of
# NULL property.uprn and empty property_overrides for matched rows).
try:
as_float = float(val)
except ValueError:
return None
return int(as_float) if as_float.is_integer() else None
def _parse_lexiscore(raw: Any) -> Optional[float]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return float(val)
except ValueError:
return None
def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
if matched:
m = UK_POSTCODE_RE.search(matched)
if m:
return m.group(0).upper()
return fallback or None
class BulkUploadFinaliserOrchestrator:
"""Owns the domain flow of Finalise, depending only on repository ports.
Both collaborators are ports (``PropertyRepository``,
``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the
Lambda handler (the composition root). So a unit test constructs this with two
fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The
orchestrator never commits: the caller opens the transaction around
``finalise`` so the insert and the ``complete`` flip land atomically.
"""
def __init__(
self,
property_repo: PropertyRepository,
status_writer: BulkUploadStatusWriter,
property_override_repo: Optional[PropertyOverrideRepository] = None,
landlord_override_reader: Optional[LandlordOverrideReader] = None,
) -> None:
self._property_repo = property_repo
self._status_writer = status_writer
# v2 (ADR-0006): the property_overrides write. Optional so the v1
# property-only path (and its tests) construct with two collaborators; the
# Lambda wires all four. Overrides are written only when both are present
# AND the trigger carried classifier inputs.
self._property_override_repo = property_override_repo
self._landlord_override_reader = landlord_override_reader
def finalise(
self,
rows: list[dict[str, str]],
portfolio_id: int,
task_id: UUID,
*,
classifier_rows: Optional[list[dict[str, str]]] = None,
multi_entry_ordering: Optional[dict[str, list[int]]] = None,
column_mapping: Optional[dict[str, str]] = None,
) -> int:
"""Resolve the combiner rows, insert the properties, write the
``property_overrides`` for UPRN-matched rows (v2), and mark the upload
``complete`` — all via the injected repositories, no DB connection of its
own. Returns the number of properties inserted. Does not commit.
An unresolved (or ``Unknown``) description raises, so ``commit_scope``
rolls back and nothing is half-written (ADR-0006 fail-loud)."""
inserts = self.to_property_rows(rows, portfolio_id)
inserted = self._property_repo.insert_all(inserts)
self._write_overrides(
rows, portfolio_id, classifier_rows, multi_entry_ordering, column_mapping
)
self._status_writer.set_status(task_id, "complete")
return inserted
def _write_overrides(
self,
combiner_rows: list[dict[str, str]],
portfolio_id: int,
classifier_rows: Optional[list[dict[str, str]]],
multi_entry_ordering: Optional[dict[str, list[int]]],
column_mapping: Optional[dict[str, str]],
) -> None:
# Nothing to do without the override collaborators (the v1 property-only
# path / its tests construct without them).
if (
self._property_override_repo is None
or self._landlord_override_reader is None
):
return
# Log WHY when we write nothing, so an empty property_overrides is never a
# silent mystery (e.g. an upload whose CSVs predate the source_row_id join
# key, or one with no classifier columns mapped).
if not classifier_rows:
logger.info("Finalise: no classifier CSV rows; no property_overrides.")
return
if not column_mapping:
logger.info("Finalise: empty column_mapping; no property_overrides.")
return
override_rows = self._build_overrides(
combiner_rows,
classifier_rows,
multi_entry_ordering or {},
column_mapping,
portfolio_id,
)
affected = self._property_override_repo.upsert_all(override_rows)
logger.info("Finalise: upserted %d property_overrides.", affected)
def _build_overrides(
self,
combiner_rows: list[dict[str, str]],
classifier_rows: list[dict[str, str]],
multi_entry_ordering: dict[str, list[int]],
column_mapping: dict[str, str],
portfolio_id: int,
) -> list[PropertyOverrideInsert]:
"""Assemble one ``property_overrides`` row per (UPRN property, component,
building part). UPRN rows only (no-UPRN deferred to v3); joined to the
classifier descriptions by ``source_row_id``; cells split + ordered by
``multi_entry_ordering``; resolved against the portfolio vocabulary.
Raises on any non-empty description that won't resolve (fail-loud)."""
assert self._landlord_override_reader is not None # narrowed by caller
# UPRN-matched combiner rows: source_row_id -> uprn.
uprn_by_row_id: dict[str, int] = {}
combiner_with_srid = 0
combiner_with_uprn = 0
for row in combiner_rows:
row_id = _normalize(row.get(SOURCE_ROW_ID_COL))
uprn = _parse_uprn(row.get(UPRN_COL))
if row_id:
combiner_with_srid += 1
if uprn is not None:
combiner_with_uprn += 1
if row_id and uprn is not None:
uprn_by_row_id[row_id] = uprn
if not uprn_by_row_id:
classifier_with_srid = sum(
1 for r in classifier_rows if _normalize(r.get(SOURCE_ROW_ID_COL))
)
logger.warning(
"Finalise: 0 override candidates — %d combiner rows (%d with "
"source_row_id, %d with a UPRN), %d/%d classifier rows with "
"source_row_id. If source_row_id counts are 0, the upload's CSVs "
"predate the join-key change (ADR-0006) — re-run from "
"start-address-matching so both CSVs carry it.",
len(combiner_rows),
combiner_with_srid,
combiner_with_uprn,
classifier_with_srid,
len(classifier_rows),
)
return []
ids_by_uprn = self._property_repo.ids_by_uprn(
portfolio_id, sorted(set(uprn_by_row_id.values()))
)
vocab = self._landlord_override_reader.load_for_portfolio(portfolio_id)
classifier_by_row_id = {
row_id: row
for row in classifier_rows
if (row_id := _normalize(row.get(SOURCE_ROW_ID_COL)))
}
inserts: list[PropertyOverrideInsert] = []
for row_id, uprn in uprn_by_row_id.items():
property_id = ids_by_uprn.get(uprn)
if property_id is None:
# A UPRN row we can't resolve to a property.id — the insert above
# should have created it (or it pre-existed). Defensive skip; this
# is an identity gap, not a description-resolution failure.
logger.warning(
"Finalise: no property.id for uprn %s (row %s, portfolio %s); "
"skipping its overrides.",
uprn,
row_id,
portfolio_id,
)
continue
classifier_row = classifier_by_row_id.get(row_id)
if classifier_row is None:
continue # no descriptions carried for this row
for component, source_header in column_mapping.items():
entries = _split_entries(classifier_row.get(source_header))
if not entries:
continue # empty cell → no override row for this component
permutation = _permutation_for(len(entries), multi_entry_ordering)
component_vocab = vocab.get(component, {})
for building_part, file_pos in enumerate(permutation):
raw = entries[file_pos]
value = component_vocab.get(raw.lower())
if value is None or value in UNKNOWN_VALUES:
raise ValueError(
f"Unresolved {component} description {raw!r} "
f"(row {row_id}, portfolio {portfolio_id}): no resolved "
f"value{' (UNKNOWN)' if value else ''}. The verify gate "
f"should have mapped it; failing the finalise (ADR-0006)."
)
inserts.append(
PropertyOverrideInsert(
property_id=property_id,
portfolio_id=portfolio_id,
building_part=building_part,
override_component=component,
override_value=value,
original_spreadsheet_description=raw,
)
)
return inserts
def to_property_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> list[PropertyIdentityInsert]:
"""Resolve combiner rows into property identity inserts.
Pure (no DB / IO) and independently testable. Reproduces the old
``/finalize`` route's resolution exactly: matched address falls back to the
user-inputted one; postcode is extracted from the matched address or falls
back to the user-inputted postcode.
"""
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
@staticmethod
def _row_to_insert(
raw: dict[str, str], portfolio_id: int
) -> PropertyIdentityInsert:
user_inputted_address = (
", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p)
or None
)
user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None
uprn = _parse_uprn(raw.get(UPRN_COL))
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None if _is_missing(matched_address_raw) else matched_address_raw
)
address = matched_address or user_inputted_address
postcode = _extract_postcode(matched_address, user_inputted_postcode or "")
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
return PropertyIdentityInsert(
portfolio_id=portfolio_id,
uprn=uprn,
landlord_property_id=internal_ref,
address=address,
postcode=postcode,
user_inputted_address=user_inputted_address,
user_inputted_postcode=user_inputted_postcode,
lexiscore=lexiscore,
creation_status="READY",
)