Model/backend/address2UPRN/main.py

527 lines
14 KiB
Python

from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from difflib import SequenceMatcher
from tqdm import tqdm
from utils.logger import setup_logger
import re
from typing import Set
import json
import requests
logger = setup_logger()
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
"a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=",
)
if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
def levenshtein(a: str, b: str) -> float:
"""
Address similarity score in [0, 1].
Strategy:
- Normalise
- Strongly penalise mismatched house/flat numbers
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> Set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> Set[str]:
return set(s.split())
def extract_building_number(s: str) -> str | None:
"""
Extract the main building number (NOT flat/unit).
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
"""
tokens = s.split()
# remove flat/unit context
cleaned = []
skip_next = False
for t in tokens:
if t in ("flat", "apt", "apartment", "unit"):
skip_next = True
continue
if skip_next:
skip_next = False
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
return None
a_norm = normalise_address(a)
b_norm = normalise_address(b)
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
# No shared numbers at all → impossible match
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
return 0.0
# 🔒 HARD GUARD: building number must match
bld_a = extract_building_number(a_norm)
bld_b = extract_building_number(b_norm)
if bld_a and bld_b and bld_a != bld_b:
return 0.0
# --- order-sensitive flat/building guard ---
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
and len(seq_b) >= 2
and has_flat_token_epc
and not has_flat_token_user
and seq_a != seq_b[:2]
):
return 0.0
# --- token similarity (order-independent) ---
toks_a = tokenise(a_norm)
toks_b = tokenise(b_norm)
if not toks_a or not toks_b:
token_score = 0.0
else:
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
# --- character similarity (soft signal) ---
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
# --- weighted blend ---
return round(
0.65 * token_score + 0.35 * char_score,
4,
)
def normalise_address(s: str) -> str:
"""
Canonical UK-focused address normalisation.
- Lowercases
- Removes punctuation (keeps / for flats)
- Normalises whitespace
- Applies synonym compression at token level
"""
if not s:
return ""
ADDRESS_SYNONYMS = {
# street types
"rd": "road",
"rd.": "road",
"st": "street",
"st.": "street",
"ave": "avenue",
"ave.": "avenue",
"ln": "lane",
"ln.": "lane",
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
}
# 1. lowercase
s = s.lower()
# 1.5 split digit-letter suffixes
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
# 2. remove punctuation except /
s = re.sub(r"[^\w\s/]", " ", s)
# 3. normalise whitespace
s = re.sub(r"\s+", " ", s).strip()
# 4. tokenise + synonym normalisation
tokens = []
for tok in s.split():
replacement = ADDRESS_SYNONYMS.get(tok, tok)
if replacement:
tokens.append(replacement)
return " ".join(tokens)
def score_addresses(
df: pd.DataFrame,
user_address: str,
column: str = "address",
) -> pd.Series:
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: levenshtein(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
"""
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
url = os.path.join(client.domestic.host, "search")
if size:
url += "?" + urlencode({"size": size})
search_resp = client.domestic.call(
url=url,
method="get",
params={"postcode": postcode},
)
if not search_resp or "rows" not in search_resp:
return pd.DataFrame()
results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"])
row_count = len(results_df)
# If we hit the size limit, there *may* be more results
if row_count == size:
print(
f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. "
f"Attempt {attempt}/{max_attempts}."
)
if attempt < max_attempts:
print(f"🔁 Retrying with size={size * 2}")
return get_epc_data_with_postcode(
postcode=postcode,
size=size * 2,
attempt=attempt + 1,
max_attempts=max_attempts,
)
else:
print(
"🚨 Max attempts reached. Results may be truncated. "
"(Please do a manual review by the tech team.)"
)
return results_df
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
# Drop nulls and normalise to string
uprns = df[column].dropna().astype(str).str.strip().unique()
# No valid UPRNs to compare
if len(uprns) == 0:
return False
# Exactly one unique UPRN and it matches
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
# Rank: 1 = best match
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)
def get_uprn(
user_inputed_address: str,
postcode: str,
return_address=False,
return_EPC=False,
return_score=True,
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return Nons when epc found but no UPRN
"""
df = get_epc_data_with_postcode(postcode=postcode)
if df.empty:
return None
scored_df = get_uprn_candidates(
df,
user_address=user_inputed_address,
)
# Best score
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
return None
# All rank-1 rows (possible draw)
top_rank_df = scored_df[scored_df["lexirank"] == 1]
# If rank-1 rows do not agree on a single UPRN → ambiguous
if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]):
return None
address = top_rank_df["address"].values[0]
lexiscore = float(top_rank_df["lexiscore"].values[0])
epc = top_rank_df["current-energy-efficiency"].values[0]
score = float(top_rank_df["lexiscore"].values[0])
# logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}")
# Safe to return the agreed UPRN
found_uprn = top_rank_df.iloc[0]["uprn"]
if found_uprn == "":
return None
if return_address:
if return_EPC is False:
return found_uprn, address
else:
if return_score is False:
return found_uprn, address, epc
else:
return (
found_uprn,
address,
epc,
score,
)
return found_uprn
def resolve_uprns_for_postcode_group(
group_df: pd.DataFrame,
epc_df: pd.DataFrame,
address_col: str = "Address 1",
) -> pd.DataFrame:
"""
Given:
- group_df: rows sharing the same postcode
- epc_df: EPC search results for that postcode
Returns:
group_df + found_uprn + diagnostics
"""
results = []
for _, row in group_df.iterrows():
user_address = str(row[address_col]).strip()
scored_df = get_uprn_candidates(
epc_df,
user_address=user_address,
)
if scored_df.empty:
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_candidates",
}
)
continue
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": best_score,
"status": "zero_score",
}
)
continue
top_rank_df = scored_df[scored_df["lexirank"] == 1]
if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]):
results.append(
{
"found_uprn": None,
"best_match_uprn": top_rank_df.iloc[0]["uprn"],
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "ambiguous",
}
)
continue
results.append(
{
"found_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "matched",
}
)
return pd.concat(
[group_df.reset_index(drop=True), pd.DataFrame(results)],
axis=1,
)
def test(a, b):
assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}"
def run_all_test():
# Basic usage with different post codes styles
test(get_epc_data_with_postcode("b93 8sy").shape[0], 63)
test(get_epc_data_with_postcode("B938sy").shape[0], 63)
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
test(get_uprn("68", "b93 8sy"), "100070989938")
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
test(get_uprn("28 A", "se6 4tf"), "100023278633")
test(get_uprn("28A", "se6 4tf"), "100023278633")
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
# unique case
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False)
test(
get_uprn("1 Semley Gate", "e9 5nh"), "10008238188"
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("48 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("42 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("46 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street")
get_uprn_candidates(
get_epc_data_with_postcode("Cr2 7dl"),
"FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY",
)
def handler(event, context):
print("=== Address2UPRN Lambda Handler ===")
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
print(f"Event: {json.dumps(event, indent=2, default=str)}")
print(f"Context: {context}")
print("===================================")
return {"statusCode": 200, "body": "hello world"}
# TO do function dispatcher,
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
# fix that
# Look again at flat 1
# pandas reader the seperate postcode_splitter
# dump into s3