added new api call for new epc api

This commit is contained in:
Khalim Conn-Kowlessar 2026-04-25 22:17:38 +00:00
parent 9ce1928b1e
commit 3ed25030d4
5 changed files with 80 additions and 196 deletions

View file

@ -1,8 +1,6 @@
from typing import Optional
from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from utils.logger import setup_logger
import json
@ -16,7 +14,7 @@ from utils.s3 import (
)
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
from backend.utils.addressMatch import AddressMatch, get_uprn_candidates, df_has_single_uprn, score_addresses
logger = setup_logger()
@ -29,122 +27,14 @@ if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def score_addresses(
df: pd.DataFrame,
user_address: str,
column: str = "address",
) -> pd.Series:
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
"""
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
url = os.path.join(client.domestic.host, "search")
if size:
url += "?" + urlencode({"size": size})
search_resp = client.domestic.call(
url=url,
method="get",
params={"postcode": postcode},
)
if not search_resp or "rows" not in search_resp:
return pd.DataFrame()
results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"])
row_count = len(results_df)
# If we hit the size limit, there *may* be more results
if row_count == size:
print(
f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. "
f"Attempt {attempt}/{max_attempts}."
)
if attempt < max_attempts:
print(f"🔁 Retrying with size={size * 2}")
return get_epc_data_with_postcode(
postcode=postcode,
size=size * 2,
attempt=attempt + 1,
max_attempts=max_attempts,
)
else:
print(
"🚨 Max attempts reached. Results may be truncated. "
"(Please do a manual review by the tech team.)"
)
return results_df
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
# Drop nulls and normalise to string
uprns = df[column].dropna().astype(str).str.strip().unique()
# No valid UPRNs to compare
if len(uprns) == 0:
return False
# Exactly one unique UPRN and it matches
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
# Rank: 1 = best match
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)
def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame:
from backend.epc_client.client import EpcClientService
service = EpcClientService(auth_token=EPC_AUTH_TOKEN)
results = service.search_by_postcode(postcode)
return pd.DataFrame([
{"address": r.address_line_1, "uprn": r.uprn}
for r in results
])
def get_uprn_with_epc_df(

View file

@ -2,6 +2,7 @@ import re
from typing import Any, Optional
from difflib import SequenceMatcher
import requests
import pandas as pd
class AddressMatch:
@ -199,3 +200,48 @@ class AddressMatch:
0.65 * token_score + 0.35 * char_score,
4,
)
def score_addresses(
df: pd.DataFrame,
user_address: str,
column: str = "address",
) -> pd.Series:
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(["lexirank", "lexiscore"], ascending=[True, False])
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""Returns True if all non-null UPRNs in df match the given uprn."""
if column not in df.columns:
return False
uprns = df[column].dropna().astype(str).str.strip().unique()
if len(uprns) == 0:
return False
return len(uprns) == 1 and uprns[0] == str(uprn)

View file

@ -1447,6 +1447,28 @@ class EpcPropertyDataMapper:
) -> List[EnergyElement]:
return [EpcPropertyDataMapper._map_energy_element(e) for e in elements]
@staticmethod
def from_api_response(data: dict) -> "EpcPropertyData":
"""
Dispatch to the correct schema mapper based on schema_type.
Supports RdSAP-Schema-21.0.0 and RdSAP-Schema-21.0.1 only.
Raises ValueError for unsupported schemas add cases here as needed.
"""
from datatypes.epc.schema.helpers import from_dict
schema = data.get("schema_type", "")
if schema == "RdSAP-Schema-21.0.1":
from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1
return EpcPropertyDataMapper.from_rdsap_schema_21_0_1(
from_dict(RdSapSchema21_0_1, data)
)
if schema == "RdSAP-Schema-21.0.0":
from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0
return EpcPropertyDataMapper.from_rdsap_schema_21_0_0(
from_dict(RdSapSchema21_0_0, data)
)
raise ValueError(f"Unsupported EPC schema: {schema!r}")
# ---------------------------------------------------------------------------
# Private helpers

View file

@ -1,77 +1,3 @@
import dataclasses
import typing
from datetime import date
from typing import Any, Dict, Type, TypeVar
from datatypes.epc.schema.helpers import from_dict
T = TypeVar("T")
def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
"""
Recursively convert a plain dict (e.g. from json.loads) into the given
dataclass type, using the field type hints to convert nested structures.
Handles:
- Nested dataclasses
- List[SomeDataclass]
- Optional[X] / Union[X, None]
- Union[DataclassType, primitive] (e.g. Union[Measurement, int])
- Primitive pass-through for Union[str, int] etc.
"""
return _from_dict_impl(cls, data) # type: ignore[return-value]
def _from_dict_impl(cls: Any, data: Any) -> Any:
hints = typing.get_type_hints(cls)
kwargs: Dict[str, Any] = {}
for field in dataclasses.fields(cls): # type: ignore[arg-type]
has_default = (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING # type: ignore[misc]
)
if field.name not in data:
if has_default:
continue
raise ValueError(f"{cls.__name__}: missing required field '{field.name}'")
kwargs[field.name] = _coerce(data[field.name], hints[field.name])
return cls(**kwargs)
def _coerce(value: Any, hint: Any) -> Any:
if value is None:
return None
origin = typing.get_origin(hint)
args = typing.get_args(hint)
# Union (includes Optional[X] which is Union[X, None])
if origin is typing.Union:
if value is None:
return None
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
# Optional[X] — recurse so List[X] and nested dataclasses are handled
return _coerce(value, non_none_args[0])
# Multi-type Union (e.g. Union[Measurement, int]): try dataclasses first
for arg in non_none_args:
if dataclasses.is_dataclass(arg) and isinstance(value, dict):
return _from_dict_impl(arg, value)
# All remaining args are primitives — return value as-is
return value
# List[X]
if origin is list:
item_hint = args[0]
return [_coerce(item, item_hint) for item in value]
# Plain dataclass
if dataclasses.is_dataclass(hint) and isinstance(value, dict):
return _from_dict_impl(hint, value)
if hint is date and isinstance(value, str):
return date.fromisoformat(value)
return value
__all__ = ["from_dict"]

View file

@ -3,6 +3,6 @@ pythonpath = .
log_cli = true
log_cli_level = INFO
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests backend/epc_client/tests
markers =
integration: mark a test as an integration test