From 3ed25030d44edf2f01e37637bd4f02110285c55a Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sat, 25 Apr 2026 22:17:38 +0000 Subject: [PATCH] added new api call for new epc api --- backend/address2UPRN/main.py | 128 ++------------------------ backend/utils/addressMatch.py | 46 +++++++++ datatypes/epc/domain/mapper.py | 22 +++++ datatypes/epc/schema/tests/helpers.py | 78 +--------------- pytest.ini | 2 +- 5 files changed, 80 insertions(+), 196 deletions(-) diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 28ad344f..bd562bc7 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -1,8 +1,6 @@ from typing import Optional -from epc_api.client import EpcClient import os -from urllib.parse import urlencode import pandas as pd from utils.logger import setup_logger import json @@ -16,7 +14,7 @@ from utils.s3 import ( ) from datetime import datetime -from backend.utils.addressMatch import AddressMatch +from backend.utils.addressMatch import AddressMatch, get_uprn_candidates, df_has_single_uprn, score_addresses logger = setup_logger() @@ -29,122 +27,14 @@ if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") -def score_addresses( - df: pd.DataFrame, - user_address: str, - column: str = "address", -) -> pd.Series: - if column not in df.columns: - raise ValueError(f"Missing column: {column}") - - return df[column].apply(lambda x: AddressMatch.score(user_address, x)) - - -def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): - """ - Recursively fetch EPC data by postcode. - If results hit the size limit, retry with double size up to max_attempts. - """ - client = EpcClient(auth_token=EPC_AUTH_TOKEN) - - url = os.path.join(client.domestic.host, "search") - - if size: - url += "?" + urlencode({"size": size}) - - search_resp = client.domestic.call( - url=url, - method="get", - params={"postcode": postcode}, - ) - if not search_resp or "rows" not in search_resp: - return pd.DataFrame() - - results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"]) - - row_count = len(results_df) - - # If we hit the size limit, there *may* be more results - if row_count == size: - print( - f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. " - f"Attempt {attempt}/{max_attempts}." - ) - - if attempt < max_attempts: - print(f"🔁 Retrying with size={size * 2}") - return get_epc_data_with_postcode( - postcode=postcode, - size=size * 2, - attempt=attempt + 1, - max_attempts=max_attempts, - ) - else: - print( - "🚨 Max attempts reached. Results may be truncated. " - "(Please do a manual review by the tech team.)" - ) - - return results_df - - -def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: - """ - Returns True if all non-null UPRNs in df match the given uprn. - Returns False otherwise. - """ - - if column not in df.columns: - return False - - # Drop nulls and normalise to string - uprns = df[column].dropna().astype(str).str.strip().unique() - - # No valid UPRNs to compare - if len(uprns) == 0: - return False - - # Exactly one unique UPRN and it matches - return len(uprns) == 1 and uprns[0] == str(uprn) - - -def get_uprn_candidates( - df: pd.DataFrame, - user_address: str, - address_column: str = "address", - uprn_column: str = "uprn", -) -> pd.DataFrame: - """ - Annotate EPC results with lexicographical similarity scores and ranks. - - Returns a DataFrame sorted by descending lexiscore. - DOES NOT choose or return a UPRN. - """ - - if address_column not in df.columns: - raise ValueError(f"Missing column: {address_column}") - - if uprn_column not in df.columns: - raise ValueError(f"Missing column: {uprn_column}") - - out = df.copy() - - user_norm = AddressMatch.normalise_address(user_address) - - out["lexiscore"] = out[address_column].apply( - lambda x: AddressMatch.levenshtein(user_norm, x) - ) - - # Normalise UPRN to string - out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) - - # Rank: 1 = best match - out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) - - return out.sort_values( - ["lexirank", "lexiscore"], - ascending=[True, False], - ) +def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame: + from backend.epc_client.client import EpcClientService + service = EpcClientService(auth_token=EPC_AUTH_TOKEN) + results = service.search_by_postcode(postcode) + return pd.DataFrame([ + {"address": r.address_line_1, "uprn": r.uprn} + for r in results + ]) def get_uprn_with_epc_df( diff --git a/backend/utils/addressMatch.py b/backend/utils/addressMatch.py index 411bb07c..12c1ac53 100644 --- a/backend/utils/addressMatch.py +++ b/backend/utils/addressMatch.py @@ -2,6 +2,7 @@ import re from typing import Any, Optional from difflib import SequenceMatcher import requests +import pandas as pd class AddressMatch: @@ -199,3 +200,48 @@ class AddressMatch: 0.65 * token_score + 0.35 * char_score, 4, ) + + +def score_addresses( + df: pd.DataFrame, + user_address: str, + column: str = "address", +) -> pd.Series: + if column not in df.columns: + raise ValueError(f"Missing column: {column}") + return df[column].apply(lambda x: AddressMatch.score(user_address, x)) + + +def get_uprn_candidates( + df: pd.DataFrame, + user_address: str, + address_column: str = "address", + uprn_column: str = "uprn", +) -> pd.DataFrame: + """ + Annotate EPC results with lexicographical similarity scores and ranks. + Returns a DataFrame sorted by descending lexiscore. + """ + if address_column not in df.columns: + raise ValueError(f"Missing column: {address_column}") + if uprn_column not in df.columns: + raise ValueError(f"Missing column: {uprn_column}") + + out = df.copy() + user_norm = AddressMatch.normalise_address(user_address) + out["lexiscore"] = out[address_column].apply( + lambda x: AddressMatch.levenshtein(user_norm, x) + ) + out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) + out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) + return out.sort_values(["lexirank", "lexiscore"], ascending=[True, False]) + + +def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: + """Returns True if all non-null UPRNs in df match the given uprn.""" + if column not in df.columns: + return False + uprns = df[column].dropna().astype(str).str.strip().unique() + if len(uprns) == 0: + return False + return len(uprns) == 1 and uprns[0] == str(uprn) diff --git a/datatypes/epc/domain/mapper.py b/datatypes/epc/domain/mapper.py index 1afade5c..7ef74340 100644 --- a/datatypes/epc/domain/mapper.py +++ b/datatypes/epc/domain/mapper.py @@ -1447,6 +1447,28 @@ class EpcPropertyDataMapper: ) -> List[EnergyElement]: return [EpcPropertyDataMapper._map_energy_element(e) for e in elements] + @staticmethod + def from_api_response(data: dict) -> "EpcPropertyData": + """ + Dispatch to the correct schema mapper based on schema_type. + Supports RdSAP-Schema-21.0.0 and RdSAP-Schema-21.0.1 only. + Raises ValueError for unsupported schemas — add cases here as needed. + """ + from datatypes.epc.schema.helpers import from_dict + + schema = data.get("schema_type", "") + if schema == "RdSAP-Schema-21.0.1": + from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1 + return EpcPropertyDataMapper.from_rdsap_schema_21_0_1( + from_dict(RdSapSchema21_0_1, data) + ) + if schema == "RdSAP-Schema-21.0.0": + from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0 + return EpcPropertyDataMapper.from_rdsap_schema_21_0_0( + from_dict(RdSapSchema21_0_0, data) + ) + raise ValueError(f"Unsupported EPC schema: {schema!r}") + # --------------------------------------------------------------------------- # Private helpers diff --git a/datatypes/epc/schema/tests/helpers.py b/datatypes/epc/schema/tests/helpers.py index 22f132d2..06338c0a 100644 --- a/datatypes/epc/schema/tests/helpers.py +++ b/datatypes/epc/schema/tests/helpers.py @@ -1,77 +1,3 @@ -import dataclasses -import typing -from datetime import date -from typing import Any, Dict, Type, TypeVar +from datatypes.epc.schema.helpers import from_dict -T = TypeVar("T") - - -def from_dict(cls: Type[T], data: Dict[str, Any]) -> T: - """ - Recursively convert a plain dict (e.g. from json.loads) into the given - dataclass type, using the field type hints to convert nested structures. - - Handles: - - Nested dataclasses - - List[SomeDataclass] - - Optional[X] / Union[X, None] - - Union[DataclassType, primitive] (e.g. Union[Measurement, int]) - - Primitive pass-through for Union[str, int] etc. - """ - return _from_dict_impl(cls, data) # type: ignore[return-value] - - -def _from_dict_impl(cls: Any, data: Any) -> Any: - hints = typing.get_type_hints(cls) - kwargs: Dict[str, Any] = {} - - for field in dataclasses.fields(cls): # type: ignore[arg-type] - has_default = ( - field.default is not dataclasses.MISSING - or field.default_factory is not dataclasses.MISSING # type: ignore[misc] - ) - if field.name not in data: - if has_default: - continue - raise ValueError(f"{cls.__name__}: missing required field '{field.name}'") - - kwargs[field.name] = _coerce(data[field.name], hints[field.name]) - - return cls(**kwargs) - - -def _coerce(value: Any, hint: Any) -> Any: - if value is None: - return None - - origin = typing.get_origin(hint) - args = typing.get_args(hint) - - # Union (includes Optional[X] which is Union[X, None]) - if origin is typing.Union: - if value is None: - return None - non_none_args = [a for a in args if a is not type(None)] - if len(non_none_args) == 1: - # Optional[X] — recurse so List[X] and nested dataclasses are handled - return _coerce(value, non_none_args[0]) - # Multi-type Union (e.g. Union[Measurement, int]): try dataclasses first - for arg in non_none_args: - if dataclasses.is_dataclass(arg) and isinstance(value, dict): - return _from_dict_impl(arg, value) - # All remaining args are primitives — return value as-is - return value - - # List[X] - if origin is list: - item_hint = args[0] - return [_coerce(item, item_hint) for item in value] - - # Plain dataclass - if dataclasses.is_dataclass(hint) and isinstance(value, dict): - return _from_dict_impl(hint, value) - - if hint is date and isinstance(value, str): - return date.fromisoformat(value) - - return value +__all__ = ["from_dict"] diff --git a/pytest.ini b/pytest.ini index 33231c61..1ddc8747 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,6 +3,6 @@ pythonpath = . log_cli = true log_cli_level = INFO addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial -testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests +testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests backend/epc_client/tests markers = integration: mark a test as an integration test