mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
address 2 uprn
This commit is contained in:
parent
589f4a7961
commit
e1188ebc18
4 changed files with 136 additions and 318 deletions
|
|
@ -24,9 +24,14 @@ def levenshtein(a: str, b: str) -> float:
|
|||
- Strongly penalise mismatched house/flat numbers
|
||||
- Combine token overlap + character similarity
|
||||
"""
|
||||
|
||||
def extract_number_sequence(s: str) -> list[str]:
|
||||
return re.findall(r"\d+[a-z]?", s)
|
||||
|
||||
def extract_numbers(s: str) -> Set[str]:
|
||||
"""Extract all numeric tokens (house numbers, flat numbers)."""
|
||||
return set(re.findall(r"\d+[a-z]?", s))
|
||||
return set(extract_number_sequence(s))
|
||||
|
||||
|
||||
|
||||
def tokenise(s: str) -> Set[str]:
|
||||
return set(s.split())
|
||||
|
|
@ -38,10 +43,28 @@ def levenshtein(a: str, b: str) -> float:
|
|||
nums_a = extract_numbers(a_norm)
|
||||
nums_b = extract_numbers(b_norm)
|
||||
|
||||
if nums_a and nums_b and nums_a != nums_b:
|
||||
# Different house/flat numbers → near impossible match
|
||||
# No shared numbers at all → impossible match
|
||||
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
|
||||
return 0.0
|
||||
|
||||
# --- order-sensitive flat/building guard ---
|
||||
seq_a = extract_number_sequence(a_norm)
|
||||
seq_b = extract_number_sequence(b_norm)
|
||||
|
||||
has_flat_token_user = any(tok in a_norm for tok in ("flat", "apt", "apartment", "unit"))
|
||||
has_flat_token_epc = "flat" in b_norm
|
||||
|
||||
|
||||
if (
|
||||
len(seq_a) == 2
|
||||
and len(seq_b) >= 2
|
||||
and has_flat_token_epc
|
||||
and not has_flat_token_user
|
||||
and seq_a != seq_b[:2]
|
||||
):
|
||||
return 0.0
|
||||
|
||||
|
||||
# --- token similarity (order-independent) ---
|
||||
toks_a = tokenise(a_norm)
|
||||
toks_b = tokenise(b_norm)
|
||||
|
|
@ -99,10 +122,12 @@ def normalise_address(s: str) -> str:
|
|||
"no": "",
|
||||
"no.": "",
|
||||
}
|
||||
|
||||
# 1. lowercase
|
||||
s = s.lower()
|
||||
|
||||
# 1.5 split digit-letter suffixes
|
||||
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
|
||||
|
||||
# 2. remove punctuation except /
|
||||
s = re.sub(r"[^\w\s/]", " ", s)
|
||||
|
||||
|
|
@ -281,7 +306,7 @@ def get_uprn(user_inputed_address: str, postcode: str):
|
|||
|
||||
|
||||
def test(a,b):
|
||||
assert a == b, f"errr a {a} - {type(a)}, does not equal b {b} - {type(b)}"
|
||||
assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}"
|
||||
|
||||
|
||||
def run_all_test():
|
||||
|
|
@ -294,321 +319,17 @@ def run_all_test():
|
|||
test(get_uprn("68", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28 A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28 A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
|
||||
from epc_api.client import EpcClient
|
||||
import os
|
||||
from urllib.parse import urlencode
|
||||
import pandas as pd
|
||||
from difflib import SequenceMatcher
|
||||
from tqdm import tqdm
|
||||
|
||||
import re
|
||||
|
||||
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=")
|
||||
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
|
||||
|
||||
import re
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Set
|
||||
|
||||
|
||||
def levenshtein(a: str, b: str) -> float:
|
||||
"""
|
||||
Address similarity score in [0, 1].
|
||||
|
||||
Strategy:
|
||||
- Normalise
|
||||
- Strongly penalise mismatched house/flat numbers
|
||||
- Combine token overlap + character similarity
|
||||
"""
|
||||
def extract_numbers(s: str) -> Set[str]:
|
||||
"""Extract all numeric tokens (house numbers, flat numbers)."""
|
||||
return set(re.findall(r"\d+[a-z]?", s))
|
||||
|
||||
def tokenise(s: str) -> Set[str]:
|
||||
return set(s.split())
|
||||
|
||||
a_norm = normalise_address(a)
|
||||
b_norm = normalise_address(b)
|
||||
|
||||
# --- hard signal: numbers ---
|
||||
nums_a = extract_numbers(a_norm)
|
||||
nums_b = extract_numbers(b_norm)
|
||||
|
||||
if nums_a and nums_b and nums_a != nums_b:
|
||||
# Different house/flat numbers → near impossible match
|
||||
return 0.0
|
||||
|
||||
# --- token similarity (order-independent) ---
|
||||
toks_a = tokenise(a_norm)
|
||||
toks_b = tokenise(b_norm)
|
||||
|
||||
if not toks_a or not toks_b:
|
||||
token_score = 0.0
|
||||
else:
|
||||
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
|
||||
|
||||
# --- character similarity (soft signal) ---
|
||||
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
|
||||
|
||||
# --- weighted blend ---
|
||||
return round(
|
||||
0.65 * token_score +
|
||||
0.35 * char_score,
|
||||
4,
|
||||
)
|
||||
|
||||
|
||||
def normalise_address(s: str) -> str:
|
||||
"""
|
||||
Canonical UK-focused address normalisation.
|
||||
|
||||
- Lowercases
|
||||
- Removes punctuation (keeps / for flats)
|
||||
- Normalises whitespace
|
||||
- Applies synonym compression at token level
|
||||
"""
|
||||
|
||||
if not s:
|
||||
return ""
|
||||
|
||||
ADDRESS_SYNONYMS = {
|
||||
# street types
|
||||
"rd": "road",
|
||||
"rd.": "road",
|
||||
"st": "street",
|
||||
"st.": "street",
|
||||
"ave": "avenue",
|
||||
"ave.": "avenue",
|
||||
"ln": "lane",
|
||||
"ln.": "lane",
|
||||
"cres": "crescent",
|
||||
"ct": "court",
|
||||
"dr": "drive",
|
||||
|
||||
# flats / units
|
||||
"apt": "flat",
|
||||
"apartment": "flat",
|
||||
"unit": "flat",
|
||||
"ste": "suite",
|
||||
|
||||
# numbering noise
|
||||
"no": "",
|
||||
"no.": "",
|
||||
}
|
||||
|
||||
# 1. lowercase
|
||||
s = s.lower()
|
||||
|
||||
# 2. remove punctuation except /
|
||||
s = re.sub(r"[^\w\s/]", " ", s)
|
||||
|
||||
# 3. normalise whitespace
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
|
||||
# 4. tokenise + synonym normalisation
|
||||
tokens = []
|
||||
for tok in s.split():
|
||||
replacement = ADDRESS_SYNONYMS.get(tok, tok)
|
||||
if replacement:
|
||||
tokens.append(replacement)
|
||||
|
||||
return " ".join(tokens)
|
||||
|
||||
|
||||
def score_addresses(
|
||||
df: pd.DataFrame,
|
||||
user_address: str,
|
||||
column: str = "address",
|
||||
) -> pd.Series:
|
||||
if column not in df.columns:
|
||||
raise ValueError(f"Missing column: {column}")
|
||||
|
||||
return df[column].apply(
|
||||
lambda x: levenshtein(user_address, x)
|
||||
)
|
||||
|
||||
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
|
||||
"""
|
||||
Recursively fetch EPC data by postcode.
|
||||
If results hit the size limit, retry with double size up to max_attempts.
|
||||
"""
|
||||
|
||||
url = os.path.join(client.domestic.host, "search")
|
||||
|
||||
if size:
|
||||
url += "?" + urlencode({"size": size})
|
||||
|
||||
search_resp = client.domestic.call(
|
||||
url=url,
|
||||
method="get",
|
||||
params={"postcode": postcode},
|
||||
)
|
||||
|
||||
results_df = pd.DataFrame(
|
||||
search_resp["rows"],
|
||||
columns=search_resp["column-names"]
|
||||
)
|
||||
|
||||
row_count = len(results_df)
|
||||
|
||||
# If we hit the size limit, there *may* be more results
|
||||
if row_count == size:
|
||||
print(
|
||||
f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. "
|
||||
f"Attempt {attempt}/{max_attempts}."
|
||||
)
|
||||
|
||||
if attempt < max_attempts:
|
||||
print(f"🔁 Retrying with size={size * 2}")
|
||||
return get_epc_data_with_postcode(
|
||||
postcode=postcode,
|
||||
size=size * 2,
|
||||
attempt=attempt + 1,
|
||||
max_attempts=max_attempts,
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"🚨 Max attempts reached. Results may be truncated. "
|
||||
"(Please do a manual review by the tech team.)"
|
||||
)
|
||||
|
||||
return results_df
|
||||
|
||||
|
||||
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
|
||||
"""
|
||||
Returns True if all non-null UPRNs in df match the given uprn.
|
||||
Returns False otherwise.
|
||||
"""
|
||||
|
||||
if column not in df.columns:
|
||||
return False
|
||||
|
||||
# Drop nulls and normalise to string
|
||||
uprns = (
|
||||
df[column]
|
||||
.dropna()
|
||||
.astype(str)
|
||||
.str.strip()
|
||||
.unique()
|
||||
)
|
||||
|
||||
# No valid UPRNs to compare
|
||||
if len(uprns) == 0:
|
||||
return False
|
||||
|
||||
# Exactly one unique UPRN and it matches
|
||||
return len(uprns) == 1 and uprns[0] == str(uprn)
|
||||
|
||||
|
||||
def get_uprn_candidates(
|
||||
df: pd.DataFrame,
|
||||
user_address: str,
|
||||
address_column: str = "address",
|
||||
uprn_column: str = "uprn",
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Annotate EPC results with lexicographical similarity scores and ranks.
|
||||
|
||||
Returns a DataFrame sorted by descending lexiscore.
|
||||
DOES NOT choose or return a UPRN.
|
||||
"""
|
||||
|
||||
if address_column not in df.columns:
|
||||
raise ValueError(f"Missing column: {address_column}")
|
||||
|
||||
if uprn_column not in df.columns:
|
||||
raise ValueError(f"Missing column: {uprn_column}")
|
||||
|
||||
out = df.copy()
|
||||
|
||||
user_norm = normalise_address(user_address)
|
||||
|
||||
out["lexiscore"] = out[address_column].apply(
|
||||
lambda x: levenshtein(user_norm, x)
|
||||
)
|
||||
|
||||
# Normalise UPRN to string
|
||||
out[uprn_column] = (
|
||||
out[uprn_column]
|
||||
.astype(str)
|
||||
.str.replace(r"\.0$", "", regex=True)
|
||||
)
|
||||
|
||||
# Rank: 1 = best match
|
||||
out["lexirank"] = (
|
||||
out["lexiscore"]
|
||||
.rank(method="dense", ascending=False)
|
||||
.astype(int)
|
||||
)
|
||||
|
||||
return out.sort_values(
|
||||
["lexirank", "lexiscore"],
|
||||
ascending=[True, False],
|
||||
)
|
||||
|
||||
|
||||
def get_uprn(user_inputed_address: str, postcode: str):
|
||||
df = get_epc_data_with_postcode(postcode=postcode)
|
||||
|
||||
if df.empty:
|
||||
return False
|
||||
|
||||
scored_df = get_uprn_candidates(
|
||||
df,
|
||||
user_address=user_inputed_address,
|
||||
)
|
||||
|
||||
# Best score
|
||||
best_score = scored_df.iloc[0]["lexiscore"]
|
||||
|
||||
if best_score <= 0:
|
||||
return False
|
||||
|
||||
# All rank-1 rows (possible draw)
|
||||
top_rank_df = scored_df[scored_df["lexirank"] == 1]
|
||||
|
||||
# If rank-1 rows do not agree on a single UPRN → ambiguous
|
||||
if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]):
|
||||
return False
|
||||
|
||||
# Safe to return the agreed UPRN
|
||||
return top_rank_df.iloc[0]["uprn"]
|
||||
|
||||
|
||||
def test(a,b):
|
||||
assert a == b, f"errr a {a} - {type(a)}, does not equal b {b} - {type(b)}"
|
||||
|
||||
|
||||
def run_all_test():
|
||||
# Basic usage with different post codes styles
|
||||
test(get_epc_data_with_postcode("b93 8sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("B938sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
|
||||
|
||||
test(get_uprn("68", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28 A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
|
||||
|
||||
|
||||
get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate")
|
||||
|
||||
# TODO
|
||||
# Lets write some tests with hackney and then peabody data
|
||||
|
||||
|
||||
|
||||
get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"5 Semley Gate")
|
||||
|
||||
# TODO
|
||||
# Lets write some tests with hackney and then peabody data
|
||||
# unique case
|
||||
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
|
||||
test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198")
|
||||
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198" )
|
||||
test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False)
|
||||
test(get_uprn("1 Semley Gate", "e9 5nh"), "10008238188") # this one return "flat 1, in 1 semley gate"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
@ -706,7 +427,6 @@ if __name__ == "__main__":
|
|||
)
|
||||
|
||||
|
||||
|
||||
# TO do function dispatcher,
|
||||
|
||||
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
|
||||
|
|
|
|||
17
backend/address2UPRN/script.py
Normal file
17
backend/address2UPRN/script.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import pandas as pd
|
||||
|
||||
|
||||
# use Address 1
|
||||
junte_df = pd.read_excel("hackney_uprn_failures.xlsx")
|
||||
|
||||
|
||||
# use domna_address_1
|
||||
khalim_df = pd.read_excel("khalim_standard.xlsx")
|
||||
|
||||
|
||||
combined_df = junte_df.merge(khalim_df, how="left", left_on="Address 1", right_on='domna_address_1')
|
||||
|
||||
# Find the row in khalim_df that does not app
|
||||
|
||||
result = combined_df[~pd.isnull(combined_df["epc_os_uprn"])]
|
||||
|
||||
BIN
backend/postcode_splitter/hackney.xlsx
Normal file
BIN
backend/postcode_splitter/hackney.xlsx
Normal file
Binary file not shown.
81
backend/postcode_splitter/main.py
Normal file
81
backend/postcode_splitter/main.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
import pandas as pd
|
||||
import requests
|
||||
|
||||
|
||||
|
||||
|
||||
def sanitise_postcode(postcode: str) -> str | None:
|
||||
"""
|
||||
Normalise postcode for grouping.
|
||||
|
||||
- Uppercase
|
||||
- Remove all whitespace
|
||||
"""
|
||||
if pd.isna(postcode):
|
||||
return None
|
||||
|
||||
return postcode.upper().replace(" ", "")
|
||||
|
||||
|
||||
def is_valid_postcode(postcode_clean: str) -> bool:
|
||||
"""
|
||||
Validate postcode using postcodes.io.
|
||||
|
||||
Expects a sanitised postcode (e.g. E84SQ).
|
||||
Returns True if valid, False otherwise.
|
||||
"""
|
||||
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
|
||||
if not postcode_clean:
|
||||
return False
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
||||
timeout=5,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("result", False)
|
||||
except requests.RequestException:
|
||||
# Network issues, rate limits, etc.
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
df = pd.read_excel("hackney.xlsx")
|
||||
|
||||
# Sanitise postcodes
|
||||
df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode)
|
||||
|
||||
# --- validate AFTER grouping (save API calls) ---
|
||||
|
||||
# Get unique, non-null postcodes
|
||||
unique_postcodes = (
|
||||
df["postcode_clean"]
|
||||
.dropna()
|
||||
.unique()
|
||||
)
|
||||
|
||||
# Validate each postcode once
|
||||
postcode_validity = {
|
||||
pc: is_valid_postcode(pc)
|
||||
for pc in unique_postcodes
|
||||
}
|
||||
|
||||
# Map validity back onto dataframe
|
||||
df["postcode_valid"] = df["postcode_clean"].map(postcode_validity)
|
||||
|
||||
# Group only valid postcodes
|
||||
grouped = (
|
||||
df[df["postcode_valid"]]
|
||||
.groupby("postcode_clean")
|
||||
)
|
||||
|
||||
# Example: count addresses per postcode
|
||||
postcode_counts = grouped.size().sort_values(ascending=False)
|
||||
|
||||
for pc in sorted(unique_postcodes):
|
||||
pc_df = df[df["postcode_clean"] == pc]
|
||||
pd_df
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Reference in a new issue