Model/backend/app/bulk_uploads/router.py
2026-06-04 11:47:42 +00:00

220 lines
6.8 KiB
Python

import boto3
from collections import Counter
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import select
from backend.app.dependencies import validate_token
from backend.app.config import get_settings
from backend.app.db.connection import get_db_session
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
from backend.app.bulk_uploads.schema import (
CombinedResultRow,
CombinedResultsResponse,
CombinerTriggerRequest,
FinaliserTriggerRequest,
FlagsSummary,
LandlordOverridesTriggerRequest,
PostcodeSplitterTriggerRequest,
)
from backend.app.bulk_uploads.scoring import score_bucket
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode")
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
def _normalize(value) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing_uprn(uprn: str) -> bool:
return uprn == "" or uprn.lower() == MISSING_SENTINEL
def _parse_lexiscore(raw) -> float | None:
val = _normalize(raw)
if not val or val.lower() == MISSING_SENTINEL:
return None
try:
return float(val)
except ValueError:
return None
router = APIRouter(
prefix="/bulk-uploads",
tags=["bulk-uploads"],
dependencies=[Depends(validate_token)],
)
@router.post("/trigger-postcode-splitter", status_code=202)
async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.post("/trigger-combiner", status_code=202)
async def trigger_combiner(req: CombinerTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.COMBINER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.post("/trigger-landlord-overrides", status_code=202)
async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.LANDLORD_OVERRIDES_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.post("/trigger-finaliser", status_code=202)
async def trigger_finaliser(req: FinaliserTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.FINALISER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
async def get_combined_results(
task_id: UUID,
offset: int = Query(0, ge=0),
limit: int = Query(500, ge=1, le=5000),
):
with get_db_session() as session:
upload = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if upload is None:
raise HTTPException(status_code=404, detail="Upload not found for task_id")
if not upload.combined_output_s3_uri:
raise HTTPException(status_code=409, detail="Combiner not finished")
bucket, key = parse_s3_uri(upload.combined_output_s3_uri)
try:
raw_rows = read_csv_from_s3(bucket, key)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}")
uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows]
uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u))
duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2}
missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u))
duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns)
matched_count = len(raw_rows) - missing_count
page = raw_rows[offset : offset + limit]
rows: list[CombinedResultRow] = []
for i, raw in enumerate(page):
absolute_index = offset + i
address_parts = [
_normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col))
]
input_address = ", ".join(address_parts)
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
uprn_raw = _normalize(raw.get(UPRN_COL))
uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None
if not matched_address_raw
or matched_address_raw.lower() == MISSING_SENTINEL
else matched_address_raw
)
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
flags: list[str] = []
if uprn is None:
flags.append("missing")
elif uprn in duplicate_uprns:
flags.append("duplicate")
rows.append(
CombinedResultRow(
row_index=absolute_index,
input_address=input_address,
internal_reference=internal_ref,
uprn=uprn,
matched_address=matched_address,
lexiscore=lexiscore,
score_bucket=score_bucket(lexiscore),
flags=flags,
)
)
return CombinedResultsResponse(
task_id=str(task_id),
total=len(raw_rows),
offset=offset,
limit=limit,
flags_summary=FlagsSummary(
duplicates=duplicate_count,
missing=missing_count,
matched=matched_count,
),
rows=rows,
)