mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
375 lines
15 KiB
Python
375 lines
15 KiB
Python
"""Finalises a BulkUpload into ``property`` rows (ADR-0013).
|
|
|
|
The domain logic of Finalise: turn the combiner output rows into property identity
|
|
rows (the same resolution the old Next.js ``/finalize`` route did) and persist them
|
|
through the injected writer. Like every orchestrator it never commits — the caller
|
|
owns the transaction boundary (see the Lambda handler).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from typing import Any, Optional
|
|
|
|
from uuid import UUID
|
|
|
|
from domain.epc.built_form_type import BuiltFormType
|
|
from domain.epc.property_type import PropertyType
|
|
from domain.epc.roof_type import RoofType
|
|
from domain.epc.wall_type import WallType
|
|
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
|
|
from repositories.landlord_overrides.landlord_override_reader import (
|
|
LandlordOverrideReader,
|
|
)
|
|
from repositories.property.property_override_repository import (
|
|
PropertyOverrideInsert,
|
|
PropertyOverrideRepository,
|
|
)
|
|
from repositories.property.property_repository import (
|
|
PropertyIdentityInsert,
|
|
PropertyRepository,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Combiner-output columns — identical to the old frontend /finalize route and the
|
|
# backend combined-results reader (router.py).
|
|
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3")
|
|
POSTCODE_COL = "postcode"
|
|
INTERNAL_REF_COL = "Internal Reference"
|
|
UPRN_COL = "address2uprn_uprn"
|
|
MATCHED_ADDRESS_COL = "address2uprn_address"
|
|
LEXISCORE_COL = "address2uprn_lexiscore"
|
|
MISSING_SENTINEL = "invalid postcode"
|
|
UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE)
|
|
|
|
# Synthetic per-row join key minted by the frontend start-address-matching route
|
|
# (ADR-0006) and emitted into BOTH the combiner output and the classifier CSV.
|
|
# Must match SOURCE_ROW_ID_COLUMN in src/lib/bulkUpload/s3Keys.ts.
|
|
SOURCE_ROW_ID_COL = "source_row_id"
|
|
|
|
# The resolved-value sentinels that mean "the classifier could not decide". v2
|
|
# treats these as never-final (ADR-0006): reaching the finaliser is a defect, so
|
|
# we fail loudly. All four category enums spell it "Unknown".
|
|
UNKNOWN_VALUES = frozenset(
|
|
{
|
|
PropertyType.UNKNOWN.value,
|
|
BuiltFormType.UNKNOWN.value,
|
|
WallType.UNKNOWN.value,
|
|
RoofType.UNKNOWN.value,
|
|
}
|
|
)
|
|
|
|
|
|
def _split_entries(cell: Any) -> list[str]:
|
|
"""Split a multi-valued cell into per-building-part entries — mirrors the
|
|
frontend ``splitEntries`` (``split(",") → trim → drop empty``)."""
|
|
return [part.strip() for part in str(cell or "").split(",") if part.strip()]
|
|
|
|
|
|
def _permutation_for(count: int, multi_entry_ordering: dict[str, list[int]]) -> list[int]:
|
|
"""``permutation[k]`` = the file position holding building part ``k``.
|
|
|
|
A single-entry cell is always the main building (part 0). For ``count >= 2``
|
|
the confirmed per-count permutation is required (the verify/ordering gate
|
|
guarantees it); a missing or malformed one is a defect → fail loudly.
|
|
"""
|
|
if count == 1:
|
|
return [0]
|
|
permutation = multi_entry_ordering.get(str(count))
|
|
if (
|
|
permutation is None
|
|
or len(permutation) != count
|
|
or sorted(permutation) != list(range(count))
|
|
):
|
|
raise ValueError(
|
|
f"No confirmed building-part ordering for {count}-entry cells "
|
|
f"(have {sorted(multi_entry_ordering)}). The awaiting_review ordering "
|
|
f"step should have captured it (ADR-0004/0006); failing the finalise."
|
|
)
|
|
return permutation
|
|
|
|
|
|
def _normalize(value: Any) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
|
|
def _is_missing(value: str) -> bool:
|
|
return value == "" or value.lower() == MISSING_SENTINEL
|
|
|
|
|
|
def _parse_uprn(raw: Any) -> Optional[int]:
|
|
val = _normalize(raw)
|
|
if _is_missing(val):
|
|
return None
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
pass
|
|
# The combiner writes the UPRN column via pandas; when any row in the batch is
|
|
# unmatched (NaN), pandas coerces the whole column to float64, so a UPRN
|
|
# arrives as e.g. "100020933699.0". Recover the integer from that float form
|
|
# (otherwise every matched row in a mixed batch parses to None — the cause of
|
|
# NULL property.uprn and empty property_overrides for matched rows).
|
|
try:
|
|
as_float = float(val)
|
|
except ValueError:
|
|
return None
|
|
return int(as_float) if as_float.is_integer() else None
|
|
|
|
|
|
def _parse_lexiscore(raw: Any) -> Optional[float]:
|
|
val = _normalize(raw)
|
|
if _is_missing(val):
|
|
return None
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
|
|
if matched:
|
|
m = UK_POSTCODE_RE.search(matched)
|
|
if m:
|
|
return m.group(0).upper()
|
|
return fallback or None
|
|
|
|
|
|
class BulkUploadFinaliserOrchestrator:
|
|
"""Owns the domain flow of Finalise, depending only on repository ports.
|
|
|
|
Both collaborators are ports (``PropertyRepository``,
|
|
``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the
|
|
Lambda handler (the composition root). So a unit test constructs this with two
|
|
fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The
|
|
orchestrator never commits: the caller opens the transaction around
|
|
``finalise`` so the insert and the ``complete`` flip land atomically.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
property_repo: PropertyRepository,
|
|
status_writer: BulkUploadStatusWriter,
|
|
property_override_repo: Optional[PropertyOverrideRepository] = None,
|
|
landlord_override_reader: Optional[LandlordOverrideReader] = None,
|
|
) -> None:
|
|
self._property_repo = property_repo
|
|
self._status_writer = status_writer
|
|
# v2 (ADR-0006): the property_overrides write. Optional so the v1
|
|
# property-only path (and its tests) construct with two collaborators; the
|
|
# Lambda wires all four. Overrides are written only when both are present
|
|
# AND the trigger carried classifier inputs.
|
|
self._property_override_repo = property_override_repo
|
|
self._landlord_override_reader = landlord_override_reader
|
|
|
|
def finalise(
|
|
self,
|
|
rows: list[dict[str, str]],
|
|
portfolio_id: int,
|
|
task_id: UUID,
|
|
*,
|
|
classifier_rows: Optional[list[dict[str, str]]] = None,
|
|
multi_entry_ordering: Optional[dict[str, list[int]]] = None,
|
|
column_mapping: Optional[dict[str, str]] = None,
|
|
) -> int:
|
|
"""Resolve the combiner rows, insert the properties, write the
|
|
``property_overrides`` for UPRN-matched rows (v2), and mark the upload
|
|
``complete`` — all via the injected repositories, no DB connection of its
|
|
own. Returns the number of properties inserted. Does not commit.
|
|
|
|
An unresolved (or ``Unknown``) description raises, so ``commit_scope``
|
|
rolls back and nothing is half-written (ADR-0006 fail-loud)."""
|
|
inserts = self.to_property_rows(rows, portfolio_id)
|
|
inserted = self._property_repo.insert_all(inserts)
|
|
self._write_overrides(
|
|
rows, portfolio_id, classifier_rows, multi_entry_ordering, column_mapping
|
|
)
|
|
self._status_writer.set_status(task_id, "complete")
|
|
return inserted
|
|
|
|
def _write_overrides(
|
|
self,
|
|
combiner_rows: list[dict[str, str]],
|
|
portfolio_id: int,
|
|
classifier_rows: Optional[list[dict[str, str]]],
|
|
multi_entry_ordering: Optional[dict[str, list[int]]],
|
|
column_mapping: Optional[dict[str, str]],
|
|
) -> None:
|
|
# Nothing to do without the override collaborators (the v1 property-only
|
|
# path / its tests construct without them).
|
|
if (
|
|
self._property_override_repo is None
|
|
or self._landlord_override_reader is None
|
|
):
|
|
return
|
|
# Log WHY when we write nothing, so an empty property_overrides is never a
|
|
# silent mystery (e.g. an upload whose CSVs predate the source_row_id join
|
|
# key, or one with no classifier columns mapped).
|
|
if not classifier_rows:
|
|
logger.info("Finalise: no classifier CSV rows; no property_overrides.")
|
|
return
|
|
if not column_mapping:
|
|
logger.info("Finalise: empty column_mapping; no property_overrides.")
|
|
return
|
|
override_rows = self._build_overrides(
|
|
combiner_rows,
|
|
classifier_rows,
|
|
multi_entry_ordering or {},
|
|
column_mapping,
|
|
portfolio_id,
|
|
)
|
|
affected = self._property_override_repo.upsert_all(override_rows)
|
|
logger.info("Finalise: upserted %d property_overrides.", affected)
|
|
|
|
def _build_overrides(
|
|
self,
|
|
combiner_rows: list[dict[str, str]],
|
|
classifier_rows: list[dict[str, str]],
|
|
multi_entry_ordering: dict[str, list[int]],
|
|
column_mapping: dict[str, str],
|
|
portfolio_id: int,
|
|
) -> list[PropertyOverrideInsert]:
|
|
"""Assemble one ``property_overrides`` row per (UPRN property, component,
|
|
building part). UPRN rows only (no-UPRN deferred to v3); joined to the
|
|
classifier descriptions by ``source_row_id``; cells split + ordered by
|
|
``multi_entry_ordering``; resolved against the portfolio vocabulary.
|
|
Raises on any non-empty description that won't resolve (fail-loud)."""
|
|
assert self._landlord_override_reader is not None # narrowed by caller
|
|
|
|
# UPRN-matched combiner rows: source_row_id -> uprn.
|
|
uprn_by_row_id: dict[str, int] = {}
|
|
combiner_with_srid = 0
|
|
combiner_with_uprn = 0
|
|
for row in combiner_rows:
|
|
row_id = _normalize(row.get(SOURCE_ROW_ID_COL))
|
|
uprn = _parse_uprn(row.get(UPRN_COL))
|
|
if row_id:
|
|
combiner_with_srid += 1
|
|
if uprn is not None:
|
|
combiner_with_uprn += 1
|
|
if row_id and uprn is not None:
|
|
uprn_by_row_id[row_id] = uprn
|
|
if not uprn_by_row_id:
|
|
classifier_with_srid = sum(
|
|
1 for r in classifier_rows if _normalize(r.get(SOURCE_ROW_ID_COL))
|
|
)
|
|
logger.warning(
|
|
"Finalise: 0 override candidates — %d combiner rows (%d with "
|
|
"source_row_id, %d with a UPRN), %d/%d classifier rows with "
|
|
"source_row_id. If source_row_id counts are 0, the upload's CSVs "
|
|
"predate the join-key change (ADR-0006) — re-run from "
|
|
"start-address-matching so both CSVs carry it.",
|
|
len(combiner_rows),
|
|
combiner_with_srid,
|
|
combiner_with_uprn,
|
|
classifier_with_srid,
|
|
len(classifier_rows),
|
|
)
|
|
return []
|
|
|
|
ids_by_uprn = self._property_repo.ids_by_uprn(
|
|
portfolio_id, sorted(set(uprn_by_row_id.values()))
|
|
)
|
|
vocab = self._landlord_override_reader.load_for_portfolio(portfolio_id)
|
|
classifier_by_row_id = {
|
|
row_id: row
|
|
for row in classifier_rows
|
|
if (row_id := _normalize(row.get(SOURCE_ROW_ID_COL)))
|
|
}
|
|
|
|
inserts: list[PropertyOverrideInsert] = []
|
|
for row_id, uprn in uprn_by_row_id.items():
|
|
property_id = ids_by_uprn.get(uprn)
|
|
if property_id is None:
|
|
# A UPRN row we can't resolve to a property.id — the insert above
|
|
# should have created it (or it pre-existed). Defensive skip; this
|
|
# is an identity gap, not a description-resolution failure.
|
|
logger.warning(
|
|
"Finalise: no property.id for uprn %s (row %s, portfolio %s); "
|
|
"skipping its overrides.",
|
|
uprn,
|
|
row_id,
|
|
portfolio_id,
|
|
)
|
|
continue
|
|
classifier_row = classifier_by_row_id.get(row_id)
|
|
if classifier_row is None:
|
|
continue # no descriptions carried for this row
|
|
|
|
for component, source_header in column_mapping.items():
|
|
entries = _split_entries(classifier_row.get(source_header))
|
|
if not entries:
|
|
continue # empty cell → no override row for this component
|
|
permutation = _permutation_for(len(entries), multi_entry_ordering)
|
|
component_vocab = vocab.get(component, {})
|
|
for building_part, file_pos in enumerate(permutation):
|
|
raw = entries[file_pos]
|
|
value = component_vocab.get(raw.lower())
|
|
if value is None or value in UNKNOWN_VALUES:
|
|
raise ValueError(
|
|
f"Unresolved {component} description {raw!r} "
|
|
f"(row {row_id}, portfolio {portfolio_id}): no resolved "
|
|
f"value{' (UNKNOWN)' if value else ''}. The verify gate "
|
|
f"should have mapped it; failing the finalise (ADR-0006)."
|
|
)
|
|
inserts.append(
|
|
PropertyOverrideInsert(
|
|
property_id=property_id,
|
|
portfolio_id=portfolio_id,
|
|
building_part=building_part,
|
|
override_component=component,
|
|
override_value=value,
|
|
original_spreadsheet_description=raw,
|
|
)
|
|
)
|
|
return inserts
|
|
|
|
def to_property_rows(
|
|
self, rows: list[dict[str, str]], portfolio_id: int
|
|
) -> list[PropertyIdentityInsert]:
|
|
"""Resolve combiner rows into property identity inserts.
|
|
|
|
Pure (no DB / IO) and independently testable. Reproduces the old
|
|
``/finalize`` route's resolution exactly: matched address falls back to the
|
|
user-inputted one; postcode is extracted from the matched address or falls
|
|
back to the user-inputted postcode.
|
|
"""
|
|
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
|
|
|
|
@staticmethod
|
|
def _row_to_insert(
|
|
raw: dict[str, str], portfolio_id: int
|
|
) -> PropertyIdentityInsert:
|
|
user_inputted_address = (
|
|
", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p)
|
|
or None
|
|
)
|
|
user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None
|
|
|
|
uprn = _parse_uprn(raw.get(UPRN_COL))
|
|
|
|
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
|
|
matched_address = (
|
|
None if _is_missing(matched_address_raw) else matched_address_raw
|
|
)
|
|
|
|
address = matched_address or user_inputted_address
|
|
postcode = _extract_postcode(matched_address, user_inputted_postcode or "")
|
|
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
|
|
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
|
|
|
|
return PropertyIdentityInsert(
|
|
portfolio_id=portfolio_id,
|
|
uprn=uprn,
|
|
landlord_property_id=internal_ref,
|
|
address=address,
|
|
postcode=postcode,
|
|
user_inputted_address=user_inputted_address,
|
|
user_inputted_postcode=user_inputted_postcode,
|
|
lexiscore=lexiscore,
|
|
creation_status="READY",
|
|
)
|