demo generated for use in address2uprn

This commit is contained in:
Jun-te Kim 2026-05-08 14:48:15 +00:00
parent 8b6a572223
commit c9c43f178c
11 changed files with 570 additions and 329 deletions

View file

@ -17,16 +17,12 @@ from utils.s3 import (
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
logger = setup_logger()
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
from backend.address2UPRN.scoring import ( # noqa: F401 (re-exported)
df_has_single_uprn,
get_uprn_candidates,
)
if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
logger = setup_logger()
def score_addresses(
@ -45,7 +41,10 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
client = EpcClient(auth_token=EPC_AUTH_TOKEN)
auth_token = os.getenv("EPC_AUTH_TOKEN")
if auth_token is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
client = EpcClient(auth_token=auth_token)
url = os.path.join(client.domestic.host, "search")
@ -88,65 +87,6 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
return results_df
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
# Drop nulls and normalise to string
uprns = df[column].dropna().astype(str).str.strip().unique()
# No valid UPRNs to compare
if len(uprns) == 0:
return False
# Exactly one unique UPRN and it matches
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
# Rank: 1 = best match
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)
def get_uprn_with_epc_df(
user_inputed_address: str,
epc_df: pd.DataFrame,

View file

@ -0,0 +1,57 @@
import pandas as pd
from backend.utils.addressMatch import AddressMatch
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
uprns = df[column].dropna().astype(str).str.strip().unique()
if len(uprns) == 0:
return False
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)

View file

@ -3,8 +3,96 @@ from dataclasses import dataclass
@dataclass
class HistoricEpc:
lmk_key: str
address1: str
address2: str
address3: str
postcode: str
building_reference_number: str
current_energy_rating: str
potential_energy_rating: str
current_energy_efficiency: str
potential_energy_efficiency: str
property_type: str
built_form: str
inspection_date: str
local_authority: str
constituency: str
county: str
lodgement_date: str
transaction_type: str
environment_impact_current: str
environment_impact_potential: str
energy_consumption_current: str
energy_consumption_potential: str
co2_emissions_current: str
co2_emiss_curr_per_floor_area: str
co2_emissions_potential: str
lighting_cost_current: str
lighting_cost_potential: str
heating_cost_current: str
heating_cost_potential: str
hot_water_cost_current: str
hot_water_cost_potential: str
total_floor_area: str
energy_tariff: str
mains_gas_flag: str
floor_level: str
flat_top_storey: str
flat_storey_count: str
main_heating_controls: str
multi_glaze_proportion: str
glazed_type: str
glazed_area: str
extension_count: str
number_habitable_rooms: str
number_heated_rooms: str
low_energy_lighting: str
number_open_fireplaces: str
hotwater_description: str
hot_water_energy_eff: str
hot_water_env_eff: str
floor_description: str
floor_energy_eff: str
floor_env_eff: str
windows_description: str
windows_energy_eff: str
windows_env_eff: str
walls_description: str
walls_energy_eff: str
walls_env_eff: str
secondheat_description: str
sheating_energy_eff: str
sheating_env_eff: str
roof_description: str
roof_energy_eff: str
roof_env_eff: str
mainheat_description: str
mainheat_energy_eff: str
mainheat_env_eff: str
mainheatcont_description: str
mainheatc_energy_eff: str
mainheatc_env_eff: str
lighting_description: str
lighting_energy_eff: str
lighting_env_eff: str
main_fuel: str
wind_turbine_count: str
heat_loss_corridor: str
unheated_corridor_length: str
floor_height: str
photo_supply: str
solar_water_heating_flag: str
mechanical_ventilation: str
address: str
local_authority_label: str
constituency_label: str
posttown: str
construction_age_band: str
lodgement_datetime: str
tenure: str
fixed_lighting_outlets_count: str
low_energy_fixed_light_count: str
uprn: str
uprn_source: str
report_type: str

View file

@ -0,0 +1,114 @@
from dataclasses import dataclass
from typing import Any, Optional
import pandas as pd
from botocore.exceptions import ClientError
from backend.address2UPRN.scoring import get_uprn_candidates
from backend.utils.addressMatch import AddressMatch
from datatypes.epc.domain.historic_epc import HistoricEpc
from utils.s3 import parse_s3_uri, read_csv_gz_from_s3
DEFAULT_S3_ROOT = "s3://retrofit-data-dev/historical_epc"
_EXTRA_COLS = {"lexiscore", "lexirank"}
def _cell_to_str(v: Any) -> str:
if v is None or (isinstance(v, float) and pd.isna(v)):
return ""
s = str(v).replace("\xa0", " ")
# get_uprn_candidates runs .astype(str) on UPRN, turning NaN into "nan".
# Treat that as missing so unambiguous_uprn truthiness checks work.
if s.lower() == "nan":
return ""
return s
def _row_to_historic_epc(row: pd.Series) -> HistoricEpc:
kwargs = {
col.lower(): _cell_to_str(val)
for col, val in row.items()
if col.lower() not in _EXTRA_COLS
}
return HistoricEpc(**kwargs)
@dataclass(frozen=True)
class ScoredHistoricEpc:
record: HistoricEpc
lexiscore: float
lexirank: int
@dataclass
class HistoricEpcMatches:
user_address: str
postcode: str
matches: list[ScoredHistoricEpc]
def top(self) -> Optional[ScoredHistoricEpc]:
return self.matches[0] if self.matches else None
def top_n(self, k: int) -> list[ScoredHistoricEpc]:
return self.matches[:k]
def unambiguous_uprn(self) -> Optional[str]:
top = self.top()
if top is None or top.lexiscore <= 0:
return None
rank1 = [m for m in self.matches if m.lexirank == top.lexirank]
uprns = {m.record.uprn for m in rank1 if m.record.uprn}
return next(iter(uprns)) if len(uprns) == 1 else None
def _sanitise_postcode(postcode: str) -> str:
cleaned = (postcode or "").upper().replace(" ", "")
if not cleaned:
raise ValueError("postcode must contain non-whitespace characters")
if not AddressMatch.is_valid_postcode(cleaned):
raise ValueError(f"postcode {cleaned!r} is not a valid UK postcode")
return cleaned
def match_addresses_for_postcode(
user_address: str,
postcode: str,
*,
s3_root: str = DEFAULT_S3_ROOT,
address_column: str = "ADDRESS",
uprn_column: str = "UPRN",
) -> HistoricEpcMatches:
if not user_address:
raise ValueError("user_address must be non-empty")
pc = _sanitise_postcode(postcode)
bucket, root_prefix = parse_s3_uri(s3_root)
key = f"{root_prefix.rstrip('/')}/{pc}/data.csv.gz"
try:
df = read_csv_gz_from_s3(bucket, key)
except ClientError as e:
if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
raise FileNotFoundError(
f"No historic EPC data at s3://{bucket}/{key}"
) from e
raise
scored = get_uprn_candidates(
df,
user_address=user_address,
address_column=address_column,
uprn_column=uprn_column,
)
matches = [
ScoredHistoricEpc(
record=_row_to_historic_epc(row),
lexiscore=float(row["lexiscore"]),
lexirank=int(row["lexirank"]),
)
for _, row in scored.iterrows()
]
return HistoricEpcMatches(user_address=user_address, postcode=pc, matches=matches)

View file

@ -1,161 +0,0 @@
# Historic EPC address-match service
## Context
ETL `backend/etl/etl_opendatacommunities/main.py` shards `certificates.csv` by sanitised postcode and uploads gzipped CSVs to `s3://retrofit-data-dev/historical_epc/<POSTCODE_NO_SPACE_UPPER>/data.csv.gz`. Need a pure-python lib that, given `(user_address, postcode)`, fetches the corresponding shard and scores every row against the user address using the same lexiscore as `address2UPRN` — but returning the full scored df (not a single UPRN), so callers can apply their own thresholding.
Mirrors pattern in [backend/address2UPRN/main.py:111-147](backend/address2UPRN/main.py#L111-L147) (`get_uprn_candidates`) but reads from S3 historic CSV instead of the EPC live API. No Lambda, no script — lib only for now.
## Approach
Add a wrapper class `HistoricEpcMatches` and a function `match_addresses_for_postcode` to the existing domain file. Add a small gzip-CSV S3 helper to `utils/s3.py`.
### 1. Add gzip-CSV S3 reader
In [utils/s3.py](utils/s3.py) (after `read_dataframe_from_s3_parquet` ~line 167):
```python
def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame:
if not file_key.endswith(".csv.gz"):
raise ValueError("file_key must end with .csv.gz")
buf = read_io_from_s3(bucket_name, file_key)
return pd.read_csv(buf, compression="gzip", low_memory=False)
```
Reuses existing `read_io_from_s3` (line 105). Caller catches `botocore.exceptions.ClientError` for missing-key handling.
### 2. Append matcher to domain module
In [datatypes/epc/domain/historic_epc.py](datatypes/epc/domain/historic_epc.py) — keep existing `HistoricEpc` dataclass intact, append:
```python
from typing import Optional
import pandas as pd
from botocore.exceptions import ClientError
from backend.utils.addressMatch import AddressMatch
from utils.s3 import read_csv_gz_from_s3
@dataclass
class HistoricEpcMatches:
"""Scored historic EPC rows for a single postcode."""
user_address: str
postcode: str # sanitised
df: pd.DataFrame # has lexiscore + lexirank, sorted best-first
def top(self) -> Optional[pd.Series]:
return None if self.df.empty else self.df.iloc[0]
def top_n(self, k: int) -> pd.DataFrame:
return self.df.head(k)
def unambiguous_uprn(self, uprn_column: str = "UPRN") -> Optional[str]:
if self.df.empty:
return None
top_rank = self.df["lexirank"].min()
uprns = (
self.df.loc[self.df["lexirank"] == top_rank, uprn_column]
.dropna().astype(str).str.replace(r"\.0$", "", regex=True)
.unique()
)
return uprns[0] if len(uprns) == 1 else None
def _sanitise_postcode(postcode: str) -> str:
if not postcode:
raise ValueError("postcode must be non-empty")
return postcode.upper().replace(" ", "")
def match_addresses_for_postcode(
user_address: str,
postcode: str,
*,
bucket: str = "retrofit-data-dev",
prefix: str = "historical_epc",
address_column: str = "ADDRESS",
) -> HistoricEpcMatches:
if not user_address:
raise ValueError("user_address must be non-empty")
pc = _sanitise_postcode(postcode)
key = f"{prefix}/{pc}/data.csv.gz"
try:
df = read_csv_gz_from_s3(bucket, key)
except ClientError as e:
if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
raise FileNotFoundError(
f"No historic EPC data at s3://{bucket}/{key}"
) from e
raise
if address_column not in df.columns:
raise ValueError(
f"Missing address column {address_column!r} in {key}"
)
user_norm = AddressMatch.normalise_address(user_address)
df = df.copy()
df["lexiscore"] = df[address_column].fillna("").apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
df["lexirank"] = (
df["lexiscore"].rank(method="dense", ascending=False).astype(int)
)
df = df.sort_values(["lexirank", "lexiscore"], ascending=[True, False]).reset_index(drop=True)
return HistoricEpcMatches(user_address=user_address, postcode=pc, df=df)
```
### Reuse notes
- `AddressMatch.normalise_address` + `AddressMatch.levenshtein` from [backend/utils/addressMatch.py](backend/utils/addressMatch.py) — same scoring as address2UPRN.
- Score column copy uses `.fillna("")` to defend against NaN in `ADDRESS`.
- Defaults match ETL output: bucket `retrofit-data-dev`, prefix `historical_epc`, column `ADDRESS` (uppercase).
### 3. Tests
New: [datatypes/epc/domain/tests/__init__.py](datatypes/epc/domain/tests/__init__.py) (empty) and [datatypes/epc/domain/tests/test_historic_epc_match.py](datatypes/epc/domain/tests/test_historic_epc_match.py).
Reuse existing fixture `datatypes/epc/schema/tests/fixtures/historic_epc.csv` — read it in-memory in tests; do NOT commit a `.csv.gz` fixture. Patch target: `datatypes.epc.domain.historic_epc.read_csv_gz_from_s3` (local binding, not `utils.s3.read_csv_gz_from_s3`).
Cases:
1. `_sanitise_postcode("ab33 8al") == "AB338AL"`; empty raises.
2. Returned df has `lexiscore` + `lexirank` columns, row count preserved.
3. df sorted: `iloc[0]["lexirank"] == 1`, `lexiscore` monotone non-increasing.
4. S3 key built correctly: `"AB33 8AL"` → key `"historical_epc/AB338AL/data.csv.gz"` (spy on patched helper).
5. `ClientError` with code `NoSuchKey``FileNotFoundError`.
6. Exact-match address → `unambiguous_uprn()` returns that UPRN; ambiguous tie → `None`.
7. `top()` / `top_n(k)` shape checks.
## Critical files
- [datatypes/epc/domain/historic_epc.py](datatypes/epc/domain/historic_epc.py) — append matcher
- [utils/s3.py](utils/s3.py) — add `read_csv_gz_from_s3`
- [datatypes/epc/domain/tests/test_historic_epc_match.py](datatypes/epc/domain/tests/test_historic_epc_match.py) — new
## Out of scope
- Lambda handler / SQS wiring (deferred — lib only)
- Threshold logic (caller decides via wrapper helpers)
- Postcode validation via `postcodes.io` (`AddressMatch.is_valid_postcode` exists if needed later)
- Refactoring `sanitise(pd.Series)` in `etl_opendatacommunities/main.py` — separate concern
## Verification
```
cd /workspaces/model && pytest datatypes/epc/domain/tests/test_historic_epc_match.py -v
```
Sample real-S3 call (needs AWS creds):
```python
from datatypes.epc.domain.historic_epc import match_addresses_for_postcode
m = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
print(m.df[["ADDRESS", "UPRN", "lexiscore", "lexirank"]].head())
print(m.unambiguous_uprn())
```
## Sequencing
1. Add `read_csv_gz_from_s3` to `utils/s3.py`.
2. Append matcher + wrapper to `datatypes/epc/domain/historic_epc.py`.
3. Add tests.
Steps 2 & 3 depend on 1. No `__init__.py` re-exports needed.

View file

@ -0,0 +1,239 @@
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from botocore.exceptions import ClientError
from datatypes.epc.domain import historic_epc_matching as matcher_mod
from datatypes.epc.domain.historic_epc_matching import (
HistoricEpcMatches,
ScoredHistoricEpc,
_sanitise_postcode,
match_addresses_for_postcode,
)
# Columns required by the HistoricEpc dataclass (lower-cased CSV columns).
# The matcher only reads ADDRESS + UPRN to score; everything else is filled
# with "" but must be present for HistoricEpc(**kwargs) to construct.
_FULL_COLUMN_FIELDS = [
"LMK_KEY", "ADDRESS1", "ADDRESS2", "ADDRESS3", "POSTCODE",
"BUILDING_REFERENCE_NUMBER", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING",
"CURRENT_ENERGY_EFFICIENCY", "POTENTIAL_ENERGY_EFFICIENCY", "PROPERTY_TYPE",
"BUILT_FORM", "INSPECTION_DATE", "LOCAL_AUTHORITY", "CONSTITUENCY", "COUNTY",
"LODGEMENT_DATE", "TRANSACTION_TYPE", "ENVIRONMENT_IMPACT_CURRENT",
"ENVIRONMENT_IMPACT_POTENTIAL", "ENERGY_CONSUMPTION_CURRENT",
"ENERGY_CONSUMPTION_POTENTIAL", "CO2_EMISSIONS_CURRENT",
"CO2_EMISS_CURR_PER_FLOOR_AREA", "CO2_EMISSIONS_POTENTIAL",
"LIGHTING_COST_CURRENT", "LIGHTING_COST_POTENTIAL", "HEATING_COST_CURRENT",
"HEATING_COST_POTENTIAL", "HOT_WATER_COST_CURRENT", "HOT_WATER_COST_POTENTIAL",
"TOTAL_FLOOR_AREA", "ENERGY_TARIFF", "MAINS_GAS_FLAG", "FLOOR_LEVEL",
"FLAT_TOP_STOREY", "FLAT_STOREY_COUNT", "MAIN_HEATING_CONTROLS",
"MULTI_GLAZE_PROPORTION", "GLAZED_TYPE", "GLAZED_AREA", "EXTENSION_COUNT",
"NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES", "HOTWATER_DESCRIPTION", "HOT_WATER_ENERGY_EFF",
"HOT_WATER_ENV_EFF", "FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF",
"WINDOWS_DESCRIPTION", "WINDOWS_ENERGY_EFF", "WINDOWS_ENV_EFF",
"WALLS_DESCRIPTION", "WALLS_ENERGY_EFF", "WALLS_ENV_EFF",
"SECONDHEAT_DESCRIPTION", "SHEATING_ENERGY_EFF", "SHEATING_ENV_EFF",
"ROOF_DESCRIPTION", "ROOF_ENERGY_EFF", "ROOF_ENV_EFF", "MAINHEAT_DESCRIPTION",
"MAINHEAT_ENERGY_EFF", "MAINHEAT_ENV_EFF", "MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF", "MAINHEATC_ENV_EFF", "LIGHTING_DESCRIPTION",
"LIGHTING_ENERGY_EFF", "LIGHTING_ENV_EFF", "MAIN_FUEL", "WIND_TURBINE_COUNT",
"HEAT_LOSS_CORRIDOR", "UNHEATED_CORRIDOR_LENGTH", "FLOOR_HEIGHT",
"PHOTO_SUPPLY", "SOLAR_WATER_HEATING_FLAG", "MECHANICAL_VENTILATION",
"ADDRESS", "LOCAL_AUTHORITY_LABEL", "CONSTITUENCY_LABEL", "POSTTOWN",
"CONSTRUCTION_AGE_BAND", "LODGEMENT_DATETIME", "TENURE",
"FIXED_LIGHTING_OUTLETS_COUNT", "LOW_ENERGY_FIXED_LIGHT_COUNT", "UPRN",
"UPRN_SOURCE", "REPORT_TYPE",
]
def _row(address: str, uprn) -> dict:
row = {col: "" for col in _FULL_COLUMN_FIELDS}
row["ADDRESS"] = address
row["UPRN"] = uprn
return row
def _build_df(rows: list[dict]) -> pd.DataFrame:
return pd.DataFrame(rows, columns=_FULL_COLUMN_FIELDS)
@pytest.fixture
def patch_postcode_valid():
with patch.object(matcher_mod.AddressMatch, "is_valid_postcode", return_value=True) as m:
yield m
@pytest.fixture
def patch_read():
with patch.object(matcher_mod, "read_csv_gz_from_s3") as m:
yield m
# ---------- _sanitise_postcode ----------
class TestSanitisePostcode:
def test_uppercases_and_strips_spaces(self, patch_postcode_valid):
assert _sanitise_postcode("ab33 8al") == "AB338AL"
def test_empty_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="non-whitespace"):
_sanitise_postcode("")
def test_whitespace_only_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="non-whitespace"):
_sanitise_postcode(" ")
def test_invalid_postcode_raises(self):
with patch.object(
matcher_mod.AddressMatch, "is_valid_postcode", return_value=False
):
with pytest.raises(ValueError, match="not a valid UK postcode"):
_sanitise_postcode("NONSENSE")
# ---------- match_addresses_for_postcode ----------
class TestMatchAddressesForPostcode:
def test_preserves_row_count_including_zero_score_rows(
self, patch_read, patch_postcode_valid
):
# Disjoint number sets => hard zero. Still kept in matches.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("999 SOMEWHERE ELSE", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert isinstance(result, HistoricEpcMatches)
assert len(result.matches) == 2
def test_top_has_lexirank_one_and_lexiscore_monotone(
self, patch_read, patch_postcode_valid
):
patch_read.return_value = _build_df([
_row("48 GORDON ROAD", "200"), # near miss
_row("47 GORDON ROAD", "100"), # exact (after normalisation)
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.top().lexirank == 1
scores = [m.lexiscore for m in result.matches]
assert scores == sorted(scores, reverse=True)
def test_s3_key_built_from_default_root(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
patch_read.assert_called_once_with(
"retrofit-data-dev", "historical_epc/AB338AL/data.csv.gz"
)
def test_s3_key_respects_custom_root_with_trailing_slash(
self, patch_read, patch_postcode_valid
):
patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
match_addresses_for_postcode(
"47 Gordon Road",
"AB33 8AL",
s3_root="s3://my-bucket/some/prefix/",
)
patch_read.assert_called_once_with(
"my-bucket", "some/prefix/AB338AL/data.csv.gz"
)
def test_no_such_key_translates_to_filenotfound(
self, patch_read, patch_postcode_valid
):
patch_read.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "missing"}}, "GetObject"
)
with pytest.raises(FileNotFoundError):
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
def test_other_client_error_propagates(self, patch_read, patch_postcode_valid):
patch_read.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "nope"}}, "GetObject"
)
with pytest.raises(ClientError):
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
def test_empty_user_address_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="user_address"):
match_addresses_for_postcode("", "AB33 8AL")
# ---------- unambiguous_uprn ----------
class TestUnambiguousUprn:
def test_exact_match_returns_uprn(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() == "100"
def test_ambiguous_tie_returns_none(self, patch_read, patch_postcode_valid):
# Two duplicate addresses with different UPRNs share rank-1.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("47 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() is None
def test_all_zero_score_returns_none_even_when_uprn_unique(
self, patch_read, patch_postcode_valid
):
# User address has building number 47; no row has 47 -> all hard-zero.
patch_read.return_value = _build_df([
_row("999 ELSEWHERE", "100"),
_row("888 ELSEWHERE", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert all(m.lexiscore == 0.0 for m in result.matches)
assert result.unambiguous_uprn() is None
def test_nan_uprn_becomes_empty_string_not_nan(
self, patch_read, patch_postcode_valid
):
# Use a real NaN in the UPRN cell.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", np.nan),
_row("48 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top = result.top()
# _cell_to_str must turn NaN/"nan" into "" (not the literal string "nan"),
# so unambiguous_uprn's truthiness check correctly drops the row.
assert top.record.uprn == ""
# ---------- top / top_n ----------
class TestTopHelpers:
def test_top_n_returns_first_k(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
_row("49 GORDON ROAD", "300"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top2 = result.top_n(2)
assert len(top2) == 2
assert all(isinstance(m, ScoredHistoricEpc) for m in top2)
def test_top_on_empty_matches_returns_none(self):
empty = HistoricEpcMatches(user_address="x", postcode="AB338AL", matches=[])
assert empty.top() is None
assert empty.top_n(5) == []
assert empty.unambiguous_uprn() is None

View file

@ -1,6 +1,6 @@
import csv
from datatypes.epc.schema.historic_epc import HistoricEpc
from datatypes.epc.domain.historic_epc import HistoricEpc
def _normalise(value: str | None) -> str:

View file

@ -1,98 +0,0 @@
from dataclasses import dataclass
@dataclass
class HistoricEpc:
lmk_key: str
address1: str
address2: str
address3: str
postcode: str
building_reference_number: str
current_energy_rating: str
potential_energy_rating: str
current_energy_efficiency: str
potential_energy_efficiency: str
property_type: str
built_form: str
inspection_date: str
local_authority: str
constituency: str
county: str
lodgement_date: str
transaction_type: str
environment_impact_current: str
environment_impact_potential: str
energy_consumption_current: str
energy_consumption_potential: str
co2_emissions_current: str
co2_emiss_curr_per_floor_area: str
co2_emissions_potential: str
lighting_cost_current: str
lighting_cost_potential: str
heating_cost_current: str
heating_cost_potential: str
hot_water_cost_current: str
hot_water_cost_potential: str
total_floor_area: str
energy_tariff: str
mains_gas_flag: str
floor_level: str
flat_top_storey: str
flat_storey_count: str
main_heating_controls: str
multi_glaze_proportion: str
glazed_type: str
glazed_area: str
extension_count: str
number_habitable_rooms: str
number_heated_rooms: str
low_energy_lighting: str
number_open_fireplaces: str
hotwater_description: str
hot_water_energy_eff: str
hot_water_env_eff: str
floor_description: str
floor_energy_eff: str
floor_env_eff: str
windows_description: str
windows_energy_eff: str
windows_env_eff: str
walls_description: str
walls_energy_eff: str
walls_env_eff: str
secondheat_description: str
sheating_energy_eff: str
sheating_env_eff: str
roof_description: str
roof_energy_eff: str
roof_env_eff: str
mainheat_description: str
mainheat_energy_eff: str
mainheat_env_eff: str
mainheatcont_description: str
mainheatc_energy_eff: str
mainheatc_env_eff: str
lighting_description: str
lighting_energy_eff: str
lighting_env_eff: str
main_fuel: str
wind_turbine_count: str
heat_loss_corridor: str
unheated_corridor_length: str
floor_height: str
photo_supply: str
solar_water_heating_flag: str
mechanical_ventilation: str
address: str
local_authority_label: str
constituency_label: str
posttown: str
construction_age_band: str
lodgement_datetime: str
tenure: str
fixed_lighting_outlets_count: str
low_energy_fixed_light_count: str
uprn: str
uprn_source: str
report_type: str

View file

@ -3,7 +3,7 @@ import os
import pytest
from datatypes.epc.loaders.historic_epc import read_historic_epc_csv
from datatypes.epc.schema.historic_epc import HistoricEpc
from datatypes.epc.domain.historic_epc import HistoricEpc
FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures")

View file

@ -0,0 +1,47 @@
"""Demo: look up historic EPC records for an address + postcode.
Reads the gzipped CSV at
s3://retrofit-data-dev/historical_epc/<POSTCODE>/data.csv.gz
scores rows against the user-provided address, and prints the top matches.
Usage:
python -m scripts.historic_epc_demo "47 Gordon Road" "AB33 8AL"
python -m scripts.historic_epc_demo # uses defaults below
"""
import sys
from datatypes.epc.domain.historic_epc_matching import match_addresses_for_postcode
def main(user_address: str, postcode: str) -> None:
print(f"Looking up: {user_address!r} @ {postcode!r}\n")
result = match_addresses_for_postcode(user_address, postcode)
print(f"Found {len(result.matches)} candidate row(s).\n")
print("Top 3 matches:")
for m in result.top_n(3):
print(
f" rank={m.lexirank} score={m.lexiscore:.3f} "
f"uprn={m.record.uprn or '(none)':<14} {m.record.address}"
)
print()
uprn = result.unambiguous_uprn()
if uprn:
print(f"Unambiguous UPRN: {uprn}")
else:
print("No unambiguous UPRN (zero-score, tie, or empty result).")
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) == 2:
main(args[0], args[1])
elif len(args) == 0:
main("47 Gordon Road", "AB33 8AL")
else:
print(__doc__)
sys.exit(2)

View file

@ -167,6 +167,21 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key):
return df
def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame:
"""
Read a gzipped CSV from S3 into a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (must end in .csv.gz).
:return: A pandas DataFrame.
"""
if not file_key.endswith(".csv.gz"):
raise ValueError("file_key must end with .csv.gz")
buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key)
return pd.read_csv(buffer, compression="gzip", low_memory=False)
def save_csv_to_s3(dataframe, bucket_name, file_name):
"""
Save a Pandas DataFrame to a CSV file in an S3 bucket.