Model/backend/postcode_splitter/main.py

407 lines
15 KiB
Python

import os
import sys
import json
import pandas as pd
import requests
import boto3
from uuid import UUID, uuid4
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict, save_csv_to_s3, parse_s3_uri
from utils.logger import setup_logger
from tqdm import tqdm
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from datetime import datetime
logger = setup_logger()
def upload_batch_to_s3(
batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> str:
"""
Upload batch DataFrame to S3 as CSV.
Args:
batch_df: The DataFrame containing batch data
task_id: The parent task ID (used for file path)
sub_task_id: The subtask ID (used for file path)
bucket_name: The S3 bucket name (defaults to env variable)
Returns:
S3 URI (s3://bucket/key) of the uploaded file
"""
if bucket_name is None:
bucket_name = os.getenv("S3_BUCKET_NAME")
if not bucket_name:
logger.error(
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
)
raise ValueError("S3_BUCKET_NAME not configured")
try:
file_name = f"{datetime.now().isoformat()}_{str(uuid4())[:8]}"
file_key = (
f"ara_postcode_splitter_batches/{task_id}/{sub_task_id}/{file_name}.csv"
)
success = save_csv_to_s3(batch_df, bucket_name, file_key)
if success:
s3_uri = f"s3://{bucket_name}/{file_key}"
logger.info(f"Successfully uploaded batch to {s3_uri}")
return s3_uri
else:
logger.error(f"Failed to upload batch to S3")
raise ValueError("Failed to save CSV to S3")
except Exception as e:
logger.error(f"Error uploading batch to S3: {str(e)}")
raise
def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> str:
"""
Send a batch to the address2UPRN SQS queue with S3 reference.
Args:
task_id: The parent task ID
sub_task_id: The new subtask ID for this batch
s3_uri: S3 URI pointing to the batch CSV file
Returns:
Message ID from SQS
"""
sqs_client = boto3.client("sqs")
queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL")
if not queue_url:
raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set")
message_body = {
"task_id": task_id,
"sub_task_id": sub_task_id,
"s3_uri": s3_uri,
}
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body),
)
logger.info(
f"Sent message to address2UPRN queue. "
f"Task: {task_id}, SubTask: {sub_task_id}, MessageId: {response['MessageId']}"
)
return response["MessageId"]
def create_batch_and_send_to_address2uprn(
batch_rows: list,
task_id: str,
subtask_interface: SubTaskInterface,
bucket_name: str,
) -> str:
"""
Create a batch DataFrame, upload to S3, create subtask, and send to address2UPRN queue.
Args:
batch_rows: List of row dictionaries for this batch
task_id: The parent task ID
subtask_interface: SubTaskInterface instance
bucket_name: S3 bucket name
Returns:
The created batch subtask ID
"""
# Generate unique batch subtask ID
batch_sub_task_id = str(uuid4())
# Upload batch to S3
batch_df = pd.DataFrame(batch_rows)
s3_uri = upload_batch_to_s3(batch_df, str(task_id), batch_sub_task_id, bucket_name)
# Create a new subtask for this batch with all inputs
created_batch_sub_task_id = subtask_interface.create_subtask(
task_id=task_id,
inputs={
"task_id": str(task_id),
"sub_task_id": batch_sub_task_id,
"batch_size": len(batch_rows),
"s3_uri": s3_uri,
},
)
logger.info(f"Created batch subtask {created_batch_sub_task_id}")
# Send message with S3 reference
send_to_address2uprn_queue(
task_id=str(task_id),
sub_task_id=batch_sub_task_id,
s3_uri=s3_uri,
)
return created_batch_sub_task_id
def handler(event, context):
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
# Example SQS message for testing (copy and paste into SQS):
# {
# "task_id":"e31f2f21-175b-4a91-a3ec-a6baa325e917",
# "s3_uri":"s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv"
# }
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
bucket_name = os.getenv("S3_BUCKET_NAME")
for record in records:
task_id = None
subtask_id = None
try:
# Parse body (inputs)
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
# Validate required fields
task_id = body.get("task_id")
s3_uri = body.get("s3_uri")
if not task_id:
errors.append({"error": "Missing required field: task_id"})
continue
if not s3_uri:
errors.append({"error": "Missing required field: s3_uri"})
continue
# Convert task_id to UUID
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Create a new subtask for this postcode splitter invocation
subtask_id = subtask_interface.create_subtask(
task_id=task_id, inputs={"s3_uri": s3_uri}
)
logger.info(f"Created subtask {subtask_id} for task {task_id}")
# Mark subtask as in progress
subtask_interface.update_subtask_status(subtask_id, "in progress")
logger.info(f"Marked subtask {subtask_id} as in progress")
# Read CSV from S3
logger.info(f"Processing S3 URI: {s3_uri}")
bucket, key = parse_s3_uri(s3_uri)
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
df = df.head(1983)
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
# Sanitise postcodes
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Calculate total rows to send
total_rows = sum(len(rows) for rows in postcode_to_addresses.values())
logger.info(f"Total rows to send: {total_rows}")
batch_size = 500
# If all rows fit in one batch, just send them all at once
if total_rows <= batch_size:
all_rows = []
for postcode, rows in postcode_to_addresses.items():
all_rows.extend(rows)
try:
create_batch_and_send_to_address2uprn(
batch_rows=all_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent all {len(all_rows)} rows in single batch to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send all rows to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
else:
# Multi-batch processing for large datasets
batch_rows = []
total_sent = 0
for postcode, rows in postcode_to_addresses.items():
logger.info(f"Processing postcode {postcode} with {len(rows)} rows")
# If postcode itself is larger than batch_size, send it individually
if len(rows) > batch_size:
# First, send the current batch if it has data
if batch_rows:
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Send the large postcode on its own
try:
create_batch_and_send_to_address2uprn(
batch_rows=rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send large postcode to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
continue
# If adding this postcode's rows would exceed batch_size, send current batch
current_batch_size = len(batch_rows) + len(rows)
if batch_rows and current_batch_size > batch_size:
logger.info(
f"Batch threshold reached: current {len(batch_rows)} + next postcode {len(rows)} = {current_batch_size} > {batch_size}"
)
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
)
total_sent += len(batch_rows)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Add current postcode's rows to batch
batch_rows.extend(rows)
# Send remaining batch
if batch_rows:
try:
create_batch_and_send_to_address2uprn(
batch_rows=batch_rows,
task_id=task_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
total_sent += len(batch_rows)
logger.info(
f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send final batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {e}")
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
except Exception as e:
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
# Return error if all records failed
if errors and not results:
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}