lambda code works locally

This commit is contained in:
Jun-te Kim 2026-03-06 12:51:08 +00:00
parent eda2fb36c6
commit 815ce01082
7 changed files with 140 additions and 31 deletions

View file

@ -436,19 +436,6 @@ def handler(event, context, local=False):
# Process the rows
logger.info(f"Processing {len(df)} rows for task {task_id}")
# Create user_input column by concatenating Address columns if not already present
if "user_input" not in df.columns:
df["user_input"] = (
df["Address 1"].fillna("")
+ " "
+ df["Address 2"].fillna("")
+ " "
+ df["Address 3"].fillna("")
).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
else:
logger.info(f"user_input column already present in data")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
@ -487,23 +474,29 @@ def handler(event, context, local=False):
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
user_input = row.get("user_input", "")
if not user_input:
# Concatenate Address columns directly
address2uprn_user_input = (
str(row.get("Address 1", "")).strip() + " " +
str(row.get("Address 2", "")).strip() + " " +
str(row.get("Address 3", "")).strip()
).strip()
if not address2uprn_user_input:
logger.warning(
f"Skipping row with missing user_input for postcode {postcode}"
f"Skipping row with missing address components for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
user_inputed_address=user_input, epc_df=epc_df, verbose=True
user_inputed_address=address2uprn_user_input, epc_df=epc_df, verbose=True
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result
logger.info(
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append(
@ -516,7 +509,7 @@ def handler(event, context, local=False):
)
else:
logger.warning(
f"No UPRN found for {user_input} in {postcode}"
f"No UPRN found for {address2uprn_user_input} in {postcode}"
)
results_data.append(
{
@ -529,7 +522,7 @@ def handler(event, context, local=False):
except Exception as e:
logger.error(
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append(

View file

@ -0,0 +1,24 @@
import pytz
import datetime
from sqlalchemy import (
Column,
BigInteger,
Text,
DateTime,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class PostcodeSearchModel(Base):
__tablename__ = "postcode_search"
id = Column(BigInteger, primary_key=True, autoincrement=True)
postcode = Column(Text, nullable=False)
result_data = Column(JSONB, nullable=True)
created_at = Column(
DateTime(timezone=True), nullable=False, default=datetime.datetime.now(pytz.utc)
)

View file

@ -0,0 +1,25 @@
FROM public.ecr.aws/lambda/python:3.11
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV DB_HOST=${DEV_DB_HOST}
ENV DB_PORT=${DEV_DB_PORT}
ENV DB_NAME=${DEV_DB_NAME}
# Set working directory (Lambda task root)
WORKDIR /var/task
COPY backend/ordnanceSurvey/handler/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy necessary files for database and utility imports
COPY utils/ utils/
COPY backend/ backend/
COPY datatypes/ datatypes/
# Lambda handler
CMD ["backend/ordnanceSurvey/main.handler"]

View file

@ -0,0 +1,11 @@
pandas==2.2.2
numpy<2.0
requests
tqdm
openpyxl
epc-api-python==1.0.2
boto3==1.35.44
sqlmodel
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
pydantic-settings==2.6.0

View file

@ -0,0 +1,11 @@
version: "3.9"
services:
ordnance-survey-lambda:
build:
context: ../../../
dockerfile: backend/ordnanceSurvey/handler/Dockerfile
ports:
- "9000:8080"
env_file:
- ../../../.env

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9000"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv",
"lexiscore_column": "address2uprn_lexiscore",
}
)
}
]
}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -68,17 +68,19 @@ def get_ordance_survey_record(row, cache=None):
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
# delete this line after test
local = True
# local = True
# Example SQS message for testing (copy and paste into SQS):
if local is True:
body = {
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv",
"lexiscore_column": "address2uprn_lexiscore",
}
s3_uri: str = body.get("s3_uri", "")
lexiscore_threshold: float = body.get("lexiscore_threshold", 0.5)
lexiscore_column: str = body.get("lexiscore_column", None)
if s3_uri == "":
raise RuntimeError("Missing s3_uri in message body")
@ -88,13 +90,17 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
# Assumption designing with address2uprn was ran first
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
df["address2uprn_lexiscore"] = pd.to_numeric(
df["address2uprn_lexiscore"], errors="coerce"
)
needs_processing = df[
df["address2uprn_lexiscore"].isna()
| (df["address2uprn_lexiscore"] < lexiscore_threshold)
]
df = df.head(5)
# If lexiscore_column is specified, use it; otherwise process all rows
if lexiscore_column and lexiscore_column in df.columns:
df[lexiscore_column] = pd.to_numeric(df[lexiscore_column], errors="coerce")
needs_processing = df[
df[lexiscore_column].isna() | (df[lexiscore_column] < lexiscore_threshold)
]
else:
# Default: process all rows
needs_processing = df
grouped = needs_processing.groupby("postcode_clean")
@ -137,13 +143,21 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
continue
for idx, row in group.iterrows():
user_address = str(row.get("user_input", "")).strip()
if not user_address:
# Concatenate Address columns directly
ordnancy_survey_user_input = (
str(row.get("Address 1", "")).strip()
+ " "
+ str(row.get("Address 2", "")).strip()
+ " "
+ str(row.get("Address 3", "")).strip()
).strip()
if not ordnancy_survey_user_input:
continue
# Score against OS Places addresses
scores = postcode_cache["ADDRESS"].apply(
lambda addr: addressMatch.score(user_address, addr)
lambda addr: addressMatch.score(ordnancy_survey_user_input, addr)
)
best_idx = scores.idxmax()
best_score = scores[best_idx]
@ -157,3 +171,5 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
# TODO: Save new results to s3 (ask Khalim if we want to save to db)
df.to_csv("ordnance_survey_results.csv", index=False)
print(f"Results saved to ordnance_survey_results.csv ({len(df)} rows)")
# TODO upload to s3 once you get confirmation from Khalim or db