added address2uprn and postcodesplitter link

This commit is contained in:
Jun-te Kim 2026-02-11 13:11:31 +00:00
parent d4ac6aee71
commit ffb840da81
4 changed files with 180 additions and 142 deletions

View file

@ -107,7 +107,8 @@ jobs:
stage: ${{ needs.determine_stage.outputs.stage }}
ecr_repo: address2uprn-${{ needs.determine_stage.outputs.stage }}
image_digest: ${{ needs.address2uprn_image.outputs.image_digest }}
terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
# terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
terraform_apply: 'true'
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
@ -140,7 +141,7 @@ jobs:
# 3⃣ Deploy Postcode Splitter Lambda
# ============================================================
postcodeSplitter_lambda:
needs: [postcodeSplitter_image, determine_stage]
needs: [postcodeSplitter_image, determine_stage, address2uprn_lambda]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: postcodeSplitter

View file

@ -506,99 +506,13 @@ def run_all_test():
)
if __name__ == "__main__":
INPUT_FILE = "hackney.xlsx"
ADDRESS_COL = "Address 1"
POSTCODE_COL = "Postcode"
UPRN_COL = "UPRN"
df = pd.read_excel(INPUT_FILE)
failures = []
for _, row in tqdm(
df.iterrows(),
total=len(df),
desc="Auditing UPRNs",
):
input_address = str(row[ADDRESS_COL]).strip()
postcode = str(row[POSTCODE_COL]).strip()
expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL]))
try:
epc_df = get_epc_data_with_postcode(postcode)
if epc_df.empty:
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_results",
}
)
continue
scored_df = get_uprn_candidates(
epc_df,
user_address=input_address,
)
best_row = scored_df.iloc[0]
best_match_uprn = str(best_row["uprn"])
best_match_address = best_row["address"]
best_match_lexiscore = round(float(best_row["lexiscore"]), 4)
found_uprn = get_uprn(input_address, postcode)
except Exception as e:
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "exception",
"error": str(e),
}
)
continue
found_uprn_norm = None if not found_uprn else str(found_uprn)
if found_uprn_norm != expected_uprn:
failures.append(
{
**row.to_dict(),
"found_uprn": found_uprn_norm,
"best_match_uprn": best_match_uprn,
"best_match_address": best_match_address,
"best_match_lexiscore": best_match_lexiscore,
"status": ("no_match" if found_uprn_norm is None else "mismatch"),
}
)
failures_df = pd.DataFrame(failures)
print("===================================")
print(f"Total rows : {len(df)}")
print(f"Failures : {len(failures_df)}")
print("===================================")
failures_df.to_excel(
"hackney_uprn_failures.xlsx",
index=False,
)
def handler(event, context):
print("hello world")
print("=== Address2UPRN Lambda Handler ===")
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
print(f"Event: {json.dumps(event, indent=2, default=str)}")
print(f"Context: {context}")
print("===================================")
return {"statusCode": 200, "body": "hello world"}

View file

@ -3,16 +3,13 @@ import sys
import json
import pandas as pd
import requests
import boto3
from uuid import UUID
from urllib.parse import unquote
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict
from utils.logger import setup_logger
from tqdm import tqdm
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.address2UPRN.main import (
resolve_uprns_for_postcode_group,
get_epc_data_with_postcode,
)
logger = setup_logger()
@ -65,17 +62,39 @@ def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
raise ValueError(f"Could not parse S3 URI") from e
def sanitise_postcode(postcode: str) -> str | None:
def send_to_address2uprn_queue(task_id: str, rows: list) -> str:
"""
Normalise postcode for grouping.
Send a postcode group to the address2UPRN SQS queue.
- Uppercase
- Remove all whitespace
Args:
task_id: The parent task ID
rows: List of row dictionaries for this postcode group
Returns:
Message ID from SQS
"""
if pd.isna(postcode):
return None
sqs_client = boto3.client("sqs")
queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL")
return postcode.upper().replace(" ", "")
if not queue_url:
raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set")
message_body = {
"task_id": task_id,
"rows": rows,
}
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body),
)
logger.info(
f"Sent message to address2UPRN queue. "
f"Task: {task_id}, MessageId: {response['MessageId']}"
)
return response["MessageId"]
def handler(event, context, local=False):
@ -142,50 +161,121 @@ def handler(event, context, local=False):
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
# just do 5 well we are testing, sqs connection
df = df.head(5)
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
# Sanitise postcodes
df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode)
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
# Group by sanitised postcode (excluding null values)
grouped_data = []
for postcode, group_df in df.dropna(subset=["postcode_clean"]).groupby(
"postcode_clean"
):
group_info = {
"postcode": postcode,
"row_count": len(group_df),
"rows": group_df.to_dict(orient="records"),
}
grouped_data.append(group_info)
logger.info(f"Postcode: {postcode}, Rows: {len(group_df)}")
clean_df = df.dropna(subset=["postcode_clean"])
logger.info(f"Total postcodes: {len(grouped_data)}")
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
results.append(
{
"message": "Postcode splitter processing completed",
"task_id": str(task_id),
"s3_uri": s3_uri,
"subtask_id": str(subtask_id),
"total_rows": len(df),
"total_postcodes": len(grouped_data),
"grouped_data": grouped_data,
}
)
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Mark subtask as complete after successful processing
subtask_interface.update_subtask_status(
subtask_id,
"complete",
outputs={
"status": "processing_complete",
"s3_uri": s3_uri,
"rows_processed": len(df),
"total_postcodes": len(grouped_data),
},
)
logger.info(f"Subtask {subtask_id} marked as complete")
# Batch rows in groups of 500
batch_rows = []
batch_size = 500
for postcode, rows in postcode_to_addresses.items():
# If postcode itself is larger than batch_size, send it individually
if len(rows) > batch_size:
# First, send the current batch if it has data
if batch_rows:
try:
send_to_address2uprn_queue(
task_id=str(task_id),
rows=batch_rows,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Send the large postcode on its own
try:
send_to_address2uprn_queue(
task_id=str(task_id),
rows=rows,
)
logger.info(
f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send large postcode to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
continue
# If adding this postcode's rows would exceed batch_size, send current batch
if batch_rows and len(batch_rows) + len(rows) > batch_size:
try:
send_to_address2uprn_queue(
task_id=str(task_id),
rows=batch_rows,
)
logger.info(
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue"
)
batch_rows = []
except Exception as e:
logger.error(
f"Failed to send batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
# Add current postcode's rows to batch
batch_rows.extend(rows)
# Send remaining batch
if batch_rows:
try:
send_to_address2uprn_queue(
task_id=str(task_id),
rows=batch_rows,
)
logger.info(
f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue"
)
except Exception as e:
logger.error(
f"Failed to send final batch to address2UPRN queue: {e}",
exc_info=True,
)
errors.append(
{
"error": "Failed to send to address2UPRN queue",
"details": str(e),
}
)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {e}")

View file

@ -15,6 +15,16 @@ locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
# Reference the existing address2UPRN Lambda outputs from shared state
data "terraform_remote_state" "address2uprn" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
module "lambda" {
source = "../modules/lambda_with_sqs"
@ -44,6 +54,7 @@ module "lambda" {
EPC_AUTH_TOKEN = "test"
ENGINE_SQS_URL = "test"
ENERGY_ASSESSMENTS_BUCKET = "test"
ADDRESS2UPRN_QUEUE_URL = data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_url
},
)
}
@ -52,4 +63,26 @@ module "lambda" {
resource "aws_iam_role_policy_attachment" "postcode_splitter_s3_read" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn
}
# Create SQS send policy for address2UPRN queue
module "postcode_splitter_sqs_policy" {
source = "../../modules/general_iam_policy"
policy_name = "postcode-splitter-sqs-send-${var.stage}"
policy_description = "Allow postcode-splitter Lambda to send messages to address2UPRN queue"
actions = [
"sqs:SendMessage"
]
resources = [
data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_arn
]
}
# Attach SQS policy to the Lambda execution role
resource "aws_iam_role_policy_attachment" "postcode_splitter_sqs_send" {
role = module.lambda.role_name
policy_arn = module.postcode_splitter_sqs_policy.policy_arn
}