From b07db1ef6b3a3a1386b9d981c390eb893df15da3 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 5 Jun 2026 12:18:13 +0000 Subject: [PATCH] property override --- .../bulk_upload_finaliser_trigger_body.py | 18 +- applications/bulk_upload_finaliser/handler.py | 37 +++- backend/app/bulk_uploads/schema.py | 6 + ...ord_override_reader_postgres_repository.py | 65 ++++++ .../postgres/property_override_table.py | 78 +++++++ .../bulk_upload_finaliser_orchestrator.py | 196 +++++++++++++++++- .../landlord_override_reader.py | 25 +++ .../property_override_postgres_repository.py | 65 ++++++ .../property/property_override_repository.py | 38 ++++ .../property/property_postgres_repository.py | 11 + repositories/property/property_repository.py | 11 + tests/orchestration/fakes.py | 4 + ...test_bulk_upload_finaliser_orchestrator.py | 126 ++++++++++- 13 files changed, 667 insertions(+), 13 deletions(-) create mode 100644 infrastructure/landlord_overrides/landlord_override_reader_postgres_repository.py create mode 100644 infrastructure/postgres/property_override_table.py create mode 100644 repositories/landlord_overrides/landlord_override_reader.py create mode 100644 repositories/property/property_override_postgres_repository.py create mode 100644 repositories/property/property_override_repository.py diff --git a/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py index 086ca291..c788d517 100644 --- a/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py +++ b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py @@ -1,15 +1,26 @@ +from typing import Optional from uuid import UUID -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field class BulkUploadFinaliserTriggerBody(BaseModel): - """Trigger body for the bulk_upload_finaliser Lambda (ADR-0013). + """Trigger body for the bulk_upload_finaliser Lambda (ADR-0013, extended in + ADR-0006). Dispatched by the Next.js Finalise action via ``POST /v1/bulk-uploads/trigger-finaliser``. ``s3_uri`` is the combiner output (``combined_output_s3_uri``) — the same address/UPRN CSV the old synchronous ``/finalize`` route read. + + v2 adds the inputs for the ``property_overrides`` write: + - ``classifier_s3_uri``: the ``{uploadId}-classifier.csv`` (raw descriptions, + joined to the combiner output by ``source_row_id``). ``None`` when no + classifier columns were mapped → no overrides written. + - ``multi_entry_ordering``: confirmed permutations keyed by entry-count + (``{count: [file positions]}``). ``{}`` when not multi-entry. + - ``column_mapping``: classifier category → source CSV header, so the + finaliser knows which classifier-CSV column feeds each override_component. """ model_config = ConfigDict(extra="allow") @@ -20,3 +31,6 @@ class BulkUploadFinaliserTriggerBody(BaseModel): # bigint in the FE schema; Python int is unbounded so Pydantic stays simple. portfolio_id: int bulk_upload_id: UUID + classifier_s3_uri: Optional[str] = None + multi_entry_ordering: dict[str, list[int]] = Field(default_factory=dict) + column_mapping: dict[str, str] = Field(default_factory=dict) diff --git a/applications/bulk_upload_finaliser/handler.py b/applications/bulk_upload_finaliser/handler.py index 6017cbb5..4ba1b370 100644 --- a/applications/bulk_upload_finaliser/handler.py +++ b/applications/bulk_upload_finaliser/handler.py @@ -26,6 +26,9 @@ from infrastructure.postgres.config import PostgresConfig from infrastructure.postgres.engine import commit_scope, make_engine, make_session from infrastructure.s3.csv_s3_client import CsvS3Client from infrastructure.s3.s3_uri import parse_s3_uri +from infrastructure.landlord_overrides.landlord_override_reader_postgres_repository import ( + LandlordOverrideReaderPostgresRepository, +) from orchestration.bulk_upload_finaliser_orchestrator import ( BulkUploadFinaliserOrchestrator, ) @@ -33,6 +36,9 @@ from orchestration.task_orchestrator import TaskOrchestrator from repositories.bulk_upload.bulk_upload_status_writer_postgres import ( BulkUploadStatusWriterPostgresRepository, ) +from repositories.property.property_override_postgres_repository import ( + PropertyOverridePostgresRepository, +) from repositories.property.property_postgres_repository import ( PropertyPostgresRepository, ) @@ -42,25 +48,44 @@ logger = logging.getLogger(__name__) def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int: - bucket, _key = parse_s3_uri(trigger.s3_uri) - boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] boto_s3: Any = boto3_client("s3") + + bucket, _key = parse_s3_uri(trigger.s3_uri) rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri) + # v2 (ADR-0006): the classifier CSV carries the raw descriptions, joined to the + # combiner rows by `source_row_id`. Absent when no classifier columns were + # mapped → the orchestrator simply writes no property_overrides. + classifier_rows: list[dict[str, str]] | None = None + if trigger.classifier_s3_uri: + c_bucket, _c_key = parse_s3_uri(trigger.classifier_s3_uri) + classifier_rows = CsvS3Client(boto_s3, c_bucket).read_rows( + trigger.classifier_s3_uri + ) + session = make_session(engine) try: orchestrator = BulkUploadFinaliserOrchestrator( # Write-only path: no EpcRepository needed for inserts. property_repo=PropertyPostgresRepository(session), status_writer=BulkUploadStatusWriterPostgresRepository(session), + property_override_repo=PropertyOverridePostgresRepository(session), + landlord_override_reader=LandlordOverrideReaderPostgresRepository(session), ) - # Atomic finalise: the orchestrator inserts properties and marks `complete` - # via its injected writers; the transaction here makes them land together — - # a failure in either rolls back both, leaving the row for the failure path. + # Atomic finalise: the orchestrator inserts properties, writes the + # property_overrides, and marks `complete` via its injected writers; the + # transaction here makes them land together — a failure in any (including an + # unresolved description, which raises) rolls back all, leaving the row for + # the failure path. with commit_scope(session): inserted = orchestrator.finalise( - rows, trigger.portfolio_id, trigger.task_id + rows, + trigger.portfolio_id, + trigger.task_id, + classifier_rows=classifier_rows, + multi_entry_ordering=trigger.multi_entry_ordering, + column_mapping=trigger.column_mapping, ) finally: session.close() diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index 759f76ca..dda8dc15 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -29,6 +29,12 @@ class FinaliserTriggerRequest(BaseModel): s3_uri: str # combiner output (combined_output_s3_uri) portfolio_id: int bulk_upload_id: str + # v2 (ADR-0006): inputs for the property_overrides write. Forwarded verbatim to + # the finaliser Lambda. classifier_s3_uri is null when no classifier columns + # were mapped; multi_entry_ordering / column_mapping default empty. + classifier_s3_uri: Optional[str] = None + multi_entry_ordering: dict[str, List[int]] = {} + column_mapping: dict[str, str] = {} class FlagsSummary(BaseModel): diff --git a/infrastructure/landlord_overrides/landlord_override_reader_postgres_repository.py b/infrastructure/landlord_overrides/landlord_override_reader_postgres_repository.py new file mode 100644 index 00000000..79b851f3 --- /dev/null +++ b/infrastructure/landlord_overrides/landlord_override_reader_postgres_repository.py @@ -0,0 +1,65 @@ +"""Postgres adapter that reads a portfolio's landlord-override vocabulary across +all four ``landlord__overrides`` tables (ADR-0006). + +The companion write path is ``LandlordOverridesRepository`` (one generic adapter +per category); reading for the finaliser is a different shape — all four tables +at once, keyed by component — so it gets its own small reader rather than four +parameterised instances at the call site. +""" + +from __future__ import annotations + +from enum import Enum +from typing import Any, cast + +from sqlalchemy import Table +from sqlalchemy import select as sa_select +from sqlmodel import Session + +from infrastructure.postgres.landlord_built_form_type_override_table import ( + LandlordBuiltFormTypeOverrideRow, +) +from infrastructure.postgres.landlord_property_type_override_table import ( + LandlordPropertyTypeOverrideRow, +) +from infrastructure.postgres.landlord_roof_type_override_table import ( + LandlordRoofTypeOverrideRow, +) +from infrastructure.postgres.landlord_wall_type_override_table import ( + LandlordWallTypeOverrideRow, +) +from repositories.landlord_overrides.landlord_override_reader import ( + LandlordOverrideReader, +) + +# component key (matches override_component / classifier category) -> row class +_ROW_TYPES: dict[str, type] = { + "property_type": LandlordPropertyTypeOverrideRow, + "built_form_type": LandlordBuiltFormTypeOverrideRow, + "wall_type": LandlordWallTypeOverrideRow, + "roof_type": LandlordRoofTypeOverrideRow, +} + + +def _value_str(value: Any) -> str: + # Core Table selects may yield the Python enum member or its raw string, + # depending on the column's enum mapping; normalise to the stored string. + return value.value if isinstance(value, Enum) else str(value) + + +class LandlordOverrideReaderPostgresRepository(LandlordOverrideReader): + def __init__(self, session: Session) -> None: + self._session = session + + def load_for_portfolio(self, portfolio_id: int) -> dict[str, dict[str, str]]: + result: dict[str, dict[str, str]] = {} + for component, row_type in _ROW_TYPES.items(): + table = cast(Table, getattr(row_type, "__table__")) + stmt = sa_select(table.c.description, table.c.value).where( + table.c.portfolio_id == portfolio_id + ) + rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated] + result[component] = { + str(description): _value_str(value) for description, value in rows + } + return result diff --git a/infrastructure/postgres/property_override_table.py b/infrastructure/postgres/property_override_table.py new file mode 100644 index 00000000..3132ddf5 --- /dev/null +++ b/infrastructure/postgres/property_override_table.py @@ -0,0 +1,78 @@ +"""SQLModel mirror of the FE-owned ``property_overrides`` Drizzle table. + +The schema + migration (``0221``) live in the ``assessment-model`` TS repo +(`src/app/db/schema/property_overrides.ts`); this row class only mirrors the +columns so the ``bulk_upload_finaliser`` Lambda can write the per-Property fact +layer at Finalise. Shape decided in ADR-0005; population (the join + resolve) +in ADR-0006. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import ClassVar +from uuid import UUID, uuid4 + +from sqlalchemy import BigInteger, Column, SmallInteger, UniqueConstraint +from sqlalchemy import Enum as SAEnum +from sqlmodel import Field, SQLModel + +# Mirror of the FE-owned ``override_component`` pgEnum (property_overrides.ts). +# The values ARE the classifier category keys used on both sides, so the +# finaliser maps category → component with no translation. A single SAEnum +# instance so test ``create_all`` emits one ``CREATE TYPE``; prod owns the type +# via Drizzle. +override_component_sa_enum = SAEnum( + "wall_type", + "roof_type", + "property_type", + "built_form_type", + name="override_component", +) + + +class PropertyOverrideRow(SQLModel, table=True): + """Mirror of the FE-owned ``property_overrides`` table — one row per + ``(property, override_component, building_part)`` carrying a denormalised + snapshot of the resolved enum (``override_value``) plus the raw cell text it + resolved from (``original_spreadsheet_description``).""" + + __tablename__: ClassVar[str] = "property_overrides" # pyright: ignore[reportIncompatibleVariableOverride] + __table_args__: ClassVar[tuple[UniqueConstraint, ...]] = ( # pyright: ignore[reportIncompatibleVariableOverride] + UniqueConstraint( + "property_id", + "override_component", + "building_part", + name="property_overrides_property_component_part_unique", + ), + ) + + id: UUID = Field(default_factory=uuid4, primary_key=True) + + # bigint FKs in the Drizzle schema (→ property.id / portfolio.id, ON DELETE + # CASCADE). The FKs are enforced by the Drizzle migration, not declared here. + property_id: int = Field( + sa_column=Column(BigInteger, nullable=False, index=True), + ) + portfolio_id: int = Field( + sa_column=Column(BigInteger, nullable=False), + ) + + # 0 = main building, 1 = extension 1, … (ADR-0004 ordering). + building_part: int = Field( + sa_column=Column(SmallInteger, nullable=False), + ) + override_component: str = Field( + sa_column=Column(override_component_sa_enum, nullable=False), + ) + override_value: str = Field(nullable=False) + original_spreadsheet_description: str = Field(nullable=False) + + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + nullable=False, + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + nullable=False, + ) diff --git a/orchestration/bulk_upload_finaliser_orchestrator.py b/orchestration/bulk_upload_finaliser_orchestrator.py index 46ad5b12..4d16d7f4 100644 --- a/orchestration/bulk_upload_finaliser_orchestrator.py +++ b/orchestration/bulk_upload_finaliser_orchestrator.py @@ -8,17 +8,31 @@ owns the transaction boundary (see the Lambda handler). from __future__ import annotations +import logging import re from typing import Any, Optional from uuid import UUID +from domain.epc.built_form_type import BuiltFormType +from domain.epc.property_type import PropertyType +from domain.epc.roof_type import RoofType +from domain.epc.wall_type import WallType from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.landlord_overrides.landlord_override_reader import ( + LandlordOverrideReader, +) +from repositories.property.property_override_repository import ( + PropertyOverrideInsert, + PropertyOverrideRepository, +) from repositories.property.property_repository import ( PropertyIdentityInsert, PropertyRepository, ) +logger = logging.getLogger(__name__) + # Combiner-output columns — identical to the old frontend /finalize route and the # backend combined-results reader (router.py). ADDRESS_COLS = ("Address 1", "Address 2", "Address 3") @@ -30,6 +44,52 @@ LEXISCORE_COL = "address2uprn_lexiscore" MISSING_SENTINEL = "invalid postcode" UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE) +# Synthetic per-row join key minted by the frontend start-address-matching route +# (ADR-0006) and emitted into BOTH the combiner output and the classifier CSV. +# Must match SOURCE_ROW_ID_COLUMN in src/lib/bulkUpload/s3Keys.ts. +SOURCE_ROW_ID_COL = "source_row_id" + +# The resolved-value sentinels that mean "the classifier could not decide". v2 +# treats these as never-final (ADR-0006): reaching the finaliser is a defect, so +# we fail loudly. All four category enums spell it "Unknown". +UNKNOWN_VALUES = frozenset( + { + PropertyType.UNKNOWN.value, + BuiltFormType.UNKNOWN.value, + WallType.UNKNOWN.value, + RoofType.UNKNOWN.value, + } +) + + +def _split_entries(cell: Any) -> list[str]: + """Split a multi-valued cell into per-building-part entries — mirrors the + frontend ``splitEntries`` (``split(",") → trim → drop empty``).""" + return [part.strip() for part in str(cell or "").split(",") if part.strip()] + + +def _permutation_for(count: int, multi_entry_ordering: dict[str, list[int]]) -> list[int]: + """``permutation[k]`` = the file position holding building part ``k``. + + A single-entry cell is always the main building (part 0). For ``count >= 2`` + the confirmed per-count permutation is required (the verify/ordering gate + guarantees it); a missing or malformed one is a defect → fail loudly. + """ + if count == 1: + return [0] + permutation = multi_entry_ordering.get(str(count)) + if ( + permutation is None + or len(permutation) != count + or sorted(permutation) != list(range(count)) + ): + raise ValueError( + f"No confirmed building-part ordering for {count}-entry cells " + f"(have {sorted(multi_entry_ordering)}). The awaiting_review ordering " + f"step should have captured it (ADR-0004/0006); failing the finalise." + ) + return permutation + def _normalize(value: Any) -> str: if value is None: @@ -84,21 +144,151 @@ class BulkUploadFinaliserOrchestrator: self, property_repo: PropertyRepository, status_writer: BulkUploadStatusWriter, + property_override_repo: Optional[PropertyOverrideRepository] = None, + landlord_override_reader: Optional[LandlordOverrideReader] = None, ) -> None: self._property_repo = property_repo self._status_writer = status_writer + # v2 (ADR-0006): the property_overrides write. Optional so the v1 + # property-only path (and its tests) construct with two collaborators; the + # Lambda wires all four. Overrides are written only when both are present + # AND the trigger carried classifier inputs. + self._property_override_repo = property_override_repo + self._landlord_override_reader = landlord_override_reader def finalise( - self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID + self, + rows: list[dict[str, str]], + portfolio_id: int, + task_id: UUID, + *, + classifier_rows: Optional[list[dict[str, str]]] = None, + multi_entry_ordering: Optional[dict[str, list[int]]] = None, + column_mapping: Optional[dict[str, str]] = None, ) -> int: - """Resolve the combiner rows, insert the properties, and mark the upload + """Resolve the combiner rows, insert the properties, write the + ``property_overrides`` for UPRN-matched rows (v2), and mark the upload ``complete`` — all via the injected repositories, no DB connection of its - own. Returns the number of properties inserted. Does not commit.""" + own. Returns the number of properties inserted. Does not commit. + + An unresolved (or ``Unknown``) description raises, so ``commit_scope`` + rolls back and nothing is half-written (ADR-0006 fail-loud).""" inserts = self.to_property_rows(rows, portfolio_id) inserted = self._property_repo.insert_all(inserts) + self._write_overrides( + rows, portfolio_id, classifier_rows, multi_entry_ordering, column_mapping + ) self._status_writer.set_status(task_id, "complete") return inserted + def _write_overrides( + self, + combiner_rows: list[dict[str, str]], + portfolio_id: int, + classifier_rows: Optional[list[dict[str, str]]], + multi_entry_ordering: Optional[dict[str, list[int]]], + column_mapping: Optional[dict[str, str]], + ) -> None: + # Nothing to do without the classifier inputs or the override + # collaborators (e.g. an upload with no classifier columns mapped). + if ( + not classifier_rows + or not column_mapping + or self._property_override_repo is None + or self._landlord_override_reader is None + ): + return + override_rows = self._build_overrides( + combiner_rows, + classifier_rows, + multi_entry_ordering or {}, + column_mapping, + portfolio_id, + ) + self._property_override_repo.upsert_all(override_rows) + + def _build_overrides( + self, + combiner_rows: list[dict[str, str]], + classifier_rows: list[dict[str, str]], + multi_entry_ordering: dict[str, list[int]], + column_mapping: dict[str, str], + portfolio_id: int, + ) -> list[PropertyOverrideInsert]: + """Assemble one ``property_overrides`` row per (UPRN property, component, + building part). UPRN rows only (no-UPRN deferred to v3); joined to the + classifier descriptions by ``source_row_id``; cells split + ordered by + ``multi_entry_ordering``; resolved against the portfolio vocabulary. + Raises on any non-empty description that won't resolve (fail-loud).""" + assert self._landlord_override_reader is not None # narrowed by caller + + # UPRN-matched combiner rows: source_row_id -> uprn. + uprn_by_row_id: dict[str, int] = {} + for row in combiner_rows: + row_id = _normalize(row.get(SOURCE_ROW_ID_COL)) + uprn = _parse_uprn(row.get(UPRN_COL)) + if row_id and uprn is not None: + uprn_by_row_id[row_id] = uprn + if not uprn_by_row_id: + return [] + + ids_by_uprn = self._property_repo.ids_by_uprn( + portfolio_id, sorted(set(uprn_by_row_id.values())) + ) + vocab = self._landlord_override_reader.load_for_portfolio(portfolio_id) + classifier_by_row_id = { + row_id: row + for row in classifier_rows + if (row_id := _normalize(row.get(SOURCE_ROW_ID_COL))) + } + + inserts: list[PropertyOverrideInsert] = [] + for row_id, uprn in uprn_by_row_id.items(): + property_id = ids_by_uprn.get(uprn) + if property_id is None: + # A UPRN row we can't resolve to a property.id — the insert above + # should have created it (or it pre-existed). Defensive skip; this + # is an identity gap, not a description-resolution failure. + logger.warning( + "Finalise: no property.id for uprn %s (row %s, portfolio %s); " + "skipping its overrides.", + uprn, + row_id, + portfolio_id, + ) + continue + classifier_row = classifier_by_row_id.get(row_id) + if classifier_row is None: + continue # no descriptions carried for this row + + for component, source_header in column_mapping.items(): + entries = _split_entries(classifier_row.get(source_header)) + if not entries: + continue # empty cell → no override row for this component + permutation = _permutation_for(len(entries), multi_entry_ordering) + component_vocab = vocab.get(component, {}) + for building_part, file_pos in enumerate(permutation): + raw = entries[file_pos] + value = component_vocab.get(raw.lower()) + if value is None or value in UNKNOWN_VALUES: + raise ValueError( + f"Unresolved {component} description {raw!r} " + f"(row {row_id}, portfolio {portfolio_id}): no resolved " + f"value{' (UNKNOWN)' if value else ''}. The verify gate " + f"should have mapped it; failing the finalise (ADR-0006)." + ) + inserts.append( + PropertyOverrideInsert( + property_id=property_id, + portfolio_id=portfolio_id, + building_part=building_part, + override_component=component, + override_value=value, + original_spreadsheet_description=raw, + ) + ) + return inserts + def to_property_rows( self, rows: list[dict[str, str]], portfolio_id: int ) -> list[PropertyIdentityInsert]: diff --git a/repositories/landlord_overrides/landlord_override_reader.py b/repositories/landlord_overrides/landlord_override_reader.py new file mode 100644 index 00000000..148c968b --- /dev/null +++ b/repositories/landlord_overrides/landlord_override_reader.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + + +class LandlordOverrideReader(ABC): + """Reads a portfolio's resolved landlord-override vocabulary across all four + category tables, for the finaliser's snapshot resolution (ADR-0006). + + The frontend does the equivalent read in ``src/lib/bulkUpload/server.ts`` + (``lookupOverrides``); this mirrors it on the backend. Distinct from the + write port ``LandlordOverrideRepository`` (the classifier's upsert path) — + one is read, one is write. + """ + + @abstractmethod + def load_for_portfolio(self, portfolio_id: int) -> dict[str, dict[str, str]]: + """Return ``{override_component: {normalized description: resolved value}}``. + + ``override_component`` is one of ``wall_type`` / ``roof_type`` / + ``property_type`` / ``built_form_type``; the inner key is the normalized + (``strip → lower``) description as the classifier stored it; the value is + the resolved enum's string value (e.g. ``"Cavity wall, filled cavity"``, + or ``"Unknown"`` for an unresolved one).""" + ... diff --git a/repositories/property/property_override_postgres_repository.py b/repositories/property/property_override_postgres_repository.py new file mode 100644 index 00000000..be69292a --- /dev/null +++ b/repositories/property/property_override_postgres_repository.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import cast + +from sqlalchemy import Table +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlmodel import Session + +from infrastructure.postgres.property_override_table import PropertyOverrideRow +from repositories.property.property_override_repository import ( + PropertyOverrideInsert, + PropertyOverrideRepository, +) + + +class PropertyOverridePostgresRepository(PropertyOverrideRepository): + """Postgres adapter for ``property_overrides`` (ADR-0006). + + Write-only: the finaliser materialises the fact layer; no reads here. + """ + + def __init__(self, session: Session) -> None: + self._session = session + # ``__table__`` is injected at runtime on table=True classes but the + # stubs don't expose it; pin to ``Table`` so the dialect insert is typed. + self._table: Table = cast(Table, getattr(PropertyOverrideRow, "__table__")) + + def upsert_all(self, rows: list[PropertyOverrideInsert]) -> int: + if not rows: + return 0 + + now = datetime.now(timezone.utc) + values = [ + { + "property_id": r.property_id, + "portfolio_id": r.portfolio_id, + "building_part": r.building_part, + "override_component": r.override_component, + "override_value": r.override_value, + "original_spreadsheet_description": r.original_spreadsheet_description, + "created_at": now, + "updated_at": now, + } + for r in rows + ] + + stmt = pg_insert(self._table).values(values) + # Re-run = recalculate (ADR-0005/0006): refresh the snapshot on conflict, + # in contrast to ``property``'s on_conflict_do_nothing. When a per-property + # user-edit path lands (and a ``source`` column with it), this set_ gains a + # ``WHERE source='classifier'`` guard so hand-edits survive. + stmt = stmt.on_conflict_do_update( + index_elements=["property_id", "override_component", "building_part"], + set_={ + "override_value": stmt.excluded.override_value, + "original_spreadsheet_description": stmt.excluded.original_spreadsheet_description, + "updated_at": now, + }, + ) + + # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked + # deprecated in the stubs but the upsert path is supported. + result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] + return cast(int, result.rowcount) diff --git a/repositories/property/property_override_repository.py b/repositories/property/property_override_repository.py new file mode 100644 index 00000000..0a1a0a2e --- /dev/null +++ b/repositories/property/property_override_repository.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PropertyOverrideInsert: + """One ``property_overrides`` row written at Finalise (ADR-0006). + + ``override_value`` is the resolved enum snapshot (a denormalised text copy + from ``landlord_*_overrides``); ``original_spreadsheet_description`` is the + raw cell entry it resolved from (un-normalized). + """ + + property_id: int + portfolio_id: int + building_part: int + override_component: str + override_value: str + original_spreadsheet_description: str + + +class PropertyOverrideRepository(ABC): + """Writes the per-Property fact layer (``property_overrides``). + + A distinct aggregate from ``property``: own table, own write semantics — + re-run = recalculate (upsert), in contrast to ``property``'s + insert-do-nothing. One repository per aggregate (ADR-0006). + """ + + @abstractmethod + def upsert_all(self, rows: list[PropertyOverrideInsert]) -> int: + """Upsert each row on ``(property_id, override_component, building_part)``, + refreshing ``override_value`` + ``original_spreadsheet_description`` + + ``updated_at`` on conflict (recalculate-on-rerun). Returns the number of + rows affected; an empty list is a no-op returning 0.""" + ... diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index 407fe1e5..9f8c4dd4 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any, Optional, cast from sqlalchemy import Table +from sqlalchemy import select as sa_select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col, select @@ -121,3 +122,13 @@ class PropertyPostgresRepository(PropertyRepository): # deprecated in the stubs but the INSERT path is supported. result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] return cast(int, result.rowcount) + + def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]: + if not uprns: + return {} + stmt = sa_select(self._table.c.uprn, self._table.c.id).where( + self._table.c.portfolio_id == portfolio_id, + self._table.c.uprn.in_(uprns), + ) + rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated] + return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None} diff --git a/repositories/property/property_repository.py b/repositories/property/property_repository.py index 1773b530..5b22c874 100644 --- a/repositories/property/property_repository.py +++ b/repositories/property/property_repository.py @@ -54,3 +54,14 @@ class PropertyRepository(ABC): Rows with no UPRN are always inserted. Returns the number actually inserted; an empty list is a no-op returning 0 (ADR-0013).""" ... + + @abstractmethod + def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]: + """Map each given UPRN to its ``property.id`` within a portfolio. + + Used by the finaliser to attach ``property_overrides`` to UPRN-matched + rows (ADR-0006) without a query per row — covers both rows just inserted + and pre-existing ones (re-found by ``(portfolio_id, uprn)``). UPRNs with + no matching property are simply absent from the result; an empty input is + a no-op returning ``{}``.""" + ... diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py index c9dcf891..edcc24ac 100644 --- a/tests/orchestration/fakes.py +++ b/tests/orchestration/fakes.py @@ -37,6 +37,10 @@ class FakePropertyRepo(PropertyRepository): self.inserted: list[PropertyIdentityInsert] = list(rows) return len(rows) + def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]: + # Not exercised by the baseline/EPC orchestrator tests that use this fake. + return {} + class FakeEpcRepo(EpcRepository): def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None: diff --git a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py index da6de277..ae285fae 100644 --- a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py +++ b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py @@ -7,6 +7,8 @@ is unit-testable with two in-memory fakes, no engine/session/Postgres. from __future__ import annotations +import pytest + from uuid import UUID, uuid4 from domain.property.properties import Properties @@ -15,6 +17,13 @@ from orchestration.bulk_upload_finaliser_orchestrator import ( BulkUploadFinaliserOrchestrator, ) from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.landlord_overrides.landlord_override_reader import ( + LandlordOverrideReader, +) +from repositories.property.property_override_repository import ( + PropertyOverrideInsert, + PropertyOverrideRepository, +) from repositories.property.property_repository import ( PropertyIdentityInsert, PropertyRepository, @@ -22,15 +31,19 @@ from repositories.property.property_repository import ( class FakePropertyRepository(PropertyRepository): - """Records inserts; the read half is unused on the Finalise path.""" + """Records inserts and serves a configurable uprn→id map for overrides.""" - def __init__(self) -> None: + def __init__(self, ids_by_uprn: dict[int, int] | None = None) -> None: self.inserted: list[PropertyIdentityInsert] = [] + self._ids_by_uprn = ids_by_uprn or {} def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: self.inserted = list(rows) return len(rows) + def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]: + return {u: self._ids_by_uprn[u] for u in uprns if u in self._ids_by_uprn} + def get(self, property_id: int) -> Property: # pragma: no cover raise NotImplementedError @@ -46,6 +59,23 @@ class FakeStatusWriter(BulkUploadStatusWriter): self.calls.append((task_id, status)) +class FakePropertyOverrideRepo(PropertyOverrideRepository): + def __init__(self) -> None: + self.upserted: list[PropertyOverrideInsert] = [] + + def upsert_all(self, rows: list[PropertyOverrideInsert]) -> int: + self.upserted = list(rows) + return len(rows) + + +class FakeLandlordOverrideReader(LandlordOverrideReader): + def __init__(self, vocab: dict[str, dict[str, str]]) -> None: + self._vocab = vocab + + def load_for_portfolio(self, portfolio_id: int) -> dict[str, dict[str, str]]: + return self._vocab + + def _orchestrator() -> tuple[ BulkUploadFinaliserOrchestrator, FakePropertyRepository, FakeStatusWriter ]: @@ -103,3 +133,95 @@ def test_finalise_falls_back_to_user_input_when_unmatched() -> None: assert row.address == "2 Other Road" # falls back to user input assert row.postcode == "B1 1BB" # falls back to user-inputted postcode assert row.lexiscore is None + + +# --- v2: property_overrides (ADR-0006) ------------------------------------ + + +def _overrides_orchestrator( + ids_by_uprn: dict[int, int], vocab: dict[str, dict[str, str]] +) -> tuple[BulkUploadFinaliserOrchestrator, FakePropertyOverrideRepo]: + prop = FakePropertyRepository(ids_by_uprn=ids_by_uprn) + overrides = FakePropertyOverrideRepo() + orchestrator = BulkUploadFinaliserOrchestrator( + prop, + FakeStatusWriter(), + property_override_repo=overrides, + landlord_override_reader=FakeLandlordOverrideReader(vocab), + ) + return orchestrator, overrides + + +def test_finalise_writes_overrides_for_uprn_rows_splitting_by_part() -> None: + # Combiner row carries identity + the join key; classifier row carries the + # raw descriptions under the source headers, joined by source_row_id. + combiner = [ + { + "Address 1": "1 Some Street", + "address2uprn_uprn": "100023", + "source_row_id": "row-a", + }, + # A no-UPRN row: must get NO overrides (deferred to v3). + { + "Address 1": "2 Other Road", + "address2uprn_uprn": "invalid postcode", + "source_row_id": "row-b", + }, + ] + classifier = [ + {"Walls": "Cavity, Solid", "Property Type": "Semi", "source_row_id": "row-a"}, + {"Walls": "Cavity", "Property Type": "Detached", "source_row_id": "row-b"}, + ] + vocab = { + "wall_type": {"cavity": "Cavity wall, filled cavity", "solid": "Solid brick, as built, no insulation (assumed)"}, + "property_type": {"semi": "Semi-detached house", "detached": "Detached house"}, + } + # For 2-entry Walls cells: file position 1 holds the main building (part 0). + ordering = {"2": [1, 0]} + orchestrator, overrides = _overrides_orchestrator({100023: 555}, vocab) + + orchestrator.finalise( + combiner, + portfolio_id=7, + task_id=uuid4(), + classifier_rows=classifier, + multi_entry_ordering=ordering, + column_mapping={"wall_type": "Walls", "property_type": "Property Type"}, + ) + + # Only the UPRN row (row-a → property 555) produced overrides. + assert {o.property_id for o in overrides.upserted} == {555} + walls = sorted( + (o for o in overrides.upserted if o.override_component == "wall_type"), + key=lambda o: o.building_part, + ) + # part 0 (main) = file position 1 = "Solid"; part 1 (ext) = position 0 = "Cavity". + assert [(o.building_part, o.original_spreadsheet_description, o.override_value) for o in walls] == [ + (0, "Solid", "Solid brick, as built, no insulation (assumed)"), + (1, "Cavity", "Cavity wall, filled cavity"), + ] + # Whole-dwelling-style single-entry cell lands at part 0. + (prop_type,) = [o for o in overrides.upserted if o.override_component == "property_type"] + assert (prop_type.building_part, prop_type.override_value) == (0, "Semi-detached house") + + +def test_finalise_fails_loudly_on_unresolved_description() -> None: + combiner = [ + {"address2uprn_uprn": "100023", "source_row_id": "row-a"}, + ] + classifier = [{"Walls": "Cavity", "source_row_id": "row-a"}] + # "cavity" resolves to Unknown — the never-final sentinel (ADR-0006). + vocab = {"wall_type": {"cavity": "Unknown"}} + orchestrator, overrides = _overrides_orchestrator({100023: 555}, vocab) + + with pytest.raises(ValueError, match="Unresolved wall_type"): + orchestrator.finalise( + combiner, + portfolio_id=7, + task_id=uuid4(), + classifier_rows=classifier, + multi_entry_ordering={}, + column_mapping={"wall_type": "Walls"}, + ) + # Nothing persisted — the raise rolls the whole finalise back upstream. + assert overrides.upserted == []