mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
129 lines
5 KiB
Python
129 lines
5 KiB
Python
"""bulk_upload_finaliser Lambda (ADR-0013).
|
|
|
|
Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse
|
|
the trigger, read the combiner output CSV from S3, hand the rows to the
|
|
``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then
|
|
write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to
|
|
the backend). ``complete`` is written in the *same* transaction as the property
|
|
insert (atomic finalise); ``failed`` is written on a fresh session on error.
|
|
|
|
PostgresConfig-only, like the landlord classifier Lambda — no legacy ``backend/``
|
|
connection — so a single DB config (POSTGRES_*) drives the whole run.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
import boto3
|
|
from sqlalchemy.engine import Engine
|
|
|
|
from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import (
|
|
BulkUploadFinaliserTriggerBody,
|
|
)
|
|
from infrastructure.postgres.config import PostgresConfig
|
|
from infrastructure.postgres.engine import commit_scope, make_engine, make_session
|
|
from infrastructure.s3.csv_s3_client import CsvS3Client
|
|
from infrastructure.s3.s3_uri import parse_s3_uri
|
|
from infrastructure.landlord_overrides.landlord_override_reader_postgres_repository import (
|
|
LandlordOverrideReaderPostgresRepository,
|
|
)
|
|
from orchestration.bulk_upload_finaliser_orchestrator import (
|
|
BulkUploadFinaliserOrchestrator,
|
|
)
|
|
from orchestration.task_orchestrator import TaskOrchestrator
|
|
from repositories.bulk_upload.bulk_upload_status_writer_postgres import (
|
|
BulkUploadStatusWriterPostgresRepository,
|
|
)
|
|
from repositories.property.property_override_postgres_repository import (
|
|
PropertyOverridePostgresRepository,
|
|
)
|
|
from repositories.property.property_postgres_repository import (
|
|
PropertyPostgresRepository,
|
|
)
|
|
from utilities.aws_lambda.subtask_handler import subtask_handler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int:
|
|
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
|
boto_s3: Any = boto3_client("s3")
|
|
|
|
bucket, _key = parse_s3_uri(trigger.s3_uri)
|
|
rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri)
|
|
|
|
# v2 (ADR-0006): the classifier CSV carries the raw descriptions, joined to the
|
|
# combiner rows by `source_row_id`. Absent when no classifier columns were
|
|
# mapped → the orchestrator simply writes no property_overrides.
|
|
classifier_rows: list[dict[str, str]] | None = None
|
|
if trigger.classifier_s3_uri:
|
|
c_bucket, _c_key = parse_s3_uri(trigger.classifier_s3_uri)
|
|
classifier_rows = CsvS3Client(boto_s3, c_bucket).read_rows(
|
|
trigger.classifier_s3_uri
|
|
)
|
|
|
|
session = make_session(engine)
|
|
try:
|
|
orchestrator = BulkUploadFinaliserOrchestrator(
|
|
# Write-only path: no EpcRepository needed for inserts.
|
|
property_repo=PropertyPostgresRepository(session),
|
|
status_writer=BulkUploadStatusWriterPostgresRepository(session),
|
|
property_override_repo=PropertyOverridePostgresRepository(session),
|
|
landlord_override_reader=LandlordOverrideReaderPostgresRepository(session),
|
|
)
|
|
# Atomic finalise: the orchestrator inserts properties, writes the
|
|
# property_overrides, and marks `complete` via its injected writers; the
|
|
# transaction here makes them land together — a failure in any (including an
|
|
# unresolved description, which raises) rolls back all, leaving the row for
|
|
# the failure path.
|
|
with commit_scope(session):
|
|
inserted = orchestrator.finalise(
|
|
rows,
|
|
trigger.portfolio_id,
|
|
trigger.task_id,
|
|
classifier_rows=classifier_rows,
|
|
multi_entry_ordering=trigger.multi_entry_ordering,
|
|
column_mapping=trigger.column_mapping,
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
logger.info(
|
|
"Finalised bulk upload %s: %d rows read, %d properties inserted.",
|
|
trigger.bulk_upload_id,
|
|
len(rows),
|
|
inserted,
|
|
)
|
|
return inserted
|
|
|
|
|
|
def _mark_failed(engine: Engine, task_id: UUID) -> None:
|
|
session = make_session(engine)
|
|
try:
|
|
with commit_scope(session):
|
|
BulkUploadStatusWriterPostgresRepository(session).set_status(
|
|
task_id, "failed"
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@subtask_handler()
|
|
def handler(
|
|
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
|
|
) -> dict[str, int]:
|
|
trigger = BulkUploadFinaliserTriggerBody.model_validate(body)
|
|
engine = make_engine(PostgresConfig.from_env(os.environ))
|
|
|
|
try:
|
|
inserted = _run(engine, trigger)
|
|
except Exception:
|
|
# Hand the BulkUpload to the terminal `failed` state so the UI leaves
|
|
# `finalising`; the @subtask_handler also marks the SubTask FAILED on the
|
|
# re-raise below.
|
|
_mark_failed(engine, trigger.task_id)
|
|
raise
|
|
|
|
return {"inserted": inserted}
|