removed duplicate code

This commit is contained in:
Jun-te Kim 2026-03-04 16:14:27 +00:00
parent 2cd24ae3d0
commit db251c1857
6 changed files with 356 additions and 217 deletions

View file

@ -2,12 +2,8 @@ from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from difflib import SequenceMatcher
from utils.logger import setup_logger
import re
from typing import Set
import json
import requests
from uuid import UUID
import uuid
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
@ -18,6 +14,8 @@ from utils.s3 import (
)
from datetime import datetime
from backend.utils.addressMatch import addressMatch
logger = setup_logger()
@ -29,191 +27,6 @@ if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
def levenshtein(a: str, b: str) -> float:
"""
Address similarity score in [0, 1].
Strategy:
- Normalise
- Strongly penalise mismatched house/flat numbers
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> Set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> Set[str]:
return set(s.split())
def extract_building_number(s: str) -> str | None:
"""
Extract the main building number (NOT flat/unit).
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
"""
tokens = s.split()
# remove flat/unit context
cleaned = []
skip_next = False
for t in tokens:
if t in ("flat", "apt", "apartment", "unit"):
skip_next = True
continue
if skip_next:
skip_next = False
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
return None
a_norm = normalise_address(a)
b_norm = normalise_address(b)
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
# No shared numbers at all → impossible match
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
return 0.0
# 🔒 HARD GUARD: building number must match
bld_a = extract_building_number(a_norm)
bld_b = extract_building_number(b_norm)
if bld_a and bld_b and bld_a != bld_b:
return 0.0
# --- order-sensitive flat/building guard ---
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
and len(seq_b) >= 2
and has_flat_token_epc
and not has_flat_token_user
and seq_a != seq_b[:2]
):
return 0.0
# --- token similarity (order-independent) ---
toks_a = tokenise(a_norm)
toks_b = tokenise(b_norm)
if not toks_a or not toks_b:
token_score = 0.0
else:
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
# --- character similarity (soft signal) ---
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
# --- weighted blend ---
return round(
0.65 * token_score + 0.35 * char_score,
4,
)
def normalise_address(s: str) -> str:
"""
Canonical UK-focused address normalisation.
- Lowercases
- Removes punctuation (keeps / for flats)
- Normalises whitespace
- Applies synonym compression at token level
"""
if not s:
return ""
ADDRESS_SYNONYMS = {
# street types
"rd": "road",
"rd.": "road",
"st": "street",
"st.": "street",
"ave": "avenue",
"ave.": "avenue",
"ln": "lane",
"ln.": "lane",
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
}
# 1. lowercase
s = s.lower()
# 1.5 split digit-letter suffixes
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
# 2. remove punctuation except /
s = re.sub(r"[^\w\s/]", " ", s)
# 3. normalise whitespace
s = re.sub(r"\s+", " ", s).strip()
# 4. tokenise + synonym normalisation
tokens = []
for tok in s.split():
replacement = ADDRESS_SYNONYMS.get(tok, tok)
if replacement:
tokens.append(replacement)
return " ".join(tokens)
def score_addresses(
df: pd.DataFrame,
user_address: str,
@ -222,7 +35,7 @@ def score_addresses(
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: levenshtein(user_address, x))
return df[column].apply(lambda x: addressMatch.score(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
@ -314,9 +127,9 @@ def get_uprn_candidates(
out = df.copy()
user_norm = normalise_address(user_address)
user_norm = addressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
out["lexiscore"] = out[address_column].apply(lambda x: addressMatch.levenshtein(user_norm, x))
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
@ -653,7 +466,7 @@ def handler(event, context, local=False):
)
# Validate postcode before processing
if not is_valid_postcode(postcode):
if not addressMatch.is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
continue

View file

@ -63,6 +63,8 @@ class Settings(BaseSettings):
# Other S3 buckts
ENERGY_ASSESSMENTS_BUCKET: str = "changeme"
ORDNANCE_SURVEY_API_KEY: str = "changeme"
# Optional AWS creds (only required in local)
AWS_ACCESS_KEY_ID: Optional[str] = None
AWS_SECRET_KEY_ID: Optional[str] = None

View file

@ -3,32 +3,113 @@ import json
from utils.logger import setup_logger
import logging
from backend.utils.subtasks import subtask_handler
from utils.s3 import (
# save_csv_to_s3,
read_csv_from_s3 as read_csv_from_s3_dict,
parse_s3_uri,
)
from backend.utils.addressMatch import addressMatch
from backend.app.db.connection import get_db_session
from backend.app.db.models.postcode_search import PostcodeSearchModel
from backend.utils.ordnance_survey import (
lookup_os_places,
os_places_results_to_dataframe,
)
from backend.app.config import get_settings
from sqlalchemy import select
import pandas as pd
logger: logging.Logger = setup_logger()
@subtask_handler()
def handler(event: dict[str, Any], context: Any, local: bool = False) -> None:
def check_if_post_code_exists_in_db_cache(postcode):
with get_db_session() as session:
result = (
session.execute(
select(PostcodeSearchModel).where(
PostcodeSearchModel.postcode == postcode
)
)
.scalars()
.first()
)
if result:
return os_places_results_to_dataframe(result.result_data)
# Cache miss — fetch from OS Places API
api_key = get_settings().ORDNANCE_SURVEY_API_KEY
response = lookup_os_places(postcode, api_key)
if response.get("status") != 200 or "data" not in response:
logger.error(f"OS Places API failed for {postcode}: {response}")
raise RuntimeError(
"A postcode that doesn't exists in ordant survey and check if its real in postcode validator!!! Postcode: {postcode}"
)
return None
# Save to cache
new_record = PostcodeSearchModel(
postcode=postcode,
result_data=response["data"],
)
session.add(new_record)
session.commit()
return os_places_results_to_dataframe(response["data"])
def get_ordance_survey_record(row, cache=None):
if cache is None:
cache = check_if_post_code_exists_in_db_cache(postcode)
# process cache with row
@subtask_handler() # This assumes task_id and subtask_id is defined in event.Records.body
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
# delete this line after test
local = True
# Example SQS message for testing (copy and paste into SQS):
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri"
}
)
}
]
body = {
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/09cc7368-0850-4145-8b04-ebd84b3263c4/2026-02-18T14:00:13.228611_d2f675c3.csv",
}
s3_uri: str = body.get("s3_uri", "")
lexiscore_threshold: float = body.get("lexiscore_threshold", 0.5)
if s3_uri == "":
raise RuntimeError("Missing s3_uri in message body")
bucket, key = parse_s3_uri(s3_uri)
# Assumption designing with address2uprn was ran first
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
df["domna_lexiscore"] = pd.to_numeric(df["domna_lexiscore"], errors="coerce")
needs_processing = df[
df["domna_lexiscore"].isna() | (df["domna_lexiscore"] < lexiscore_threshold)
]
grouped = needs_processing.groupby("postcode_clean")
# Process each postcode group at a time
for postcode, group in grouped:
print(f"Processing postcode: {postcode} ({len(group)} rows)")
valid_group = addressMatch.is_valid_postcode(postcode)
if valid_group:
postcode_cache = None
if postcode_cache is None:
postcode_cache = get_ordance_survey_record(postcode)
for index, row in group.iterrows():
print("do something")
break
# Add business logic to do handling
# TODO: Copy s3_uri importing from address2uprn
# TODO: Copy s3_uri logic to read csv from address2uprn and search for ones without UPRN/score is low
# TODO: Copy and do ordant survey logic
# TODO: Save new results to s3 ( ask Khalim if we want to save to db)

View file

@ -0,0 +1,201 @@
import re
from typing import Any, Optional
from difflib import SequenceMatcher
import requests
class addressMatch:
def __init__(self):
return None
@staticmethod
def score(a: str, b: str) -> float:
score: float = addressMatch.levenshtein(a, b)
return score
@staticmethod
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = (
"https://api.postcodes.io/postcodes/{postcode}/validate"
)
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
@staticmethod
def normalise_address(s: str) -> str:
"""
Canonical UK-focused address normalisation.
- Lowercases
- Removes punctuation (keeps / for flats)
- Normalises whitespace
- Applies synonym compression at token level
"""
if not s:
return ""
ADDRESS_SYNONYMS = {
# street types
"rd": "road",
"rd.": "road",
"st": "street",
"st.": "street",
"ave": "avenue",
"ave.": "avenue",
"ln": "lane",
"ln.": "lane",
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
}
# 1. lowercase
s = s.lower()
# 1.5 split digit-letter suffixes
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
# 2. remove punctuation except /
s = re.sub(r"[^\w\s/]", " ", s)
# 3. normalise whitespace
s = re.sub(r"\s+", " ", s).strip()
# 4. tokenise + synonym normalisation
tokens: list[str] = []
for tok in s.split():
replacement = ADDRESS_SYNONYMS.get(tok, tok)
if replacement:
tokens.append(replacement)
return " ".join(tokens)
@staticmethod
def levenshtein(a: str, b: str) -> float:
"""
Address similarity score in [0, 1].
Strategy:
- Normalise
- Strongly penalise mismatched house/flat numbers
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> set[str]:
return set(s.split())
def extract_building_number(s: str) -> Optional[str]:
"""
Extract the main building number (NOT flat/unit).
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
"""
tokens = s.split()
# remove flat/unit context
cleaned: list[Any] = []
skip_next = False
for t in tokens:
if t in ("flat", "apt", "apartment", "unit"):
skip_next = True
continue
if skip_next:
skip_next = False
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
return None
a_norm = addressMatch.normalise_address(a)
b_norm = addressMatch.normalise_address(b)
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
# No shared numbers at all → impossible match
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
return 0.0
# 🔒 HARD GUARD: building number must match
bld_a = extract_building_number(a_norm)
bld_b = extract_building_number(b_norm)
if bld_a and bld_b and bld_a != bld_b:
return 0.0
# --- order-sensitive flat/building guard ---
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
and len(seq_b) >= 2
and has_flat_token_epc
and not has_flat_token_user
and seq_a != seq_b[:2]
):
return 0.0
# --- token similarity (order-independent) ---
toks_a: set[str] = tokenise(a_norm)
toks_b: set[str] = tokenise(b_norm)
if not toks_a or not toks_b:
token_score = 0.0
else:
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
# --- character similarity (soft signal) ---
char_score: float = SequenceMatcher(None, a_norm, b_norm).ratio()
# --- weighted blend ---
return round(
0.65 * token_score + 0.35 * char_score,
4,
)

View file

@ -0,0 +1,44 @@
import urllib.parse
import requests
import pandas as pd
from utils.logger import setup_logger
logger = setup_logger()
def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
"""
Flatten the OS Places API response results into a DataFrame.
Each result contains either a DPA or LPI record.
"""
results = data.get("results", [])
rows = []
for r in results:
if "DPA" in r:
rows.append(r["DPA"])
elif "LPI" in r:
rows.append(r["LPI"])
return pd.DataFrame(rows)
def lookup_os_places(postcode: str, api_key: str) -> dict:
"""
Lookup a postcode using the OS Places API.
Returns the full API response data or an error dict.
"""
if not api_key:
return {"error": "Ordnance Survey API key not specified", "status": 400}
encoded_postcode = urllib.parse.quote(postcode)
url = (
f"https://api.os.uk/search/places/v1/postcode?postcode={encoded_postcode}"
f"&dataset=DPA,LPI&key={api_key}"
)
response = requests.get(url)
if response.status_code != 200:
logger.error(f"OS Places API error for postcode {postcode}: {response.status_code}")
return {"error": "Failed to fetch address data", "status": response.status_code}
data = response.json()
return {"data": data, "status": 200}

View file

@ -28,15 +28,15 @@ from sqlalchemy import func
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 568
SCENARIOS = [
1059,
]
PORTFOLIO_ID = 404
SCENARIOS = [819, 829, 872]
scenario_names = {
1059: "EPC C - 10k budget",
819: "EPC C",
829: "EPC C - no solid floor",
872: "EPC C - no solid floor, refresh",
}
project_name = "manchester"
project_name = "lincs_rural"
def get_data(portfolio_id, scenario_ids):
@ -330,8 +330,6 @@ for scenario_id in SCENARIOS:
getting_works = df[df["total_retrofit_cost"] > 0]
getting_works["predicted_post_works_epc"].value_counts()
32565 / getting_works.shape[0]
df[df["predicted_post_works_sap"] == ""]
# Expected columns list