From 5a0e0c0a698f858abdfcb39554370dabd2e35c25 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 11 Feb 2026 13:45:06 +0000 Subject: [PATCH] add more logic to batch and also missing libraries --- backend/address2UPRN/main.py | 1 + backend/postcode_splitter/main.py | 153 +++++++++++++++++++----------- 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 30066bcb..777dde0e 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -8,6 +8,7 @@ from utils.logger import setup_logger import re from typing import Set import json +import requests logger = setup_logger() diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index d515a21f..eb7cf044 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -177,23 +177,103 @@ def handler(event, context, local=False): logger.info(f"Total postcodes: {len(postcode_to_addresses)}") - # Batch rows in groups of 500 - batch_rows = [] + # Calculate total rows to send + total_rows = sum(len(rows) for rows in postcode_to_addresses.values()) + logger.info(f"Total rows to send: {total_rows}") + batch_size = 500 - for postcode, rows in postcode_to_addresses.items(): - # If postcode itself is larger than batch_size, send it individually - if len(rows) > batch_size: - # First, send the current batch if it has data - if batch_rows: + # If all rows fit in one batch, just send them all at once + if total_rows <= batch_size: + all_rows = [] + for postcode, rows in postcode_to_addresses.items(): + all_rows.extend(rows) + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=all_rows, + ) + logger.info(f"Sent all {len(all_rows)} rows in single batch to address2UPRN queue") + except Exception as e: + logger.error( + f"Failed to send all rows to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + else: + # Multi-batch processing for large datasets + batch_rows = [] + total_sent = 0 + + for postcode, rows in postcode_to_addresses.items(): + logger.info(f"Processing postcode {postcode} with {len(rows)} rows") + # If postcode itself is larger than batch_size, send it individually + if len(rows) > batch_size: + # First, send the current batch if it has data + if batch_rows: + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=batch_rows, + ) + logger.info( + f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" + ) + batch_rows = [] + except Exception as e: + logger.error( + f"Failed to send batch to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + + # Send the large postcode on its own + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=rows, + ) + logger.info( + f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue" + ) + except Exception as e: + logger.error( + f"Failed to send large postcode to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + continue + + # If adding this postcode's rows would exceed batch_size, send current batch + current_batch_size = len(batch_rows) + len(rows) + if batch_rows and current_batch_size > batch_size: + logger.info( + f"Batch threshold reached: current {len(batch_rows)} + next postcode {len(rows)} = {current_batch_size} > {batch_size}" + ) try: send_to_address2uprn_queue( task_id=str(task_id), rows=batch_rows, ) logger.info( - f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" + f"Sent batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})" ) + total_sent += len(batch_rows) batch_rows = [] except Exception as e: logger.error( @@ -207,42 +287,24 @@ def handler(event, context, local=False): } ) - # Send the large postcode on its own - try: - send_to_address2uprn_queue( - task_id=str(task_id), - rows=rows, - ) - logger.info( - f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue" - ) - except Exception as e: - logger.error( - f"Failed to send large postcode to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - continue + # Add current postcode's rows to batch + batch_rows.extend(rows) - # If adding this postcode's rows would exceed batch_size, send current batch - if batch_rows and len(batch_rows) + len(rows) > batch_size: + # Send remaining batch + if batch_rows: try: send_to_address2uprn_queue( task_id=str(task_id), rows=batch_rows, ) + total_sent += len(batch_rows) logger.info( - f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" + f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})" ) batch_rows = [] except Exception as e: logger.error( - f"Failed to send batch to address2UPRN queue: {e}", + f"Failed to send final batch to address2UPRN queue: {e}", exc_info=True, ) errors.append( @@ -252,31 +314,6 @@ def handler(event, context, local=False): } ) - # Add current postcode's rows to batch - batch_rows.extend(rows) - - # Send remaining batch - if batch_rows: - try: - send_to_address2uprn_queue( - task_id=str(task_id), - rows=batch_rows, - ) - logger.info( - f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue" - ) - except Exception as e: - logger.error( - f"Failed to send final batch to address2UPRN queue: {e}", - exc_info=True, - ) - errors.append( - { - "error": "Failed to send to address2UPRN queue", - "details": str(e), - } - ) - except json.JSONDecodeError as e: logger.error(f"Invalid JSON in request body: {e}") errors.append({"error": "Invalid JSON in request body", "details": str(e)})