from epc_api.client import EpcClient import os from urllib.parse import urlencode import pandas as pd from difflib import SequenceMatcher from tqdm import tqdm from utils.logger import setup_logger import re from typing import Set import json import requests from uuid import UUID import uuid from backend.app.db.functions.tasks.Tasks import SubTaskInterface from utils.s3 import save_csv_to_s3 from datetime import datetime logger = setup_logger() EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", ) if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") def is_valid_postcode(postcode_clean: str) -> bool: """ Validate postcode using postcodes.io. Expects a sanitised postcode (e.g. E84SQ). Returns True if valid, False otherwise. """ POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" if not postcode_clean: return False try: resp = requests.get( POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), timeout=5, ) resp.raise_for_status() return resp.json().get("result", False) except requests.RequestException: # Network issues, rate limits, etc. return False def levenshtein(a: str, b: str) -> float: """ Address similarity score in [0, 1]. Strategy: - Normalise - Strongly penalise mismatched house/flat numbers - Combine token overlap + character similarity """ def extract_number_sequence(s: str) -> list[str]: return re.findall(r"\d+[a-z]?", s) def extract_numbers(s: str) -> Set[str]: return set(extract_number_sequence(s)) def tokenise(s: str) -> Set[str]: return set(s.split()) def extract_building_number(s: str) -> str | None: """ Extract the main building number (NOT flat/unit). Assumes formats like: - '42 moreton road' - 'flat 3 42 moreton road' """ tokens = s.split() # remove flat/unit context cleaned = [] skip_next = False for t in tokens: if t in ("flat", "apt", "apartment", "unit"): skip_next = True continue if skip_next: skip_next = False continue cleaned.append(t) # first remaining number is building number for t in cleaned: if re.fullmatch(r"\d+[a-z]?", t): return t return None a_norm = normalise_address(a) b_norm = normalise_address(b) # --- hard signal: numbers --- nums_a = extract_numbers(a_norm) nums_b = extract_numbers(b_norm) if nums_a and not nums_b: return 0.0 # No shared numbers at all → impossible match if nums_a and nums_b and nums_a.isdisjoint(nums_b): return 0.0 # 🔒 HARD GUARD: building number must match bld_a = extract_building_number(a_norm) bld_b = extract_building_number(b_norm) if bld_a and bld_b and bld_a != bld_b: return 0.0 # --- order-sensitive flat/building guard --- seq_a = extract_number_sequence(a_norm) seq_b = extract_number_sequence(b_norm) has_flat_token_user = any( tok in a_norm for tok in ("flat", "apt", "apartment", "unit") ) has_flat_token_epc = "flat" in b_norm if ( len(seq_a) == 2 and len(seq_b) >= 2 and has_flat_token_epc and not has_flat_token_user and seq_a != seq_b[:2] ): return 0.0 # --- token similarity (order-independent) --- toks_a = tokenise(a_norm) toks_b = tokenise(b_norm) if not toks_a or not toks_b: token_score = 0.0 else: token_score = len(toks_a & toks_b) / len(toks_a | toks_b) # --- character similarity (soft signal) --- char_score = SequenceMatcher(None, a_norm, b_norm).ratio() # --- weighted blend --- return round( 0.65 * token_score + 0.35 * char_score, 4, ) def normalise_address(s: str) -> str: """ Canonical UK-focused address normalisation. - Lowercases - Removes punctuation (keeps / for flats) - Normalises whitespace - Applies synonym compression at token level """ if not s: return "" ADDRESS_SYNONYMS = { # street types "rd": "road", "rd.": "road", "st": "street", "st.": "street", "ave": "avenue", "ave.": "avenue", "ln": "lane", "ln.": "lane", "cres": "crescent", "ct": "court", "dr": "drive", # flats / units "apt": "flat", "apartment": "flat", "unit": "flat", "ste": "suite", # numbering noise "no": "", "no.": "", } # 1. lowercase s = s.lower() # 1.5 split digit-letter suffixes s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s) # 2. remove punctuation except / s = re.sub(r"[^\w\s/]", " ", s) # 3. normalise whitespace s = re.sub(r"\s+", " ", s).strip() # 4. tokenise + synonym normalisation tokens = [] for tok in s.split(): replacement = ADDRESS_SYNONYMS.get(tok, tok) if replacement: tokens.append(replacement) return " ".join(tokens) def score_addresses( df: pd.DataFrame, user_address: str, column: str = "address", ) -> pd.Series: if column not in df.columns: raise ValueError(f"Missing column: {column}") return df[column].apply(lambda x: levenshtein(user_address, x)) def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): """ Recursively fetch EPC data by postcode. If results hit the size limit, retry with double size up to max_attempts. """ client = EpcClient(auth_token=EPC_AUTH_TOKEN) url = os.path.join(client.domestic.host, "search") if size: url += "?" + urlencode({"size": size}) search_resp = client.domestic.call( url=url, method="get", params={"postcode": postcode}, ) if not search_resp or "rows" not in search_resp: return pd.DataFrame() results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"]) row_count = len(results_df) # If we hit the size limit, there *may* be more results if row_count == size: print( f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. " f"Attempt {attempt}/{max_attempts}." ) if attempt < max_attempts: print(f"🔁 Retrying with size={size * 2}") return get_epc_data_with_postcode( postcode=postcode, size=size * 2, attempt=attempt + 1, max_attempts=max_attempts, ) else: print( "🚨 Max attempts reached. Results may be truncated. " "(Please do a manual review by the tech team.)" ) return results_df def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: """ Returns True if all non-null UPRNs in df match the given uprn. Returns False otherwise. """ if column not in df.columns: return False # Drop nulls and normalise to string uprns = df[column].dropna().astype(str).str.strip().unique() # No valid UPRNs to compare if len(uprns) == 0: return False # Exactly one unique UPRN and it matches return len(uprns) == 1 and uprns[0] == str(uprn) def get_uprn_candidates( df: pd.DataFrame, user_address: str, address_column: str = "address", uprn_column: str = "uprn", ) -> pd.DataFrame: """ Annotate EPC results with lexicographical similarity scores and ranks. Returns a DataFrame sorted by descending lexiscore. DOES NOT choose or return a UPRN. """ if address_column not in df.columns: raise ValueError(f"Missing column: {address_column}") if uprn_column not in df.columns: raise ValueError(f"Missing column: {uprn_column}") out = df.copy() user_norm = normalise_address(user_address) out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x)) # Normalise UPRN to string out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) # Rank: 1 = best match out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) return out.sort_values( ["lexirank", "lexiscore"], ascending=[True, False], ) def get_uprn_with_epc_df( user_inputed_address: str, epc_df: pd.DataFrame, ): """ Return uprn (str) using a pre-fetched EPC dataframe. This avoids calling the API multiple times for the same postcode. Args: user_inputed_address: The user's address string epc_df: Pre-fetched EPC data for the postcode return_address: Whether to return the matched address return_EPC: Whether to return the EPC rating return_score: Whether to return the lexiscore Returns: uprn (str), or tuple if return_address/return_EPC/return_score are True Returns None if no match found, lexiscore < 0.7, or UPRN is empty """ if epc_df.empty: return None scored_df = get_uprn_candidates( epc_df, user_address=user_inputed_address, ) # Best score best_score = scored_df.iloc[0]["lexiscore"] # # Return None if score is below threshold # if best_score < 0.7: # return None # All rank-1 rows (possible draw) top_rank_df = scored_df[scored_df["lexirank"] == 1] # If rank-1 rows do not agree on a single UPRN → ambiguous if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]): return None address = top_rank_df["address"].values[0] score = float(top_rank_df["lexiscore"].values[0]) # logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}") # Safe to return the agreed UPRN found_uprn = top_rank_df.iloc[0]["uprn"] if found_uprn == "": return None return (found_uprn, address, score) def get_uprn( user_inputed_address: str, postcode: str, return_address=False, return_EPC=False, return_score=True, ): """ Return uprn (str) Return False if failed to find a sensible matching epc Return None when epc found but no UPRN This function fetches EPC data via API for a single postcode. For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead. """ df = get_epc_data_with_postcode(postcode=postcode) return get_uprn_with_epc_df( user_inputed_address=user_inputed_address, epc_df=df, return_address=return_address, return_EPC=return_EPC, return_score=return_score, ) def resolve_uprns_for_postcode_group( group_df: pd.DataFrame, epc_df: pd.DataFrame, address_col: str = "Address 1", ) -> pd.DataFrame: """ Given: - group_df: rows sharing the same postcode - epc_df: EPC search results for that postcode Returns: group_df + found_uprn + diagnostics """ results = [] for _, row in group_df.iterrows(): user_address = str(row[address_col]).strip() scored_df = get_uprn_candidates( epc_df, user_address=user_address, ) if scored_df.empty: results.append( { "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": None, "status": "no_epc_candidates", } ) continue best_score = scored_df.iloc[0]["lexiscore"] if best_score <= 0: results.append( { "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": best_score, "status": "zero_score", } ) continue top_rank_df = scored_df[scored_df["lexirank"] == 1] if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]): results.append( { "found_uprn": None, "best_match_uprn": top_rank_df.iloc[0]["uprn"], "best_match_address": top_rank_df.iloc[0]["address"], "best_match_lexiscore": best_score, "status": "ambiguous", } ) continue results.append( { "found_uprn": str(top_rank_df.iloc[0]["uprn"]), "best_match_uprn": str(top_rank_df.iloc[0]["uprn"]), "best_match_address": top_rank_df.iloc[0]["address"], "best_match_lexiscore": best_score, "status": "matched", } ) return pd.concat( [group_df.reset_index(drop=True), pd.DataFrame(results)], axis=1, ) def save_results_to_s3( results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None ) -> bool: """ Save results DataFrame to S3 as CSV. :param results_df: The DataFrame containing results :param task_id: The task ID (used for file naming) :param bucket_name: The S3 bucket name (defaults to env variable) :return: True if successful, False otherwise """ if bucket_name is None: bucket_name = os.getenv("S3_BUCKET_NAME") if not bucket_name: logger.error( "S3 bucket name not provided and S3_BUCKET_NAME environment variable not set" ) return False try: # Create a filename with the task ID file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}" file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv" # Save to S3 success = save_csv_to_s3(results_df, bucket_name, file_key) if success: logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}") return True else: logger.error(f"Failed to save results to S3") return False except Exception as e: logger.error(f"Error saving results to S3: {str(e)}") return False def test(a, b): assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" def run_all_test(): # Basic usage with different post codes styles test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) test(get_epc_data_with_postcode("B938sy").shape[0], 63) test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) test(get_uprn("68", "b93 8sy"), "100070989938") test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") test(get_uprn("28 A", "se6 4tf"), "100023278633") test(get_uprn("28A", "se6 4tf"), "100023278633") test(get_uprn("6 Aitken Close", "E8 4SQ"), False) # unique case test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) test( get_uprn("1 Semley Gate", "e9 5nh"), "10008238188" ) # this one return "flat 1, in 1 semley gate" test( get_uprn("48 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" test( get_uprn("42 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" test( get_uprn("46 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street") get_uprn_candidates( get_epc_data_with_postcode("Cr2 7dl"), "FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY", ) def handler(event, context, local=False): print("=== Address2UPRN Lambda Handler ===") print(f"Function: {context.function_name}") print(f"Request ID: {context.aws_request_id}") # Handle local testing if local is True: event = { "Records": [ { "body": json.dumps( { "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", "rows": [ { "landlord_property_id": "00000002POR", "UPRN": "766019911", "Address 1": "9 Redland Way", "Address 2": "Aylesbury Vale", "postcode": "HP21 9RJ", "landlord_property_type": "House", "postcode_clean": "HP219RJ", }, { "landlord_property_id": "00000003MTR", "UPRN": "100120781544", "Address 1": "16 Lime Crescent", "Address 2": "BICESTER", "postcode": "OX26 3XJ", "landlord_property_type": "House", "postcode_clean": "OX263XJ", }, { "landlord_property_id": "00000004HBY", "UPRN": "14033542", "Address 1": "14 Dunbar Drive", "Address 2": "Woodley", "postcode": "RG5 4HA", "landlord_property_type": "House", "postcode_clean": "RG54HA", }, ], } ) } ] } print(f"Event: {json.dumps(event, indent=2, default=str)}") print("===================================") # Handle both single event and batch events (SQS, etc.) records = event.get("Records", [event]) results = [] errors = [] subtask_interface = SubTaskInterface() for record in records: task_id = None subtask_id = None try: # Parse body (inputs) if isinstance(record.get("body"), str): body = json.loads(record["body"]) else: body = record.get("body", {}) # Validate required fields task_id = body.get("task_id") rows = body.get("rows", []) if not task_id: errors.append({"error": "Missing required field: task_id"}) continue if not rows: errors.append({"error": "Missing or empty rows data"}) continue # Convert task_id to UUID try: task_id = UUID(task_id) if isinstance(task_id, str) else task_id except ValueError as e: errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) continue # Create a subtask for this batch subtask_id = subtask_interface.create_subtask( task_id=task_id, inputs={"row_count": len(rows)} ) logger.info( f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows" ) # Process the rows logger.info(f"Processing {len(rows)} rows for task {task_id}") # Convert rows to DataFrame df = pd.DataFrame(rows) # Create user_input column by concatenating Address 1 and Address 2 df["user_input"] = ( df["Address 1"].fillna("") + " " + df["Address 2"].fillna("") + " " + df["Address 3"].fillna("") ).str.strip() logger.info(f"Created user_input column from Address 1 and Address 2") clean_df = df.dropna(subset=["postcode_clean"]) postcode_to_addresses = { postcode: group.to_dict(orient="records") for postcode, group in clean_df.groupby("postcode_clean", sort=False) } logger.info(f"Total postcodes: {len(postcode_to_addresses)}") # Process each postcode group postcodes_processed = 0 addresses_processed = 0 uprns_found = 0 results_data = [] for postcode, postcode_rows in postcode_to_addresses.items(): logger.info( f"Processing postcode: {postcode} with {len(postcode_rows)} rows" ) # Validate postcode before processing if not is_valid_postcode(postcode): logger.warning(f"Postcode {postcode} is invalid, skipping") continue # Fetch EPC data once per postcode try: epc_df = get_epc_data_with_postcode(postcode=postcode) logger.info( f"Fetched {len(epc_df)} EPC records for postcode {postcode}" ) except Exception as e: logger.error( f"Failed to fetch EPC data for postcode {postcode}: {e}" ) continue # Process each address in this postcode with the same EPC data for row in postcode_rows: try: user_input = row.get("user_input", "") if not user_input: logger.warning( f"Skipping row with missing user_input for postcode {postcode}" ) continue # Get UPRN using the pre-fetched EPC data with all return options result = get_uprn_with_epc_df( user_inputed_address=user_input, epc_df=epc_df, ) # Parse result tuple if successful if result: uprn, found_address, score = result uprns_found += 1 logger.info( f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})" ) results_data.append( { **row, # Include all original data "uprn": uprn, "domna_found_address": found_address, "domna_lexiscore": score, } ) else: logger.warning( f"No UPRN found for {user_input} in {postcode}" ) results_data.append( { **row, # Include all original data "uprn": None, "domna_found_address": None, "domna_lexiscore": None, } ) addresses_processed += 1 except Exception as e: logger.error( f"Error processing address {row.get('user_input', 'unknown')}: {e}" ) # Still add the row with error markers results_data.append( { **row, "uprn": None, "domna_found_address": None, "domna_lexiscore": None, "error": str(e), } ) continue postcodes_processed += 1 # Create results DataFrame result_df = pd.DataFrame(results_data) # Save results to S3 try: save_results_to_s3(result_df, str(task_id), str(subtask_id)) except Exception as s3_error: logger.error(f"Failed to save results to S3: {s3_error}") results.append( { "subtask_id": str(subtask_id), "rows_processed": len(rows), "postcodes_processed": postcodes_processed, "addresses_processed": addresses_processed, "uprns_found": uprns_found, "status": "processed", } ) # Mark subtask as completed try: subtask_interface.update_subtask_status( subtask_id, "completed", outputs={"rows_processed": len(rows)} ) logger.info(f"Marked subtask {subtask_id} as completed") except Exception as db_error: logger.error(f"Failed to mark subtask as completed: {db_error}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON in request body: {e}") errors.append({"error": "Invalid JSON in request body", "details": str(e)}) # Mark subtask as failed if we have one if subtask_id: try: subtask_interface.update_subtask_status( subtask_id, "failed", outputs={"error": str(e)} ) except Exception as db_error: logger.error(f"Failed to update subtask status: {db_error}") except Exception as e: logger.error(f"Unexpected error processing record: {e}", exc_info=True) errors.append({"error": "Unexpected error", "details": str(e)}) # Mark subtask as failed if we have one if subtask_id: try: subtask_interface.update_subtask_status( subtask_id, "failed", outputs={"error": str(e)} ) except Exception as db_error: logger.error(f"Failed to update subtask status: {db_error}") # Return error if all records failed logger.info(results_data) logger.info(results) if errors and not results: return {"statusCode": 500, "body": json.dumps({"errors": errors})} return { "statusCode": 200, "body": json.dumps( {"processed": results, "errors": errors if errors else None} ), } # TODO: # Don't add results to return messages as its too verbose # capture the exepection as e, into s3, to find the logs go to s3 # Upload results to s3 as well as csv