import boto3 from collections import Counter from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import select from backend.app.dependencies import validate_token from backend.app.config import get_settings from backend.app.db.connection import get_db_session from backend.app.db.models.bulk_address_uploads import BulkAddressUpload from backend.app.bulk_uploads.schema import ( CombinedResultRow, CombinedResultsResponse, CombinerTriggerRequest, FinaliserTriggerRequest, FlagsSummary, LandlordOverridesTriggerRequest, PostcodeSplitterTriggerRequest, ) from backend.app.bulk_uploads.scoring import score_bucket ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode") INTERNAL_REF_COL = "Internal Reference" UPRN_COL = "address2uprn_uprn" MATCHED_ADDRESS_COL = "address2uprn_address" LEXISCORE_COL = "address2uprn_lexiscore" MISSING_SENTINEL = "invalid postcode" def _normalize(value) -> str: if value is None: return "" return str(value).strip() def _is_missing_uprn(uprn: str) -> bool: return uprn == "" or uprn.lower() == MISSING_SENTINEL def _parse_lexiscore(raw) -> float | None: val = _normalize(raw) if not val or val.lower() == MISSING_SENTINEL: return None try: return float(val) except ValueError: return None router = APIRouter( prefix="/bulk-uploads", tags=["bulk-uploads"], dependencies=[Depends(validate_token)], ) @router.post("/trigger-postcode-splitter", status_code=202) async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest): settings = get_settings() try: sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) response = sqs.send_message( QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL, MessageBody=req.model_dump_json(), ) except Exception as e: raise HTTPException(status_code=500, detail=f"SQS error: {e}") return { "task_id": req.task_id, "sub_task_id": req.sub_task_id, "sqs_message_id": response.get("MessageId"), } @router.post("/trigger-combiner", status_code=202) async def trigger_combiner(req: CombinerTriggerRequest): settings = get_settings() try: sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) response = sqs.send_message( QueueUrl=settings.COMBINER_SQS_URL, MessageBody=req.model_dump_json(), ) except Exception as e: raise HTTPException(status_code=500, detail=f"SQS error: {e}") return { "task_id": req.task_id, "sub_task_id": req.sub_task_id, "sqs_message_id": response.get("MessageId"), } @router.post("/trigger-landlord-overrides", status_code=202) async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest): settings = get_settings() try: sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) response = sqs.send_message( QueueUrl=settings.LANDLORD_OVERRIDES_SQS_URL, MessageBody=req.model_dump_json(), ) except Exception as e: raise HTTPException(status_code=500, detail=f"SQS error: {e}") return { "task_id": req.task_id, "sub_task_id": req.sub_task_id, "sqs_message_id": response.get("MessageId"), } @router.post("/trigger-finaliser", status_code=202) async def trigger_finaliser(req: FinaliserTriggerRequest): settings = get_settings() try: sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) response = sqs.send_message( QueueUrl=settings.FINALISER_SQS_URL, MessageBody=req.model_dump_json(), ) except Exception as e: raise HTTPException(status_code=500, detail=f"SQS error: {e}") return { "task_id": req.task_id, "sub_task_id": req.sub_task_id, "sqs_message_id": response.get("MessageId"), } @router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) async def get_combined_results( task_id: UUID, offset: int = Query(0, ge=0), limit: int = Query(500, ge=1, le=5000), ): with get_db_session() as session: upload = session.exec( select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) ).first() if upload is None: raise HTTPException(status_code=404, detail="Upload not found for task_id") if not upload.combined_output_s3_uri: raise HTTPException(status_code=409, detail="Combiner not finished") bucket, key = parse_s3_uri(upload.combined_output_s3_uri) try: raw_rows = read_csv_from_s3(bucket, key) except Exception as e: raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}") uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows] uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u)) duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2} missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u)) duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns) matched_count = len(raw_rows) - missing_count page = raw_rows[offset : offset + limit] rows: list[CombinedResultRow] = [] for i, raw in enumerate(page): absolute_index = offset + i address_parts = [ _normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col)) ] input_address = ", ".join(address_parts) internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None uprn_raw = _normalize(raw.get(UPRN_COL)) uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) matched_address = ( None if not matched_address_raw or matched_address_raw.lower() == MISSING_SENTINEL else matched_address_raw ) lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) flags: list[str] = [] if uprn is None: flags.append("missing") elif uprn in duplicate_uprns: flags.append("duplicate") rows.append( CombinedResultRow( row_index=absolute_index, input_address=input_address, internal_reference=internal_ref, uprn=uprn, matched_address=matched_address, lexiscore=lexiscore, score_bucket=score_bucket(lexiscore), flags=flags, ) ) return CombinedResultsResponse( task_id=str(task_id), total=len(raw_rows), offset=offset, limit=limit, flags_summary=FlagsSummary( duplicates=duplicate_count, missing=missing_count, matched=matched_count, ), rows=rows, )