diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 29c3c456..e4939836 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -24,9 +24,14 @@ def levenshtein(a: str, b: str) -> float: - Strongly penalise mismatched house/flat numbers - Combine token overlap + character similarity """ + + def extract_number_sequence(s: str) -> list[str]: + return re.findall(r"\d+[a-z]?", s) + def extract_numbers(s: str) -> Set[str]: - """Extract all numeric tokens (house numbers, flat numbers).""" - return set(re.findall(r"\d+[a-z]?", s)) + return set(extract_number_sequence(s)) + + def tokenise(s: str) -> Set[str]: return set(s.split()) @@ -38,10 +43,28 @@ def levenshtein(a: str, b: str) -> float: nums_a = extract_numbers(a_norm) nums_b = extract_numbers(b_norm) - if nums_a and nums_b and nums_a != nums_b: - # Different house/flat numbers → near impossible match + # No shared numbers at all → impossible match + if nums_a and nums_b and nums_a.isdisjoint(nums_b): return 0.0 + # --- order-sensitive flat/building guard --- + seq_a = extract_number_sequence(a_norm) + seq_b = extract_number_sequence(b_norm) + + has_flat_token_user = any(tok in a_norm for tok in ("flat", "apt", "apartment", "unit")) + has_flat_token_epc = "flat" in b_norm + + + if ( + len(seq_a) == 2 + and len(seq_b) >= 2 + and has_flat_token_epc + and not has_flat_token_user + and seq_a != seq_b[:2] + ): + return 0.0 + + # --- token similarity (order-independent) --- toks_a = tokenise(a_norm) toks_b = tokenise(b_norm) @@ -99,10 +122,12 @@ def normalise_address(s: str) -> str: "no": "", "no.": "", } - # 1. lowercase s = s.lower() + # 1.5 split digit-letter suffixes + s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s) + # 2. remove punctuation except / s = re.sub(r"[^\w\s/]", " ", s) @@ -281,7 +306,7 @@ def get_uprn(user_inputed_address: str, postcode: str): def test(a,b): - assert a == b, f"errr a {a} - {type(a)}, does not equal b {b} - {type(b)}" + assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" def run_all_test(): @@ -294,321 +319,17 @@ def run_all_test(): test(get_uprn("68", "b93 8sy"), "100070989938") test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") - test(get_uprn("28 A", "se6 4tf"), "100023278633") + test(get_uprn("28 A", "se6 4tf"), "100023278633") test(get_uprn("28A", "se6 4tf"), "100023278633") test(get_uprn("6 Aitken Close", "E8 4SQ"), False) - from epc_api.client import EpcClient -import os -from urllib.parse import urlencode -import pandas as pd -from difflib import SequenceMatcher -from tqdm import tqdm -import re -EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=") -client = EpcClient(auth_token=EPC_AUTH_TOKEN) - -import re -from difflib import SequenceMatcher -from typing import Set - - -def levenshtein(a: str, b: str) -> float: - """ - Address similarity score in [0, 1]. - - Strategy: - - Normalise - - Strongly penalise mismatched house/flat numbers - - Combine token overlap + character similarity - """ - def extract_numbers(s: str) -> Set[str]: - """Extract all numeric tokens (house numbers, flat numbers).""" - return set(re.findall(r"\d+[a-z]?", s)) - - def tokenise(s: str) -> Set[str]: - return set(s.split()) - - a_norm = normalise_address(a) - b_norm = normalise_address(b) - - # --- hard signal: numbers --- - nums_a = extract_numbers(a_norm) - nums_b = extract_numbers(b_norm) - - if nums_a and nums_b and nums_a != nums_b: - # Different house/flat numbers → near impossible match - return 0.0 - - # --- token similarity (order-independent) --- - toks_a = tokenise(a_norm) - toks_b = tokenise(b_norm) - - if not toks_a or not toks_b: - token_score = 0.0 - else: - token_score = len(toks_a & toks_b) / len(toks_a | toks_b) - - # --- character similarity (soft signal) --- - char_score = SequenceMatcher(None, a_norm, b_norm).ratio() - - # --- weighted blend --- - return round( - 0.65 * token_score + - 0.35 * char_score, - 4, - ) - - -def normalise_address(s: str) -> str: - """ - Canonical UK-focused address normalisation. - - - Lowercases - - Removes punctuation (keeps / for flats) - - Normalises whitespace - - Applies synonym compression at token level - """ - - if not s: - return "" - - ADDRESS_SYNONYMS = { - # street types - "rd": "road", - "rd.": "road", - "st": "street", - "st.": "street", - "ave": "avenue", - "ave.": "avenue", - "ln": "lane", - "ln.": "lane", - "cres": "crescent", - "ct": "court", - "dr": "drive", - - # flats / units - "apt": "flat", - "apartment": "flat", - "unit": "flat", - "ste": "suite", - - # numbering noise - "no": "", - "no.": "", - } - - # 1. lowercase - s = s.lower() - - # 2. remove punctuation except / - s = re.sub(r"[^\w\s/]", " ", s) - - # 3. normalise whitespace - s = re.sub(r"\s+", " ", s).strip() - - # 4. tokenise + synonym normalisation - tokens = [] - for tok in s.split(): - replacement = ADDRESS_SYNONYMS.get(tok, tok) - if replacement: - tokens.append(replacement) - - return " ".join(tokens) - - -def score_addresses( - df: pd.DataFrame, - user_address: str, - column: str = "address", -) -> pd.Series: - if column not in df.columns: - raise ValueError(f"Missing column: {column}") - - return df[column].apply( - lambda x: levenshtein(user_address, x) - ) - -def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): - """ - Recursively fetch EPC data by postcode. - If results hit the size limit, retry with double size up to max_attempts. - """ - - url = os.path.join(client.domestic.host, "search") - - if size: - url += "?" + urlencode({"size": size}) - - search_resp = client.domestic.call( - url=url, - method="get", - params={"postcode": postcode}, - ) - - results_df = pd.DataFrame( - search_resp["rows"], - columns=search_resp["column-names"] - ) - - row_count = len(results_df) - - # If we hit the size limit, there *may* be more results - if row_count == size: - print( - f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. " - f"Attempt {attempt}/{max_attempts}." - ) - - if attempt < max_attempts: - print(f"🔁 Retrying with size={size * 2}") - return get_epc_data_with_postcode( - postcode=postcode, - size=size * 2, - attempt=attempt + 1, - max_attempts=max_attempts, - ) - else: - print( - "🚨 Max attempts reached. Results may be truncated. " - "(Please do a manual review by the tech team.)" - ) - - return results_df - - -def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: - """ - Returns True if all non-null UPRNs in df match the given uprn. - Returns False otherwise. - """ - - if column not in df.columns: - return False - - # Drop nulls and normalise to string - uprns = ( - df[column] - .dropna() - .astype(str) - .str.strip() - .unique() - ) - - # No valid UPRNs to compare - if len(uprns) == 0: - return False - - # Exactly one unique UPRN and it matches - return len(uprns) == 1 and uprns[0] == str(uprn) - - -def get_uprn_candidates( - df: pd.DataFrame, - user_address: str, - address_column: str = "address", - uprn_column: str = "uprn", -) -> pd.DataFrame: - """ - Annotate EPC results with lexicographical similarity scores and ranks. - - Returns a DataFrame sorted by descending lexiscore. - DOES NOT choose or return a UPRN. - """ - - if address_column not in df.columns: - raise ValueError(f"Missing column: {address_column}") - - if uprn_column not in df.columns: - raise ValueError(f"Missing column: {uprn_column}") - - out = df.copy() - - user_norm = normalise_address(user_address) - - out["lexiscore"] = out[address_column].apply( - lambda x: levenshtein(user_norm, x) - ) - - # Normalise UPRN to string - out[uprn_column] = ( - out[uprn_column] - .astype(str) - .str.replace(r"\.0$", "", regex=True) - ) - - # Rank: 1 = best match - out["lexirank"] = ( - out["lexiscore"] - .rank(method="dense", ascending=False) - .astype(int) - ) - - return out.sort_values( - ["lexirank", "lexiscore"], - ascending=[True, False], - ) - - -def get_uprn(user_inputed_address: str, postcode: str): - df = get_epc_data_with_postcode(postcode=postcode) - - if df.empty: - return False - - scored_df = get_uprn_candidates( - df, - user_address=user_inputed_address, - ) - - # Best score - best_score = scored_df.iloc[0]["lexiscore"] - - if best_score <= 0: - return False - - # All rank-1 rows (possible draw) - top_rank_df = scored_df[scored_df["lexirank"] == 1] - - # If rank-1 rows do not agree on a single UPRN → ambiguous - if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]): - return False - - # Safe to return the agreed UPRN - return top_rank_df.iloc[0]["uprn"] - - -def test(a,b): - assert a == b, f"errr a {a} - {type(a)}, does not equal b {b} - {type(b)}" - - -def run_all_test(): - # Basic usage with different post codes styles - test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) - test(get_epc_data_with_postcode("B938sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) - - test(get_uprn("68", "b93 8sy"), "100070989938") - test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") - test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") - test(get_uprn("28 A", "se6 4tf"), "100023278633") - test(get_uprn("28A", "se6 4tf"), "100023278633") - test(get_uprn("6 Aitken Close", "E8 4SQ"), False) - - - get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate") - - # TODO - # Lets write some tests with hackney and then peabody data - - - - get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate") - - # TODO - # Lets write some tests with hackney and then peabody data + # unique case + test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198") + test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198") + test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198" ) + test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) + test(get_uprn("1 Semley Gate", "e9 5nh"), "10008238188") # this one return "flat 1, in 1 semley gate" if __name__ == "__main__": @@ -706,7 +427,6 @@ if __name__ == "__main__": ) - # TO do function dispatcher, # get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate) diff --git a/backend/address2UPRN/script.py b/backend/address2UPRN/script.py new file mode 100644 index 00000000..bd8f8017 --- /dev/null +++ b/backend/address2UPRN/script.py @@ -0,0 +1,17 @@ +import pandas as pd + + +# use Address 1 +junte_df = pd.read_excel("hackney_uprn_failures.xlsx") + + +# use domna_address_1 +khalim_df = pd.read_excel("khalim_standard.xlsx") + + +combined_df = junte_df.merge(khalim_df, how="left", left_on="Address 1", right_on='domna_address_1') + +# Find the row in khalim_df that does not app + +result = combined_df[~pd.isnull(combined_df["epc_os_uprn"])] + diff --git a/backend/postcode_splitter/hackney.xlsx b/backend/postcode_splitter/hackney.xlsx new file mode 100644 index 00000000..b6d3786e Binary files /dev/null and b/backend/postcode_splitter/hackney.xlsx differ diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py new file mode 100644 index 00000000..fc60b658 --- /dev/null +++ b/backend/postcode_splitter/main.py @@ -0,0 +1,81 @@ +import pandas as pd +import requests + + + + +def sanitise_postcode(postcode: str) -> str | None: + """ + Normalise postcode for grouping. + + - Uppercase + - Remove all whitespace + """ + if pd.isna(postcode): + return None + + return postcode.upper().replace(" ", "") + + +def is_valid_postcode(postcode_clean: str) -> bool: + """ + Validate postcode using postcodes.io. + + Expects a sanitised postcode (e.g. E84SQ). + Returns True if valid, False otherwise. + """ + POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" + if not postcode_clean: + return False + + try: + resp = requests.get( + POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), + timeout=5, + ) + resp.raise_for_status() + return resp.json().get("result", False) + except requests.RequestException: + # Network issues, rate limits, etc. + return False + + +def main(): + df = pd.read_excel("hackney.xlsx") + + # Sanitise postcodes + df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode) + + # --- validate AFTER grouping (save API calls) --- + + # Get unique, non-null postcodes + unique_postcodes = ( + df["postcode_clean"] + .dropna() + .unique() + ) + + # Validate each postcode once + postcode_validity = { + pc: is_valid_postcode(pc) + for pc in unique_postcodes + } + + # Map validity back onto dataframe + df["postcode_valid"] = df["postcode_clean"].map(postcode_validity) + + # Group only valid postcodes + grouped = ( + df[df["postcode_valid"]] + .groupby("postcode_clean") + ) + + # Example: count addresses per postcode + postcode_counts = grouped.size().sort_values(ascending=False) + + for pc in sorted(unique_postcodes): + pc_df = df[df["postcode_clean"] == pc] + pd_df + +if __name__ == "__main__": + main()