Model/backend/app/bulk_uploads/router.py
2026-04-20 13:17:35 +00:00

74 lines
2.1 KiB
Python

import boto3
import json
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import select
from backend.app.dependencies import validate_token
from backend.app.config import get_settings
from backend.app.db.connection import get_db_session
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
from backend.app.bulk_uploads.schema import TriggerSplitterRequest
from utils.s3 import parse_s3_uri, read_csv_from_s3
router = APIRouter(
prefix="/bulk-uploads",
tags=["bulk-uploads"],
dependencies=[Depends(validate_token)],
)
@router.post("/trigger-splitter", status_code=202)
async def trigger_splitter(req: TriggerSplitterRequest):
settings = get_settings()
sqs_payload = {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"s3_uri": req.s3_uri,
}
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL,
MessageBody=json.dumps(sqs_payload),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results")
async def get_combined_results(
task_id: UUID,
offset: int = Query(default=0, ge=0),
limit: int = Query(default=500, ge=1, le=5000),
):
with get_db_session() as session:
upload = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not upload:
raise HTTPException(status_code=404, detail="Upload not found")
if not upload.combined_output_s3_uri:
raise HTTPException(status_code=409, detail="Combiner not finished")
bucket, key = parse_s3_uri(upload.combined_output_s3_uri)
rows = read_csv_from_s3(bucket, key)
total = len(rows)
return {
"rows": rows[offset : offset + limit],
"total": total,
"offset": offset,
"limit": limit,
}