mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
277 lines
8.4 KiB
Python
277 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from difflib import SequenceMatcher
|
|
from typing import TYPE_CHECKING, Any, Optional
|
|
|
|
import requests
|
|
|
|
if TYPE_CHECKING:
|
|
import pandas as pd
|
|
|
|
|
|
class AddressMatch:
|
|
def __init__(self):
|
|
return None
|
|
|
|
@staticmethod
|
|
def score(a: str, b: str) -> float:
|
|
score: float = AddressMatch.levenshtein(a, b)
|
|
|
|
return score
|
|
|
|
@staticmethod
|
|
def is_valid_postcode(postcode_clean: str) -> bool:
|
|
"""
|
|
Validate postcode using postcodes.io.
|
|
|
|
Expects a sanitised postcode (e.g. E84SQ).
|
|
Returns True if valid, False otherwise.
|
|
"""
|
|
POSTCODES_IO_VALIDATE_URL = (
|
|
"https://api.postcodes.io/postcodes/{postcode}/validate"
|
|
)
|
|
if not postcode_clean:
|
|
return False
|
|
|
|
try:
|
|
resp = requests.get(
|
|
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
|
timeout=5,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json().get("result", False)
|
|
except requests.RequestException:
|
|
# Network issues, rate limits, etc.
|
|
return False
|
|
|
|
@staticmethod
|
|
def normalise_address(s: str) -> str:
|
|
"""
|
|
Canonical UK-focused address normalisation.
|
|
|
|
- Lowercases
|
|
- Removes punctuation (keeps / for flats)
|
|
- Normalises whitespace
|
|
- Applies synonym compression at token level
|
|
"""
|
|
|
|
if not s:
|
|
return ""
|
|
|
|
ADDRESS_SYNONYMS = {
|
|
# street types
|
|
"rd": "road",
|
|
"rd.": "road",
|
|
"st": "street",
|
|
"st.": "street",
|
|
"ave": "avenue",
|
|
"ave.": "avenue",
|
|
"ln": "lane",
|
|
"ln.": "lane",
|
|
"cres": "crescent",
|
|
"ct": "court",
|
|
"dr": "drive",
|
|
# flats / units
|
|
"apt": "flat",
|
|
"apartment": "flat",
|
|
"unit": "flat",
|
|
"ste": "suite",
|
|
# numbering noise
|
|
"no": "",
|
|
"no.": "",
|
|
}
|
|
# 1. lowercase
|
|
s = s.lower()
|
|
|
|
# 1.5 split digit-letter suffixes
|
|
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
|
|
|
|
# 2. remove punctuation except /
|
|
s = re.sub(r"[^\w\s/]", " ", s)
|
|
|
|
# 3. normalise whitespace
|
|
s = re.sub(r"\s+", " ", s).strip()
|
|
|
|
# 4. tokenise + synonym normalisation
|
|
tokens: list[str] = []
|
|
for tok in s.split():
|
|
replacement = ADDRESS_SYNONYMS.get(tok, tok)
|
|
if replacement:
|
|
tokens.append(replacement)
|
|
return " ".join(tokens)
|
|
|
|
@staticmethod
|
|
def _match_building_number(token: str, next_token: Optional[str]) -> Optional[str]:
|
|
if re.fullmatch(r"\d+[a-z]", token):
|
|
return token
|
|
if re.fullmatch(r"\d+", token):
|
|
if next_token is not None and re.fullmatch(r"[a-z]", next_token):
|
|
return token + next_token
|
|
return token
|
|
return None
|
|
|
|
@staticmethod
|
|
def levenshtein(a: str, b: str) -> float:
|
|
"""
|
|
Address similarity score in [0, 1].
|
|
|
|
Strategy:
|
|
- Normalise
|
|
- Strongly penalise mismatched house/flat numbers
|
|
- Combine token overlap + character similarity
|
|
"""
|
|
|
|
def extract_number_sequence(s: str) -> list[str]:
|
|
return re.findall(r"\d+[a-z]?", s)
|
|
|
|
def extract_numbers(s: str) -> set[str]:
|
|
return set(extract_number_sequence(s))
|
|
|
|
def tokenise(s: str) -> set[str]:
|
|
return set(s.split())
|
|
|
|
def extract_building_number(s: str) -> Optional[str]:
|
|
"""
|
|
Extract the main building number (NOT flat/unit).
|
|
Assumes formats like:
|
|
- '42 moreton road'
|
|
- 'flat 3 42 moreton road'
|
|
- '82 a victoria square' (recombined to '82a')
|
|
"""
|
|
tokens = s.split()
|
|
|
|
# remove flat/unit context
|
|
cleaned: list[Any] = []
|
|
skip_next = False
|
|
for t in tokens:
|
|
if t in ("flat", "apt", "apartment", "unit"):
|
|
skip_next = True
|
|
continue
|
|
if skip_next:
|
|
skip_next = False
|
|
continue
|
|
cleaned.append(t)
|
|
|
|
# first remaining number is building number; recombine with a
|
|
# single-letter suffix when normalisation has split "82a" → "82 a"
|
|
for i, t in enumerate(cleaned):
|
|
nxt = cleaned[i + 1] if i + 1 < len(cleaned) else None
|
|
if (match := AddressMatch._match_building_number(t, nxt)) is not None:
|
|
return match
|
|
|
|
return None
|
|
|
|
a_norm = AddressMatch.normalise_address(a)
|
|
b_norm = AddressMatch.normalise_address(b)
|
|
|
|
# --- hard signal: numbers ---
|
|
nums_a = extract_numbers(a_norm)
|
|
nums_b = extract_numbers(b_norm)
|
|
|
|
if nums_a and not nums_b:
|
|
return 0.0
|
|
|
|
# No shared numbers at all → impossible match
|
|
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
|
|
return 0.0
|
|
|
|
# 🔒 HARD GUARD: building number must match
|
|
bld_a = extract_building_number(a_norm)
|
|
bld_b = extract_building_number(b_norm)
|
|
|
|
if bld_a and bld_b and bld_a != bld_b:
|
|
return 0.0
|
|
|
|
# --- order-sensitive flat/building guard ---
|
|
seq_a = extract_number_sequence(a_norm)
|
|
seq_b = extract_number_sequence(b_norm)
|
|
|
|
has_flat_token_user = any(
|
|
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
|
|
)
|
|
has_flat_token_epc = "flat" in b_norm
|
|
# Slash-format like "3/137a" is an implicit flat reference
|
|
# (flat 3 of 137a) even without a "flat" keyword.
|
|
has_implicit_flat_user = bool(re.search(r"\d+\s*/\s*\d+", a_norm))
|
|
|
|
# EPC says it's a flat but user gave no flat indication
|
|
# (neither keyword nor slash-format). Unlikely to be the right unit.
|
|
if (
|
|
has_flat_token_epc
|
|
and not has_flat_token_user
|
|
and not has_implicit_flat_user
|
|
):
|
|
return 0.0
|
|
|
|
if (
|
|
len(seq_a) == 2
|
|
and len(seq_b) >= 2
|
|
and has_flat_token_epc
|
|
and not has_flat_token_user
|
|
and seq_a != seq_b[:2]
|
|
):
|
|
return 0.0
|
|
|
|
# --- token similarity (order-independent) ---
|
|
toks_a: set[str] = tokenise(a_norm)
|
|
toks_b: set[str] = tokenise(b_norm)
|
|
|
|
if not toks_a or not toks_b:
|
|
token_score = 0.0
|
|
else:
|
|
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
|
|
|
|
# --- character similarity (soft signal) ---
|
|
char_score: float = SequenceMatcher(None, a_norm, b_norm).ratio()
|
|
|
|
# --- weighted blend ---
|
|
return round(
|
|
0.65 * token_score + 0.35 * char_score,
|
|
4,
|
|
)
|
|
|
|
|
|
def score_addresses(
|
|
df: pd.DataFrame,
|
|
user_address: str,
|
|
column: str = "address",
|
|
) -> pd.Series:
|
|
if column not in df.columns:
|
|
raise ValueError(f"Missing column: {column}")
|
|
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
|
|
|
|
|
|
def get_uprn_candidates(
|
|
df: pd.DataFrame,
|
|
user_address: str,
|
|
address_column: str = "address",
|
|
uprn_column: str = "uprn",
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Annotate EPC results with lexicographical similarity scores and ranks.
|
|
Returns a DataFrame sorted by descending lexiscore.
|
|
"""
|
|
if address_column not in df.columns:
|
|
raise ValueError(f"Missing column: {address_column}")
|
|
if uprn_column not in df.columns:
|
|
raise ValueError(f"Missing column: {uprn_column}")
|
|
|
|
out = df.copy()
|
|
user_norm = AddressMatch.normalise_address(user_address)
|
|
out["lexiscore"] = out[address_column].apply(
|
|
lambda x: AddressMatch.levenshtein(user_norm, x)
|
|
)
|
|
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
|
|
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
|
|
return out.sort_values(["lexirank", "lexiscore"], ascending=[True, False])
|
|
|
|
|
|
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
|
|
"""Returns True if all non-null UPRNs in df match the given uprn."""
|
|
if column not in df.columns:
|
|
return False
|
|
uprns = df[column].dropna().astype(str).str.strip().unique()
|
|
if len(uprns) == 0:
|
|
return False
|
|
return len(uprns) == 1 and uprns[0] == str(uprn)
|