diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 39132944..514fc7af 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -107,7 +107,8 @@ jobs: stage: ${{ needs.determine_stage.outputs.stage }} ecr_repo: address2uprn-${{ needs.determine_stage.outputs.stage }} image_digest: ${{ needs.address2uprn_image.outputs.image_digest }} - terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }} + # terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }} + terraform_apply: 'true' secrets: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} @@ -140,7 +141,7 @@ jobs: # 3️⃣ Deploy Postcode Splitter Lambda # ============================================================ postcodeSplitter_lambda: - needs: [postcodeSplitter_image, determine_stage] + needs: [postcodeSplitter_image, determine_stage, address2uprn_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: postcodeSplitter diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index fb812d67..33c37760 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -506,99 +506,13 @@ def run_all_test(): ) -if __name__ == "__main__": - INPUT_FILE = "hackney.xlsx" - - ADDRESS_COL = "Address 1" - POSTCODE_COL = "Postcode" - UPRN_COL = "UPRN" - - df = pd.read_excel(INPUT_FILE) - - failures = [] - - for _, row in tqdm( - df.iterrows(), - total=len(df), - desc="Auditing UPRNs", - ): - input_address = str(row[ADDRESS_COL]).strip() - postcode = str(row[POSTCODE_COL]).strip() - - expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL])) - - try: - epc_df = get_epc_data_with_postcode(postcode) - - if epc_df.empty: - failures.append( - { - **row.to_dict(), - "found_uprn": None, - "best_match_uprn": None, - "best_match_address": None, - "best_match_lexiscore": None, - "status": "no_epc_results", - } - ) - continue - - scored_df = get_uprn_candidates( - epc_df, - user_address=input_address, - ) - - best_row = scored_df.iloc[0] - - best_match_uprn = str(best_row["uprn"]) - best_match_address = best_row["address"] - best_match_lexiscore = round(float(best_row["lexiscore"]), 4) - - found_uprn = get_uprn(input_address, postcode) - - except Exception as e: - failures.append( - { - **row.to_dict(), - "found_uprn": None, - "best_match_uprn": None, - "best_match_address": None, - "best_match_lexiscore": None, - "status": "exception", - "error": str(e), - } - ) - continue - - found_uprn_norm = None if not found_uprn else str(found_uprn) - - if found_uprn_norm != expected_uprn: - failures.append( - { - **row.to_dict(), - "found_uprn": found_uprn_norm, - "best_match_uprn": best_match_uprn, - "best_match_address": best_match_address, - "best_match_lexiscore": best_match_lexiscore, - "status": ("no_match" if found_uprn_norm is None else "mismatch"), - } - ) - - failures_df = pd.DataFrame(failures) - - print("===================================") - print(f"Total rows : {len(df)}") - print(f"Failures : {len(failures_df)}") - print("===================================") - - failures_df.to_excel( - "hackney_uprn_failures.xlsx", - index=False, - ) - - def handler(event, context): - print("hello world") + print("=== Address2UPRN Lambda Handler ===") + print(f"Function: {context.function_name}") + print(f"Request ID: {context.aws_request_id}") + print(f"Event: {json.dumps(event, indent=2, default=str)}") + print(f"Context: {context}") + print("===================================") return {"statusCode": 200, "body": "hello world"} diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 0f21a67f..d515a21f 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -3,16 +3,13 @@ import sys import json import pandas as pd import requests +import boto3 from uuid import UUID from urllib.parse import unquote from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict from utils.logger import setup_logger from tqdm import tqdm from backend.app.db.functions.tasks.Tasks import SubTaskInterface -from backend.address2UPRN.main import ( - resolve_uprns_for_postcode_group, - get_epc_data_with_postcode, -) logger = setup_logger() @@ -65,17 +62,39 @@ def parse_s3_uri(s3_uri: str) -> tuple[str, str]: raise ValueError(f"Could not parse S3 URI") from e -def sanitise_postcode(postcode: str) -> str | None: +def send_to_address2uprn_queue(task_id: str, rows: list) -> str: """ - Normalise postcode for grouping. + Send a postcode group to the address2UPRN SQS queue. - - Uppercase - - Remove all whitespace + Args: + task_id: The parent task ID + rows: List of row dictionaries for this postcode group + + Returns: + Message ID from SQS """ - if pd.isna(postcode): - return None + sqs_client = boto3.client("sqs") + queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL") - return postcode.upper().replace(" ", "") + if not queue_url: + raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set") + + message_body = { + "task_id": task_id, + "rows": rows, + } + + response = sqs_client.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps(message_body), + ) + + logger.info( + f"Sent message to address2UPRN queue. " + f"Task: {task_id}, MessageId: {response['MessageId']}" + ) + + return response["MessageId"] def handler(event, context, local=False): @@ -142,50 +161,121 @@ def handler(event, context, local=False): csv_data = read_csv_from_s3_dict(bucket, key) df = pd.DataFrame(csv_data) + # just do 5 well we are testing, sqs connection + df = df.head(5) logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") # Sanitise postcodes - df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode) + df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "") - # Group by sanitised postcode (excluding null values) - grouped_data = [] - for postcode, group_df in df.dropna(subset=["postcode_clean"]).groupby( - "postcode_clean" - ): - group_info = { - "postcode": postcode, - "row_count": len(group_df), - "rows": group_df.to_dict(orient="records"), - } - grouped_data.append(group_info) - logger.info(f"Postcode: {postcode}, Rows: {len(group_df)}") + clean_df = df.dropna(subset=["postcode_clean"]) - logger.info(f"Total postcodes: {len(grouped_data)}") + postcode_to_addresses = { + postcode: group.to_dict(orient="records") + for postcode, group in clean_df.groupby("postcode_clean", sort=False) + } - results.append( - { - "message": "Postcode splitter processing completed", - "task_id": str(task_id), - "s3_uri": s3_uri, - "subtask_id": str(subtask_id), - "total_rows": len(df), - "total_postcodes": len(grouped_data), - "grouped_data": grouped_data, - } - ) + logger.info(f"Total postcodes: {len(postcode_to_addresses)}") - # Mark subtask as complete after successful processing - subtask_interface.update_subtask_status( - subtask_id, - "complete", - outputs={ - "status": "processing_complete", - "s3_uri": s3_uri, - "rows_processed": len(df), - "total_postcodes": len(grouped_data), - }, - ) - logger.info(f"Subtask {subtask_id} marked as complete") + # Batch rows in groups of 500 + batch_rows = [] + batch_size = 500 + + for postcode, rows in postcode_to_addresses.items(): + # If postcode itself is larger than batch_size, send it individually + if len(rows) > batch_size: + # First, send the current batch if it has data + if batch_rows: + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=batch_rows, + ) + logger.info( + f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" + ) + batch_rows = [] + except Exception as e: + logger.error( + f"Failed to send batch to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + + # Send the large postcode on its own + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=rows, + ) + logger.info( + f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue" + ) + except Exception as e: + logger.error( + f"Failed to send large postcode to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + continue + + # If adding this postcode's rows would exceed batch_size, send current batch + if batch_rows and len(batch_rows) + len(rows) > batch_size: + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=batch_rows, + ) + logger.info( + f"Sent batch of {len(batch_rows)} rows to address2UPRN queue" + ) + batch_rows = [] + except Exception as e: + logger.error( + f"Failed to send batch to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) + + # Add current postcode's rows to batch + batch_rows.extend(rows) + + # Send remaining batch + if batch_rows: + try: + send_to_address2uprn_queue( + task_id=str(task_id), + rows=batch_rows, + ) + logger.info( + f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue" + ) + except Exception as e: + logger.error( + f"Failed to send final batch to address2UPRN queue: {e}", + exc_info=True, + ) + errors.append( + { + "error": "Failed to send to address2UPRN queue", + "details": str(e), + } + ) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in request body: {e}") diff --git a/infrastructure/terraform/lambda/postcodeSplitter/main.tf b/infrastructure/terraform/lambda/postcodeSplitter/main.tf index 2e2e91da..69b80011 100644 --- a/infrastructure/terraform/lambda/postcodeSplitter/main.tf +++ b/infrastructure/terraform/lambda/postcodeSplitter/main.tf @@ -15,6 +15,16 @@ locals { db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) } +# Reference the existing address2UPRN Lambda outputs from shared state +data "terraform_remote_state" "address2uprn" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + module "lambda" { source = "../modules/lambda_with_sqs" @@ -44,6 +54,7 @@ module "lambda" { EPC_AUTH_TOKEN = "test" ENGINE_SQS_URL = "test" ENERGY_ASSESSMENTS_BUCKET = "test" + ADDRESS2UPRN_QUEUE_URL = data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_url }, ) } @@ -52,4 +63,26 @@ module "lambda" { resource "aws_iam_role_policy_attachment" "postcode_splitter_s3_read" { role = module.lambda.role_name policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn +} + +# Create SQS send policy for address2UPRN queue +module "postcode_splitter_sqs_policy" { + source = "../../modules/general_iam_policy" + + policy_name = "postcode-splitter-sqs-send-${var.stage}" + policy_description = "Allow postcode-splitter Lambda to send messages to address2UPRN queue" + + actions = [ + "sqs:SendMessage" + ] + + resources = [ + data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_arn + ] +} + +# Attach SQS policy to the Lambda execution role +resource "aws_iam_role_policy_attachment" "postcode_splitter_sqs_send" { + role = module.lambda.role_name + policy_arn = module.postcode_splitter_sqs_policy.policy_arn } \ No newline at end of file