From 9aae5bf482a522e5b2fc4cb41174b7ef05ff1b07 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 11 May 2026 15:20:17 +0000 Subject: [PATCH] added logic to deal with flats --- backend/address2UPRN/main.py | 92 ++++++++++++++++++++++++++++++----- backend/utils/addressMatch.py | 23 +++++++++ 2 files changed, 102 insertions(+), 13 deletions(-) diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index fad5c64e..0938a53b 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -24,22 +24,53 @@ from backend.utils.addressMatch import ( logger = setup_logger() -OPEN_EPC_API_TOKEN = os.getenv("OPEN_EPC_API_TOKEN") - -if OPEN_EPC_API_TOKEN is None: - raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env") - - def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame: from backend.epc_client.client import EpcClientService - service = EpcClientService(auth_token=OPEN_EPC_API_TOKEN) + token = os.getenv("OPEN_EPC_API_TOKEN") + if token is None: + raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env") + + service = EpcClientService(auth_token=token) results = service.search_by_postcode(postcode) return pd.DataFrame( [{"address": r.address_line_1, "uprn": r.uprn} for r in results] ) +def get_uprn_from_historic_epc( + user_inputed_address: str, + postcode: str, +) -> Optional[tuple[str, str, float]]: + """Resolve a UPRN via historic EPC S3 data. + + Returns (uprn, address, lexiscore) when the historic dataset agrees on a + single rank-1 UPRN, None otherwise (missing postcode file, zero score, + or ambiguous top rank). The score gate is `unambiguous_uprn`'s own + (score > 0); the 0.7 heuristic used for the new-EPC source isn't applied + here because historic addresses use a more verbose format that + systematically depresses lexiscores. + """ + from datatypes.epc.domain.historic_epc_matching import ( + match_addresses_for_postcode, + ) + + try: + result = match_addresses_for_postcode(user_inputed_address, postcode) + except FileNotFoundError: + return None + + uprn = result.unambiguous_uprn() + if not uprn or uprn in ("", "nan"): + return None + + top = result.top() + if top is None: + return None + + return (uprn, top.record.address, top.lexiscore) + + def get_uprn_with_epc_df( user_inputed_address: str, epc_df: pd.DataFrame, @@ -95,20 +126,37 @@ def get_uprn( ): """ Return uprn (str) - Return False if failed to find a sensible matching epc - Return None when epc found but no UPRN + Return None when no sensible match is found in either EPC source. - This function fetches EPC data via API for a single postcode. - For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead. + Tries the new EPC API first; if that yields no confident match, falls + back to the historic EPC dataset on S3. + + For processing multiple addresses in the same postcode, use + get_uprn_with_epc_df instead. """ df = get_epc_data_with_postcode(postcode=postcode) - return get_uprn_with_epc_df( + result = get_uprn_with_epc_df( user_inputed_address=user_inputed_address, epc_df=df, - verbose=verbose, + verbose=True, ) + if not result: + result = get_uprn_from_historic_epc( + user_inputed_address=user_inputed_address, + postcode=postcode, + ) + if result: + logger.info( + f"Historic EPC matched {user_inputed_address} in {postcode}" + ) + + if not result: + return None + + return result if verbose else result[0] + def resolve_uprns_for_postcode_group( group_df: pd.DataFrame, @@ -379,6 +427,7 @@ def handler(event, context, local=False): ) continue + # Process each address in this postcode with the same EPC data for row in postcode_rows: try: @@ -404,6 +453,23 @@ def handler(event, context, local=False): verbose=True, ) + # Fallback to historic EPC if new EPC produced no match + if not result: + try: + result = get_uprn_from_historic_epc( + user_inputed_address=address2uprn_user_input, + postcode=postcode, + ) + except Exception as e: + logger.error( + f"Historic EPC lookup failed for {address2uprn_user_input} in {postcode}: {e}" + ) + result = None + if result: + logger.info( + f"Historic EPC matched {address2uprn_user_input} in {postcode}" + ) + # Parse result tuple if successful if result: uprn, found_address, score = result diff --git a/backend/utils/addressMatch.py b/backend/utils/addressMatch.py index a0c6ebdf..1435a629 100644 --- a/backend/utils/addressMatch.py +++ b/backend/utils/addressMatch.py @@ -178,6 +178,29 @@ class AddressMatch: tok in a_norm for tok in ("flat", "apt", "apartment", "unit") ) has_flat_token_epc = "flat" in b_norm + # Slash-format like "3/137a" is an implicit flat reference + # (flat 3 of 137a) even without a "flat" keyword. + has_implicit_flat_user = bool(re.search(r"\d+\s*/\s*\d+", a_norm)) + # If the user named a street, their leading number is a house number, + # not a flat number — so an EPC "Flat N, …" candidate is a wrong unit. + # Without a street token (e.g. "2 College House"), the user may be + # implicitly naming a flat in a named building; don't apply the guard. + STREET_TYPE_TOKENS = { + "road", "street", "lane", "avenue", "close", "way", + "crescent", "court", "drive", "place", "terrace", "mews", + "gardens", "square", "grove", "park", "walk", "row", + "green", "hill", "rise", "parade", "broadway", + } + user_tokens = set(a_norm.split()) + has_street_type_user = bool(user_tokens & STREET_TYPE_TOKENS) + + if ( + has_flat_token_epc + and not has_flat_token_user + and not has_implicit_flat_user + and has_street_type_user + ): + return 0.0 if ( len(seq_a) == 2