From 8e574c24014ee15534de3847762e3800690f521f Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 13 Feb 2026 18:30:47 +0000 Subject: [PATCH] post code splitter works --- .github/workflows/deploy_terraform.yml | 2 +- backend/address2UPRN/main.py | 31 +-- backend/postcode_splitter/main.py | 361 +++++++++---------------- 3 files changed, 130 insertions(+), 264 deletions(-) diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 4dcbf129..2fd12fe6 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -77,7 +77,7 @@ jobs: run: terraform plan -var-file=${STAGE}.tfvars -out=tfplan - name: Terraform Apply - # if: env.STAGE == 'prod' + if: env.STAGE == 'prod' working-directory: infrastructure/terraform/shared run: terraform apply -auto-approve tfplan diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index f843d28a..7fc11570 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -544,8 +544,8 @@ def handler(event, context, local=False): "body": json.dumps( { "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", - "sub_task_id": "a1b2c3d4-e5f6-7a8b-9c0d-e1f2a3b4c5d6", - "s3_uri": "", + "sub_task_id": "1c09df07-fd29-4de7-b146-fafb591856a9", + "s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-13T15:54:58.568594_67557923.csv", } ) } @@ -573,14 +573,14 @@ def handler(event, context, local=False): # Validate required fields task_id = body.get("task_id") - sub_task_id = body.get("sub_task_id") + subtask_id = body.get("sub_task_id") s3_uri = body.get("s3_uri") if not task_id: errors.append({"error": "Missing required field: task_id"}) continue - if not sub_task_id: + if not subtask_id: errors.append({"error": "Missing required field: sub_task_id"}) continue @@ -598,7 +598,7 @@ def handler(event, context, local=False): # Convert sub_task_id to UUID try: subtask_id = ( - UUID(sub_task_id) if isinstance(sub_task_id, str) else sub_task_id + UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id ) except ValueError as e: errors.append( @@ -756,16 +756,6 @@ def handler(event, context, local=False): except Exception as s3_error: logger.error(f"Failed to save results to S3: {s3_error}") - results.append( - { - "subtask_id": str(subtask_id), - "postcodes_processed": postcodes_processed, - "addresses_processed": addresses_processed, - "uprns_found": uprns_found, - "status": "processed", - } - ) - # Mark subtask as completed try: subtask_interface.update_subtask_status( @@ -777,17 +767,6 @@ def handler(event, context, local=False): except Exception as db_error: logger.error(f"Failed to mark subtask as completed: {db_error}") - except json.JSONDecodeError as e: - logger.error(f"Invalid JSON in request body: {e}") - errors.append({"error": "Invalid JSON in request body", "details": str(e)}) - # Mark subtask as failed if we have one - if subtask_id: - try: - subtask_interface.update_subtask_status( - subtask_id, "failed", outputs={"error": str(e)} - ) - except Exception as db_error: - logger.error(f"Failed to update subtask status: {db_error}") except Exception as e: logger.error(f"Unexpected error processing record: {e}", exc_info=True) errors.append({"error": "Unexpected error", "details": str(e)}) diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 1049295b..6d8d1095 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -101,8 +101,9 @@ def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> s def create_batch_and_send_to_address2uprn( - batch_rows: list, + batch_df: pd.DataFrame, task_id: str, + sub_task_id: str, subtask_interface: SubTaskInterface, bucket_name: str, ) -> str: @@ -118,291 +119,177 @@ def create_batch_and_send_to_address2uprn( Returns: The created batch subtask ID """ - # Generate unique batch subtask ID - batch_sub_task_id = str(uuid4()) - # Upload batch to S3 - batch_df = pd.DataFrame(batch_rows) - s3_uri = upload_batch_to_s3(batch_df, str(task_id), batch_sub_task_id, bucket_name) + + s3_uri = upload_batch_to_s3(batch_df, str(task_id), str(sub_task_id), bucket_name) # Create a new subtask for this batch with all inputs created_batch_sub_task_id = subtask_interface.create_subtask( task_id=task_id, inputs={ "task_id": str(task_id), - "sub_task_id": batch_sub_task_id, - "batch_size": len(batch_rows), "s3_uri": s3_uri, }, ) + logger.info(f"Created batch subtask {created_batch_sub_task_id}") - # Send message with S3 reference - send_to_address2uprn_queue( - task_id=str(task_id), - sub_task_id=str(created_batch_sub_task_id), - s3_uri=s3_uri, - ) + # # Send message with S3 reference + # send_to_address2uprn_queue( + # task_id=str(task_id), + # sub_task_id=str(created_batch_sub_task_id), + # s3_uri=s3_uri, + # ) return created_batch_sub_task_id -def handler(event, context): +def handler(event, context, local=False): print(f"Function: {context.function_name}") print(f"Request ID: {context.aws_request_id}") # Example SQS message for testing (copy and paste into SQS): - # { - # "task_id":"e31f2f21-175b-4a91-a3ec-a6baa325e917", - # "s3_uri":"s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv" - # } - + if local is True: + event = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv", + } + ) + } + ] + } # Handle both single event and batch events (SQS, etc.) records = event.get("Records", [event]) results = [] errors = [] subtask_interface = SubTaskInterface() bucket_name = os.getenv("S3_BUCKET_NAME") + if local: + bucket_name = "retrofit-data-dev" for record in records: + if local: + record = records[0] task_id = None subtask_id = None - try: - # Parse body (inputs) - if isinstance(record.get("body"), str): - body = json.loads(record["body"]) - else: - body = record.get("body", {}) + # Parse body (inputs) - # Validate required fields - task_id = body.get("task_id") - s3_uri = body.get("s3_uri") + if isinstance(record.get("body"), str): + body = json.loads(record["body"]) + else: + body = record.get("body", {}) - if not task_id: - errors.append({"error": "Missing required field: task_id"}) - continue + # Validate required fields + task_id = body.get("task_id") + subtask_id = body.get("sub_task_id") + s3_uri = body.get("s3_uri") - if not s3_uri: - errors.append({"error": "Missing required field: s3_uri"}) - continue + # Convert task_id to UUID + task_id = UUID(task_id) if isinstance(task_id, str) else task_id + subtask_id = UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id - # Convert task_id to UUID - try: - task_id = UUID(task_id) if isinstance(task_id, str) else task_id - except ValueError as e: - errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) - continue + # Mark subtask as in progress + subtask_interface.update_subtask_status(subtask_id, "in progress") + logger.info(f"Marked subtask {subtask_id} as in progress") - # Create a new subtask for this postcode splitter invocation - subtask_id = subtask_interface.create_subtask( - task_id=task_id, inputs={"s3_uri": s3_uri} + # Read CSV from S3 + bucket, key = parse_s3_uri(s3_uri) + logger.info(f"S3 Bucket: {bucket}, Key: {key}") + + csv_data = read_csv_from_s3_dict(bucket, key) + df = pd.DataFrame(csv_data) + + # TODO: Change the input to the file you want + # df = df.head(1983) + df = df.head(502) + + logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") + + # Sanitise postcodes + df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "") + + df = df.dropna(subset=["postcode_clean"]) + + batch_size = 500 + if df.shape[0] < batch_size: + create_batch_and_send_to_address2uprn( + batch_df=df, + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, ) - logger.info(f"Created subtask {subtask_id} for task {task_id}") - - # Mark subtask as in progress - subtask_interface.update_subtask_status(subtask_id, "in progress") - logger.info(f"Marked subtask {subtask_id} as in progress") - - # Read CSV from S3 - logger.info(f"Processing S3 URI: {s3_uri}") - bucket, key = parse_s3_uri(s3_uri) - logger.info(f"S3 Bucket: {bucket}, Key: {key}") - - csv_data = read_csv_from_s3_dict(bucket, key) - df = pd.DataFrame(csv_data) - - # df = df.head(1983) - df = df.head(5) - - logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") - - # Sanitise postcodes - df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "") - - clean_df = df.dropna(subset=["postcode_clean"]) - + else: postcode_to_addresses = { - postcode: group.to_dict(orient="records") - for postcode, group in clean_df.groupby("postcode_clean", sort=False) + postcode: group + for postcode, group in df.groupby("postcode_clean", sort=False) } - logger.info(f"Total postcodes: {len(postcode_to_addresses)}") + count = 0 + buffer = [] - # Calculate total rows to send - total_rows = sum(len(rows) for rows in postcode_to_addresses.values()) - logger.info(f"Total rows to send: {total_rows}") + for postcode, group_df in postcode_to_addresses.items(): + group_len = len(group_df) - batch_size = 500 - - # If all rows fit in one batch, just send them all at once - if total_rows <= batch_size: - all_rows = [] - for postcode, rows in postcode_to_addresses.items(): - all_rows.extend(rows) - try: - create_batch_and_send_to_address2uprn( - batch_rows=all_rows, - task_id=task_id, - subtask_interface=subtask_interface, - bucket_name=bucket_name, - ) - logger.info( - f"Sent all {len(all_rows)} rows in single batch to address2UPRN queue" - ) - except Exception as e: - logger.error( - f"Failed to send all rows to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - else: - # Multi-batch processing for large datasets - batch_rows = [] - total_sent = 0 - - for postcode, rows in postcode_to_addresses.items(): - logger.info(f"Processing postcode {postcode} with {len(rows)} rows") - # If postcode itself is larger than batch_size, send it individually - if len(rows) > batch_size: - # First, send the current batch if it has data - if batch_rows: - try: - create_batch_and_send_to_address2uprn( - batch_rows=batch_rows, - task_id=task_id, - subtask_interface=subtask_interface, - bucket_name=bucket_name, - ) - logger.info( - f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" - ) - batch_rows = [] - except Exception as e: - logger.error( - f"Failed to send batch to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - - # Send the large postcode on its own - try: - create_batch_and_send_to_address2uprn( - batch_rows=rows, - task_id=task_id, - subtask_interface=subtask_interface, - bucket_name=bucket_name, - ) - logger.info( - f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue" - ) - except Exception as e: - logger.error( - f"Failed to send large postcode to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - continue - - # If adding this postcode's rows would exceed batch_size, send current batch - current_batch_size = len(batch_rows) + len(rows) - if batch_rows and current_batch_size > batch_size: - logger.info( - f"Batch threshold reached: current {len(batch_rows)} + next postcode {len(rows)} = {current_batch_size} > {batch_size}" - ) - try: - create_batch_and_send_to_address2uprn( - batch_rows=batch_rows, - task_id=task_id, - subtask_interface=subtask_interface, - bucket_name=bucket_name, - ) - logger.info( - f"Sent batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})" - ) - total_sent += len(batch_rows) - batch_rows = [] - except Exception as e: - logger.error( - f"Failed to send batch to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - - # Add current postcode's rows to batch - batch_rows.extend(rows) - - # Send remaining batch - if batch_rows: - try: + # If single postcode is bigger than batch_size → send directly + if group_len >= batch_size: + if buffer: create_batch_and_send_to_address2uprn( - batch_rows=batch_rows, + batch_df=pd.concat(buffer, ignore_index=True), task_id=task_id, + sub_task_id=subtask_id, subtask_interface=subtask_interface, bucket_name=bucket_name, ) - total_sent += len(batch_rows) - logger.info( - f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})" - ) - batch_rows = [] - except Exception as e: - logger.error( - f"Failed to send final batch to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) + buffer = [] + count = 0 - except json.JSONDecodeError as e: - logger.error(f"Invalid JSON in request body: {e}") - errors.append({"error": "Invalid JSON in request body", "details": str(e)}) - # Mark subtask as failed if we have one - if subtask_id: - try: - subtask_interface.update_subtask_status( - subtask_id, "failed", outputs={"error": str(e)} + create_batch_and_send_to_address2uprn( + batch_df=group_df, + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, ) - except Exception as db_error: - logger.error(f"Failed to update subtask status: {db_error}") - except Exception as e: - logger.error(f"Unexpected error processing record: {e}", exc_info=True) - errors.append({"error": "Unexpected error", "details": str(e)}) - # Mark subtask as failed if we have one - if subtask_id: - try: - subtask_interface.update_subtask_status( - subtask_id, "failed", outputs={"error": str(e)} - ) - except Exception as db_error: - logger.error(f"Failed to update subtask status: {db_error}") + continue - # Return error if all records failed - if errors and not results: - return {"statusCode": 500, "body": json.dumps({"errors": errors})} + # If adding would exceed batch → flush first + if count + group_len > batch_size: + create_batch_and_send_to_address2uprn( + batch_df=pd.concat(buffer, ignore_index=True), + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + buffer = [] + count = 0 + + # Add group + buffer.append(group_df) + count += group_len + + # Final flush + if buffer: + create_batch_and_send_to_address2uprn( + batch_df=pd.concat(buffer, ignore_index=True), + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + + # Mark subtask as completed + subtask_interface.update_subtask_status( + subtask_id, + "completed", + outputs={"rows_processed": "todo -> show sensible output"}, + ) return { "statusCode": 200,