From c9c43f178c51ae061dce767f1062981a3fa8acf3 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 8 May 2026 14:48:15 +0000 Subject: [PATCH] demo generated for use in address2uprn --- backend/address2UPRN/main.py | 76 +----- backend/address2UPRN/scoring.py | 57 +++++ datatypes/epc/domain/historic_epc.py | 88 +++++++ datatypes/epc/domain/historic_epc_matching.py | 114 +++++++++ datatypes/epc/domain/plan.md | 161 ------------ .../tests/test_historic_epc_matching.py | 239 ++++++++++++++++++ datatypes/epc/loaders/historic_epc.py | 2 +- datatypes/epc/schema/historic_epc.py | 98 ------- .../schema/tests/test_historic_epc_loading.py | 2 +- scripts/historic_epc_demo.py | 47 ++++ utils/s3.py | 15 ++ 11 files changed, 570 insertions(+), 329 deletions(-) create mode 100644 backend/address2UPRN/scoring.py create mode 100644 datatypes/epc/domain/historic_epc_matching.py delete mode 100644 datatypes/epc/domain/plan.md create mode 100644 datatypes/epc/domain/tests/test_historic_epc_matching.py delete mode 100644 datatypes/epc/schema/historic_epc.py create mode 100644 scripts/historic_epc_demo.py diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 28ad344f..b83c7a58 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -17,16 +17,12 @@ from utils.s3 import ( from datetime import datetime from backend.utils.addressMatch import AddressMatch - -logger = setup_logger() - - -EPC_AUTH_TOKEN = os.getenv( - "EPC_AUTH_TOKEN", +from backend.address2UPRN.scoring import ( # noqa: F401 (re-exported) + df_has_single_uprn, + get_uprn_candidates, ) -if EPC_AUTH_TOKEN is None: - raise RuntimeError("EPC_AUTH_TOKEN not defined in env") +logger = setup_logger() def score_addresses( @@ -45,7 +41,10 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): Recursively fetch EPC data by postcode. If results hit the size limit, retry with double size up to max_attempts. """ - client = EpcClient(auth_token=EPC_AUTH_TOKEN) + auth_token = os.getenv("EPC_AUTH_TOKEN") + if auth_token is None: + raise RuntimeError("EPC_AUTH_TOKEN not defined in env") + client = EpcClient(auth_token=auth_token) url = os.path.join(client.domestic.host, "search") @@ -88,65 +87,6 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): return results_df -def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: - """ - Returns True if all non-null UPRNs in df match the given uprn. - Returns False otherwise. - """ - - if column not in df.columns: - return False - - # Drop nulls and normalise to string - uprns = df[column].dropna().astype(str).str.strip().unique() - - # No valid UPRNs to compare - if len(uprns) == 0: - return False - - # Exactly one unique UPRN and it matches - return len(uprns) == 1 and uprns[0] == str(uprn) - - -def get_uprn_candidates( - df: pd.DataFrame, - user_address: str, - address_column: str = "address", - uprn_column: str = "uprn", -) -> pd.DataFrame: - """ - Annotate EPC results with lexicographical similarity scores and ranks. - - Returns a DataFrame sorted by descending lexiscore. - DOES NOT choose or return a UPRN. - """ - - if address_column not in df.columns: - raise ValueError(f"Missing column: {address_column}") - - if uprn_column not in df.columns: - raise ValueError(f"Missing column: {uprn_column}") - - out = df.copy() - - user_norm = AddressMatch.normalise_address(user_address) - - out["lexiscore"] = out[address_column].apply( - lambda x: AddressMatch.levenshtein(user_norm, x) - ) - - # Normalise UPRN to string - out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) - - # Rank: 1 = best match - out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) - - return out.sort_values( - ["lexirank", "lexiscore"], - ascending=[True, False], - ) - - def get_uprn_with_epc_df( user_inputed_address: str, epc_df: pd.DataFrame, diff --git a/backend/address2UPRN/scoring.py b/backend/address2UPRN/scoring.py new file mode 100644 index 00000000..d31b9aea --- /dev/null +++ b/backend/address2UPRN/scoring.py @@ -0,0 +1,57 @@ +import pandas as pd + +from backend.utils.addressMatch import AddressMatch + + +def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool: + """ + Returns True if all non-null UPRNs in df match the given uprn. + Returns False otherwise. + """ + + if column not in df.columns: + return False + + uprns = df[column].dropna().astype(str).str.strip().unique() + + if len(uprns) == 0: + return False + + return len(uprns) == 1 and uprns[0] == str(uprn) + + +def get_uprn_candidates( + df: pd.DataFrame, + user_address: str, + address_column: str = "address", + uprn_column: str = "uprn", +) -> pd.DataFrame: + """ + Annotate EPC results with lexicographical similarity scores and ranks. + + Returns a DataFrame sorted by descending lexiscore. + DOES NOT choose or return a UPRN. + """ + + if address_column not in df.columns: + raise ValueError(f"Missing column: {address_column}") + + if uprn_column not in df.columns: + raise ValueError(f"Missing column: {uprn_column}") + + out = df.copy() + + user_norm = AddressMatch.normalise_address(user_address) + + out["lexiscore"] = out[address_column].apply( + lambda x: AddressMatch.levenshtein(user_norm, x) + ) + + out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) + + out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int) + + return out.sort_values( + ["lexirank", "lexiscore"], + ascending=[True, False], + ) diff --git a/datatypes/epc/domain/historic_epc.py b/datatypes/epc/domain/historic_epc.py index 230c6327..f64ab8c4 100644 --- a/datatypes/epc/domain/historic_epc.py +++ b/datatypes/epc/domain/historic_epc.py @@ -3,8 +3,96 @@ from dataclasses import dataclass @dataclass class HistoricEpc: + lmk_key: str address1: str address2: str address3: str postcode: str + building_reference_number: str + current_energy_rating: str + potential_energy_rating: str + current_energy_efficiency: str + potential_energy_efficiency: str + property_type: str + built_form: str + inspection_date: str + local_authority: str + constituency: str + county: str + lodgement_date: str + transaction_type: str + environment_impact_current: str + environment_impact_potential: str + energy_consumption_current: str + energy_consumption_potential: str + co2_emissions_current: str + co2_emiss_curr_per_floor_area: str + co2_emissions_potential: str + lighting_cost_current: str + lighting_cost_potential: str + heating_cost_current: str + heating_cost_potential: str + hot_water_cost_current: str + hot_water_cost_potential: str + total_floor_area: str + energy_tariff: str + mains_gas_flag: str + floor_level: str + flat_top_storey: str + flat_storey_count: str + main_heating_controls: str + multi_glaze_proportion: str + glazed_type: str + glazed_area: str + extension_count: str + number_habitable_rooms: str + number_heated_rooms: str + low_energy_lighting: str + number_open_fireplaces: str + hotwater_description: str + hot_water_energy_eff: str + hot_water_env_eff: str + floor_description: str + floor_energy_eff: str + floor_env_eff: str + windows_description: str + windows_energy_eff: str + windows_env_eff: str + walls_description: str + walls_energy_eff: str + walls_env_eff: str + secondheat_description: str + sheating_energy_eff: str + sheating_env_eff: str + roof_description: str + roof_energy_eff: str + roof_env_eff: str + mainheat_description: str + mainheat_energy_eff: str + mainheat_env_eff: str + mainheatcont_description: str + mainheatc_energy_eff: str + mainheatc_env_eff: str + lighting_description: str + lighting_energy_eff: str + lighting_env_eff: str + main_fuel: str + wind_turbine_count: str + heat_loss_corridor: str + unheated_corridor_length: str + floor_height: str + photo_supply: str + solar_water_heating_flag: str + mechanical_ventilation: str + address: str + local_authority_label: str + constituency_label: str + posttown: str + construction_age_band: str + lodgement_datetime: str + tenure: str + fixed_lighting_outlets_count: str + low_energy_fixed_light_count: str uprn: str + uprn_source: str + report_type: str diff --git a/datatypes/epc/domain/historic_epc_matching.py b/datatypes/epc/domain/historic_epc_matching.py new file mode 100644 index 00000000..53f602ae --- /dev/null +++ b/datatypes/epc/domain/historic_epc_matching.py @@ -0,0 +1,114 @@ +from dataclasses import dataclass +from typing import Any, Optional + +import pandas as pd +from botocore.exceptions import ClientError + +from backend.address2UPRN.scoring import get_uprn_candidates +from backend.utils.addressMatch import AddressMatch +from datatypes.epc.domain.historic_epc import HistoricEpc +from utils.s3 import parse_s3_uri, read_csv_gz_from_s3 + +DEFAULT_S3_ROOT = "s3://retrofit-data-dev/historical_epc" + +_EXTRA_COLS = {"lexiscore", "lexirank"} + + +def _cell_to_str(v: Any) -> str: + if v is None or (isinstance(v, float) and pd.isna(v)): + return "" + s = str(v).replace("\xa0", " ") + # get_uprn_candidates runs .astype(str) on UPRN, turning NaN into "nan". + # Treat that as missing so unambiguous_uprn truthiness checks work. + if s.lower() == "nan": + return "" + return s + + +def _row_to_historic_epc(row: pd.Series) -> HistoricEpc: + kwargs = { + col.lower(): _cell_to_str(val) + for col, val in row.items() + if col.lower() not in _EXTRA_COLS + } + return HistoricEpc(**kwargs) + + +@dataclass(frozen=True) +class ScoredHistoricEpc: + record: HistoricEpc + lexiscore: float + lexirank: int + + +@dataclass +class HistoricEpcMatches: + user_address: str + postcode: str + matches: list[ScoredHistoricEpc] + + def top(self) -> Optional[ScoredHistoricEpc]: + return self.matches[0] if self.matches else None + + def top_n(self, k: int) -> list[ScoredHistoricEpc]: + return self.matches[:k] + + def unambiguous_uprn(self) -> Optional[str]: + top = self.top() + if top is None or top.lexiscore <= 0: + return None + rank1 = [m for m in self.matches if m.lexirank == top.lexirank] + uprns = {m.record.uprn for m in rank1 if m.record.uprn} + return next(iter(uprns)) if len(uprns) == 1 else None + + +def _sanitise_postcode(postcode: str) -> str: + cleaned = (postcode or "").upper().replace(" ", "") + if not cleaned: + raise ValueError("postcode must contain non-whitespace characters") + if not AddressMatch.is_valid_postcode(cleaned): + raise ValueError(f"postcode {cleaned!r} is not a valid UK postcode") + return cleaned + + +def match_addresses_for_postcode( + user_address: str, + postcode: str, + *, + s3_root: str = DEFAULT_S3_ROOT, + address_column: str = "ADDRESS", + uprn_column: str = "UPRN", +) -> HistoricEpcMatches: + if not user_address: + raise ValueError("user_address must be non-empty") + + pc = _sanitise_postcode(postcode) + bucket, root_prefix = parse_s3_uri(s3_root) + key = f"{root_prefix.rstrip('/')}/{pc}/data.csv.gz" + + try: + df = read_csv_gz_from_s3(bucket, key) + except ClientError as e: + if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"): + raise FileNotFoundError( + f"No historic EPC data at s3://{bucket}/{key}" + ) from e + raise + + scored = get_uprn_candidates( + df, + user_address=user_address, + address_column=address_column, + uprn_column=uprn_column, + ) + + matches = [ + ScoredHistoricEpc( + record=_row_to_historic_epc(row), + lexiscore=float(row["lexiscore"]), + lexirank=int(row["lexirank"]), + ) + for _, row in scored.iterrows() + ] + + return HistoricEpcMatches(user_address=user_address, postcode=pc, matches=matches) diff --git a/datatypes/epc/domain/plan.md b/datatypes/epc/domain/plan.md deleted file mode 100644 index 45cc495b..00000000 --- a/datatypes/epc/domain/plan.md +++ /dev/null @@ -1,161 +0,0 @@ -# Historic EPC address-match service - -## Context - -ETL `backend/etl/etl_opendatacommunities/main.py` shards `certificates.csv` by sanitised postcode and uploads gzipped CSVs to `s3://retrofit-data-dev/historical_epc//data.csv.gz`. Need a pure-python lib that, given `(user_address, postcode)`, fetches the corresponding shard and scores every row against the user address using the same lexiscore as `address2UPRN` — but returning the full scored df (not a single UPRN), so callers can apply their own thresholding. - -Mirrors pattern in [backend/address2UPRN/main.py:111-147](backend/address2UPRN/main.py#L111-L147) (`get_uprn_candidates`) but reads from S3 historic CSV instead of the EPC live API. No Lambda, no script — lib only for now. - -## Approach - -Add a wrapper class `HistoricEpcMatches` and a function `match_addresses_for_postcode` to the existing domain file. Add a small gzip-CSV S3 helper to `utils/s3.py`. - -### 1. Add gzip-CSV S3 reader - -In [utils/s3.py](utils/s3.py) (after `read_dataframe_from_s3_parquet` ~line 167): - -```python -def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame: - if not file_key.endswith(".csv.gz"): - raise ValueError("file_key must end with .csv.gz") - buf = read_io_from_s3(bucket_name, file_key) - return pd.read_csv(buf, compression="gzip", low_memory=False) -``` - -Reuses existing `read_io_from_s3` (line 105). Caller catches `botocore.exceptions.ClientError` for missing-key handling. - -### 2. Append matcher to domain module - -In [datatypes/epc/domain/historic_epc.py](datatypes/epc/domain/historic_epc.py) — keep existing `HistoricEpc` dataclass intact, append: - -```python -from typing import Optional -import pandas as pd -from botocore.exceptions import ClientError - -from backend.utils.addressMatch import AddressMatch -from utils.s3 import read_csv_gz_from_s3 - - -@dataclass -class HistoricEpcMatches: - """Scored historic EPC rows for a single postcode.""" - user_address: str - postcode: str # sanitised - df: pd.DataFrame # has lexiscore + lexirank, sorted best-first - - def top(self) -> Optional[pd.Series]: - return None if self.df.empty else self.df.iloc[0] - - def top_n(self, k: int) -> pd.DataFrame: - return self.df.head(k) - - def unambiguous_uprn(self, uprn_column: str = "UPRN") -> Optional[str]: - if self.df.empty: - return None - top_rank = self.df["lexirank"].min() - uprns = ( - self.df.loc[self.df["lexirank"] == top_rank, uprn_column] - .dropna().astype(str).str.replace(r"\.0$", "", regex=True) - .unique() - ) - return uprns[0] if len(uprns) == 1 else None - - -def _sanitise_postcode(postcode: str) -> str: - if not postcode: - raise ValueError("postcode must be non-empty") - return postcode.upper().replace(" ", "") - - -def match_addresses_for_postcode( - user_address: str, - postcode: str, - *, - bucket: str = "retrofit-data-dev", - prefix: str = "historical_epc", - address_column: str = "ADDRESS", -) -> HistoricEpcMatches: - if not user_address: - raise ValueError("user_address must be non-empty") - - pc = _sanitise_postcode(postcode) - key = f"{prefix}/{pc}/data.csv.gz" - - try: - df = read_csv_gz_from_s3(bucket, key) - except ClientError as e: - if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"): - raise FileNotFoundError( - f"No historic EPC data at s3://{bucket}/{key}" - ) from e - raise - - if address_column not in df.columns: - raise ValueError( - f"Missing address column {address_column!r} in {key}" - ) - - user_norm = AddressMatch.normalise_address(user_address) - df = df.copy() - df["lexiscore"] = df[address_column].fillna("").apply( - lambda x: AddressMatch.levenshtein(user_norm, x) - ) - df["lexirank"] = ( - df["lexiscore"].rank(method="dense", ascending=False).astype(int) - ) - df = df.sort_values(["lexirank", "lexiscore"], ascending=[True, False]).reset_index(drop=True) - - return HistoricEpcMatches(user_address=user_address, postcode=pc, df=df) -``` - -### Reuse notes -- `AddressMatch.normalise_address` + `AddressMatch.levenshtein` from [backend/utils/addressMatch.py](backend/utils/addressMatch.py) — same scoring as address2UPRN. -- Score column copy uses `.fillna("")` to defend against NaN in `ADDRESS`. -- Defaults match ETL output: bucket `retrofit-data-dev`, prefix `historical_epc`, column `ADDRESS` (uppercase). - -### 3. Tests - -New: [datatypes/epc/domain/tests/__init__.py](datatypes/epc/domain/tests/__init__.py) (empty) and [datatypes/epc/domain/tests/test_historic_epc_match.py](datatypes/epc/domain/tests/test_historic_epc_match.py). - -Reuse existing fixture `datatypes/epc/schema/tests/fixtures/historic_epc.csv` — read it in-memory in tests; do NOT commit a `.csv.gz` fixture. Patch target: `datatypes.epc.domain.historic_epc.read_csv_gz_from_s3` (local binding, not `utils.s3.read_csv_gz_from_s3`). - -Cases: -1. `_sanitise_postcode("ab33 8al") == "AB338AL"`; empty raises. -2. Returned df has `lexiscore` + `lexirank` columns, row count preserved. -3. df sorted: `iloc[0]["lexirank"] == 1`, `lexiscore` monotone non-increasing. -4. S3 key built correctly: `"AB33 8AL"` → key `"historical_epc/AB338AL/data.csv.gz"` (spy on patched helper). -5. `ClientError` with code `NoSuchKey` → `FileNotFoundError`. -6. Exact-match address → `unambiguous_uprn()` returns that UPRN; ambiguous tie → `None`. -7. `top()` / `top_n(k)` shape checks. - -## Critical files -- [datatypes/epc/domain/historic_epc.py](datatypes/epc/domain/historic_epc.py) — append matcher -- [utils/s3.py](utils/s3.py) — add `read_csv_gz_from_s3` -- [datatypes/epc/domain/tests/test_historic_epc_match.py](datatypes/epc/domain/tests/test_historic_epc_match.py) — new - -## Out of scope -- Lambda handler / SQS wiring (deferred — lib only) -- Threshold logic (caller decides via wrapper helpers) -- Postcode validation via `postcodes.io` (`AddressMatch.is_valid_postcode` exists if needed later) -- Refactoring `sanitise(pd.Series)` in `etl_opendatacommunities/main.py` — separate concern - -## Verification -``` -cd /workspaces/model && pytest datatypes/epc/domain/tests/test_historic_epc_match.py -v -``` - -Sample real-S3 call (needs AWS creds): -```python -from datatypes.epc.domain.historic_epc import match_addresses_for_postcode -m = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") -print(m.df[["ADDRESS", "UPRN", "lexiscore", "lexirank"]].head()) -print(m.unambiguous_uprn()) -``` - -## Sequencing -1. Add `read_csv_gz_from_s3` to `utils/s3.py`. -2. Append matcher + wrapper to `datatypes/epc/domain/historic_epc.py`. -3. Add tests. - -Steps 2 & 3 depend on 1. No `__init__.py` re-exports needed. diff --git a/datatypes/epc/domain/tests/test_historic_epc_matching.py b/datatypes/epc/domain/tests/test_historic_epc_matching.py new file mode 100644 index 00000000..c23846e1 --- /dev/null +++ b/datatypes/epc/domain/tests/test_historic_epc_matching.py @@ -0,0 +1,239 @@ +from unittest.mock import patch + +import numpy as np +import pandas as pd +import pytest +from botocore.exceptions import ClientError + +from datatypes.epc.domain import historic_epc_matching as matcher_mod +from datatypes.epc.domain.historic_epc_matching import ( + HistoricEpcMatches, + ScoredHistoricEpc, + _sanitise_postcode, + match_addresses_for_postcode, +) + + +# Columns required by the HistoricEpc dataclass (lower-cased CSV columns). +# The matcher only reads ADDRESS + UPRN to score; everything else is filled +# with "" but must be present for HistoricEpc(**kwargs) to construct. +_FULL_COLUMN_FIELDS = [ + "LMK_KEY", "ADDRESS1", "ADDRESS2", "ADDRESS3", "POSTCODE", + "BUILDING_REFERENCE_NUMBER", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING", + "CURRENT_ENERGY_EFFICIENCY", "POTENTIAL_ENERGY_EFFICIENCY", "PROPERTY_TYPE", + "BUILT_FORM", "INSPECTION_DATE", "LOCAL_AUTHORITY", "CONSTITUENCY", "COUNTY", + "LODGEMENT_DATE", "TRANSACTION_TYPE", "ENVIRONMENT_IMPACT_CURRENT", + "ENVIRONMENT_IMPACT_POTENTIAL", "ENERGY_CONSUMPTION_CURRENT", + "ENERGY_CONSUMPTION_POTENTIAL", "CO2_EMISSIONS_CURRENT", + "CO2_EMISS_CURR_PER_FLOOR_AREA", "CO2_EMISSIONS_POTENTIAL", + "LIGHTING_COST_CURRENT", "LIGHTING_COST_POTENTIAL", "HEATING_COST_CURRENT", + "HEATING_COST_POTENTIAL", "HOT_WATER_COST_CURRENT", "HOT_WATER_COST_POTENTIAL", + "TOTAL_FLOOR_AREA", "ENERGY_TARIFF", "MAINS_GAS_FLAG", "FLOOR_LEVEL", + "FLAT_TOP_STOREY", "FLAT_STOREY_COUNT", "MAIN_HEATING_CONTROLS", + "MULTI_GLAZE_PROPORTION", "GLAZED_TYPE", "GLAZED_AREA", "EXTENSION_COUNT", + "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "LOW_ENERGY_LIGHTING", + "NUMBER_OPEN_FIREPLACES", "HOTWATER_DESCRIPTION", "HOT_WATER_ENERGY_EFF", + "HOT_WATER_ENV_EFF", "FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF", + "WINDOWS_DESCRIPTION", "WINDOWS_ENERGY_EFF", "WINDOWS_ENV_EFF", + "WALLS_DESCRIPTION", "WALLS_ENERGY_EFF", "WALLS_ENV_EFF", + "SECONDHEAT_DESCRIPTION", "SHEATING_ENERGY_EFF", "SHEATING_ENV_EFF", + "ROOF_DESCRIPTION", "ROOF_ENERGY_EFF", "ROOF_ENV_EFF", "MAINHEAT_DESCRIPTION", + "MAINHEAT_ENERGY_EFF", "MAINHEAT_ENV_EFF", "MAINHEATCONT_DESCRIPTION", + "MAINHEATC_ENERGY_EFF", "MAINHEATC_ENV_EFF", "LIGHTING_DESCRIPTION", + "LIGHTING_ENERGY_EFF", "LIGHTING_ENV_EFF", "MAIN_FUEL", "WIND_TURBINE_COUNT", + "HEAT_LOSS_CORRIDOR", "UNHEATED_CORRIDOR_LENGTH", "FLOOR_HEIGHT", + "PHOTO_SUPPLY", "SOLAR_WATER_HEATING_FLAG", "MECHANICAL_VENTILATION", + "ADDRESS", "LOCAL_AUTHORITY_LABEL", "CONSTITUENCY_LABEL", "POSTTOWN", + "CONSTRUCTION_AGE_BAND", "LODGEMENT_DATETIME", "TENURE", + "FIXED_LIGHTING_OUTLETS_COUNT", "LOW_ENERGY_FIXED_LIGHT_COUNT", "UPRN", + "UPRN_SOURCE", "REPORT_TYPE", +] + + +def _row(address: str, uprn) -> dict: + row = {col: "" for col in _FULL_COLUMN_FIELDS} + row["ADDRESS"] = address + row["UPRN"] = uprn + return row + + +def _build_df(rows: list[dict]) -> pd.DataFrame: + return pd.DataFrame(rows, columns=_FULL_COLUMN_FIELDS) + + +@pytest.fixture +def patch_postcode_valid(): + with patch.object(matcher_mod.AddressMatch, "is_valid_postcode", return_value=True) as m: + yield m + + +@pytest.fixture +def patch_read(): + with patch.object(matcher_mod, "read_csv_gz_from_s3") as m: + yield m + + +# ---------- _sanitise_postcode ---------- + + +class TestSanitisePostcode: + + def test_uppercases_and_strips_spaces(self, patch_postcode_valid): + assert _sanitise_postcode("ab33 8al") == "AB338AL" + + def test_empty_raises(self, patch_postcode_valid): + with pytest.raises(ValueError, match="non-whitespace"): + _sanitise_postcode("") + + def test_whitespace_only_raises(self, patch_postcode_valid): + with pytest.raises(ValueError, match="non-whitespace"): + _sanitise_postcode(" ") + + def test_invalid_postcode_raises(self): + with patch.object( + matcher_mod.AddressMatch, "is_valid_postcode", return_value=False + ): + with pytest.raises(ValueError, match="not a valid UK postcode"): + _sanitise_postcode("NONSENSE") + + +# ---------- match_addresses_for_postcode ---------- + + +class TestMatchAddressesForPostcode: + + def test_preserves_row_count_including_zero_score_rows( + self, patch_read, patch_postcode_valid + ): + # Disjoint number sets => hard zero. Still kept in matches. + patch_read.return_value = _build_df([ + _row("47 GORDON ROAD", "100"), + _row("999 SOMEWHERE ELSE", "200"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + assert isinstance(result, HistoricEpcMatches) + assert len(result.matches) == 2 + + def test_top_has_lexirank_one_and_lexiscore_monotone( + self, patch_read, patch_postcode_valid + ): + patch_read.return_value = _build_df([ + _row("48 GORDON ROAD", "200"), # near miss + _row("47 GORDON ROAD", "100"), # exact (after normalisation) + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + assert result.top().lexirank == 1 + scores = [m.lexiscore for m in result.matches] + assert scores == sorted(scores, reverse=True) + + def test_s3_key_built_from_default_root(self, patch_read, patch_postcode_valid): + patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")]) + match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + patch_read.assert_called_once_with( + "retrofit-data-dev", "historical_epc/AB338AL/data.csv.gz" + ) + + def test_s3_key_respects_custom_root_with_trailing_slash( + self, patch_read, patch_postcode_valid + ): + patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")]) + match_addresses_for_postcode( + "47 Gordon Road", + "AB33 8AL", + s3_root="s3://my-bucket/some/prefix/", + ) + patch_read.assert_called_once_with( + "my-bucket", "some/prefix/AB338AL/data.csv.gz" + ) + + def test_no_such_key_translates_to_filenotfound( + self, patch_read, patch_postcode_valid + ): + patch_read.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "missing"}}, "GetObject" + ) + with pytest.raises(FileNotFoundError): + match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + + def test_other_client_error_propagates(self, patch_read, patch_postcode_valid): + patch_read.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "nope"}}, "GetObject" + ) + with pytest.raises(ClientError): + match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + + def test_empty_user_address_raises(self, patch_postcode_valid): + with pytest.raises(ValueError, match="user_address"): + match_addresses_for_postcode("", "AB33 8AL") + + +# ---------- unambiguous_uprn ---------- + + +class TestUnambiguousUprn: + + def test_exact_match_returns_uprn(self, patch_read, patch_postcode_valid): + patch_read.return_value = _build_df([ + _row("47 GORDON ROAD", "100"), + _row("48 GORDON ROAD", "200"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + assert result.unambiguous_uprn() == "100" + + def test_ambiguous_tie_returns_none(self, patch_read, patch_postcode_valid): + # Two duplicate addresses with different UPRNs share rank-1. + patch_read.return_value = _build_df([ + _row("47 GORDON ROAD", "100"), + _row("47 GORDON ROAD", "200"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + assert result.unambiguous_uprn() is None + + def test_all_zero_score_returns_none_even_when_uprn_unique( + self, patch_read, patch_postcode_valid + ): + # User address has building number 47; no row has 47 -> all hard-zero. + patch_read.return_value = _build_df([ + _row("999 ELSEWHERE", "100"), + _row("888 ELSEWHERE", "200"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + assert all(m.lexiscore == 0.0 for m in result.matches) + assert result.unambiguous_uprn() is None + + def test_nan_uprn_becomes_empty_string_not_nan( + self, patch_read, patch_postcode_valid + ): + # Use a real NaN in the UPRN cell. + patch_read.return_value = _build_df([ + _row("47 GORDON ROAD", np.nan), + _row("48 GORDON ROAD", "200"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + top = result.top() + # _cell_to_str must turn NaN/"nan" into "" (not the literal string "nan"), + # so unambiguous_uprn's truthiness check correctly drops the row. + assert top.record.uprn == "" + + +# ---------- top / top_n ---------- + + +class TestTopHelpers: + + def test_top_n_returns_first_k(self, patch_read, patch_postcode_valid): + patch_read.return_value = _build_df([ + _row("47 GORDON ROAD", "100"), + _row("48 GORDON ROAD", "200"), + _row("49 GORDON ROAD", "300"), + ]) + result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL") + top2 = result.top_n(2) + assert len(top2) == 2 + assert all(isinstance(m, ScoredHistoricEpc) for m in top2) + + def test_top_on_empty_matches_returns_none(self): + empty = HistoricEpcMatches(user_address="x", postcode="AB338AL", matches=[]) + assert empty.top() is None + assert empty.top_n(5) == [] + assert empty.unambiguous_uprn() is None diff --git a/datatypes/epc/loaders/historic_epc.py b/datatypes/epc/loaders/historic_epc.py index 7b563315..a4757d23 100644 --- a/datatypes/epc/loaders/historic_epc.py +++ b/datatypes/epc/loaders/historic_epc.py @@ -1,6 +1,6 @@ import csv -from datatypes.epc.schema.historic_epc import HistoricEpc +from datatypes.epc.domain.historic_epc import HistoricEpc def _normalise(value: str | None) -> str: diff --git a/datatypes/epc/schema/historic_epc.py b/datatypes/epc/schema/historic_epc.py deleted file mode 100644 index f64ab8c4..00000000 --- a/datatypes/epc/schema/historic_epc.py +++ /dev/null @@ -1,98 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class HistoricEpc: - lmk_key: str - address1: str - address2: str - address3: str - postcode: str - building_reference_number: str - current_energy_rating: str - potential_energy_rating: str - current_energy_efficiency: str - potential_energy_efficiency: str - property_type: str - built_form: str - inspection_date: str - local_authority: str - constituency: str - county: str - lodgement_date: str - transaction_type: str - environment_impact_current: str - environment_impact_potential: str - energy_consumption_current: str - energy_consumption_potential: str - co2_emissions_current: str - co2_emiss_curr_per_floor_area: str - co2_emissions_potential: str - lighting_cost_current: str - lighting_cost_potential: str - heating_cost_current: str - heating_cost_potential: str - hot_water_cost_current: str - hot_water_cost_potential: str - total_floor_area: str - energy_tariff: str - mains_gas_flag: str - floor_level: str - flat_top_storey: str - flat_storey_count: str - main_heating_controls: str - multi_glaze_proportion: str - glazed_type: str - glazed_area: str - extension_count: str - number_habitable_rooms: str - number_heated_rooms: str - low_energy_lighting: str - number_open_fireplaces: str - hotwater_description: str - hot_water_energy_eff: str - hot_water_env_eff: str - floor_description: str - floor_energy_eff: str - floor_env_eff: str - windows_description: str - windows_energy_eff: str - windows_env_eff: str - walls_description: str - walls_energy_eff: str - walls_env_eff: str - secondheat_description: str - sheating_energy_eff: str - sheating_env_eff: str - roof_description: str - roof_energy_eff: str - roof_env_eff: str - mainheat_description: str - mainheat_energy_eff: str - mainheat_env_eff: str - mainheatcont_description: str - mainheatc_energy_eff: str - mainheatc_env_eff: str - lighting_description: str - lighting_energy_eff: str - lighting_env_eff: str - main_fuel: str - wind_turbine_count: str - heat_loss_corridor: str - unheated_corridor_length: str - floor_height: str - photo_supply: str - solar_water_heating_flag: str - mechanical_ventilation: str - address: str - local_authority_label: str - constituency_label: str - posttown: str - construction_age_band: str - lodgement_datetime: str - tenure: str - fixed_lighting_outlets_count: str - low_energy_fixed_light_count: str - uprn: str - uprn_source: str - report_type: str diff --git a/datatypes/epc/schema/tests/test_historic_epc_loading.py b/datatypes/epc/schema/tests/test_historic_epc_loading.py index 2170a8a6..a42f383e 100644 --- a/datatypes/epc/schema/tests/test_historic_epc_loading.py +++ b/datatypes/epc/schema/tests/test_historic_epc_loading.py @@ -3,7 +3,7 @@ import os import pytest from datatypes.epc.loaders.historic_epc import read_historic_epc_csv -from datatypes.epc.schema.historic_epc import HistoricEpc +from datatypes.epc.domain.historic_epc import HistoricEpc FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures") diff --git a/scripts/historic_epc_demo.py b/scripts/historic_epc_demo.py new file mode 100644 index 00000000..b47c3a3c --- /dev/null +++ b/scripts/historic_epc_demo.py @@ -0,0 +1,47 @@ +"""Demo: look up historic EPC records for an address + postcode. + +Reads the gzipped CSV at + s3://retrofit-data-dev/historical_epc//data.csv.gz +scores rows against the user-provided address, and prints the top matches. + +Usage: + python -m scripts.historic_epc_demo "47 Gordon Road" "AB33 8AL" + python -m scripts.historic_epc_demo # uses defaults below +""" + +import sys + +from datatypes.epc.domain.historic_epc_matching import match_addresses_for_postcode + + +def main(user_address: str, postcode: str) -> None: + print(f"Looking up: {user_address!r} @ {postcode!r}\n") + + result = match_addresses_for_postcode(user_address, postcode) + + print(f"Found {len(result.matches)} candidate row(s).\n") + + print("Top 3 matches:") + for m in result.top_n(3): + print( + f" rank={m.lexirank} score={m.lexiscore:.3f} " + f"uprn={m.record.uprn or '(none)':<14} {m.record.address}" + ) + + print() + uprn = result.unambiguous_uprn() + if uprn: + print(f"Unambiguous UPRN: {uprn}") + else: + print("No unambiguous UPRN (zero-score, tie, or empty result).") + + +if __name__ == "__main__": + args = sys.argv[1:] + if len(args) == 2: + main(args[0], args[1]) + elif len(args) == 0: + main("47 Gordon Road", "AB33 8AL") + else: + print(__doc__) + sys.exit(2) diff --git a/utils/s3.py b/utils/s3.py index 930e2e15..a28f074e 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -167,6 +167,21 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key): return df +def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame: + """ + Read a gzipped CSV from S3 into a pandas DataFrame. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (must end in .csv.gz). + :return: A pandas DataFrame. + """ + if not file_key.endswith(".csv.gz"): + raise ValueError("file_key must end with .csv.gz") + + buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key) + return pd.read_csv(buffer, compression="gzip", low_memory=False) + + def save_csv_to_s3(dataframe, bucket_name, file_name): """ Save a Pandas DataFrame to a CSV file in an S3 bucket.