go back to origional

This commit is contained in:
Jun-te Kim 2026-03-17 13:14:16 +00:00
parent 547f50550b
commit 6bfeeeb1b1

View file

@ -1,11 +1,13 @@
from typing import Optional
from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from difflib import SequenceMatcher
from utils.logger import setup_logger
import re
from typing import Set
import json
import requests
from uuid import UUID
import uuid
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
@ -16,8 +18,6 @@ from utils.s3 import (
)
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
logger = setup_logger()
@ -29,6 +29,191 @@ if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
def levenshtein(a: str, b: str) -> float:
"""
Address similarity score in [0, 1].
Strategy:
- Normalise
- Strongly penalise mismatched house/flat numbers
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> Set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> Set[str]:
return set(s.split())
def extract_building_number(s: str) -> str | None:
"""
Extract the main building number (NOT flat/unit).
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
"""
tokens = s.split()
# remove flat/unit context
cleaned = []
skip_next = False
for t in tokens:
if t in ("flat", "apt", "apartment", "unit"):
skip_next = True
continue
if skip_next:
skip_next = False
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
return None
a_norm = normalise_address(a)
b_norm = normalise_address(b)
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
# No shared numbers at all → impossible match
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
return 0.0
# 🔒 HARD GUARD: building number must match
bld_a = extract_building_number(a_norm)
bld_b = extract_building_number(b_norm)
if bld_a and bld_b and bld_a != bld_b:
return 0.0
# --- order-sensitive flat/building guard ---
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
and len(seq_b) >= 2
and has_flat_token_epc
and not has_flat_token_user
and seq_a != seq_b[:2]
):
return 0.0
# --- token similarity (order-independent) ---
toks_a = tokenise(a_norm)
toks_b = tokenise(b_norm)
if not toks_a or not toks_b:
token_score = 0.0
else:
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
# --- character similarity (soft signal) ---
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
# --- weighted blend ---
return round(
0.65 * token_score + 0.35 * char_score,
4,
)
def normalise_address(s: str) -> str:
"""
Canonical UK-focused address normalisation.
- Lowercases
- Removes punctuation (keeps / for flats)
- Normalises whitespace
- Applies synonym compression at token level
"""
if not s:
return ""
ADDRESS_SYNONYMS = {
# street types
"rd": "road",
"rd.": "road",
"st": "street",
"st.": "street",
"ave": "avenue",
"ave.": "avenue",
"ln": "lane",
"ln.": "lane",
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
}
# 1. lowercase
s = s.lower()
# 1.5 split digit-letter suffixes
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
# 2. remove punctuation except /
s = re.sub(r"[^\w\s/]", " ", s)
# 3. normalise whitespace
s = re.sub(r"\s+", " ", s).strip()
# 4. tokenise + synonym normalisation
tokens = []
for tok in s.split():
replacement = ADDRESS_SYNONYMS.get(tok, tok)
if replacement:
tokens.append(replacement)
return " ".join(tokens)
def score_addresses(
df: pd.DataFrame,
user_address: str,
@ -37,7 +222,7 @@ def score_addresses(
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
return df[column].apply(lambda x: levenshtein(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
@ -129,11 +314,9 @@ def get_uprn_candidates(
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
user_norm = normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
@ -297,10 +480,7 @@ def resolve_uprns_for_postcode_group(
def save_results_to_s3(
results_df: pd.DataFrame,
task_id: str,
sub_task_id: str,
bucket_name: Optional[str] = None,
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> bool:
"""
Save results DataFrame to S3 as CSV.
@ -351,9 +531,9 @@ def handler(event, context, local=False):
{
"body": json.dumps(
{
"task_id": "169ea9b0-01b5-48dc-9f90-ae1989491d09",
"sub_task_id": "e5704f9e-29fe-43c8-8913-05be09f2440f",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching Rerun After Address Fix.csv",
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d",
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv",
}
)
}
@ -441,9 +621,19 @@ def handler(event, context, local=False):
# Process the rows
logger.info(f"Processing {len(df)} rows for task {task_id}")
df["postcode_clean"] = (
df["Postcode"].astype(str).str.upper().str.strip().str.replace(" ", "")
)
# Create user_input column by concatenating Address columns if not already present
if "user_input" not in df.columns:
df["user_input"] = (
df["Address 1"].fillna("")
+ " "
+ df["Address 2"].fillna("")
+ " "
+ df["Address 3"].fillna("")
).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
else:
logger.info(f"user_input column already present in data")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
@ -463,7 +653,7 @@ def handler(event, context, local=False):
)
# Validate postcode before processing
if not AddressMatch.is_valid_postcode(postcode):
if not is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
continue
@ -482,67 +672,57 @@ def handler(event, context, local=False):
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
# Concatenate Address columns directly
address2uprn_user_input = (
str(row.get("Address 1", "")).strip()
+ " "
+ str(row.get("Address 2", "")).strip()
+ " "
+ str(row.get("Address 3", "")).strip()
).strip()
if not address2uprn_user_input:
user_input = row.get("user_input", "")
if not user_input:
logger.warning(
f"Skipping row with missing address components for postcode {postcode}"
f"Skipping row with missing user_input for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
user_inputed_address=address2uprn_user_input,
epc_df=epc_df,
verbose=True,
user_inputed_address=user_input, epc_df=epc_df, verbose=True
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result
logger.info(
f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})"
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": uprn,
"address2uprn_address": found_address,
"address2uprn_lexiscore": score,
"uprn": uprn,
"domna_found_address": found_address,
"domna_lexiscore": score,
}
)
else:
logger.warning(
f"No UPRN found for {address2uprn_user_input} in {postcode}"
f"No UPRN found for {user_input} in {postcode}"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
}
)
except Exception as e:
logger.error(
f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}"
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append(
{
**row,
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
"error": str(e),
}
)