diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index c458e40d..af29a095 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -1,11 +1,13 @@ -from typing import Optional - from epc_api.client import EpcClient import os from urllib.parse import urlencode import pandas as pd +from difflib import SequenceMatcher from utils.logger import setup_logger +import re +from typing import Set import json +import requests from uuid import UUID import uuid from backend.app.db.functions.tasks.Tasks import SubTaskInterface @@ -16,8 +18,6 @@ from utils.s3 import ( ) from datetime import datetime -from backend.utils.addressMatch import AddressMatch - logger = setup_logger() @@ -29,6 +29,191 @@ if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") +def is_valid_postcode(postcode_clean: str) -> bool: + """ + Validate postcode using postcodes.io. + + Expects a sanitised postcode (e.g. E84SQ). + Returns True if valid, False otherwise. + """ + POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" + if not postcode_clean: + return False + + try: + resp = requests.get( + POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), + timeout=5, + ) + resp.raise_for_status() + return resp.json().get("result", False) + except requests.RequestException: + # Network issues, rate limits, etc. + return False + + +def levenshtein(a: str, b: str) -> float: + """ + Address similarity score in [0, 1]. + + Strategy: + - Normalise + - Strongly penalise mismatched house/flat numbers + - Combine token overlap + character similarity + """ + + def extract_number_sequence(s: str) -> list[str]: + return re.findall(r"\d+[a-z]?", s) + + def extract_numbers(s: str) -> Set[str]: + return set(extract_number_sequence(s)) + + def tokenise(s: str) -> Set[str]: + return set(s.split()) + + def extract_building_number(s: str) -> str | None: + """ + Extract the main building number (NOT flat/unit). + Assumes formats like: + - '42 moreton road' + - 'flat 3 42 moreton road' + """ + tokens = s.split() + + # remove flat/unit context + cleaned = [] + skip_next = False + for t in tokens: + if t in ("flat", "apt", "apartment", "unit"): + skip_next = True + continue + if skip_next: + skip_next = False + continue + cleaned.append(t) + + # first remaining number is building number + for t in cleaned: + if re.fullmatch(r"\d+[a-z]?", t): + return t + + return None + + a_norm = normalise_address(a) + b_norm = normalise_address(b) + + # --- hard signal: numbers --- + nums_a = extract_numbers(a_norm) + nums_b = extract_numbers(b_norm) + + if nums_a and not nums_b: + return 0.0 + + # No shared numbers at all → impossible match + if nums_a and nums_b and nums_a.isdisjoint(nums_b): + return 0.0 + + # 🔒 HARD GUARD: building number must match + bld_a = extract_building_number(a_norm) + bld_b = extract_building_number(b_norm) + + if bld_a and bld_b and bld_a != bld_b: + return 0.0 + + # --- order-sensitive flat/building guard --- + seq_a = extract_number_sequence(a_norm) + seq_b = extract_number_sequence(b_norm) + + has_flat_token_user = any( + tok in a_norm for tok in ("flat", "apt", "apartment", "unit") + ) + has_flat_token_epc = "flat" in b_norm + + if ( + len(seq_a) == 2 + and len(seq_b) >= 2 + and has_flat_token_epc + and not has_flat_token_user + and seq_a != seq_b[:2] + ): + return 0.0 + + # --- token similarity (order-independent) --- + toks_a = tokenise(a_norm) + toks_b = tokenise(b_norm) + + if not toks_a or not toks_b: + token_score = 0.0 + else: + token_score = len(toks_a & toks_b) / len(toks_a | toks_b) + + # --- character similarity (soft signal) --- + char_score = SequenceMatcher(None, a_norm, b_norm).ratio() + + # --- weighted blend --- + return round( + 0.65 * token_score + 0.35 * char_score, + 4, + ) + + +def normalise_address(s: str) -> str: + """ + Canonical UK-focused address normalisation. + + - Lowercases + - Removes punctuation (keeps / for flats) + - Normalises whitespace + - Applies synonym compression at token level + """ + + if not s: + return "" + + ADDRESS_SYNONYMS = { + # street types + "rd": "road", + "rd.": "road", + "st": "street", + "st.": "street", + "ave": "avenue", + "ave.": "avenue", + "ln": "lane", + "ln.": "lane", + "cres": "crescent", + "ct": "court", + "dr": "drive", + # flats / units + "apt": "flat", + "apartment": "flat", + "unit": "flat", + "ste": "suite", + # numbering noise + "no": "", + "no.": "", + } + # 1. lowercase + s = s.lower() + + # 1.5 split digit-letter suffixes + s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s) + + # 2. remove punctuation except / + s = re.sub(r"[^\w\s/]", " ", s) + + # 3. normalise whitespace + s = re.sub(r"\s+", " ", s).strip() + + # 4. tokenise + synonym normalisation + tokens = [] + for tok in s.split(): + replacement = ADDRESS_SYNONYMS.get(tok, tok) + if replacement: + tokens.append(replacement) + + return " ".join(tokens) + + def score_addresses( df: pd.DataFrame, user_address: str, @@ -37,7 +222,7 @@ def score_addresses( if column not in df.columns: raise ValueError(f"Missing column: {column}") - return df[column].apply(lambda x: AddressMatch.score(user_address, x)) + return df[column].apply(lambda x: levenshtein(user_address, x)) def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): @@ -129,11 +314,9 @@ def get_uprn_candidates( out = df.copy() - user_norm = AddressMatch.normalise_address(user_address) + user_norm = normalise_address(user_address) - out["lexiscore"] = out[address_column].apply( - lambda x: AddressMatch.levenshtein(user_norm, x) - ) + out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x)) # Normalise UPRN to string out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) @@ -297,10 +480,7 @@ def resolve_uprns_for_postcode_group( def save_results_to_s3( - results_df: pd.DataFrame, - task_id: str, - sub_task_id: str, - bucket_name: Optional[str] = None, + results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None ) -> bool: """ Save results DataFrame to S3 as CSV. @@ -351,9 +531,9 @@ def handler(event, context, local=False): { "body": json.dumps( { - "task_id": "169ea9b0-01b5-48dc-9f90-ae1989491d09", - "sub_task_id": "e5704f9e-29fe-43c8-8913-05be09f2440f", - "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching Rerun After Address Fix.csv", + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d", + "s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv", } ) } @@ -441,9 +621,19 @@ def handler(event, context, local=False): # Process the rows logger.info(f"Processing {len(df)} rows for task {task_id}") - df["postcode_clean"] = ( - df["Postcode"].astype(str).str.upper().str.strip().str.replace(" ", "") - ) + # Create user_input column by concatenating Address columns if not already present + if "user_input" not in df.columns: + df["user_input"] = ( + df["Address 1"].fillna("") + + " " + + df["Address 2"].fillna("") + + " " + + df["Address 3"].fillna("") + ).str.strip() + logger.info(f"Created user_input column from Address 1 and Address 2") + else: + logger.info(f"user_input column already present in data") + clean_df = df.dropna(subset=["postcode_clean"]) postcode_to_addresses = { @@ -463,7 +653,7 @@ def handler(event, context, local=False): ) # Validate postcode before processing - if not AddressMatch.is_valid_postcode(postcode): + if not is_valid_postcode(postcode): logger.warning(f"Postcode {postcode} is invalid, skipping") continue @@ -482,67 +672,57 @@ def handler(event, context, local=False): # Process each address in this postcode with the same EPC data for row in postcode_rows: try: - # Concatenate Address columns directly - address2uprn_user_input = ( - str(row.get("Address 1", "")).strip() - + " " - + str(row.get("Address 2", "")).strip() - + " " - + str(row.get("Address 3", "")).strip() - ).strip() - - if not address2uprn_user_input: + user_input = row.get("user_input", "") + if not user_input: logger.warning( - f"Skipping row with missing address components for postcode {postcode}" + f"Skipping row with missing user_input for postcode {postcode}" ) continue # Get UPRN using the pre-fetched EPC data with all return options result = get_uprn_with_epc_df( - user_inputed_address=address2uprn_user_input, - epc_df=epc_df, - verbose=True, + user_inputed_address=user_input, epc_df=epc_df, verbose=True ) # Parse result tuple if successful if result: uprn, found_address, score = result logger.info( - f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})" + f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})" ) results_data.append( { **row, # Include all original data - "address2uprn_uprn": uprn, - "address2uprn_address": found_address, - "address2uprn_lexiscore": score, + "uprn": uprn, + "domna_found_address": found_address, + "domna_lexiscore": score, } ) else: logger.warning( - f"No UPRN found for {address2uprn_user_input} in {postcode}" + f"No UPRN found for {user_input} in {postcode}" ) results_data.append( { **row, # Include all original data - "address2uprn_uprn": None, - "address2uprn_address": None, - "address2uprn_lexiscore": None, + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, } ) except Exception as e: logger.error( - f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}" + f"Error processing address {row.get('user_input', 'unknown')}: {e}" ) # Still add the row with error markers results_data.append( { **row, - "address2uprn_uprn": None, - "address2uprn_address": None, - "address2uprn_lexiscore": None, + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, "error": str(e), } )