Model/backend/address2UPRN/main.py
2026-06-05 19:03:33 +00:00

570 lines
20 KiB
Python

from typing import Optional
import os
import pandas as pd
from utils.logger import setup_logger
import json
from uuid import UUID
import uuid
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from utils.s3 import (
save_csv_to_s3,
read_csv_from_s3 as read_csv_from_s3_dict,
parse_s3_uri,
)
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
from backend.address2UPRN.scoring import all_uprns_match, rank_address_similarity
from datatypes.epc.domain.historic_epc_matching import (
match_addresses_for_postcode,
)
from infrastructure.epc_client.epc_client_service import EpcClientService
from datatypes.epc.domain.historic_epc_matching import ScoredHistoricEpc
logger = setup_logger()
def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame:
token = os.getenv("OPEN_EPC_API_TOKEN")
if token is None:
raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env")
service = EpcClientService(auth_token=token)
results = service.search_by_postcode(postcode)
return pd.DataFrame(
[{"address": r.address_line_1, "uprn": r.uprn} for r in results]
)
def get_uprn_from_historic_epc(
user_inputed_address: str,
postcode: str,
) -> Optional[tuple[str, str, float]]:
"""Resolve a UPRN via historic EPC S3 data.
Returns (uprn, address, lexiscore) when the historic dataset agrees on a
single rank-1 UPRN, None otherwise (missing postcode file, zero score,
or ambiguous top rank). The score gate is `unambiguous_uprn`'s own
(score > 0); the 0.7 heuristic used for the new-EPC source isn't applied
here because historic addresses use a more verbose format that
systematically depresses lexiscores.
"""
try:
result = match_addresses_for_postcode(user_inputed_address, postcode)
except FileNotFoundError:
return None
uprn: Optional[str] = result.unambiguous_uprn()
if not uprn or uprn == "nan":
return None
top: Optional[ScoredHistoricEpc] = result.top()
if top is None:
return None
return uprn, top.record.address, top.lexiscore
def get_uprn_with_epc_df(
user_inputed_address: str,
epc_df: pd.DataFrame,
verbose: bool = False,
) -> Optional[str | tuple[str, str, float]]:
"""
Return uprn (str) using a pre-fetched EPC dataframe.
This avoids calling the API multiple times for the same postcode.
"""
if epc_df.empty:
return None
scored_df = rank_address_similarity(
epc_df,
user_address=user_inputed_address,
)
# Best score
best_score = scored_df.iloc[0]["lexiscore"]
# # Return None if score is below threshold
if best_score < 0.7:
return None
# All rank-1 rows (possible draw)
top_rank_df = scored_df[scored_df["lexirank"] == 1]
# If rank-1 rows do not agree on a single UPRN → ambiguous
if not all_uprns_match(top_rank_df, target_uprn=top_rank_df.iloc[0]["uprn"]):
return None
address = top_rank_df["address"].values[0]
score = float(top_rank_df["lexiscore"].values[0])
logger.info(f"Address found to be: {address}, with lexiscore {score}")
# Safe to return the agreed UPRN
found_uprn = top_rank_df.iloc[0]["uprn"]
# Handling numeric missingness in new api
if found_uprn in ["", "nan"]:
return None
if verbose:
return (found_uprn, address, score)
else:
return found_uprn
def get_uprn(
user_inputed_address: str,
postcode: str,
verbose: bool = False,
):
"""
Return uprn (str)
Return None when no sensible match is found in either EPC source.
Tries the new EPC API first; if that yields no confident match, falls
back to the historic EPC dataset on S3.
For processing multiple addresses in the same postcode, use
get_uprn_with_epc_df instead.
"""
df = get_epc_data_with_postcode(postcode=postcode)
result: Optional[tuple[str, str, float]] = get_uprn_with_epc_df(
user_inputed_address=user_inputed_address,
epc_df=df,
verbose=True,
)
if not result:
result = get_uprn_from_historic_epc(
user_inputed_address=user_inputed_address,
postcode=postcode,
)
if result:
logger.info(f"Historic EPC matched {user_inputed_address} in {postcode}")
if not result:
return None
return result if verbose else result[0]
def resolve_uprns_for_postcode_group(
group_df: pd.DataFrame,
epc_df: pd.DataFrame,
address_col: str = "Address 1",
) -> pd.DataFrame:
"""
Given:
- group_df: rows sharing the same postcode
- epc_df: EPC search results for that postcode
Returns:
group_df + found_uprn + diagnostics
"""
results = []
for _, row in group_df.iterrows():
user_address = str(row[address_col]).strip()
scored_df = rank_address_similarity(
epc_df,
user_address=user_address,
)
if scored_df.empty:
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_candidates",
}
)
continue
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": best_score,
"status": "zero_score",
}
)
continue
top_rank_df = scored_df[scored_df["lexirank"] == 1]
if not all_uprns_match(top_rank_df, top_rank_df.iloc[0]["uprn"]):
results.append(
{
"found_uprn": None,
"best_match_uprn": top_rank_df.iloc[0]["uprn"],
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "ambiguous",
}
)
continue
results.append(
{
"found_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "matched",
}
)
return pd.concat(
[group_df.reset_index(drop=True), pd.DataFrame(results)],
axis=1,
)
def save_results_to_s3(
results_df: pd.DataFrame,
task_id: str,
sub_task_id: str,
bucket_name: Optional[str] = None,
) -> bool:
"""
Save results DataFrame to S3 as CSV.
:param results_df: The DataFrame containing results
:param task_id: The task ID (used for file naming)
:param bucket_name: The S3 bucket name (defaults to env variable)
:return: True if successful, False otherwise
"""
if bucket_name is None:
bucket_name = os.getenv("S3_BUCKET_NAME")
if not bucket_name:
logger.error(
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
)
return False
try:
# Create a filename with the task ID
file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}"
file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv"
# Save to S3
success = save_csv_to_s3(results_df, bucket_name, file_key)
if success:
logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}")
return True
else:
logger.error(f"Failed to save results to S3")
return False
except Exception as e:
logger.error(f"Error saving results to S3: {str(e)}")
return False
def handler(event, context, local=False):
print("=== Address2UPRN Lambda Handler ===")
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
# Handle local testing
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"sub_task_id": "d7363c83-2ef7-4474-b30f-980fd587350c",
"task_id": "a042af13-8b57-4709-ad22-ecac1ccca4bd",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/essex/Copy of EPC register Essex(August 2025)(in) (2).csv",
}
)
}
]
}
print(f"Event: {json.dumps(event, indent=2, default=str)}")
print("===================================")
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
for record in records:
task_id = None
subtask_id = None
try:
# Parse body (inputs)
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
# Validate required fields
task_id = body.get("task_id")
subtask_id = body.get("sub_task_id")
s3_uri = body.get("s3_uri")
if not task_id:
errors.append({"error": "Missing required field: task_id"})
continue
if not subtask_id:
errors.append({"error": "Missing required field: sub_task_id"})
continue
if not s3_uri:
errors.append({"error": "Missing required field: s3_uri"})
continue
# Convert task_id to UUID
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Convert sub_task_id to UUID
try:
subtask_id = (
UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
)
except ValueError as e:
errors.append(
{"error": f"Invalid UUID format for sub_task_id: {str(e)}"}
)
continue
# Update existing subtask to 'in progress'
subtask_interface.update_subtask_status(subtask_id, "in progress")
logger.info(f"Processing subtask {subtask_id} for task {task_id}")
# Parse S3 URI and read CSV from S3
logger.info(f"Reading data from S3: {s3_uri}")
try:
bucket, key = parse_s3_uri(s3_uri)
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
logger.info(f"Loaded {len(df)} rows from S3")
except Exception as s3_error:
logger.error(f"Failed to read data from S3: {s3_error}")
errors.append(
{"error": "Failed to read data from S3", "details": str(s3_error)}
)
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(s3_error)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
continue
# Process the rows
logger.info(f"Processing {len(df)} rows for task {task_id}")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Process each postcode group
results_data = []
for postcode, postcode_rows in postcode_to_addresses.items():
logger.info(
f"Processing postcode: {postcode} with {len(postcode_rows)} rows"
)
# Validate postcode before processing
if not AddressMatch.is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
for row in postcode_rows:
results_data.append(
{
**row,
"address2uprn_uprn": "invalid postcode",
"address2uprn_address": "invalid postcode",
"address2uprn_lexiscore": "invalid postcode",
}
)
continue
# Fetch EPC data once per postcode
try:
epc_df = get_epc_data_with_postcode(postcode=postcode)
logger.info(
f"Fetched {len(epc_df)} EPC records for postcode {postcode}"
)
except Exception as e:
logger.error(
f"Failed to fetch EPC data for postcode {postcode}: {e}"
)
continue
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
# Concatenate Address columns directly
address2uprn_user_input = (
str(row.get("Address 1", "")).strip()
+ " "
+ str(row.get("Address 2", "")).strip()
+ " "
+ str(row.get("Address 3", "")).strip()
).strip()
if not address2uprn_user_input:
logger.warning(
f"Skipping row with missing address components for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
result: Optional[tuple[str, str, float]] = get_uprn_with_epc_df(
user_inputed_address=address2uprn_user_input,
epc_df=epc_df,
verbose=True,
)
# Fallback to historic EPC if new EPC produced no match
if not result:
try:
result = get_uprn_from_historic_epc(
user_inputed_address=address2uprn_user_input,
postcode=postcode,
)
except Exception as e:
logger.error(
f"Historic EPC lookup failed for {address2uprn_user_input} in {postcode}: {e}"
)
result = None
if result:
logger.info(
f"Historic EPC matched {address2uprn_user_input} in {postcode}"
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result
logger.info(
f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": uprn,
"address2uprn_address": found_address,
"address2uprn_lexiscore": score,
}
)
else:
logger.warning(
f"No UPRN found for {address2uprn_user_input} in {postcode}"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
}
)
except Exception as e:
logger.error(
f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append(
{
**row,
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
"error": str(e),
}
)
continue
# Create results DataFrame
result_df = pd.DataFrame(results_data)
# The UPRN is integer-valued, but the no-match rows append None, so the
# mixed column lands as float64 and would serialise as "100020933699.0".
# Coerce to a nullable integer so it round-trips as "100020933699"
# (empty when missing) — the form the finaliser and the combined-results
# UI expect. `to_numeric(errors="coerce")` also folds the
# "invalid postcode" sentinel + blanks to NA (read back as missing).
if "address2uprn_uprn" in result_df.columns:
result_df["address2uprn_uprn"] = pd.to_numeric(
result_df["address2uprn_uprn"], errors="coerce"
).astype("Int64")
# Save results to S3
try:
save_results_to_s3(result_df, str(task_id), str(subtask_id))
except Exception as s3_error:
logger.error(f"Failed to save results to S3: {s3_error}")
# Mark subtask as complete
try:
subtask_interface.update_subtask_status(
subtask_id,
"complete",
outputs={"rows_processed": "todo -> show sensible output"},
)
logger.info(f"Marked subtask {subtask_id} as complete")
except Exception as db_error:
logger.error(f"Failed to mark subtask as completed: {db_error}")
except Exception as e:
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
# Return error if all records failed
logger.info(results_data)
logger.info(results)
if errors and not results:
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}
# TODO:
# Don't add results to return messages as its too verbose
# capture the exepection as e, into s3, to find the logs go to s3
# Upload results to s3 as well as csv