mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
test post code splitter with csv file
This commit is contained in:
parent
79eb81fd94
commit
53ec9c261c
1 changed files with 140 additions and 9 deletions
|
|
@ -1,12 +1,34 @@
|
|||
import json
|
||||
import pandas as pd
|
||||
import requests
|
||||
from uuid import UUID
|
||||
from urllib.parse import unquote
|
||||
from backend.address2UPRN.main import (
|
||||
resolve_uprns_for_postcode_group,
|
||||
get_epc_data_with_postcode,
|
||||
)
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
def parse_s3_console_url(s3_uri: str) -> tuple[str, str]:
|
||||
"""
|
||||
Parse AWS console S3 URL to extract bucket and key.
|
||||
|
||||
Format: https://account-id-hash.region.console.aws.amazon.com/s3/object/bucket?region=...&prefix=path
|
||||
"""
|
||||
if "console.aws.amazon.com" in s3_uri and "?prefix=" in s3_uri:
|
||||
base, query = s3_uri.split("?", 1)
|
||||
path_parts = base.split("/s3/object/")
|
||||
if len(path_parts) > 1:
|
||||
bucket = path_parts[1]
|
||||
params = dict(item.split("=") for item in query.split("&") if "=" in item)
|
||||
key = unquote(params.get("prefix", ""))
|
||||
return bucket, key
|
||||
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
|
||||
|
||||
|
||||
def sanitise_postcode(postcode: str) -> str | None:
|
||||
"""
|
||||
Normalise postcode for grouping.
|
||||
|
|
@ -120,17 +142,126 @@ def main():
|
|||
|
||||
def handler(event, context):
|
||||
print(f"Function: {context.function_name}")
|
||||
print(f"Function Version: {context.function_version}")
|
||||
print(f"Log Group: {context.log_group_name}")
|
||||
print(f"Log Stream: {context.log_stream_name}")
|
||||
print(f"Request ID: {context.aws_request_id}")
|
||||
print(f"Memory Limit: {context.memory_limit_in_mb} MB")
|
||||
print(f"Remaining Time: {context.get_remaining_time_in_millis()} ms")
|
||||
print(f"Event: {event}")
|
||||
print(f"Event: {event}")
|
||||
|
||||
print("Postcode splitter handler invoked")
|
||||
return {"statusCode": 200, "body": "postcode splitter executed"}
|
||||
# Example SQS message for testing (copy and paste into SQS):
|
||||
# {
|
||||
# "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
# "s3_uri": "https://337213553626-7ovirzjr.eu-west-2.console.aws.amazon.com/s3/object/retrofit-data-dev?region=eu-west-2&prefix=ara_raw_inputs/peabody/2025_11_11+-+Peabody+-+Data+Extracts+for+Domna_transformed.csv"
|
||||
# }
|
||||
|
||||
# Handle both single event and batch events (SQS, etc.)
|
||||
records = event.get("Records", [event])
|
||||
results = []
|
||||
errors = []
|
||||
subtask_interface = SubTaskInterface()
|
||||
|
||||
for record in records:
|
||||
task_id = None
|
||||
subtask_id = None
|
||||
try:
|
||||
# Parse body
|
||||
if isinstance(record.get("body"), str):
|
||||
body = json.loads(record["body"])
|
||||
else:
|
||||
body = record.get("body", {})
|
||||
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
|
||||
if not task_id:
|
||||
errors.append({"error": "Missing required field: task_id"})
|
||||
continue
|
||||
|
||||
if not s3_uri:
|
||||
errors.append({"error": "Missing required field: s3_uri"})
|
||||
continue
|
||||
|
||||
# Convert task_id to UUID
|
||||
try:
|
||||
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
||||
except ValueError as e:
|
||||
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
|
||||
continue
|
||||
|
||||
# Create a new subtask for this postcode splitter invocation
|
||||
subtask_id = subtask_interface.create_subtask(
|
||||
task_id=task_id, inputs={"s3_uri": s3_uri}
|
||||
)
|
||||
print(f"Created subtask {subtask_id} for task {task_id}")
|
||||
|
||||
# Process normal flow
|
||||
print(f"Processing task_id: {task_id}")
|
||||
print(f"Processing s3_uri: {s3_uri}")
|
||||
|
||||
# Read CSV from S3
|
||||
print("Reading CSV from S3...")
|
||||
bucket, key = parse_s3_console_url(s3_uri)
|
||||
print(f"Parsed S3 - Bucket: {bucket}, Key: {key}")
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
print(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
|
||||
|
||||
# Get head for demo
|
||||
df_head = df.head()
|
||||
print("DataFrame head:")
|
||||
print(df_head)
|
||||
df_head_dict = df_head.to_dict("records")
|
||||
|
||||
results.append(
|
||||
{
|
||||
"message": "Postcode splitter processing started",
|
||||
"task_id": str(task_id),
|
||||
"s3_uri": s3_uri,
|
||||
"subtask_id": str(subtask_id),
|
||||
}
|
||||
)
|
||||
|
||||
# Mark subtask as complete after successful processing
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id,
|
||||
"complete",
|
||||
outputs={
|
||||
"status": "processing_complete",
|
||||
"s3_uri": s3_uri,
|
||||
"rows_processed": len(df),
|
||||
},
|
||||
)
|
||||
print(f"Subtask {subtask_id} marked as complete")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
print(f"Failed to update subtask status: {db_error}")
|
||||
except Exception as e:
|
||||
print(f"Unexpected error processing record: {e}")
|
||||
errors.append({"error": "Unexpected error", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
print(f"Failed to update subtask status: {db_error}")
|
||||
|
||||
# Return error if all records failed
|
||||
if errors and not results:
|
||||
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(
|
||||
{"processed": results, "errors": errors if errors else None}
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue