from epc_api.client import EpcClient import os from urllib.parse import urlencode import pandas as pd from difflib import SequenceMatcher from tqdm import tqdm from utils.logger import setup_logger logger = setup_logger() import re EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", ) if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") import re from difflib import SequenceMatcher from typing import Set def levenshtein(a: str, b: str) -> float: """ Address similarity score in [0, 1]. Strategy: - Normalise - Strongly penalise mismatched house/flat numbers - Combine token overlap + character similarity """ def extract_number_sequence(s: str) -> list[str]: return re.findall(r"\d+[a-z]?", s) def extract_numbers(s: str) -> Set[str]: return set(extract_number_sequence(s)) def tokenise(s: str) -> Set[str]: return set(s.split()) def extract_building_number(s: str) -> str | None: """ Extract the main building number (NOT flat/unit). Assumes formats like: - '42 moreton road' - 'flat 3 42 moreton road' """ tokens = s.split() # remove flat/unit context cleaned = [] skip_next = False for t in tokens: if t in ("flat", "apt", "apartment", "unit"): skip_next = True continue if skip_next: skip_next = False continue cleaned.append(t) # first remaining number is building number for t in cleaned: if re.fullmatch(r"\d+[a-z]?", t): return t return None a_norm = normalise_address(a) b_norm = normalise_address(b) # --- hard signal: numbers --- nums_a = extract_numbers(a_norm) nums_b = extract_numbers(b_norm) if nums_a and not nums_b: return 0.0 # No shared numbers at all → impossible match if nums_a and nums_b and nums_a.isdisjoint(nums_b): return 0.0 # 🔒 HARD GUARD: building number must match bld_a = extract_building_number(a_norm) bld_b = extract_building_number(b_norm) if bld_a and bld_b and bld_a != bld_b: return 0.0 # --- order-sensitive flat/building guard --- seq_a = extract_number_sequence(a_norm) seq_b = extract_number_sequence(b_norm) has_flat_token_user = any( tok in a_norm for tok in ("flat", "apt", "apartment", "unit") ) has_flat_token_epc = "flat" in b_norm if ( len(seq_a) == 2 and len(seq_b) >= 2 and has_flat_token_epc and not has_flat_token_user and seq_a != seq_b[:2] ): return 0.0 # --- token similarity (order-independent) --- toks_a = tokenise(a_norm) toks_b = tokenise(b_norm) if not toks_a or not toks_b: token_score = 0.0 else: token_score = len(toks_a & toks_b) / len(toks_a | toks_b) # --- character similarity (soft signal) --- char_score = SequenceMatcher(None, a_norm, b_norm).ratio() # --- weighted blend --- return round( 0.65 * token_score + 0.35 * char_score, 4, ) def normalise_address(s: str) -> str: """ Canonical UK-focused address normalisation. - Lowercases - Removes punctuation (keeps / for flats) - Normalises whitespace - Applies synonym compression at token level """ if not s: return "" ADDRESS_SYNONYMS = { # street types "rd": "road", "rd.": "road", "st": "street", "st.": "street", "ave": "avenue", "ave.": "avenue", "ln": "lane", "ln.": "lane", "cres": "crescent", "ct": "court", "dr": "drive", # flats / units "apt": "flat", "apartment": "flat", "unit": "flat", "ste": "suite", # numbering noise "no": "", "no.": "", } # 1. lowercase s = s.lower() # 1.5 split digit-letter suffixes s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s) # 2. remove punctuation except / s = re.sub(r"[^\w\s/]", " ", s) # 3. normalise whitespace s = re.sub(r"\s+", " ", s).strip() # 4. tokenise + synonym normalisation tokens = [] for tok in s.split(): replacement = ADDRESS_SYNONYMS.get(tok, tok) if replacement: tokens.append(replacement) return " ".join(tokens) def score_addresses( df: pd.DataFrame, user_address: str, column: str = "address", ) -> pd.Series: if column not in df.columns: raise ValueError(f"Missing column: {column}") return df[column].apply(lambda x: levenshtein(user_address, x)) def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): """ Recursively fetch EPC data by postcode. If results hit the size limit, retry with double size up to max_attempts. """ client = EpcClient(auth_token=EPC_AUTH_TOKEN) url = os.path.join(client.domestic.host, "search") if size: url += "?" + urlencode({"size": size}) search_resp = client.domestic.call( url=url, method="get", params={"postcode": postcode}, ) if not search_resp or "rows" not in search_resp: return pd.DataFrame() results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"]) row_count = len(results_df) # If we hit the size limit, there *may* be more results if row_count == size: print( f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. " f"Attempt {attempt}/{max_attempts}." ) if attempt < max_attempts: print(f"🔁 Retrying with size={size * 2}") return get_epc_data_with_postcode( postcode=postcode, size=size * 2, attempt=attempt + 1, max_attempts=max_attempts, ) else: print( "🚨 Max attempts reached. Results may be truncated. " "(Please do a manual review by the tech team.)" ) return results_df def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: """ Returns True if all non-null UPRNs in df match the given uprn. Returns False otherwise. """ if column not in df.columns: return False # Drop nulls and normalise to string uprns = df[column].dropna().astype(str).str.strip().unique() # No valid UPRNs to compare if len(uprns) == 0: return False # Exactly one unique UPRN and it matches return len(uprns) == 1 and uprns[0] == str(uprn) def get_uprn_candidates( df: pd.DataFrame, user_address: str, address_column: str = "address", uprn_column: str = "uprn", ) -> pd.DataFrame: """ Annotate EPC results with lexicographical similarity scores and ranks. Returns a DataFrame sorted by descending lexiscore. DOES NOT choose or return a UPRN. """ if address_column not in df.columns: raise ValueError(f"Missing column: {address_column}") if uprn_column not in df.columns: raise ValueError(f"Missing column: {uprn_column}") out = df.copy() user_norm = normalise_address(user_address) out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x)) # Normalise UPRN to string out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) # Rank: 1 = best match out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) return out.sort_values( ["lexirank", "lexiscore"], ascending=[True, False], ) def get_uprn(user_inputed_address: str, postcode: str, return_address=False): """ Return uprn (str) Return False if failed to find a sensible matching epc Return Nons when epc found but no UPRN """ df = get_epc_data_with_postcode(postcode=postcode) if df.empty: return None scored_df = get_uprn_candidates( df, user_address=user_inputed_address, ) # Best score best_score = scored_df.iloc[0]["lexiscore"] if best_score <= 0: return None # All rank-1 rows (possible draw) top_rank_df = scored_df[scored_df["lexirank"] == 1] # If rank-1 rows do not agree on a single UPRN → ambiguous if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]): return None address = top_rank_df["address"].values[0] lexiscore = float(top_rank_df["lexiscore"].values[0]) logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}") # Safe to return the agreed UPRN found_uprn = top_rank_df.iloc[0]["uprn"] if found_uprn == "": return None if return_address: return found_uprn, address return found_uprn def resolve_uprns_for_postcode_group( group_df: pd.DataFrame, epc_df: pd.DataFrame, address_col: str = "Address 1", ) -> pd.DataFrame: """ Given: - group_df: rows sharing the same postcode - epc_df: EPC search results for that postcode Returns: group_df + found_uprn + diagnostics """ results = [] for _, row in group_df.iterrows(): user_address = str(row[address_col]).strip() scored_df = get_uprn_candidates( epc_df, user_address=user_address, ) if scored_df.empty: results.append( { "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": None, "status": "no_epc_candidates", } ) continue best_score = scored_df.iloc[0]["lexiscore"] if best_score <= 0: results.append( { "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": best_score, "status": "zero_score", } ) continue top_rank_df = scored_df[scored_df["lexirank"] == 1] if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]): results.append( { "found_uprn": None, "best_match_uprn": top_rank_df.iloc[0]["uprn"], "best_match_address": top_rank_df.iloc[0]["address"], "best_match_lexiscore": best_score, "status": "ambiguous", } ) continue results.append( { "found_uprn": str(top_rank_df.iloc[0]["uprn"]), "best_match_uprn": str(top_rank_df.iloc[0]["uprn"]), "best_match_address": top_rank_df.iloc[0]["address"], "best_match_lexiscore": best_score, "status": "matched", } ) return pd.concat( [group_df.reset_index(drop=True), pd.DataFrame(results)], axis=1, ) def test(a, b): assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" def run_all_test(): # Basic usage with different post codes styles test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) test(get_epc_data_with_postcode("B938sy").shape[0], 63) test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) test(get_uprn("68", "b93 8sy"), "100070989938") test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") test(get_uprn("28 A", "se6 4tf"), "100023278633") test(get_uprn("28A", "se6 4tf"), "100023278633") test(get_uprn("6 Aitken Close", "E8 4SQ"), False) # unique case test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198") test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) test( get_uprn("1 Semley Gate", "e9 5nh"), "10008238188" ) # this one return "flat 1, in 1 semley gate" test( get_uprn("48 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" test( get_uprn("42 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" test( get_uprn("46 Oswald Street", "E5 0BT"), False ) # this one return "flat 1, in 1 semley gate" get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street") get_uprn_candidates( get_epc_data_with_postcode("Cr2 7dl"), "FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY", ) if __name__ == "__main__": INPUT_FILE = "hackney.xlsx" ADDRESS_COL = "Address 1" POSTCODE_COL = "Postcode" UPRN_COL = "UPRN" df = pd.read_excel(INPUT_FILE) failures = [] for _, row in tqdm( df.iterrows(), total=len(df), desc="Auditing UPRNs", ): input_address = str(row[ADDRESS_COL]).strip() postcode = str(row[POSTCODE_COL]).strip() expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL])) try: epc_df = get_epc_data_with_postcode(postcode) if epc_df.empty: failures.append( { **row.to_dict(), "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": None, "status": "no_epc_results", } ) continue scored_df = get_uprn_candidates( epc_df, user_address=input_address, ) best_row = scored_df.iloc[0] best_match_uprn = str(best_row["uprn"]) best_match_address = best_row["address"] best_match_lexiscore = round(float(best_row["lexiscore"]), 4) found_uprn = get_uprn(input_address, postcode) except Exception as e: failures.append( { **row.to_dict(), "found_uprn": None, "best_match_uprn": None, "best_match_address": None, "best_match_lexiscore": None, "status": "exception", "error": str(e), } ) continue found_uprn_norm = None if not found_uprn else str(found_uprn) if found_uprn_norm != expected_uprn: failures.append( { **row.to_dict(), "found_uprn": found_uprn_norm, "best_match_uprn": best_match_uprn, "best_match_address": best_match_address, "best_match_lexiscore": best_match_lexiscore, "status": ("no_match" if found_uprn_norm is None else "mismatch"), } ) failures_df = pd.DataFrame(failures) print("===================================") print(f"Total rows : {len(df)}") print(f"Failures : {len(failures_df)}") print("===================================") failures_df.to_excel( "hackney_uprn_failures.xlsx", index=False, ) def handler(event, context): print("hello world") return {"statusCode": 200, "body": "hello world"} # TO do function dispatcher, # get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate) # fix that # Look again at flat 1 # pandas reader the seperate postcode_splitter # dump into s3