diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index f4aa0dc9..f843d28a 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -12,11 +12,16 @@ import requests from uuid import UUID import uuid from backend.app.db.functions.tasks.Tasks import SubTaskInterface -from utils.s3 import save_csv_to_s3 +from utils.s3 import ( + save_csv_to_s3, + read_csv_from_s3 as read_csv_from_s3_dict, + parse_s3_uri, +) from datetime import datetime logger = setup_logger() + EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", ) @@ -526,48 +531,6 @@ def save_results_to_s3( return False -def test(a, b): - assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" - - -def run_all_test(): - # Basic usage with different post codes styles - test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) - test(get_epc_data_with_postcode("B938sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) - - test(get_uprn("68", "b93 8sy"), "100070989938") - test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") - test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") - test(get_uprn("28 A", "se6 4tf"), "100023278633") - test(get_uprn("28A", "se6 4tf"), "100023278633") - test(get_uprn("6 Aitken Close", "E8 4SQ"), False) - - # unique case - test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) - test( - get_uprn("1 Semley Gate", "e9 5nh"), "10008238188" - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("48 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("42 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("46 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street") - get_uprn_candidates( - get_epc_data_with_postcode("Cr2 7dl"), - "FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY", - ) - - def handler(event, context, local=False): print("=== Address2UPRN Lambda Handler ===") print(f"Function: {context.function_name}") @@ -581,35 +544,8 @@ def handler(event, context, local=False): "body": json.dumps( { "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", - "rows": [ - { - "landlord_property_id": "00000002POR", - "UPRN": "766019911", - "Address 1": "9 Redland Way", - "Address 2": "Aylesbury Vale", - "postcode": "HP21 9RJ", - "landlord_property_type": "House", - "postcode_clean": "HP219RJ", - }, - { - "landlord_property_id": "00000003MTR", - "UPRN": "100120781544", - "Address 1": "16 Lime Crescent", - "Address 2": "BICESTER", - "postcode": "OX26 3XJ", - "landlord_property_type": "House", - "postcode_clean": "OX263XJ", - }, - { - "landlord_property_id": "00000004HBY", - "UPRN": "14033542", - "Address 1": "14 Dunbar Drive", - "Address 2": "Woodley", - "postcode": "RG5 4HA", - "landlord_property_type": "House", - "postcode_clean": "RG54HA", - }, - ], + "sub_task_id": "a1b2c3d4-e5f6-7a8b-9c0d-e1f2a3b4c5d6", + "s3_uri": "", } ) } @@ -637,14 +573,19 @@ def handler(event, context, local=False): # Validate required fields task_id = body.get("task_id") - rows = body.get("rows", []) + sub_task_id = body.get("sub_task_id") + s3_uri = body.get("s3_uri") if not task_id: errors.append({"error": "Missing required field: task_id"}) continue - if not rows: - errors.append({"error": "Missing or empty rows data"}) + if not sub_task_id: + errors.append({"error": "Missing required field: sub_task_id"}) + continue + + if not s3_uri: + errors.append({"error": "Missing required field: s3_uri"}) continue # Convert task_id to UUID @@ -654,29 +595,56 @@ def handler(event, context, local=False): errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) continue - # Create a subtask for this batch - subtask_id = subtask_interface.create_subtask( - task_id=task_id, inputs={"row_count": len(rows)} - ) - logger.info( - f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows" - ) + # Convert sub_task_id to UUID + try: + subtask_id = ( + UUID(sub_task_id) if isinstance(sub_task_id, str) else sub_task_id + ) + except ValueError as e: + errors.append( + {"error": f"Invalid UUID format for sub_task_id: {str(e)}"} + ) + continue + + # Update existing subtask to 'in progress' + subtask_interface.update_subtask_status(subtask_id, "in progress") + logger.info(f"Processing subtask {subtask_id} for task {task_id}") + + # Parse S3 URI and read CSV from S3 + logger.info(f"Reading data from S3: {s3_uri}") + try: + bucket, key = parse_s3_uri(s3_uri) + csv_data = read_csv_from_s3_dict(bucket, key) + df = pd.DataFrame(csv_data) + logger.info(f"Loaded {len(df)} rows from S3") + except Exception as s3_error: + logger.error(f"Failed to read data from S3: {s3_error}") + errors.append( + {"error": "Failed to read data from S3", "details": str(s3_error)} + ) + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(s3_error)} + ) + except Exception as db_error: + logger.error(f"Failed to update subtask status: {db_error}") + continue # Process the rows - logger.info(f"Processing {len(rows)} rows for task {task_id}") + logger.info(f"Processing {len(df)} rows for task {task_id}") - # Convert rows to DataFrame - df = pd.DataFrame(rows) - - # Create user_input column by concatenating Address 1 and Address 2 - df["user_input"] = ( - df["Address 1"].fillna("") - + " " - + df["Address 2"].fillna("") - + " " - + df["Address 3"].fillna("") - ).str.strip() - logger.info(f"Created user_input column from Address 1 and Address 2") + # Create user_input column by concatenating Address columns if not already present + if "user_input" not in df.columns: + df["user_input"] = ( + df["Address 1"].fillna("") + + " " + + df["Address 2"].fillna("") + + " " + + df["Address 3"].fillna("") + ).str.strip() + logger.info(f"Created user_input column from Address 1 and Address 2") + else: + logger.info(f"user_input column already present in data") clean_df = df.dropna(subset=["postcode_clean"]) @@ -791,7 +759,6 @@ def handler(event, context, local=False): results.append( { "subtask_id": str(subtask_id), - "rows_processed": len(rows), "postcodes_processed": postcodes_processed, "addresses_processed": addresses_processed, "uprns_found": uprns_found, @@ -802,7 +769,9 @@ def handler(event, context, local=False): # Mark subtask as completed try: subtask_interface.update_subtask_status( - subtask_id, "completed", outputs={"rows_processed": len(rows)} + subtask_id, + "completed", + outputs={"rows_processed": "todo -> show sensible output"}, ) logger.info(f"Marked subtask {subtask_id} as completed") except Exception as db_error: diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 3d0f0d8d..930fac7f 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -5,8 +5,7 @@ import pandas as pd import requests import boto3 from uuid import UUID, uuid4 -from urllib.parse import unquote -from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict, save_csv_to_s3 +from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict, save_csv_to_s3, parse_s3_uri from utils.logger import setup_logger from tqdm import tqdm from backend.app.db.functions.tasks.Tasks import SubTaskInterface @@ -15,54 +14,6 @@ from datetime import datetime logger = setup_logger() -def parse_s3_uri(s3_uri: str) -> tuple[str, str]: - """ - Parse S3 URI to extract bucket and key. - - Supports two formats: - 1. S3 URI format: s3://bucket/key - """ - logger.info("Parsing S3 URI") - - try: - # Check if it's an S3 URI format - if s3_uri.startswith("s3://"): - parts = s3_uri[5:].split("/", 1) - if len(parts) < 2: - raise ValueError("S3 URI must include both bucket and key") - bucket = parts[0] - key = parts[1] - logger.info(f"Extracted bucket: {bucket}, key: {key}") - return bucket, key - - # Otherwise, treat as AWS console URL - logger.info("Parsing as AWS console URL") - - # Split base URL and query string - if "?" not in s3_uri: - raise ValueError("No query string found") - - base, query = s3_uri.split("?", 1) - - # Extract bucket from base URL - if "/s3/object/" not in base: - raise ValueError("No '/s3/object/' found in URL path") - - path_parts = base.split("/s3/object/") - bucket = path_parts[1] - logger.info(f"Extracted bucket: {bucket}") - - # Extract prefix from query parameters - params = dict(item.split("=") for item in query.split("&") if "=" in item) - key = unquote(params.get("prefix", "")) - logger.info(f"Extracted key: {key}") - - return bucket, key - except Exception as e: - logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}") - raise ValueError(f"Could not parse S3 URI") from e - - def upload_batch_to_s3( batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None ) -> str: diff --git a/utils/s3.py b/utils/s3.py index 0e79c26b..0ba036f7 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -3,11 +3,62 @@ import boto3 import csv import pandas as pd from io import BytesIO, StringIO +from urllib.parse import unquote from utils.logger import setup_logger from botocore.exceptions import NoCredentialsError, PartialCredentialsError logger = setup_logger() + +def parse_s3_uri(s3_uri: str) -> tuple[str, str]: + """ + Parse S3 URI to extract bucket and key. + + Supports two formats: + 1. S3 URI format: s3://bucket/key + 2. AWS console URL format with query parameters + """ + logger.info("Parsing S3 URI") + + try: + # Check if it's an S3 URI format + if s3_uri.startswith("s3://"): + parts = s3_uri[5:].split("/", 1) + if len(parts) < 2: + raise ValueError("S3 URI must include both bucket and key") + bucket = parts[0] + key = parts[1] + logger.info(f"Extracted bucket: {bucket}, key: {key}") + return bucket, key + + # Otherwise, treat as AWS console URL + logger.info("Parsing as AWS console URL") + + # Split base URL and query string + if "?" not in s3_uri: + raise ValueError("No query string found") + + base, query = s3_uri.split("?", 1) + + # Extract bucket from base URL + if "/s3/object/" not in base: + raise ValueError("No '/s3/object/' found in URL path") + + path_parts = base.split("/s3/object/") + bucket = path_parts[1] + logger.info(f"Extracted bucket: {bucket}") + + # Extract prefix from query parameters + params = dict(item.split("=") for item in query.split("&") if "=" in item) + key = unquote(params.get("prefix", "")) + logger.info(f"Extracted key: {key}") + + return bucket, key + except Exception as e: + logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}") + raise ValueError(f"Could not parse S3 URI") from e + + def read_from_s3(bucket_name, s3_file_name): """ Read an object from s3. Decoding of the data is left for outside of this function