diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index e4939836..406a8ffb 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -42,6 +42,9 @@ def levenshtein(a: str, b: str) -> float: # --- hard signal: numbers --- nums_a = extract_numbers(a_norm) nums_b = extract_numbers(b_norm) + + if nums_a and not nums_b: + return 0.0 # No shared numbers at all → impossible match if nums_a and nums_b and nums_a.isdisjoint(nums_b): @@ -304,6 +307,78 @@ def get_uprn(user_inputed_address: str, postcode: str): # Safe to return the agreed UPRN return top_rank_df.iloc[0]["uprn"] +def resolve_uprns_for_postcode_group( + group_df: pd.DataFrame, + epc_df: pd.DataFrame, + address_col: str = "Address 1", +) -> pd.DataFrame: + """ + Given: + - group_df: rows sharing the same postcode + - epc_df: EPC search results for that postcode + + Returns: + group_df + found_uprn + diagnostics + """ + + results = [] + + for _, row in group_df.iterrows(): + user_address = str(row[address_col]).strip() + + scored_df = get_uprn_candidates( + epc_df, + user_address=user_address, + ) + + if scored_df.empty: + results.append({ + "found_uprn": None, + "best_match_uprn": None, + "best_match_address": None, + "best_match_lexiscore": None, + "status": "no_epc_candidates", + }) + continue + + best_score = scored_df.iloc[0]["lexiscore"] + + if best_score <= 0: + results.append({ + "found_uprn": None, + "best_match_uprn": None, + "best_match_address": None, + "best_match_lexiscore": best_score, + "status": "zero_score", + }) + continue + + top_rank_df = scored_df[scored_df["lexirank"] == 1] + + if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]): + results.append({ + "found_uprn": None, + "best_match_uprn": top_rank_df.iloc[0]["uprn"], + "best_match_address": top_rank_df.iloc[0]["address"], + "best_match_lexiscore": best_score, + "status": "ambiguous", + }) + continue + + results.append({ + "found_uprn": str(top_rank_df.iloc[0]["uprn"]), + "best_match_uprn": str(top_rank_df.iloc[0]["uprn"]), + "best_match_address": top_rank_df.iloc[0]["address"], + "best_match_lexiscore": best_score, + "status": "matched", + }) + + return pd.concat( + [group_df.reset_index(drop=True), pd.DataFrame(results)], + axis=1, + ) + + def test(a,b): assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" @@ -330,6 +405,10 @@ def run_all_test(): test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198" ) test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) test(get_uprn("1 Semley Gate", "e9 5nh"), "10008238188") # this one return "flat 1, in 1 semley gate" + test(get_uprn("48 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate" + test(get_uprn("42 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate" + test(get_uprn("46 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate" + get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street") if __name__ == "__main__": diff --git a/backend/postcode_splitter/hackney.xlsx b/backend/postcode_splitter/hackney.xlsx index b6d3786e..64892f3a 100644 Binary files a/backend/postcode_splitter/hackney.xlsx and b/backend/postcode_splitter/hackney.xlsx differ diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index fc60b658..8ae91684 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -1,6 +1,7 @@ import pandas as pd import requests - +from backend.address2UPRN.main import resolve_uprns_for_postcode_group, get_epc_data_with_postcode +from tqdm import tqdm @@ -41,7 +42,8 @@ def is_valid_postcode(postcode_clean: str) -> bool: def main(): - df = pd.read_excel("hackney.xlsx") + df = pd.read_excel("hackney.xlsx", sheet_name="Sustainability") + df = df.head(500) # Sanitise postcodes df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode) @@ -55,27 +57,49 @@ def main(): .unique() ) - # Validate each postcode once + # Validate each postcode once, TODOadd a progress bar postcode_validity = { pc: is_valid_postcode(pc) - for pc in unique_postcodes + for pc in tqdm(unique_postcodes, total=len(unique_postcodes)) } # Map validity back onto dataframe df["postcode_valid"] = df["postcode_clean"].map(postcode_validity) - # Group only valid postcodes - grouped = ( - df[df["postcode_valid"]] - .groupby("postcode_clean") - ) - # Example: count addresses per postcode - postcode_counts = grouped.size().sort_values(ascending=False) + results = [] - for pc in sorted(unique_postcodes): - pc_df = df[df["postcode_clean"] == pc] - pd_df + for postcode, group_df in tqdm( + df[df["postcode_valid"]].groupby("postcode_clean"), + desc="Resolving UPRNs by postcode", + ): + try: + epc_df = get_epc_data_with_postcode(postcode) + + if epc_df.empty: + tmp = group_df.copy() + tmp["found_uprn"] = None + tmp["status"] = "no_epc_results" + results.append(tmp) + continue + + resolved = resolve_uprns_for_postcode_group( + group_df=group_df, + epc_df=epc_df, + ) + + results.append(resolved) + + except Exception as e: + tmp = group_df.copy() + tmp["found_uprn"] = None + tmp["status"] = "exception" + tmp["error"] = str(e) + results.append(tmp) + + final_df = pd.concat(results, ignore_index=True) + a = final_df[["best_match_lexiscore","Address 1", "best_match_address", "Postcode", "UPRN", "best_match_uprn"]] # add levi score to viewing + b = final_df[final_df["best_match_lexiscore"]>0] # add levi score to viewing if __name__ == "__main__": main()