mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
220 lines
6.8 KiB
Python
220 lines
6.8 KiB
Python
import boto3
|
|
from collections import Counter
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import select
|
|
|
|
from backend.app.dependencies import validate_token
|
|
from backend.app.config import get_settings
|
|
from backend.app.db.connection import get_db_session
|
|
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
|
|
from backend.app.bulk_uploads.schema import (
|
|
CombinedResultRow,
|
|
CombinedResultsResponse,
|
|
CombinerTriggerRequest,
|
|
FinaliserTriggerRequest,
|
|
FlagsSummary,
|
|
LandlordOverridesTriggerRequest,
|
|
PostcodeSplitterTriggerRequest,
|
|
)
|
|
from backend.app.bulk_uploads.scoring import score_bucket
|
|
|
|
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode")
|
|
INTERNAL_REF_COL = "Internal Reference"
|
|
UPRN_COL = "address2uprn_uprn"
|
|
MATCHED_ADDRESS_COL = "address2uprn_address"
|
|
LEXISCORE_COL = "address2uprn_lexiscore"
|
|
MISSING_SENTINEL = "invalid postcode"
|
|
|
|
|
|
def _normalize(value) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
|
|
def _is_missing_uprn(uprn: str) -> bool:
|
|
return uprn == "" or uprn.lower() == MISSING_SENTINEL
|
|
|
|
|
|
def _parse_lexiscore(raw) -> float | None:
|
|
val = _normalize(raw)
|
|
if not val or val.lower() == MISSING_SENTINEL:
|
|
return None
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/bulk-uploads",
|
|
tags=["bulk-uploads"],
|
|
dependencies=[Depends(validate_token)],
|
|
)
|
|
|
|
|
|
@router.post("/trigger-postcode-splitter", status_code=202)
|
|
async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest):
|
|
settings = get_settings()
|
|
|
|
try:
|
|
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
|
|
response = sqs.send_message(
|
|
QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL,
|
|
MessageBody=req.model_dump_json(),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
|
|
|
|
return {
|
|
"task_id": req.task_id,
|
|
"sub_task_id": req.sub_task_id,
|
|
"sqs_message_id": response.get("MessageId"),
|
|
}
|
|
|
|
|
|
@router.post("/trigger-combiner", status_code=202)
|
|
async def trigger_combiner(req: CombinerTriggerRequest):
|
|
settings = get_settings()
|
|
|
|
try:
|
|
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
|
|
response = sqs.send_message(
|
|
QueueUrl=settings.COMBINER_SQS_URL,
|
|
MessageBody=req.model_dump_json(),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
|
|
|
|
return {
|
|
"task_id": req.task_id,
|
|
"sub_task_id": req.sub_task_id,
|
|
"sqs_message_id": response.get("MessageId"),
|
|
}
|
|
|
|
|
|
@router.post("/trigger-landlord-overrides", status_code=202)
|
|
async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest):
|
|
settings = get_settings()
|
|
|
|
try:
|
|
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
|
|
response = sqs.send_message(
|
|
QueueUrl=settings.LANDLORD_OVERRIDES_SQS_URL,
|
|
MessageBody=req.model_dump_json(),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
|
|
|
|
return {
|
|
"task_id": req.task_id,
|
|
"sub_task_id": req.sub_task_id,
|
|
"sqs_message_id": response.get("MessageId"),
|
|
}
|
|
|
|
|
|
@router.post("/trigger-finaliser", status_code=202)
|
|
async def trigger_finaliser(req: FinaliserTriggerRequest):
|
|
settings = get_settings()
|
|
|
|
try:
|
|
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
|
|
response = sqs.send_message(
|
|
QueueUrl=settings.FINALISER_SQS_URL,
|
|
MessageBody=req.model_dump_json(),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
|
|
|
|
return {
|
|
"task_id": req.task_id,
|
|
"sub_task_id": req.sub_task_id,
|
|
"sqs_message_id": response.get("MessageId"),
|
|
}
|
|
|
|
|
|
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
|
|
async def get_combined_results(
|
|
task_id: UUID,
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(500, ge=1, le=5000),
|
|
):
|
|
with get_db_session() as session:
|
|
upload = session.exec(
|
|
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
|
|
).first()
|
|
|
|
if upload is None:
|
|
raise HTTPException(status_code=404, detail="Upload not found for task_id")
|
|
if not upload.combined_output_s3_uri:
|
|
raise HTTPException(status_code=409, detail="Combiner not finished")
|
|
|
|
bucket, key = parse_s3_uri(upload.combined_output_s3_uri)
|
|
try:
|
|
raw_rows = read_csv_from_s3(bucket, key)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}")
|
|
|
|
uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows]
|
|
uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u))
|
|
duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2}
|
|
|
|
missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u))
|
|
duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns)
|
|
matched_count = len(raw_rows) - missing_count
|
|
|
|
page = raw_rows[offset : offset + limit]
|
|
rows: list[CombinedResultRow] = []
|
|
for i, raw in enumerate(page):
|
|
absolute_index = offset + i
|
|
address_parts = [
|
|
_normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col))
|
|
]
|
|
input_address = ", ".join(address_parts)
|
|
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
|
|
|
|
uprn_raw = _normalize(raw.get(UPRN_COL))
|
|
uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw
|
|
|
|
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
|
|
matched_address = (
|
|
None
|
|
if not matched_address_raw
|
|
or matched_address_raw.lower() == MISSING_SENTINEL
|
|
else matched_address_raw
|
|
)
|
|
|
|
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
|
|
|
|
flags: list[str] = []
|
|
if uprn is None:
|
|
flags.append("missing")
|
|
elif uprn in duplicate_uprns:
|
|
flags.append("duplicate")
|
|
|
|
rows.append(
|
|
CombinedResultRow(
|
|
row_index=absolute_index,
|
|
input_address=input_address,
|
|
internal_reference=internal_ref,
|
|
uprn=uprn,
|
|
matched_address=matched_address,
|
|
lexiscore=lexiscore,
|
|
score_bucket=score_bucket(lexiscore),
|
|
flags=flags,
|
|
)
|
|
)
|
|
|
|
return CombinedResultsResponse(
|
|
task_id=str(task_id),
|
|
total=len(raw_rows),
|
|
offset=offset,
|
|
limit=limit,
|
|
flags_summary=FlagsSummary(
|
|
duplicates=duplicate_count,
|
|
missing=missing_count,
|
|
matched=matched_count,
|
|
),
|
|
rows=rows,
|
|
)
|