From 8121e6d5b67d87b8e60b5f28a6a03edae2d7e465 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 10 Feb 2026 07:53:54 +0000 Subject: [PATCH] more logs for s3 --- backend/postcode_splitter/main.py | 146 +++++++++++------------------- 1 file changed, 53 insertions(+), 93 deletions(-) diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 8210bf78..1d0e56a0 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -1,50 +1,20 @@ import os import sys +import json +import pandas as pd +import requests +from uuid import UUID +from urllib.parse import unquote +from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict +from utils.logger import setup_logger +from tqdm import tqdm +from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from backend.address2UPRN.main import ( + resolve_uprns_for_postcode_group, + get_epc_data_with_postcode, +) -print("=" * 60) -print("ENVIRONMENT AT STARTUP:") -print("=" * 60) -for k, v in sorted(os.environ.items()): - print(f"{k}={v}") -print("=" * 60) - -try: - import json - - print("✓ json imported") - import pandas as pd - - print("✓ pandas imported") - import requests - - print("✓ requests imported") - from uuid import UUID - - print("✓ UUID imported") - from urllib.parse import unquote - - print("✓ urllib.parse imported") - from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict - - print("✓ utils.s3 imported") - from tqdm import tqdm - - print("✓ tqdm imported") - from backend.app.db.functions.tasks.Tasks import SubTaskInterface - - print("✓ SubTaskInterface imported") - from backend.address2UPRN.main import ( - resolve_uprns_for_postcode_group, - get_epc_data_with_postcode, - ) - - print("✓ backend.address2UPRN imported") -except Exception as e: - print(f"✗ IMPORT ERROR: {type(e).__name__}: {e}") - import traceback - - traceback.print_exc() - raise +logger = setup_logger() def parse_s3_console_url(s3_uri: str) -> tuple[str, str]: @@ -53,15 +23,41 @@ def parse_s3_console_url(s3_uri: str) -> tuple[str, str]: Format: https://account-id-hash.region.console.aws.amazon.com/s3/object/bucket?region=...&prefix=path """ - if "console.aws.amazon.com" in s3_uri and "?prefix=" in s3_uri: + logger.info(f"Parsing S3 URI: {s3_uri}") + + if "console.aws.amazon.com" not in s3_uri: + logger.error("URI does not contain 'console.aws.amazon.com'") + raise ValueError(f"Could not parse S3 URI: {s3_uri}") + + if "?prefix=" not in s3_uri: + logger.error("URI does not contain '?prefix='") + raise ValueError(f"Could not parse S3 URI: {s3_uri}") + + try: base, query = s3_uri.split("?", 1) + logger.debug(f"Base: {base}") + logger.debug(f"Query: {query}") + path_parts = base.split("/s3/object/") + logger.debug(f"Path parts: {path_parts}") + if len(path_parts) > 1: bucket = path_parts[1] + logger.info(f"Extracted bucket: {bucket}") + params = dict(item.split("=") for item in query.split("&") if "=" in item) + logger.debug(f"Query params: {params}") + key = unquote(params.get("prefix", "")) + logger.info(f"Extracted key: {key}") + return bucket, key - raise ValueError(f"Could not parse S3 URI: {s3_uri}") + else: + logger.error(f"Could not find '/s3/object/' in URI") + raise ValueError(f"Could not parse S3 URI: {s3_uri}") + except Exception as e: + logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}") + raise ValueError(f"Could not parse S3 URI: {s3_uri}") from e def sanitise_postcode(postcode: str) -> str | None: @@ -176,13 +172,8 @@ def main(): def handler(event, context): - print("=" * 60) - print("HANDLER INVOKED") - print("=" * 60) print(f"Function: {context.function_name}") print(f"Request ID: {context.aws_request_id}") - print(f"Event received: {type(event)}") - print(f"Event keys: {event.keys() if isinstance(event, dict) else 'N/A'}") # Example SQS message for testing (copy and paste into SQS): # { @@ -191,40 +182,24 @@ def handler(event, context): # } # Handle both single event and batch events (SQS, etc.) - print("Extracting records from event...") records = event.get("Records", [event]) - print(f"Found {len(records)} record(s) to process") results = [] errors = [] - - print("Initializing SubTaskInterface...") subtask_interface = SubTaskInterface() - print("✓ SubTaskInterface initialized") for record in records: - print("Processing record...") - print(f"Record type: {type(record)}") - print(f"Record: {record}") task_id = None subtask_id = None try: # Parse body - print("Parsing body from record...") - print(f"record.get('body'): {record.get('body')}") - print(f"isinstance(record.get('body'), str): {isinstance(record.get('body'), str)}") - if isinstance(record.get("body"), str): - print("Body is string, parsing JSON...") body = json.loads(record["body"]) else: - print("Body is not string, using directly...") body = record.get("body", {}) - print(f"Body parsed: {body}") # Validate required fields task_id = body.get("task_id") s3_uri = body.get("s3_uri") - print(f"task_id: {task_id}, s3_uri: {s3_uri}") if not task_id: errors.append({"error": "Missing required field: task_id"}) @@ -235,46 +210,32 @@ def handler(event, context): continue # Convert task_id to UUID - print("Converting task_id to UUID...") try: task_id = UUID(task_id) if isinstance(task_id, str) else task_id - print(f"UUID conversion successful: {task_id}") except ValueError as e: errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) continue # Create a new subtask for this postcode splitter invocation - print(f"Creating subtask for task {task_id}...") subtask_id = subtask_interface.create_subtask( task_id=task_id, inputs={"s3_uri": s3_uri} ) - print(f"Created subtask {subtask_id} for task {task_id}") - - # Process normal flow - print(f"Processing task_id: {task_id}") - print(f"Processing s3_uri: {s3_uri}") + logger.info(f"Created subtask {subtask_id} for task {task_id}") # Read CSV from S3 - print("Parsing S3 URI...") + logger.info(f"Processing S3 URI: {s3_uri}") bucket, key = parse_s3_console_url(s3_uri) - print(f"Bucket: {bucket}, Key: {key}") + logger.info(f"S3 Bucket: {bucket}, Key: {key}") - print("Fetching CSV from S3...") csv_data = read_csv_from_s3_dict(bucket, key) - print(f"CSV fetched: {len(csv_data)} rows") - - print("Creating DataFrame...") df = pd.DataFrame(csv_data) - print(f"DataFrame created: {len(df)} rows, {len(df.columns)} columns") + logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") # Get head for demo - print("Getting DataFrame head...") df_head = df.head() - print("DataFrame head:") - print(df_head) - df_head_dict = df_head.to_dict("records") + logger.info("DataFrame head:") + logger.info(f"\n{df_head}") - print("Appending result...") results.append( { "message": "Postcode splitter processing started", @@ -283,10 +244,8 @@ def handler(event, context): "subtask_id": str(subtask_id), } ) - print("Result appended") # Mark subtask as complete after successful processing - print("Updating subtask status to complete...") subtask_interface.update_subtask_status( subtask_id, "complete", @@ -296,9 +255,10 @@ def handler(event, context): "rows_processed": len(df), }, ) - print(f"Subtask {subtask_id} marked as complete") + logger.info(f"Subtask {subtask_id} marked as complete") except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in request body: {e}") errors.append({"error": "Invalid JSON in request body", "details": str(e)}) # Mark subtask as failed if we have one if subtask_id: @@ -307,9 +267,9 @@ def handler(event, context): subtask_id, "failed", outputs={"error": str(e)} ) except Exception as db_error: - print(f"Failed to update subtask status: {db_error}") + logger.error(f"Failed to update subtask status: {db_error}") except Exception as e: - print(f"Unexpected error processing record: {e}") + logger.error(f"Unexpected error processing record: {e}", exc_info=True) errors.append({"error": "Unexpected error", "details": str(e)}) # Mark subtask as failed if we have one if subtask_id: @@ -318,7 +278,7 @@ def handler(event, context): subtask_id, "failed", outputs={"error": str(e)} ) except Exception as db_error: - print(f"Failed to update subtask status: {db_error}") + logger.error(f"Failed to update subtask status: {db_error}") # Return error if all records failed if errors and not results: