diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index da15a48a..d5fe3b1b 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -1,12 +1,34 @@ +import json import pandas as pd import requests +from uuid import UUID +from urllib.parse import unquote from backend.address2UPRN.main import ( resolve_uprns_for_postcode_group, get_epc_data_with_postcode, ) +from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict from tqdm import tqdm +def parse_s3_console_url(s3_uri: str) -> tuple[str, str]: + """ + Parse AWS console S3 URL to extract bucket and key. + + Format: https://account-id-hash.region.console.aws.amazon.com/s3/object/bucket?region=...&prefix=path + """ + if "console.aws.amazon.com" in s3_uri and "?prefix=" in s3_uri: + base, query = s3_uri.split("?", 1) + path_parts = base.split("/s3/object/") + if len(path_parts) > 1: + bucket = path_parts[1] + params = dict(item.split("=") for item in query.split("&") if "=" in item) + key = unquote(params.get("prefix", "")) + return bucket, key + raise ValueError(f"Could not parse S3 URI: {s3_uri}") + + def sanitise_postcode(postcode: str) -> str | None: """ Normalise postcode for grouping. @@ -120,17 +142,126 @@ def main(): def handler(event, context): print(f"Function: {context.function_name}") - print(f"Function Version: {context.function_version}") - print(f"Log Group: {context.log_group_name}") - print(f"Log Stream: {context.log_stream_name}") print(f"Request ID: {context.aws_request_id}") - print(f"Memory Limit: {context.memory_limit_in_mb} MB") - print(f"Remaining Time: {context.get_remaining_time_in_millis()} ms") - print(f"Event: {event}") - print(f"Event: {event}") - print("Postcode splitter handler invoked") - return {"statusCode": 200, "body": "postcode splitter executed"} + # Example SQS message for testing (copy and paste into SQS): + # { + # "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + # "s3_uri": "https://337213553626-7ovirzjr.eu-west-2.console.aws.amazon.com/s3/object/retrofit-data-dev?region=eu-west-2&prefix=ara_raw_inputs/peabody/2025_11_11+-+Peabody+-+Data+Extracts+for+Domna_transformed.csv" + # } + + # Handle both single event and batch events (SQS, etc.) + records = event.get("Records", [event]) + results = [] + errors = [] + subtask_interface = SubTaskInterface() + + for record in records: + task_id = None + subtask_id = None + try: + # Parse body + if isinstance(record.get("body"), str): + body = json.loads(record["body"]) + else: + body = record.get("body", {}) + + # Validate required fields + task_id = body.get("task_id") + s3_uri = body.get("s3_uri") + + if not task_id: + errors.append({"error": "Missing required field: task_id"}) + continue + + if not s3_uri: + errors.append({"error": "Missing required field: s3_uri"}) + continue + + # Convert task_id to UUID + try: + task_id = UUID(task_id) if isinstance(task_id, str) else task_id + except ValueError as e: + errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) + continue + + # Create a new subtask for this postcode splitter invocation + subtask_id = subtask_interface.create_subtask( + task_id=task_id, inputs={"s3_uri": s3_uri} + ) + print(f"Created subtask {subtask_id} for task {task_id}") + + # Process normal flow + print(f"Processing task_id: {task_id}") + print(f"Processing s3_uri: {s3_uri}") + + # Read CSV from S3 + print("Reading CSV from S3...") + bucket, key = parse_s3_console_url(s3_uri) + print(f"Parsed S3 - Bucket: {bucket}, Key: {key}") + csv_data = read_csv_from_s3_dict(bucket, key) + df = pd.DataFrame(csv_data) + print(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") + + # Get head for demo + df_head = df.head() + print("DataFrame head:") + print(df_head) + df_head_dict = df_head.to_dict("records") + + results.append( + { + "message": "Postcode splitter processing started", + "task_id": str(task_id), + "s3_uri": s3_uri, + "subtask_id": str(subtask_id), + } + ) + + # Mark subtask as complete after successful processing + subtask_interface.update_subtask_status( + subtask_id, + "complete", + outputs={ + "status": "processing_complete", + "s3_uri": s3_uri, + "rows_processed": len(df), + }, + ) + print(f"Subtask {subtask_id} marked as complete") + + except json.JSONDecodeError as e: + errors.append({"error": "Invalid JSON in request body", "details": str(e)}) + # Mark subtask as failed if we have one + if subtask_id: + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(e)} + ) + except Exception as db_error: + print(f"Failed to update subtask status: {db_error}") + except Exception as e: + print(f"Unexpected error processing record: {e}") + errors.append({"error": "Unexpected error", "details": str(e)}) + # Mark subtask as failed if we have one + if subtask_id: + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(e)} + ) + except Exception as db_error: + print(f"Failed to update subtask status: {db_error}") + + # Return error if all records failed + if errors and not results: + return {"statusCode": 500, "body": json.dumps({"errors": errors})} + + return { + "statusCode": 200, + "body": json.dumps( + {"processed": results, "errors": errors if errors else None} + ), + } if __name__ == "__main__":