mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
104 lines
3 KiB
Python
104 lines
3 KiB
Python
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
from botocore.exceptions import ClientError
|
|
|
|
from backend.address2UPRN.scoring import get_uprn_candidates
|
|
from backend.utils.addressMatch import AddressMatch
|
|
from domain.epc.historic_epc import HistoricEpc
|
|
from utils.pandas_utils import pandas_cell_to_str
|
|
from utils.s3 import parse_s3_uri, read_csv_gz_from_s3
|
|
|
|
DEFAULT_S3_ROOT = "s3://retrofit-data-dev/historical_epc"
|
|
|
|
_EXTRA_COLS = {"lexiscore", "lexirank"}
|
|
|
|
|
|
def _map_historic_epc_pandas_row_to_domain(row: pd.Series) -> HistoricEpc:
|
|
kwargs = {
|
|
col.lower(): pandas_cell_to_str(val)
|
|
for col, val in row.items()
|
|
if col.lower() not in _EXTRA_COLS
|
|
}
|
|
return HistoricEpc(**kwargs)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScoredHistoricEpc:
|
|
record: HistoricEpc
|
|
lexiscore: float
|
|
lexirank: int
|
|
|
|
|
|
@dataclass
|
|
class HistoricEpcMatches:
|
|
user_address: str
|
|
postcode: str
|
|
matches: list[ScoredHistoricEpc]
|
|
|
|
def top(self) -> Optional[ScoredHistoricEpc]:
|
|
return self.matches[0] if self.matches else None
|
|
|
|
def top_n(self, k: int) -> list[ScoredHistoricEpc]:
|
|
return self.matches[:k]
|
|
|
|
def unambiguous_uprn(self) -> Optional[str]:
|
|
top = self.top()
|
|
if top is None or top.lexiscore <= 0:
|
|
return None
|
|
rank1 = [m for m in self.matches if m.lexirank == top.lexirank]
|
|
uprns = {m.record.uprn for m in rank1 if m.record.uprn}
|
|
return next(iter(uprns)) if len(uprns) == 1 else None
|
|
|
|
|
|
def _sanitise_postcode(postcode: str) -> str:
|
|
cleaned = (postcode or "").upper().replace(" ", "")
|
|
if not cleaned:
|
|
raise ValueError("postcode must contain non-whitespace characters")
|
|
if not AddressMatch.is_valid_postcode(cleaned):
|
|
raise ValueError(f"postcode {cleaned!r} is not a valid UK postcode")
|
|
return cleaned
|
|
|
|
|
|
def match_addresses_for_postcode(
|
|
user_address: str,
|
|
postcode: str,
|
|
*,
|
|
s3_root: str = DEFAULT_S3_ROOT,
|
|
address_column: str = "ADDRESS",
|
|
uprn_column: str = "UPRN",
|
|
) -> HistoricEpcMatches:
|
|
if not user_address:
|
|
raise ValueError("user_address must be non-empty")
|
|
|
|
pc = _sanitise_postcode(postcode)
|
|
bucket, root_prefix = parse_s3_uri(s3_root)
|
|
key = f"{root_prefix.rstrip('/')}/{pc}/data.csv.gz"
|
|
|
|
try:
|
|
df = read_csv_gz_from_s3(bucket, key)
|
|
except ClientError as e:
|
|
if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
|
|
raise FileNotFoundError(
|
|
f"No historic EPC data at s3://{bucket}/{key}"
|
|
) from e
|
|
raise
|
|
|
|
scored = get_uprn_candidates(
|
|
df,
|
|
user_address=user_address,
|
|
address_column=address_column,
|
|
uprn_column=uprn_column,
|
|
)
|
|
|
|
matches = [
|
|
ScoredHistoricEpc(
|
|
record=_map_historic_epc_pandas_row_to_domain(row),
|
|
lexiscore=float(row["lexiscore"]),
|
|
lexirank=int(row["lexirank"]),
|
|
)
|
|
for _, row in scored.iterrows()
|
|
]
|
|
|
|
return HistoricEpcMatches(user_address=user_address, postcode=pc, matches=matches)
|