Model/backend/bulk_address2uprn_combiner/main.py
Jun-te Kim d70e8a9e53 utilities/aws_lambda: @subtask_handler injects TaskOrchestrator as third positional arg
The wrapped function now receives the decorator-owned TaskOrchestrator as
a third positional argument so handlers can compose their own use-case
orchestrator that shares the session, instead of opening a second Postgres
connection per invocation.

Both existing callers (backend/ordnanceSurvey/main.py and
backend/bulk_address2uprn_combiner/main.py) have their signatures extended
to accept the new positional argument (typed Optional[TaskOrchestrator] so
the legacy backend.utils.subtasks.subtask_handler — which only passes two
args — keeps working until the migration to the new decorator lands).

@task_handler is intentionally unchanged in this slice; symmetry is
deferred per issue #1103.
2026-05-19 17:31:27 +00:00

89 lines
2.8 KiB
Python

import os
import boto3
import pandas as pd
from io import BytesIO
from typing import Any, Optional
from uuid import UUID
from datetime import datetime, timezone
from utils.logger import setup_logger
from backend.utils.subtasks import subtask_handler
from backend.app.db.functions.bulk_address_uploads_functions import (
set_combined_output_s3_uri,
set_combining_status,
)
from orchestration.task_orchestrator import TaskOrchestrator
logger = setup_logger()
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
def list_csv_files(s3_client, bucket: str, task_id: str) -> list[str]:
paginator = s3_client.get_paginator("list_objects_v2")
prefix = f"ara_raw_outputs/{task_id}/"
keys = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv"):
keys.append(obj["Key"])
return keys
def download_csv(s3_client, bucket: str, key: str) -> pd.DataFrame:
obj = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_csv(BytesIO(obj["Body"].read()))
@subtask_handler()
def handler(
body: dict[str, Any],
context: Any,
orchestrator: Optional[TaskOrchestrator] = None,
) -> str:
# `orchestrator` is injected by the new utilities.aws_lambda.subtask_handler
# decorator; unused here but accepted so the contract is uniform across
# callers (see issue #1103).
del orchestrator
task_id_str: str = body.get("task_id", "")
if not task_id_str:
raise RuntimeError("Missing task_id in message body")
set_combining_status(UUID(task_id_str))
bucket = S3_BUCKET_NAME
if not bucket:
raise RuntimeError("S3_BUCKET_NAME env var not set")
s3 = boto3.client("s3")
logger.info(f"Combining ara_raw_outputs for task {task_id_str}")
csv_keys = list_csv_files(s3, bucket, task_id_str)
if not csv_keys:
raise RuntimeError(f"No CSV files found under ara_raw_outputs/{task_id_str}/")
logger.info(f"Found {len(csv_keys)} CSV files")
dfs = [download_csv(s3, bucket, key) for key in csv_keys]
combined = pd.concat(dfs, ignore_index=True)
logger.info(f"Combined {len(combined)} rows from {len(dfs)} files")
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
output_key = f"bulk_final_outputs/{task_id_str}/combined_{timestamp}.csv"
csv_buffer = BytesIO()
combined.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
s3.put_object(Bucket=bucket, Key=output_key, Body=csv_buffer.getvalue())
s3_uri = f"s3://{bucket}/{output_key}"
logger.info(f"Saved combined CSV to {s3_uri}")
print(f"OUTPUT_S3_URI: {s3_uri}")
set_combined_output_s3_uri(UUID(task_id_str), s3_uri)
logger.info(f"Persisted combined_output_s3_uri + awaiting_review status for task {task_id_str}")
return s3_uri