"""Finalises a BulkUpload into ``property`` rows (ADR-0013). The domain logic of Finalise: turn the combiner output rows into property identity rows (the same resolution the old Next.js ``/finalize`` route did) and persist them through the injected writer. Like every orchestrator it never commits — the caller owns the transaction boundary (see the Lambda handler). """ from __future__ import annotations import logging import re from typing import Any, Optional from uuid import UUID from domain.epc.built_form_type import BuiltFormType from domain.epc.property_type import PropertyType from domain.epc.roof_type import RoofType from domain.epc.wall_type import WallType from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter from repositories.landlord_overrides.landlord_override_reader import ( LandlordOverrideReader, ) from repositories.property.property_override_repository import ( PropertyOverrideInsert, PropertyOverrideRepository, ) from repositories.property.property_repository import ( PropertyIdentityInsert, PropertyRepository, ) logger = logging.getLogger(__name__) # Combiner-output columns — identical to the old frontend /finalize route and the # backend combined-results reader (router.py). ADDRESS_COLS = ("Address 1", "Address 2", "Address 3") POSTCODE_COL = "postcode" INTERNAL_REF_COL = "Internal Reference" UPRN_COL = "address2uprn_uprn" MATCHED_ADDRESS_COL = "address2uprn_address" LEXISCORE_COL = "address2uprn_lexiscore" MISSING_SENTINEL = "invalid postcode" UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE) # Synthetic per-row join key minted by the frontend start-address-matching route # (ADR-0006) and emitted into BOTH the combiner output and the classifier CSV. # Must match SOURCE_ROW_ID_COLUMN in src/lib/bulkUpload/s3Keys.ts. SOURCE_ROW_ID_COL = "source_row_id" # The resolved-value sentinels that mean "the classifier could not decide". v2 # treats these as never-final (ADR-0006): reaching the finaliser is a defect, so # we fail loudly. All four category enums spell it "Unknown". UNKNOWN_VALUES = frozenset( { PropertyType.UNKNOWN.value, BuiltFormType.UNKNOWN.value, WallType.UNKNOWN.value, RoofType.UNKNOWN.value, } ) def _split_entries(cell: Any) -> list[str]: """Split a multi-valued cell into per-building-part entries — mirrors the frontend ``splitEntries`` (``split(",") → trim → drop empty``).""" return [part.strip() for part in str(cell or "").split(",") if part.strip()] def _permutation_for(count: int, multi_entry_ordering: dict[str, list[int]]) -> list[int]: """``permutation[k]`` = the file position holding building part ``k``. A single-entry cell is always the main building (part 0). For ``count >= 2`` the confirmed per-count permutation is required (the verify/ordering gate guarantees it); a missing or malformed one is a defect → fail loudly. """ if count == 1: return [0] permutation = multi_entry_ordering.get(str(count)) if ( permutation is None or len(permutation) != count or sorted(permutation) != list(range(count)) ): raise ValueError( f"No confirmed building-part ordering for {count}-entry cells " f"(have {sorted(multi_entry_ordering)}). The awaiting_review ordering " f"step should have captured it (ADR-0004/0006); failing the finalise." ) return permutation def _normalize(value: Any) -> str: if value is None: return "" return str(value).strip() def _is_missing(value: str) -> bool: return value == "" or value.lower() == MISSING_SENTINEL def _parse_uprn(raw: Any) -> Optional[int]: val = _normalize(raw) if _is_missing(val): return None try: return int(val) except ValueError: pass # The combiner writes the UPRN column via pandas; when any row in the batch is # unmatched (NaN), pandas coerces the whole column to float64, so a UPRN # arrives as e.g. "100020933699.0". Recover the integer from that float form # (otherwise every matched row in a mixed batch parses to None — the cause of # NULL property.uprn and empty property_overrides for matched rows). try: as_float = float(val) except ValueError: return None return int(as_float) if as_float.is_integer() else None def _parse_lexiscore(raw: Any) -> Optional[float]: val = _normalize(raw) if _is_missing(val): return None try: return float(val) except ValueError: return None def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]: if matched: m = UK_POSTCODE_RE.search(matched) if m: return m.group(0).upper() return fallback or None class BulkUploadFinaliserOrchestrator: """Owns the domain flow of Finalise, depending only on repository ports. Both collaborators are ports (``PropertyRepository``, ``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the Lambda handler (the composition root). So a unit test constructs this with two fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The orchestrator never commits: the caller opens the transaction around ``finalise`` so the insert and the ``complete`` flip land atomically. """ def __init__( self, property_repo: PropertyRepository, status_writer: BulkUploadStatusWriter, property_override_repo: Optional[PropertyOverrideRepository] = None, landlord_override_reader: Optional[LandlordOverrideReader] = None, ) -> None: self._property_repo = property_repo self._status_writer = status_writer # v2 (ADR-0006): the property_overrides write. Optional so the v1 # property-only path (and its tests) construct with two collaborators; the # Lambda wires all four. Overrides are written only when both are present # AND the trigger carried classifier inputs. self._property_override_repo = property_override_repo self._landlord_override_reader = landlord_override_reader def finalise( self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID, *, classifier_rows: Optional[list[dict[str, str]]] = None, multi_entry_ordering: Optional[dict[str, list[int]]] = None, column_mapping: Optional[dict[str, str]] = None, ) -> int: """Resolve the combiner rows, insert the properties, write the ``property_overrides`` for UPRN-matched rows (v2), and mark the upload ``complete`` — all via the injected repositories, no DB connection of its own. Returns the number of properties inserted. Does not commit. An unresolved (or ``Unknown``) description raises, so ``commit_scope`` rolls back and nothing is half-written (ADR-0006 fail-loud).""" inserts = self.to_property_rows(rows, portfolio_id) inserted = self._property_repo.insert_all(inserts) self._write_overrides( rows, portfolio_id, classifier_rows, multi_entry_ordering, column_mapping ) self._status_writer.set_status(task_id, "complete") return inserted def _write_overrides( self, combiner_rows: list[dict[str, str]], portfolio_id: int, classifier_rows: Optional[list[dict[str, str]]], multi_entry_ordering: Optional[dict[str, list[int]]], column_mapping: Optional[dict[str, str]], ) -> None: # Nothing to do without the override collaborators (the v1 property-only # path / its tests construct without them). if ( self._property_override_repo is None or self._landlord_override_reader is None ): return # Log WHY when we write nothing, so an empty property_overrides is never a # silent mystery (e.g. an upload whose CSVs predate the source_row_id join # key, or one with no classifier columns mapped). if not classifier_rows: logger.info("Finalise: no classifier CSV rows; no property_overrides.") return if not column_mapping: logger.info("Finalise: empty column_mapping; no property_overrides.") return override_rows = self._build_overrides( combiner_rows, classifier_rows, multi_entry_ordering or {}, column_mapping, portfolio_id, ) affected = self._property_override_repo.upsert_all(override_rows) logger.info("Finalise: upserted %d property_overrides.", affected) def _build_overrides( self, combiner_rows: list[dict[str, str]], classifier_rows: list[dict[str, str]], multi_entry_ordering: dict[str, list[int]], column_mapping: dict[str, str], portfolio_id: int, ) -> list[PropertyOverrideInsert]: """Assemble one ``property_overrides`` row per (UPRN property, component, building part). UPRN rows only (no-UPRN deferred to v3); joined to the classifier descriptions by ``source_row_id``; cells split + ordered by ``multi_entry_ordering``; resolved against the portfolio vocabulary. Raises on any non-empty description that won't resolve (fail-loud).""" assert self._landlord_override_reader is not None # narrowed by caller # UPRN-matched combiner rows: source_row_id -> uprn. uprn_by_row_id: dict[str, int] = {} combiner_with_srid = 0 combiner_with_uprn = 0 for row in combiner_rows: row_id = _normalize(row.get(SOURCE_ROW_ID_COL)) uprn = _parse_uprn(row.get(UPRN_COL)) if row_id: combiner_with_srid += 1 if uprn is not None: combiner_with_uprn += 1 if row_id and uprn is not None: uprn_by_row_id[row_id] = uprn if not uprn_by_row_id: classifier_with_srid = sum( 1 for r in classifier_rows if _normalize(r.get(SOURCE_ROW_ID_COL)) ) logger.warning( "Finalise: 0 override candidates — %d combiner rows (%d with " "source_row_id, %d with a UPRN), %d/%d classifier rows with " "source_row_id. If source_row_id counts are 0, the upload's CSVs " "predate the join-key change (ADR-0006) — re-run from " "start-address-matching so both CSVs carry it.", len(combiner_rows), combiner_with_srid, combiner_with_uprn, classifier_with_srid, len(classifier_rows), ) return [] ids_by_uprn = self._property_repo.ids_by_uprn( portfolio_id, sorted(set(uprn_by_row_id.values())) ) vocab = self._landlord_override_reader.load_for_portfolio(portfolio_id) classifier_by_row_id = { row_id: row for row in classifier_rows if (row_id := _normalize(row.get(SOURCE_ROW_ID_COL))) } inserts: list[PropertyOverrideInsert] = [] for row_id, uprn in uprn_by_row_id.items(): property_id = ids_by_uprn.get(uprn) if property_id is None: # A UPRN row we can't resolve to a property.id — the insert above # should have created it (or it pre-existed). Defensive skip; this # is an identity gap, not a description-resolution failure. logger.warning( "Finalise: no property.id for uprn %s (row %s, portfolio %s); " "skipping its overrides.", uprn, row_id, portfolio_id, ) continue classifier_row = classifier_by_row_id.get(row_id) if classifier_row is None: continue # no descriptions carried for this row for component, source_header in column_mapping.items(): entries = _split_entries(classifier_row.get(source_header)) if not entries: continue # empty cell → no override row for this component permutation = _permutation_for(len(entries), multi_entry_ordering) component_vocab = vocab.get(component, {}) for building_part, file_pos in enumerate(permutation): raw = entries[file_pos] value = component_vocab.get(raw.lower()) if value is None or value in UNKNOWN_VALUES: raise ValueError( f"Unresolved {component} description {raw!r} " f"(row {row_id}, portfolio {portfolio_id}): no resolved " f"value{' (UNKNOWN)' if value else ''}. The verify gate " f"should have mapped it; failing the finalise (ADR-0006)." ) inserts.append( PropertyOverrideInsert( property_id=property_id, portfolio_id=portfolio_id, building_part=building_part, override_component=component, override_value=value, original_spreadsheet_description=raw, ) ) return inserts def to_property_rows( self, rows: list[dict[str, str]], portfolio_id: int ) -> list[PropertyIdentityInsert]: """Resolve combiner rows into property identity inserts. Pure (no DB / IO) and independently testable. Reproduces the old ``/finalize`` route's resolution exactly: matched address falls back to the user-inputted one; postcode is extracted from the matched address or falls back to the user-inputted postcode. """ return [self._row_to_insert(raw, portfolio_id) for raw in rows] @staticmethod def _row_to_insert( raw: dict[str, str], portfolio_id: int ) -> PropertyIdentityInsert: user_inputted_address = ( ", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p) or None ) user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None uprn = _parse_uprn(raw.get(UPRN_COL)) matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) matched_address = ( None if _is_missing(matched_address_raw) else matched_address_raw ) address = matched_address or user_inputted_address postcode = _extract_postcode(matched_address, user_inputted_postcode or "") internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) return PropertyIdentityInsert( portfolio_id=portfolio_id, uprn=uprn, landlord_property_id=internal_ref, address=address, postcode=postcode, user_inputted_address=user_inputted_address, user_inputted_postcode=user_inputted_postcode, lexiscore=lexiscore, creation_status="READY", )