more logs for s3

This commit is contained in:
Jun-te Kim 2026-02-10 07:53:54 +00:00
parent 94524379e4
commit 8121e6d5b6

View file

@ -1,50 +1,20 @@
import os
import sys
import json
import pandas as pd
import requests
from uuid import UUID
from urllib.parse import unquote
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict
from utils.logger import setup_logger
from tqdm import tqdm
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.address2UPRN.main import (
resolve_uprns_for_postcode_group,
get_epc_data_with_postcode,
)
print("=" * 60)
print("ENVIRONMENT AT STARTUP:")
print("=" * 60)
for k, v in sorted(os.environ.items()):
print(f"{k}={v}")
print("=" * 60)
try:
import json
print("✓ json imported")
import pandas as pd
print("✓ pandas imported")
import requests
print("✓ requests imported")
from uuid import UUID
print("✓ UUID imported")
from urllib.parse import unquote
print("✓ urllib.parse imported")
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict
print("✓ utils.s3 imported")
from tqdm import tqdm
print("✓ tqdm imported")
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
print("✓ SubTaskInterface imported")
from backend.address2UPRN.main import (
resolve_uprns_for_postcode_group,
get_epc_data_with_postcode,
)
print("✓ backend.address2UPRN imported")
except Exception as e:
print(f"✗ IMPORT ERROR: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise
logger = setup_logger()
def parse_s3_console_url(s3_uri: str) -> tuple[str, str]:
@ -53,15 +23,41 @@ def parse_s3_console_url(s3_uri: str) -> tuple[str, str]:
Format: https://account-id-hash.region.console.aws.amazon.com/s3/object/bucket?region=...&prefix=path
"""
if "console.aws.amazon.com" in s3_uri and "?prefix=" in s3_uri:
logger.info(f"Parsing S3 URI: {s3_uri}")
if "console.aws.amazon.com" not in s3_uri:
logger.error("URI does not contain 'console.aws.amazon.com'")
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
if "?prefix=" not in s3_uri:
logger.error("URI does not contain '?prefix='")
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
try:
base, query = s3_uri.split("?", 1)
logger.debug(f"Base: {base}")
logger.debug(f"Query: {query}")
path_parts = base.split("/s3/object/")
logger.debug(f"Path parts: {path_parts}")
if len(path_parts) > 1:
bucket = path_parts[1]
logger.info(f"Extracted bucket: {bucket}")
params = dict(item.split("=") for item in query.split("&") if "=" in item)
logger.debug(f"Query params: {params}")
key = unquote(params.get("prefix", ""))
logger.info(f"Extracted key: {key}")
return bucket, key
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
else:
logger.error(f"Could not find '/s3/object/' in URI")
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
except Exception as e:
logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}")
raise ValueError(f"Could not parse S3 URI: {s3_uri}") from e
def sanitise_postcode(postcode: str) -> str | None:
@ -176,13 +172,8 @@ def main():
def handler(event, context):
print("=" * 60)
print("HANDLER INVOKED")
print("=" * 60)
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
print(f"Event received: {type(event)}")
print(f"Event keys: {event.keys() if isinstance(event, dict) else 'N/A'}")
# Example SQS message for testing (copy and paste into SQS):
# {
@ -191,40 +182,24 @@ def handler(event, context):
# }
# Handle both single event and batch events (SQS, etc.)
print("Extracting records from event...")
records = event.get("Records", [event])
print(f"Found {len(records)} record(s) to process")
results = []
errors = []
print("Initializing SubTaskInterface...")
subtask_interface = SubTaskInterface()
print("✓ SubTaskInterface initialized")
for record in records:
print("Processing record...")
print(f"Record type: {type(record)}")
print(f"Record: {record}")
task_id = None
subtask_id = None
try:
# Parse body
print("Parsing body from record...")
print(f"record.get('body'): {record.get('body')}")
print(f"isinstance(record.get('body'), str): {isinstance(record.get('body'), str)}")
if isinstance(record.get("body"), str):
print("Body is string, parsing JSON...")
body = json.loads(record["body"])
else:
print("Body is not string, using directly...")
body = record.get("body", {})
print(f"Body parsed: {body}")
# Validate required fields
task_id = body.get("task_id")
s3_uri = body.get("s3_uri")
print(f"task_id: {task_id}, s3_uri: {s3_uri}")
if not task_id:
errors.append({"error": "Missing required field: task_id"})
@ -235,46 +210,32 @@ def handler(event, context):
continue
# Convert task_id to UUID
print("Converting task_id to UUID...")
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
print(f"UUID conversion successful: {task_id}")
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Create a new subtask for this postcode splitter invocation
print(f"Creating subtask for task {task_id}...")
subtask_id = subtask_interface.create_subtask(
task_id=task_id, inputs={"s3_uri": s3_uri}
)
print(f"Created subtask {subtask_id} for task {task_id}")
# Process normal flow
print(f"Processing task_id: {task_id}")
print(f"Processing s3_uri: {s3_uri}")
logger.info(f"Created subtask {subtask_id} for task {task_id}")
# Read CSV from S3
print("Parsing S3 URI...")
logger.info(f"Processing S3 URI: {s3_uri}")
bucket, key = parse_s3_console_url(s3_uri)
print(f"Bucket: {bucket}, Key: {key}")
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
print("Fetching CSV from S3...")
csv_data = read_csv_from_s3_dict(bucket, key)
print(f"CSV fetched: {len(csv_data)} rows")
print("Creating DataFrame...")
df = pd.DataFrame(csv_data)
print(f"DataFrame created: {len(df)} rows, {len(df.columns)} columns")
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
# Get head for demo
print("Getting DataFrame head...")
df_head = df.head()
print("DataFrame head:")
print(df_head)
df_head_dict = df_head.to_dict("records")
logger.info("DataFrame head:")
logger.info(f"\n{df_head}")
print("Appending result...")
results.append(
{
"message": "Postcode splitter processing started",
@ -283,10 +244,8 @@ def handler(event, context):
"subtask_id": str(subtask_id),
}
)
print("Result appended")
# Mark subtask as complete after successful processing
print("Updating subtask status to complete...")
subtask_interface.update_subtask_status(
subtask_id,
"complete",
@ -296,9 +255,10 @@ def handler(event, context):
"rows_processed": len(df),
},
)
print(f"Subtask {subtask_id} marked as complete")
logger.info(f"Subtask {subtask_id} marked as complete")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {e}")
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
@ -307,9 +267,9 @@ def handler(event, context):
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
print(f"Failed to update subtask status: {db_error}")
logger.error(f"Failed to update subtask status: {db_error}")
except Exception as e:
print(f"Unexpected error processing record: {e}")
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
@ -318,7 +278,7 @@ def handler(event, context):
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
print(f"Failed to update subtask status: {db_error}")
logger.error(f"Failed to update subtask status: {db_error}")
# Return error if all records failed
if errors and not results: