"""bulk_upload_finaliser Lambda (ADR-0013). Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse the trigger, read the combiner output CSV from S3, hand the rows to the ``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to the backend). ``complete`` is written in the *same* transaction as the property insert (atomic finalise); ``failed`` is written on a fresh session on error. PostgresConfig-only, like the landlord classifier Lambda — no legacy ``backend/`` connection — so a single DB config (POSTGRES_*) drives the whole run. """ import logging import os from typing import Any from uuid import UUID import boto3 from sqlalchemy.engine import Engine from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import ( BulkUploadFinaliserTriggerBody, ) from infrastructure.postgres.config import PostgresConfig from infrastructure.postgres.engine import commit_scope, make_engine, make_session from infrastructure.s3.csv_s3_client import CsvS3Client from infrastructure.s3.s3_uri import parse_s3_uri from infrastructure.landlord_overrides.landlord_override_reader_postgres_repository import ( LandlordOverrideReaderPostgresRepository, ) from orchestration.bulk_upload_finaliser_orchestrator import ( BulkUploadFinaliserOrchestrator, ) from orchestration.task_orchestrator import TaskOrchestrator from repositories.bulk_upload.bulk_upload_status_writer_postgres import ( BulkUploadStatusWriterPostgresRepository, ) from repositories.property.property_override_postgres_repository import ( PropertyOverridePostgresRepository, ) from repositories.property.property_postgres_repository import ( PropertyPostgresRepository, ) from utilities.aws_lambda.subtask_handler import subtask_handler logger = logging.getLogger(__name__) def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int: boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] boto_s3: Any = boto3_client("s3") bucket, _key = parse_s3_uri(trigger.s3_uri) rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri) # v2 (ADR-0006): the classifier CSV carries the raw descriptions, joined to the # combiner rows by `source_row_id`. Absent when no classifier columns were # mapped → the orchestrator simply writes no property_overrides. classifier_rows: list[dict[str, str]] | None = None if trigger.classifier_s3_uri: c_bucket, _c_key = parse_s3_uri(trigger.classifier_s3_uri) classifier_rows = CsvS3Client(boto_s3, c_bucket).read_rows( trigger.classifier_s3_uri ) session = make_session(engine) try: orchestrator = BulkUploadFinaliserOrchestrator( # Write-only path: no EpcRepository needed for inserts. property_repo=PropertyPostgresRepository(session), status_writer=BulkUploadStatusWriterPostgresRepository(session), property_override_repo=PropertyOverridePostgresRepository(session), landlord_override_reader=LandlordOverrideReaderPostgresRepository(session), ) # Atomic finalise: the orchestrator inserts properties, writes the # property_overrides, and marks `complete` via its injected writers; the # transaction here makes them land together — a failure in any (including an # unresolved description, which raises) rolls back all, leaving the row for # the failure path. with commit_scope(session): inserted = orchestrator.finalise( rows, trigger.portfolio_id, trigger.task_id, classifier_rows=classifier_rows, multi_entry_ordering=trigger.multi_entry_ordering, column_mapping=trigger.column_mapping, ) finally: session.close() logger.info( "Finalised bulk upload %s: %d rows read, %d properties inserted.", trigger.bulk_upload_id, len(rows), inserted, ) return inserted def _mark_failed(engine: Engine, task_id: UUID) -> None: session = make_session(engine) try: with commit_scope(session): BulkUploadStatusWriterPostgresRepository(session).set_status( task_id, "failed" ) finally: session.close() @subtask_handler() def handler( body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator ) -> dict[str, int]: trigger = BulkUploadFinaliserTriggerBody.model_validate(body) engine = make_engine(PostgresConfig.from_env(os.environ)) try: inserted = _run(engine, trigger) except Exception: # Hand the BulkUpload to the terminal `failed` state so the UI leaves # `finalising`; the @subtask_handler also marks the SubTask FAILED on the # re-raise below. _mark_failed(engine, trigger.task_id) raise return {"inserted": inserted}