diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 1d8a9b68..29c3c456 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -3,10 +3,11 @@ import os from urllib.parse import urlencode import pandas as pd from difflib import SequenceMatcher +from tqdm import tqdm import re -EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=") client = EpcClient(auth_token=EPC_AUTH_TOKEN) import re @@ -292,30 +293,424 @@ def run_all_test(): test(get_uprn("68", "b93 8sy"), "100070989938") test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") + test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") + test(get_uprn("28 A", "se6 4tf"), "100023278633") + test(get_uprn("28A", "se6 4tf"), "100023278633") + test(get_uprn("6 Aitken Close", "E8 4SQ"), False) + from epc_api.client import EpcClient +import os +from urllib.parse import urlencode +import pandas as pd +from difflib import SequenceMatcher +from tqdm import tqdm - get_uprn_candidates(get_epc_data_with_postcode("b93 8sy"), "68") - get_uprn_candidates(get_epc_data_with_postcode("se6 4tf"), "Flat A, 28, Nelgarde Road") - get_uprn_candidates(get_epc_data_with_postcode("se6 4tf"), "28 A") - get_uprn_candidates(get_epc_data_with_postcode("se6 4tf"), "A 28") - get_uprn_candidates(get_epc_data_with_postcode("se6 4tf"), "28A") - get_uprn_candidates(get_epc_data_with_postcode("se6 4tf"), "A28") +import re - get_uprn_candidates(get_epc_data_with_postcode("E8 4SQ"), "6 Aitken Close") # no epc +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=") +client = EpcClient(auth_token=EPC_AUTH_TOKEN) + +import re +from difflib import SequenceMatcher +from typing import Set + + +def levenshtein(a: str, b: str) -> float: + """ + Address similarity score in [0, 1]. + + Strategy: + - Normalise + - Strongly penalise mismatched house/flat numbers + - Combine token overlap + character similarity + """ + def extract_numbers(s: str) -> Set[str]: + """Extract all numeric tokens (house numbers, flat numbers).""" + return set(re.findall(r"\d+[a-z]?", s)) + + def tokenise(s: str) -> Set[str]: + return set(s.split()) + + a_norm = normalise_address(a) + b_norm = normalise_address(b) + + # --- hard signal: numbers --- + nums_a = extract_numbers(a_norm) + nums_b = extract_numbers(b_norm) + + if nums_a and nums_b and nums_a != nums_b: + # Different house/flat numbers → near impossible match + return 0.0 + + # --- token similarity (order-independent) --- + toks_a = tokenise(a_norm) + toks_b = tokenise(b_norm) + + if not toks_a or not toks_b: + token_score = 0.0 + else: + token_score = len(toks_a & toks_b) / len(toks_a | toks_b) + + # --- character similarity (soft signal) --- + char_score = SequenceMatcher(None, a_norm, b_norm).ratio() + + # --- weighted blend --- + return round( + 0.65 * token_score + + 0.35 * char_score, + 4, + ) + + +def normalise_address(s: str) -> str: + """ + Canonical UK-focused address normalisation. + + - Lowercases + - Removes punctuation (keeps / for flats) + - Normalises whitespace + - Applies synonym compression at token level + """ + + if not s: + return "" + + ADDRESS_SYNONYMS = { + # street types + "rd": "road", + "rd.": "road", + "st": "street", + "st.": "street", + "ave": "avenue", + "ave.": "avenue", + "ln": "lane", + "ln.": "lane", + "cres": "crescent", + "ct": "court", + "dr": "drive", + + # flats / units + "apt": "flat", + "apartment": "flat", + "unit": "flat", + "ste": "suite", + + # numbering noise + "no": "", + "no.": "", + } + + # 1. lowercase + s = s.lower() + + # 2. remove punctuation except / + s = re.sub(r"[^\w\s/]", " ", s) + + # 3. normalise whitespace + s = re.sub(r"\s+", " ", s).strip() + + # 4. tokenise + synonym normalisation + tokens = [] + for tok in s.split(): + replacement = ADDRESS_SYNONYMS.get(tok, tok) + if replacement: + tokens.append(replacement) + + return " ".join(tokens) + + +def score_addresses( + df: pd.DataFrame, + user_address: str, + column: str = "address", +) -> pd.Series: + if column not in df.columns: + raise ValueError(f"Missing column: {column}") + + return df[column].apply( + lambda x: levenshtein(user_address, x) + ) + +def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): + """ + Recursively fetch EPC data by postcode. + If results hit the size limit, retry with double size up to max_attempts. + """ + + url = os.path.join(client.domestic.host, "search") + + if size: + url += "?" + urlencode({"size": size}) + + search_resp = client.domestic.call( + url=url, + method="get", + params={"postcode": postcode}, + ) + + results_df = pd.DataFrame( + search_resp["rows"], + columns=search_resp["column-names"] + ) + + row_count = len(results_df) + + # If we hit the size limit, there *may* be more results + if row_count == size: + print( + f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. " + f"Attempt {attempt}/{max_attempts}." + ) + + if attempt < max_attempts: + print(f"🔁 Retrying with size={size * 2}") + return get_epc_data_with_postcode( + postcode=postcode, + size=size * 2, + attempt=attempt + 1, + max_attempts=max_attempts, + ) + else: + print( + "🚨 Max attempts reached. Results may be truncated. " + "(Please do a manual review by the tech team.)" + ) + + return results_df + + +def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: + """ + Returns True if all non-null UPRNs in df match the given uprn. + Returns False otherwise. + """ + + if column not in df.columns: + return False + + # Drop nulls and normalise to string + uprns = ( + df[column] + .dropna() + .astype(str) + .str.strip() + .unique() + ) + + # No valid UPRNs to compare + if len(uprns) == 0: + return False + + # Exactly one unique UPRN and it matches + return len(uprns) == 1 and uprns[0] == str(uprn) + + +def get_uprn_candidates( + df: pd.DataFrame, + user_address: str, + address_column: str = "address", + uprn_column: str = "uprn", +) -> pd.DataFrame: + """ + Annotate EPC results with lexicographical similarity scores and ranks. + + Returns a DataFrame sorted by descending lexiscore. + DOES NOT choose or return a UPRN. + """ + + if address_column not in df.columns: + raise ValueError(f"Missing column: {address_column}") + + if uprn_column not in df.columns: + raise ValueError(f"Missing column: {uprn_column}") + + out = df.copy() + + user_norm = normalise_address(user_address) + + out["lexiscore"] = out[address_column].apply( + lambda x: levenshtein(user_norm, x) + ) + + # Normalise UPRN to string + out[uprn_column] = ( + out[uprn_column] + .astype(str) + .str.replace(r"\.0$", "", regex=True) + ) + + # Rank: 1 = best match + out["lexirank"] = ( + out["lexiscore"] + .rank(method="dense", ascending=False) + .astype(int) + ) + + return out.sort_values( + ["lexirank", "lexiscore"], + ascending=[True, False], + ) + + +def get_uprn(user_inputed_address: str, postcode: str): + df = get_epc_data_with_postcode(postcode=postcode) + + if df.empty: + return False + + scored_df = get_uprn_candidates( + df, + user_address=user_inputed_address, + ) + + # Best score + best_score = scored_df.iloc[0]["lexiscore"] + + if best_score <= 0: + return False + + # All rank-1 rows (possible draw) + top_rank_df = scored_df[scored_df["lexirank"] == 1] + + # If rank-1 rows do not agree on a single UPRN → ambiguous + if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]): + return False + + # Safe to return the agreed UPRN + return top_rank_df.iloc[0]["uprn"] + + +def test(a,b): + assert a == b, f"errr a {a} - {type(a)}, does not equal b {b} - {type(b)}" + + +def run_all_test(): + # Basic usage with different post codes styles + test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) + test(get_epc_data_with_postcode("B938sy").shape[0], 63) + test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) + test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) + + test(get_uprn("68", "b93 8sy"), "100070989938") + test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") + test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") + test(get_uprn("28 A", "se6 4tf"), "100023278633") + test(get_uprn("28A", "se6 4tf"), "100023278633") + test(get_uprn("6 Aitken Close", "E8 4SQ"), False) - - # # Example of more than one results for the same address - # test(get_uprn("se6 4tf", house_number="flat A 28"), "100023278633") - # test(get_uprn("se6 4tf", house_number="28 A"), "100023278633") - # test(get_uprn("se6 4tf", house_number="A 28"), "100023278633") - # test(get_uprn("se6 4tf", house_number="A28"), "100023278633") # this one - # test(get_uprn("se6 4tf", house_number="28A"), "100023278633") # investigate this one + get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate") - # # Example of flats that have different uprn - # test(get_uprn("se6 4tf", house_number="28"), "100023278633") + # TODO + # Lets write some tests with hackney and then peabody data - # house number nlp, address1 + + + get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate") - # get postcode - # make input data with peabody with 3 postcode so i have sample of iput - + # TODO + # Lets write some tests with hackney and then peabody data + + +if __name__ == "__main__": + INPUT_FILE = "hackney.xlsx" + + ADDRESS_COL = "Address 1" + POSTCODE_COL = "Postcode" + UPRN_COL = "UPRN" + + df = pd.read_excel(INPUT_FILE) + + failures = [] + + for _, row in tqdm( + df.iterrows(), + total=len(df), + desc="Auditing UPRNs", + ): + input_address = str(row[ADDRESS_COL]).strip() + postcode = str(row[POSTCODE_COL]).strip() + + expected_uprn = ( + None + if pd.isna(row[UPRN_COL]) + else str(int(row[UPRN_COL])) + ) + + try: + epc_df = get_epc_data_with_postcode(postcode) + + if epc_df.empty: + failures.append({ + **row.to_dict(), + "found_uprn": None, + "best_match_uprn": None, + "best_match_address": None, + "best_match_lexiscore": None, + "status": "no_epc_results", + }) + continue + + scored_df = get_uprn_candidates( + epc_df, + user_address=input_address, + ) + + best_row = scored_df.iloc[0] + + best_match_uprn = str(best_row["uprn"]) + best_match_address = best_row["address"] + best_match_lexiscore = round(float(best_row["lexiscore"]), 4) + + found_uprn = get_uprn(input_address, postcode) + + except Exception as e: + failures.append({ + **row.to_dict(), + "found_uprn": None, + "best_match_uprn": None, + "best_match_address": None, + "best_match_lexiscore": None, + "status": "exception", + "error": str(e), + }) + continue + + found_uprn_norm = ( + None if not found_uprn else str(found_uprn) + ) + + if found_uprn_norm != expected_uprn: + failures.append({ + **row.to_dict(), + "found_uprn": found_uprn_norm, + "best_match_uprn": best_match_uprn, + "best_match_address": best_match_address, + "best_match_lexiscore": best_match_lexiscore, + "status": ( + "no_match" + if found_uprn_norm is None + else "mismatch" + ), + }) + + failures_df = pd.DataFrame(failures) + + print("===================================") + print(f"Total rows : {len(df)}") + print(f"Failures : {len(failures_df)}") + print("===================================") + + failures_df.to_excel( + "hackney_uprn_failures.xlsx", + index=False, + ) + + + +# TO do function dispatcher, + +# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate) +# fix that +# Look again at flat 1 +# pandas reader the seperate postcode_splitter +# dump into s3 \ No newline at end of file