run all tests

This commit is contained in:
Jun-te Kim 2026-01-28 15:54:38 +00:00
parent d5f4799675
commit ef40363a69
5 changed files with 114 additions and 121 deletions

View file

@ -21,7 +21,8 @@
"jgclark.vscode-todo-highlight",
"corentinartaud.pdfpreview",
"ms-python.vscode-python-envs",
"ms-python.black-formatter"
"ms-python.black-formatter",
"waderyan.gitblame"
],
"settings": {
"files.defaultWorkspace": "/workspaces/model",

View file

@ -2,6 +2,9 @@ name: Run unit tests
on:
pull_request:
branches:
- '*'
jobs:
test:

View file

@ -7,8 +7,10 @@ from tqdm import tqdm
import re
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=")
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
"a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=",
)
import re
from difflib import SequenceMatcher
@ -25,14 +27,12 @@ def levenshtein(a: str, b: str) -> float:
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> Set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> Set[str]:
return set(s.split())
@ -42,7 +42,7 @@ def levenshtein(a: str, b: str) -> float:
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
@ -54,9 +54,10 @@ def levenshtein(a: str, b: str) -> float:
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(tok in a_norm for tok in ("flat", "apt", "apartment", "unit"))
has_flat_token_epc = "flat" in b_norm
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
@ -67,7 +68,6 @@ def levenshtein(a: str, b: str) -> float:
):
return 0.0
# --- token similarity (order-independent) ---
toks_a = tokenise(a_norm)
toks_b = tokenise(b_norm)
@ -82,8 +82,7 @@ def levenshtein(a: str, b: str) -> float:
# --- weighted blend ---
return round(
0.65 * token_score +
0.35 * char_score,
0.65 * token_score + 0.35 * char_score,
4,
)
@ -114,13 +113,11 @@ def normalise_address(s: str) -> str:
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
@ -155,15 +152,15 @@ def score_addresses(
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(
lambda x: levenshtein(user_address, x)
)
return df[column].apply(lambda x: levenshtein(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
"""
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
url = os.path.join(client.domestic.host, "search")
@ -176,10 +173,7 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
params={"postcode": postcode},
)
results_df = pd.DataFrame(
search_resp["rows"],
columns=search_resp["column-names"]
)
results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"])
row_count = len(results_df)
@ -217,13 +211,7 @@ def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> boo
return False
# Drop nulls and normalise to string
uprns = (
df[column]
.dropna()
.astype(str)
.str.strip()
.unique()
)
uprns = df[column].dropna().astype(str).str.strip().unique()
# No valid UPRNs to compare
if len(uprns) == 0:
@ -256,23 +244,13 @@ def get_uprn_candidates(
user_norm = normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: levenshtein(user_norm, x)
)
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
# Normalise UPRN to string
out[uprn_column] = (
out[uprn_column]
.astype(str)
.str.replace(r"\.0$", "", regex=True)
)
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
# Rank: 1 = best match
out["lexirank"] = (
out["lexiscore"]
.rank(method="dense", ascending=False)
.astype(int)
)
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
@ -307,6 +285,7 @@ def get_uprn(user_inputed_address: str, postcode: str):
# Safe to return the agreed UPRN
return top_rank_df.iloc[0]["uprn"]
def resolve_uprns_for_postcode_group(
group_df: pd.DataFrame,
epc_df: pd.DataFrame,
@ -332,46 +311,54 @@ def resolve_uprns_for_postcode_group(
)
if scored_df.empty:
results.append({
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_candidates",
})
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_candidates",
}
)
continue
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
results.append({
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": best_score,
"status": "zero_score",
})
results.append(
{
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": best_score,
"status": "zero_score",
}
)
continue
top_rank_df = scored_df[scored_df["lexirank"] == 1]
if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]):
results.append({
"found_uprn": None,
"best_match_uprn": top_rank_df.iloc[0]["uprn"],
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "ambiguous",
})
results.append(
{
"found_uprn": None,
"best_match_uprn": top_rank_df.iloc[0]["uprn"],
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "ambiguous",
}
)
continue
results.append({
"found_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "matched",
})
results.append(
{
"found_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_uprn": str(top_rank_df.iloc[0]["uprn"]),
"best_match_address": top_rank_df.iloc[0]["address"],
"best_match_lexiscore": best_score,
"status": "matched",
}
)
return pd.concat(
[group_df.reset_index(drop=True), pd.DataFrame(results)],
@ -379,8 +366,7 @@ def resolve_uprns_for_postcode_group(
)
def test(a,b):
def test(a, b):
assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}"
@ -394,20 +380,27 @@ def run_all_test():
test(get_uprn("68", "b93 8sy"), "100070989938")
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
test(get_uprn("28 A", "se6 4tf"), "100023278633")
test(get_uprn("28 A", "se6 4tf"), "100023278633")
test(get_uprn("28A", "se6 4tf"), "100023278633")
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
# unique case
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198" )
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False)
test(get_uprn("1 Semley Gate", "e9 5nh"), "10008238188") # this one return "flat 1, in 1 semley gate"
test(get_uprn("48 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate"
test(get_uprn("42 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate"
test(get_uprn("46 Oswald Street", "E5 0BT"), False) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("1 Semley Gate", "e9 5nh"), "10008238188"
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("48 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("42 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("46 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street")
@ -430,24 +423,22 @@ if __name__ == "__main__":
input_address = str(row[ADDRESS_COL]).strip()
postcode = str(row[POSTCODE_COL]).strip()
expected_uprn = (
None
if pd.isna(row[UPRN_COL])
else str(int(row[UPRN_COL]))
)
expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL]))
try:
epc_df = get_epc_data_with_postcode(postcode)
if epc_df.empty:
failures.append({
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_results",
})
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_results",
}
)
continue
scored_df = get_uprn_candidates(
@ -464,34 +455,32 @@ if __name__ == "__main__":
found_uprn = get_uprn(input_address, postcode)
except Exception as e:
failures.append({
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "exception",
"error": str(e),
})
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "exception",
"error": str(e),
}
)
continue
found_uprn_norm = (
None if not found_uprn else str(found_uprn)
)
found_uprn_norm = None if not found_uprn else str(found_uprn)
if found_uprn_norm != expected_uprn:
failures.append({
**row.to_dict(),
"found_uprn": found_uprn_norm,
"best_match_uprn": best_match_uprn,
"best_match_address": best_match_address,
"best_match_lexiscore": best_match_lexiscore,
"status": (
"no_match"
if found_uprn_norm is None
else "mismatch"
),
})
failures.append(
{
**row.to_dict(),
"found_uprn": found_uprn_norm,
"best_match_uprn": best_match_uprn,
"best_match_address": best_match_address,
"best_match_lexiscore": best_match_lexiscore,
"status": ("no_match" if found_uprn_norm is None else "mismatch"),
}
)
failures_df = pd.DataFrame(failures)
@ -510,6 +499,6 @@ if __name__ == "__main__":
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
# fix that
# Look again at flat 1
# Look again at flat 1
# pandas reader the seperate postcode_splitter
# dump into s3
# dump into s3

View file

@ -8,7 +8,7 @@ DEFAULT_ENV = {
"DATA_BUCKET": "test",
"PLAN_TRIGGER_BUCKET": "test",
"ENGINE_SQS_URL": "test",
"EPC_AUTH_TOKEN": "test", # overridden in GitHub Actions
"EPC_AUTH_TOKEN": "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=", # overridden in GitHub Actions
"GOOGLE_SOLAR_API_KEY": "test",
"DB_HOST": "localhost",
"DB_USERNAME": "test",

View file

@ -1,4 +1,4 @@
[pytest]
pythonpath = .
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests