mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
added logic to deal with flats
This commit is contained in:
parent
6504785e7c
commit
9aae5bf482
2 changed files with 102 additions and 13 deletions
|
|
@ -24,22 +24,53 @@ from backend.utils.addressMatch import (
|
|||
logger = setup_logger()
|
||||
|
||||
|
||||
OPEN_EPC_API_TOKEN = os.getenv("OPEN_EPC_API_TOKEN")
|
||||
|
||||
if OPEN_EPC_API_TOKEN is None:
|
||||
raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env")
|
||||
|
||||
|
||||
def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame:
|
||||
from backend.epc_client.client import EpcClientService
|
||||
|
||||
service = EpcClientService(auth_token=OPEN_EPC_API_TOKEN)
|
||||
token = os.getenv("OPEN_EPC_API_TOKEN")
|
||||
if token is None:
|
||||
raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env")
|
||||
|
||||
service = EpcClientService(auth_token=token)
|
||||
results = service.search_by_postcode(postcode)
|
||||
return pd.DataFrame(
|
||||
[{"address": r.address_line_1, "uprn": r.uprn} for r in results]
|
||||
)
|
||||
|
||||
|
||||
def get_uprn_from_historic_epc(
|
||||
user_inputed_address: str,
|
||||
postcode: str,
|
||||
) -> Optional[tuple[str, str, float]]:
|
||||
"""Resolve a UPRN via historic EPC S3 data.
|
||||
|
||||
Returns (uprn, address, lexiscore) when the historic dataset agrees on a
|
||||
single rank-1 UPRN, None otherwise (missing postcode file, zero score,
|
||||
or ambiguous top rank). The score gate is `unambiguous_uprn`'s own
|
||||
(score > 0); the 0.7 heuristic used for the new-EPC source isn't applied
|
||||
here because historic addresses use a more verbose format that
|
||||
systematically depresses lexiscores.
|
||||
"""
|
||||
from datatypes.epc.domain.historic_epc_matching import (
|
||||
match_addresses_for_postcode,
|
||||
)
|
||||
|
||||
try:
|
||||
result = match_addresses_for_postcode(user_inputed_address, postcode)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
uprn = result.unambiguous_uprn()
|
||||
if not uprn or uprn in ("", "nan"):
|
||||
return None
|
||||
|
||||
top = result.top()
|
||||
if top is None:
|
||||
return None
|
||||
|
||||
return (uprn, top.record.address, top.lexiscore)
|
||||
|
||||
|
||||
def get_uprn_with_epc_df(
|
||||
user_inputed_address: str,
|
||||
epc_df: pd.DataFrame,
|
||||
|
|
@ -95,20 +126,37 @@ def get_uprn(
|
|||
):
|
||||
"""
|
||||
Return uprn (str)
|
||||
Return False if failed to find a sensible matching epc
|
||||
Return None when epc found but no UPRN
|
||||
Return None when no sensible match is found in either EPC source.
|
||||
|
||||
This function fetches EPC data via API for a single postcode.
|
||||
For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead.
|
||||
Tries the new EPC API first; if that yields no confident match, falls
|
||||
back to the historic EPC dataset on S3.
|
||||
|
||||
For processing multiple addresses in the same postcode, use
|
||||
get_uprn_with_epc_df instead.
|
||||
"""
|
||||
df = get_epc_data_with_postcode(postcode=postcode)
|
||||
|
||||
return get_uprn_with_epc_df(
|
||||
result = get_uprn_with_epc_df(
|
||||
user_inputed_address=user_inputed_address,
|
||||
epc_df=df,
|
||||
verbose=verbose,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
if not result:
|
||||
result = get_uprn_from_historic_epc(
|
||||
user_inputed_address=user_inputed_address,
|
||||
postcode=postcode,
|
||||
)
|
||||
if result:
|
||||
logger.info(
|
||||
f"Historic EPC matched {user_inputed_address} in {postcode}"
|
||||
)
|
||||
|
||||
if not result:
|
||||
return None
|
||||
|
||||
return result if verbose else result[0]
|
||||
|
||||
|
||||
def resolve_uprns_for_postcode_group(
|
||||
group_df: pd.DataFrame,
|
||||
|
|
@ -379,6 +427,7 @@ def handler(event, context, local=False):
|
|||
)
|
||||
continue
|
||||
|
||||
|
||||
# Process each address in this postcode with the same EPC data
|
||||
for row in postcode_rows:
|
||||
try:
|
||||
|
|
@ -404,6 +453,23 @@ def handler(event, context, local=False):
|
|||
verbose=True,
|
||||
)
|
||||
|
||||
# Fallback to historic EPC if new EPC produced no match
|
||||
if not result:
|
||||
try:
|
||||
result = get_uprn_from_historic_epc(
|
||||
user_inputed_address=address2uprn_user_input,
|
||||
postcode=postcode,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Historic EPC lookup failed for {address2uprn_user_input} in {postcode}: {e}"
|
||||
)
|
||||
result = None
|
||||
if result:
|
||||
logger.info(
|
||||
f"Historic EPC matched {address2uprn_user_input} in {postcode}"
|
||||
)
|
||||
|
||||
# Parse result tuple if successful
|
||||
if result:
|
||||
uprn, found_address, score = result
|
||||
|
|
|
|||
|
|
@ -178,6 +178,29 @@ class AddressMatch:
|
|||
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
|
||||
)
|
||||
has_flat_token_epc = "flat" in b_norm
|
||||
# Slash-format like "3/137a" is an implicit flat reference
|
||||
# (flat 3 of 137a) even without a "flat" keyword.
|
||||
has_implicit_flat_user = bool(re.search(r"\d+\s*/\s*\d+", a_norm))
|
||||
# If the user named a street, their leading number is a house number,
|
||||
# not a flat number — so an EPC "Flat N, …" candidate is a wrong unit.
|
||||
# Without a street token (e.g. "2 College House"), the user may be
|
||||
# implicitly naming a flat in a named building; don't apply the guard.
|
||||
STREET_TYPE_TOKENS = {
|
||||
"road", "street", "lane", "avenue", "close", "way",
|
||||
"crescent", "court", "drive", "place", "terrace", "mews",
|
||||
"gardens", "square", "grove", "park", "walk", "row",
|
||||
"green", "hill", "rise", "parade", "broadway",
|
||||
}
|
||||
user_tokens = set(a_norm.split())
|
||||
has_street_type_user = bool(user_tokens & STREET_TYPE_TOKENS)
|
||||
|
||||
if (
|
||||
has_flat_token_epc
|
||||
and not has_flat_token_user
|
||||
and not has_implicit_flat_user
|
||||
and has_street_type_user
|
||||
):
|
||||
return 0.0
|
||||
|
||||
if (
|
||||
len(seq_a) == 2
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue