Model/backend/postcode_splitter/main.py
2026-02-13 11:05:05 +00:00

456 lines
17 KiB
Python

import os
import sys
import json
import pandas as pd
import requests
import boto3
from uuid import UUID, uuid4
from urllib.parse import unquote
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict, save_csv_to_s3
from utils.logger import setup_logger
from tqdm import tqdm
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from datetime import datetime
logger = setup_logger()
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
"""
Parse S3 URI to extract bucket and key.
Supports two formats:
1. S3 URI format: s3://bucket/key
"""
logger.info("Parsing S3 URI")
try:
# Check if it's an S3 URI format
if s3_uri.startswith("s3://"):
parts = s3_uri[5:].split("/", 1)
if len(parts) < 2:
raise ValueError("S3 URI must include both bucket and key")
bucket = parts[0]
key = parts[1]
logger.info(f"Extracted bucket: {bucket}, key: {key}")
return bucket, key
# Otherwise, treat as AWS console URL
logger.info("Parsing as AWS console URL")
# Split base URL and query string
if "?" not in s3_uri:
raise ValueError("No query string found")
base, query = s3_uri.split("?", 1)
# Extract bucket from base URL
if "/s3/object/" not in base:
raise ValueError("No '/s3/object/' found in URL path")
path_parts = base.split("/s3/object/")
bucket = path_parts[1]
logger.info(f"Extracted bucket: {bucket}")
# Extract prefix from query parameters
params = dict(item.split("=") for item in query.split("&") if "=" in item)
key = unquote(params.get("prefix", ""))
logger.info(f"Extracted key: {key}")
return bucket, key
except Exception as e:
logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}")
raise ValueError(f"Could not parse S3 URI") from e
def upload_batch_to_s3(
batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> str:
"""
Upload batch DataFrame to S3 as CSV.
Args:
batch_df: The DataFrame containing batch data
task_id: The parent task ID (used for file path)
sub_task_id: The subtask ID (used for file path)
bucket_name: The S3 bucket name (defaults to env variable)
Returns:
S3 URI (s3://bucket/key) of the uploaded file
"""
if bucket_name is None:
bucket_name = os.getenv("S3_BUCKET_NAME")
if not bucket_name:
logger.error(
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
)
raise ValueError("S3_BUCKET_NAME not configured")
try:
file_name = f"{datetime.now().isoformat()}_{str(uuid4())[:8]}"
file_key = (
f"ara_postcode_splitter_batches/{task_id}/{sub_task_id}/{file_name}.csv"
)
success = save_csv_to_s3(batch_df, bucket_name, file_key)
if success:
s3_uri = f"s3://{bucket_name}/{file_key}"
logger.info(f"Successfully uploaded batch to {s3_uri}")
return s3_uri
else:
logger.error(f"Failed to upload batch to S3")
raise ValueError("Failed to save CSV to S3")
except Exception as e:
logger.error(f"Error uploading batch to S3: {str(e)}")
raise
def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> str:
"""
Send a batch to the address2UPRN SQS queue with S3 reference.
Args:
task_id: The parent task ID
sub_task_id: The new subtask ID for this batch
s3_uri: S3 URI pointing to the batch CSV file
Returns:
Message ID from SQS
"""
sqs_client = boto3.client("sqs")
queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL")
if not queue_url:
raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set")
message_body = {
"task_id": task_id,
"sub_task_id": sub_task_id,
"s3_uri": s3_uri,
}
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body),
)
logger.info(
f"Sent message to address2UPRN queue. "
f"Task: {task_id}, SubTask: {sub_task_id}, MessageId: {response['MessageId']}"
)
return response["MessageId"]
def create_batch_and_send_to_address2uprn(
batch_rows: list,
task_id: str,
subtask_interface: SubTaskInterface,
bucket_name: str,
) -> str:
"""
Create a batch DataFrame, upload to S3, create subtask, and send to address2UPRN queue.
Args:
batch_rows: List of row dictionaries for this batch
task_id: The parent task ID
subtask_interface: SubTaskInterface instance
bucket_name: S3 bucket name
Returns:
The created batch subtask ID
"""
# Generate unique batch subtask ID
batch_sub_task_id = str(uuid4())
# Upload batch to S3
batch_df = pd.DataFrame(batch_rows)
s3_uri = upload_batch_to_s3(batch_df, str(task_id), batch_sub_task_id, bucket_name)
# Create a new subtask for this batch with all inputs
created_batch_sub_task_id = subtask_interface.create_subtask(
task_id=task_id,
inputs={
"task_id": str(task_id),
"sub_task_id": batch_sub_task_id,
"batch_size": len(batch_rows),
"s3_uri": s3_uri,
},
)
logger.info(f"Created batch subtask {created_batch_sub_task_id}")
# Send message with S3 reference
send_to_address2uprn_queue(
task_id=str(task_id),
sub_task_id=batch_sub_task_id,
s3_uri=s3_uri,
)
return created_batch_sub_task_id
def handler(event, context):
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
# Example SQS message for testing (copy and paste into SQS):
# {
# "task_id":"e31f2f21-175b-4a91-a3ec-a6baa325e917",
# "s3_uri":"s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv"
# }
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
bucket_name = os.getenv("S3_BUCKET_NAME")
for record in records:
task_id = None
subtask_id = None
try:
# Parse body (inputs)
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
# Validate required fields
task_id = body.get("task_id")
s3_uri = body.get("s3_uri")
if not task_id:
errors.append({"error": "Missing required field: task_id"})
continue
if not s3_uri:
errors.append({"error": "Missing required field: s3_uri"})
continue
# Convert task_id to UUID
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Create a new subtask for this postcode splitter invocation
subtask_id = subtask_interface.create_subtask(
task_id=task_id, inputs={"s3_uri": s3_uri}
)
logger.info(f"Created subtask {subtask_id} for task {task_id}")
# Mark subtask as in progress
subtask_interface.update_subtask_status(subtask_id, "in progress")
logger.info(f"Marked subtask {subtask_id} as in progress")
# Read CSV from S3
logger.info(f"Processing S3 URI: {s3_uri}")
bucket, key = parse_s3_uri(s3_uri)
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
df = df.head(1983)
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
# Sanitise postcodes
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Calculate total rows to send
total_rows = sum(len(rows) for rows in postcode_to_addresses.values())
logger.info(f"Total rows to send: {total_rows}")
batch_size = 500
# If all rows fit in one batch, just send them all at once
if total_rows <= batch_size:
all_rows = []
for postcode, rows in postcode_to_addresses.items():
all_rows.extend(rows)
try:
create_batch_and_send_to_address2uprn(
batch_rows=all_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent all {len(all_rows)} rows in single batch to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send all rows to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
else:
# Multi-batch processing for large datasets
batch_rows = []
total_sent = 0
for postcode, rows in postcode_to_addresses.items():
logger.info(f"Processing postcode {postcode} with {len(rows)} rows")
# If postcode itself is larger than batch_size, send it individually
if len(rows) > batch_size:
# First, send the current batch if it has data
if batch_rows:
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Send the large postcode on its own
try:
create_batch_and_send_to_address2uprn(
batch_rows=rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send large postcode to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
continue
# If adding this postcode's rows would exceed batch_size, send current batch
current_batch_size = len(batch_rows) + len(rows)
if batch_rows and current_batch_size > batch_size:
logger.info(
f"Batch threshold reached: current {len(batch_rows)} + next postcode {len(rows)} = {current_batch_size} > {batch_size}"
)
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
)
total_sent += len(batch_rows)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Add current postcode's rows to batch
batch_rows.extend(rows)
# Send remaining batch
if batch_rows:
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
total_sent += len(batch_rows)
logger.info(
f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send final batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {e}")
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
except Exception as e:
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
# Return error if all records failed
if errors and not results:
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}