diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 53e50617..ea588a77 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -436,19 +436,6 @@ def handler(event, context, local=False): # Process the rows logger.info(f"Processing {len(df)} rows for task {task_id}") - # Create user_input column by concatenating Address columns if not already present - if "user_input" not in df.columns: - df["user_input"] = ( - df["Address 1"].fillna("") - + " " - + df["Address 2"].fillna("") - + " " - + df["Address 3"].fillna("") - ).str.strip() - logger.info(f"Created user_input column from Address 1 and Address 2") - else: - logger.info(f"user_input column already present in data") - clean_df = df.dropna(subset=["postcode_clean"]) postcode_to_addresses = { @@ -487,23 +474,29 @@ def handler(event, context, local=False): # Process each address in this postcode with the same EPC data for row in postcode_rows: try: - user_input = row.get("user_input", "") - if not user_input: + # Concatenate Address columns directly + address2uprn_user_input = ( + str(row.get("Address 1", "")).strip() + " " + + str(row.get("Address 2", "")).strip() + " " + + str(row.get("Address 3", "")).strip() + ).strip() + + if not address2uprn_user_input: logger.warning( - f"Skipping row with missing user_input for postcode {postcode}" + f"Skipping row with missing address components for postcode {postcode}" ) continue # Get UPRN using the pre-fetched EPC data with all return options result = get_uprn_with_epc_df( - user_inputed_address=user_input, epc_df=epc_df, verbose=True + user_inputed_address=address2uprn_user_input, epc_df=epc_df, verbose=True ) # Parse result tuple if successful if result: uprn, found_address, score = result logger.info( - f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})" + f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})" ) results_data.append( @@ -516,7 +509,7 @@ def handler(event, context, local=False): ) else: logger.warning( - f"No UPRN found for {user_input} in {postcode}" + f"No UPRN found for {address2uprn_user_input} in {postcode}" ) results_data.append( { @@ -529,7 +522,7 @@ def handler(event, context, local=False): except Exception as e: logger.error( - f"Error processing address {row.get('user_input', 'unknown')}: {e}" + f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}" ) # Still add the row with error markers results_data.append( diff --git a/backend/app/db/models/postcode_search.py b/backend/app/db/models/postcode_search.py new file mode 100644 index 00000000..1e3e03b8 --- /dev/null +++ b/backend/app/db/models/postcode_search.py @@ -0,0 +1,24 @@ +import pytz +import datetime +from sqlalchemy import ( + Column, + BigInteger, + Text, + DateTime, +) +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class PostcodeSearchModel(Base): + __tablename__ = "postcode_search" + + id = Column(BigInteger, primary_key=True, autoincrement=True) + postcode = Column(Text, nullable=False) + result_data = Column(JSONB, nullable=True) + + created_at = Column( + DateTime(timezone=True), nullable=False, default=datetime.datetime.now(pytz.utc) + ) diff --git a/backend/ordnanceSurvey/handler/Dockerfile b/backend/ordnanceSurvey/handler/Dockerfile index e69de29b..6a3cbe26 100644 --- a/backend/ordnanceSurvey/handler/Dockerfile +++ b/backend/ordnanceSurvey/handler/Dockerfile @@ -0,0 +1,25 @@ +FROM public.ecr.aws/lambda/python:3.11 + +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} + +# Set working directory (Lambda task root) +WORKDIR /var/task + +COPY backend/ordnanceSurvey/handler/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +# Copy necessary files for database and utility imports +COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +# Lambda handler +CMD ["backend/ordnanceSurvey/main.handler"] + diff --git a/backend/ordnanceSurvey/handler/requirements.txt b/backend/ordnanceSurvey/handler/requirements.txt index e69de29b..6ef41b2d 100644 --- a/backend/ordnanceSurvey/handler/requirements.txt +++ b/backend/ordnanceSurvey/handler/requirements.txt @@ -0,0 +1,11 @@ +pandas==2.2.2 +numpy<2.0 +requests +tqdm +openpyxl +epc-api-python==1.0.2 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 \ No newline at end of file diff --git a/backend/ordnanceSurvey/local_handler/docker-compose.yml b/backend/ordnanceSurvey/local_handler/docker-compose.yml new file mode 100644 index 00000000..5f54e7da --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + ordnance-survey-lambda: + build: + context: ../../../ + dockerfile: backend/ordnanceSurvey/handler/Dockerfile + ports: + - "9000:8080" + env_file: + - ../../../.env \ No newline at end of file diff --git a/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py b/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py new file mode 100644 index 00000000..c25f2d20 --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +import json +import requests + +HOST = "localhost" +PORT = "9000" + +LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations" + +payload = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv", + "lexiscore_column": "address2uprn_lexiscore", + } + ) + } + ] +} + +response = requests.post(LAMBDA_URL, json=payload) + +print("Status code:", response.status_code) +print("Response:") +print(response.text) diff --git a/backend/ordnanceSurvey/main.py b/backend/ordnanceSurvey/main.py index 4200bd24..6c4f3080 100644 --- a/backend/ordnanceSurvey/main.py +++ b/backend/ordnanceSurvey/main.py @@ -68,17 +68,19 @@ def get_ordance_survey_record(row, cache=None): def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: # delete this line after test - local = True + # local = True # Example SQS message for testing (copy and paste into SQS): if local is True: body = { "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", "s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv", + "lexiscore_column": "address2uprn_lexiscore", } s3_uri: str = body.get("s3_uri", "") lexiscore_threshold: float = body.get("lexiscore_threshold", 0.5) + lexiscore_column: str = body.get("lexiscore_column", None) if s3_uri == "": raise RuntimeError("Missing s3_uri in message body") @@ -88,13 +90,17 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: # Assumption designing with address2uprn was ran first csv_data = read_csv_from_s3_dict(bucket, key) df = pd.DataFrame(csv_data) - df["address2uprn_lexiscore"] = pd.to_numeric( - df["address2uprn_lexiscore"], errors="coerce" - ) - needs_processing = df[ - df["address2uprn_lexiscore"].isna() - | (df["address2uprn_lexiscore"] < lexiscore_threshold) - ] + df = df.head(5) + + # If lexiscore_column is specified, use it; otherwise process all rows + if lexiscore_column and lexiscore_column in df.columns: + df[lexiscore_column] = pd.to_numeric(df[lexiscore_column], errors="coerce") + needs_processing = df[ + df[lexiscore_column].isna() | (df[lexiscore_column] < lexiscore_threshold) + ] + else: + # Default: process all rows + needs_processing = df grouped = needs_processing.groupby("postcode_clean") @@ -137,13 +143,21 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: continue for idx, row in group.iterrows(): - user_address = str(row.get("user_input", "")).strip() - if not user_address: + # Concatenate Address columns directly + ordnancy_survey_user_input = ( + str(row.get("Address 1", "")).strip() + + " " + + str(row.get("Address 2", "")).strip() + + " " + + str(row.get("Address 3", "")).strip() + ).strip() + + if not ordnancy_survey_user_input: continue # Score against OS Places addresses scores = postcode_cache["ADDRESS"].apply( - lambda addr: addressMatch.score(user_address, addr) + lambda addr: addressMatch.score(ordnancy_survey_user_input, addr) ) best_idx = scores.idxmax() best_score = scores[best_idx] @@ -157,3 +171,5 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: # TODO: Save new results to s3 (ask Khalim if we want to save to db) df.to_csv("ordnance_survey_results.csv", index=False) print(f"Results saved to ordnance_survey_results.csv ({len(df)} rows)") + + # TODO upload to s3 once you get confirmation from Khalim or db