Merge pull request #1064 from Hestia-Homes/feature/integrate_new_epc_with_historical_epc

Feature/integrate new epc with historical epc
This commit is contained in:
Jun-te Kim 2026-05-13 15:17:21 +01:00 committed by GitHub
commit fae61cb2c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 1253 additions and 265 deletions

View file

@ -10,7 +10,7 @@ ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
sudo jq vim curl git ca-certificates wget \
build-essential pkg-config automake autoconf libtool \
ripgrep fd-find make unzip \
ripgrep fd-find make unzip bash-completion \
&& rm -rf /var/lib/apt/lists/*
# Neovim latest (LazyVim needs >=0.9)
@ -53,8 +53,8 @@ RUN echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] \
https://apt.releases.hashicorp.com $(lsb_release -cs) main" | \
tee /etc/apt/sources.list.d/hashicorp.list
RUN apt update
RUN apt-get install terraform
RUN terraform -install-autocomplete
RUN apt-get install -y terraform
RUN terraform -install-autocomplete || true
# Install postgres
RUN apt install -y wget gnupg2 lsb-release

View file

@ -4,13 +4,7 @@
"service": "model-backend",
"remoteUser": "vscode",
"workspaceFolder": "/workspaces/model",
// Host preflight: ensure GitHub auth exists before we try to build.
// Either ~/.config/gh (from `gh auth login`) or a GITHUB_TOKEN env var.
"initializeCommand": "test -d \"$HOME/.config/gh\" || test -n \"$GITHUB_TOKEN\" || { echo >&2 'error: no GitHub auth found. Run `gh auth login && gh auth setup-git` on the host, or export GITHUB_TOKEN, then retry.'; exit 1; }",
// Install Domna's curated skill set (pinned to 0.0.5) into this workspace.
// `gh repo clone` handles private-repo auth using the mounted host ~/.config/gh.
"initializeCommand": "docker network create shared-dev 2>/dev/null || true; test -d \"$HOME/.config/gh\" || test -n \"$GITHUB_TOKEN\" || { echo >&2 'error: no GitHub auth found. Run `gh auth login && gh auth setup-git` on the host, or export GITHUB_TOKEN, then retry.'; exit 1; }",
"postCreateCommand": "gh repo clone Hestia-Homes/agentic-toolkit /tmp/agentic-toolkit -- --branch 0.0.5 --depth 1 && bash /tmp/agentic-toolkit/setup.sh",
"postStartCommand": "bash .devcontainer/backend/post-install.sh",
"mounts": [
@ -24,7 +18,6 @@
"ms-toolsai.jupyter",
"mechatroner.rainbow-csv",
"ms-toolsai.datawrangler",
"lindacong.vscode-book-reader",
"4ops.terraform",
"fabiospampinato.vscode-todo-plus",
"jgclark.vscode-todo-highlight",
@ -33,9 +26,6 @@
"ms-python.black-formatter",
"waderyan.gitblame",
"GrapeCity.gc-excelviewer",
"jakobhoeg.vscode-pokemon",
"github.vscode-github-actions",
"me-dutour-mathieu.vscode-github-actions",
"anthropic.claude-code",
"eamodio.gitlens"
],

View file

@ -51,6 +51,10 @@ jobs:
id: set_auth_token
run: echo "::set-output name=auth_token::${{ secrets[format('{0}_EPC_AUTH_TOKEN', github.ref_name)] }}"
- name: Set Open EPC API token
id: set_open_epc_token
run: echo "::set-output name=open_epc_token::${{ secrets[format('{0}_OPEN_EPC_API_TOKEN', github.ref_name)] }}"
# Store port, name and host in github secrets
- name: Set DB credentials
id: set_db_credentials
@ -127,6 +131,7 @@ jobs:
GOOGLE_SOLAR_API_KEY: ${{ steps.set_api_secrets.outputs.google_solar_api_key }}
DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }}
EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }}
OPEN_EPC_API_TOKEN: ${{ steps.set_open_epc_token.outputs.open_epc_token }}
DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }}
DB_PORT: ${{ steps.set_db_credentials.outputs.db_port }}
DB_NAME: ${{ steps.set_db_credentials.outputs.db_name }}

View file

@ -49,7 +49,11 @@ jobs:
docker run --rm \
--network host \
-e EPC_AUTH_TOKEN=${{ secrets.DEV_EPC_AUTH_TOKEN }} \
-e OPEN_EPC_API_TOKEN=${{ secrets.DEV_OPEN_EPC_API_TOKEN }} \
-e HUBSPOT_API_KEY=${{ secrets.HUBSPOT_API_KEY }} \
-e AWS_ACCESS_KEY_ID=${{ secrets.DEV_AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} \
-e AWS_DEFAULT_REGION=${{ secrets.DEV_AWS_REGION }} \
-e DB_HOST=localhost \
-e DB_NAME=test \
-e DB_USERNAME=test \

3
.gitignore vendored
View file

@ -292,3 +292,6 @@ pyrightconfig.json
# playwright output
*/pashub_fetcher/videos/*
backlog/*
# Local Claude config files
.claude/*

View file

@ -28,3 +28,41 @@ You MUST read the overview resource to understand the complete workflow. The inf
<!-- BACKLOG.MD MCP GUIDELINES END -->
## Available Skills
Five Claude Code skills are installed in this repo's dev container. Each maps to a phase of the feature lifecycle.
| Skill | Invoke | When to use |
|-------|--------|-------------|
| **grill-me** | `/grill-me` | Before implementing — stress-tests a design through sequential questioning |
| **to-prd** | `/to-prd` | After a planning conversation — formalises context into a GitHub issue PRD |
| **ubiquitous-language** | `/ubiquitous-language` | When domain terms are drifting or ambiguous — builds/updates `UBIQUITOUS_LANGUAGE.md` |
| **tdd** | `/tdd` | During implementation — enforces vertical-slice TDD (one test → one impl → repeat) |
| **improve-codebase-architecture** | `/improve-codebase-architecture` | During refactoring — surfaces shallow modules and proposes deepening opportunities |
### Typical session chains
**Feature planning:**
`/grill-me``/to-prd``/ubiquitous-language`
**Implementation:**
`/tdd` (+ `/grill-me` if a design fork appears mid-session)
**Refactoring:**
`/improve-codebase-architecture``/grill-me``/tdd``/ubiquitous-language`
### First time setting up?
New containers install all skills automatically via the Dockerfile. If you're in an existing container, run:
```bash
bash .devcontainer/backend/install-claude-skills.sh
```
## Type Safety
All new code must pass `pyright` with zero errors under `typeCheckingMode = strict`.
Use Optional over | None
Annotate all function return types. Use `dict[str, Any]` for untyped external API
payloads — never bare `dict`. Add `pandas-stubs` when introducing pandas to a module.

View file

@ -2,7 +2,7 @@
PYTHON = python
.PHONY: setup test lint typecheck check clean
.PHONY: setup test lint typecheck check clean network-setup dev-setup
# Install dev dependencies + tox
setup:
@ -28,3 +28,11 @@ check: lint typecheck test
# Clean up tox environments
clean:
rm -rf .tox
# Create shared Docker network required by dev container (idempotent)
network-setup:
docker network create shared-dev 2>/dev/null || true
# First-time dev environment setup
dev-setup: network-setup
@echo "Dev environment ready. Open the repo in VS Code and select 'Reopen in Container'."

View file

@ -8,6 +8,27 @@ The different folders in this repository relate to services
that can be used independently, or can be imported and used as
part of a larger application
# Getting Started
## Prerequisites
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
- [VS Code](https://code.visualstudio.com/) with the [Dev Containers extension](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-containers)
## Dev Container Setup
This repo uses a Docker Compose-based dev container. The `model-backend` service joins a `shared-dev` Docker network so it can communicate with other local services (e.g. a frontend container) running on your machine.
**VS Code users:** The `initializeCommand` in `devcontainer.json` creates the `shared-dev` network automatically before the container starts. No manual step required — just open the repo and select **Reopen in Container**.
**Non-VS Code / CI workflows:** Run the following once before starting the container:
```commandline
make dev-setup
```
This is idempotent and safe to re-run if the network already exists.
# Folders
### backend/

78
UBIQUITOUS_LANGUAGE.md Normal file
View file

@ -0,0 +1,78 @@
# Ubiquitous Language
Domain terminology glossary for this project. Generated and maintained by the `/ubiquitous-language` Claude Code skill.
Invoke `/ubiquitous-language` in any session to extract new terms from the conversation, flag ambiguities, and update this file with canonical definitions.
---
## Energy Performance Certificates
| Term | Definition | Aliases to avoid |
|------|------------|------------------|
| **EPC** | An Energy Performance Certificate — a government-issued document rating a dwelling's energy efficiency from A (best) to G (worst). | "energy certificate", "energy report" |
| **Certificate Number** | The unique identifier assigned to an EPC by the government registry. | "cert number", "EPC ID" |
| **Registration Date** | The date an EPC was lodged with the government register; used to identify the most recent certificate for a property. | "assessment date", "submission date" |
| **EPC Band** | A single letter AG representing a property's current or potential energy efficiency rating. | "energy rating", "EPC grade", "EPC score" |
| **Schema Type** | The versioned RdSAP or SAP schema that describes the structure of a certificate's raw data (e.g. `RdSAP-Schema-21.0.1`). | "schema version", "EPC format" |
| **Domestic Certificate** | An EPC issued for a residential dwelling, as opposed to a commercial one. | "residential EPC", "home EPC" |
## Properties and Addresses
| Term | Definition | Aliases to avoid |
|------|------------|------------------|
| **UPRN** | Unique Property Reference Number — the government-issued permanent identifier for a physical address in the UK. | "property ID", "address ID", "code" |
| **Postcode** | A UK postal code used to group nearby addresses; the primary search key for finding EPC records. | "zip code", "postal code" |
| **User Address** | A free-text address string provided by a user or imported from a customer dataset, before any normalisation or matching. | "user input", "raw address", "user_inputed_address" |
| **Dwelling** | A single residential unit that can hold an EPC — a house, flat, or maisonette. | "property", "unit", "home" |
## Address Matching
| Term | Definition | Aliases to avoid |
|------|------------|------------------|
| **Lexiscore** | A similarity score in [0, 1] between a user address and a candidate EPC address; combines token overlap and character-level similarity. | "score", "match score", "similarity" |
| **Lexirank** | Dense rank of candidates sorted by lexiscore descending; rank 1 = best match. | "rank", "position" |
| **UPRN Candidate** | An EPC search result that is a plausible match for a given user address, before scoring decides the winner. | "match candidate", "result" |
| **Score Threshold** | The minimum lexiscore (currently 0.6) below which no match is returned even if a candidate exists. | "minimum score", "cutoff" |
| **Ambiguous Match** | A matching outcome where two or more candidates share lexirank 1, making it impossible to select a unique winner. | "tie", "draw", "duplicate" |
| **Best Match** | The single UPRN candidate with lexirank 1 that meets or exceeds the score threshold. | "winner", "top result" |
## API and Integration
| Term | Definition | Aliases to avoid |
|------|------------|------------------|
| **EPC Search Result** | A lightweight record returned by the government domestic search endpoint — contains address lines, postcode, UPRN, band, and certificate number but not the full certificate data. | "search row", "EPC row", "result" |
| **EPC Property Data** | The fully mapped domain object produced after fetching and parsing a complete EPC certificate. | "EPC data", "certificate data", "parsed EPC" |
| **Old EPC API** | The retired government API (`epc.opendatacommunities.org`) using HTTP Basic auth; decommissioned May 2026. | "legacy API" |
| **New EPC API** | The replacement government API (`api.get-energy-performance-data.communities.gov.uk`) using Bearer token auth. | "new API", "current API" |
| **Bearer Token** | The auth credential required by the new EPC API; stored in the `EPC_AUTH_TOKEN` environment variable. | "API key", "auth token", "secret" |
## Relationships
- An **EPC** belongs to exactly one **Dwelling** and has one **Certificate Number**.
- A **Dwelling** may have multiple **EPCs** across time; the one with the most recent **Registration Date** is the current one.
- A **UPRN** identifies a **Dwelling** permanently; it does not change when the property changes owner.
- An **EPC Search Result** is a summary; it points to a full **EPC** via its **Certificate Number**.
- **Address Matching** uses a **User Address** and **Postcode** to find a **UPRN** by scoring **UPRN Candidates** from an EPC search.
- A **Lexirank** of 1 with no **Ambiguous Match** and a **Lexiscore** ≥ the **Score Threshold** produces a **Best Match**.
## Example dialogue
> **Dev:** "We have a user address and postcode. How do we find the UPRN?"
> **Domain expert:** "Search the **New EPC API** by **Postcode** — you get back a list of **EPC Search Results** for that area. Each one has an address and a **UPRN**. Score each against the **User Address** using the **Lexiscore**. If the top **UPRN Candidate** scores above the **Score Threshold** and there's no **Ambiguous Match**, that's your **Best Match**."
> **Dev:** "What if two results share the same address line 1?"
> **Domain expert:** "That's an **Ambiguous Match** — two candidates at **Lexirank** 1. Fall back to scoring on the full address using all address lines joined together. If that still ties, return nothing."
> **Dev:** "Once we have the best match, do we use the UPRN or fetch the full EPC?"
> **Domain expert:** "Depends on what you need. The **EPC Search Result** gives you the **EPC Band** and **Certificate Number**. If you need energy efficiency detail, use the **Certificate Number** to fetch the full **EPC Property Data**."
## Flagged ambiguities
- **"address"** appears as both the raw **User Address** (free-text from customer data) and a structured field on an **EPC Search Result** (normalised address lines). Always qualify: "user address" vs "EPC address" or "address line 1".
- **"score"** is used for the `AddressMatch.score()` function output, the `lexiscore` DataFrame column, and informally in conversation. Prefer **Lexiscore** in domain discussions; reserve "score" for method-level code comments.
- **"user_inputed_address"** in `backend/address2UPRN/main.py` is a misspelling and a synonym for **User Address** — the canonical term. New code should use `user_address`.
- **"EPC"** is overloaded as both the document (an Energy Performance Certificate) and the rating band letter. Use **EPC** for the document and **EPC Band** for the letter.

View file

@ -1,4 +1,5 @@
API_KEY = example-api-key
ENVIRONMENT = local
SECRET_KEY = YOUR_SECRET_KEY
ALGORITHM = HS256
ALGORITHM = HS256
OPEN_EPC_API_TOKEN = your_token_here

View file

@ -1,8 +1,6 @@
from typing import Optional
from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from utils.logger import setup_logger
import json
@ -17,81 +15,63 @@ from utils.s3 import (
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
from backend.address2UPRN.scoring import ( # noqa: F401 (re-exported)
df_has_single_uprn,
get_uprn_candidates,
from backend.address2UPRN.scoring import all_uprns_match, rank_address_similarity
from datatypes.epc.domain.historic_epc_matching import (
match_addresses_for_postcode,
)
from backend.epc_client.epc_client_service import EpcClientService
from datatypes.epc.domain.historic_epc_matching import ScoredHistoricEpc
logger = setup_logger()
def score_addresses(
df: pd.DataFrame,
user_address: str,
column: str = "address",
) -> pd.Series:
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
def get_epc_data_with_postcode(postcode: str) -> pd.DataFrame:
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
token = os.getenv("OPEN_EPC_API_TOKEN")
if token is None:
raise RuntimeError("OPEN_EPC_API_TOKEN not defined in env")
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
"""
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
auth_token = os.getenv("EPC_AUTH_TOKEN")
if auth_token is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
client = EpcClient(auth_token=auth_token)
url = os.path.join(client.domestic.host, "search")
if size:
url += "?" + urlencode({"size": size})
search_resp = client.domestic.call(
url=url,
method="get",
params={"postcode": postcode},
service = EpcClientService(auth_token=token)
results = service.search_by_postcode(postcode)
return pd.DataFrame(
[{"address": r.address_line_1, "uprn": r.uprn} for r in results]
)
if not search_resp or "rows" not in search_resp:
return pd.DataFrame()
results_df = pd.DataFrame(search_resp["rows"], columns=search_resp["column-names"])
row_count = len(results_df)
def get_uprn_from_historic_epc(
user_inputed_address: str,
postcode: str,
) -> Optional[tuple[str, str, float]]:
"""Resolve a UPRN via historic EPC S3 data.
# If we hit the size limit, there *may* be more results
if row_count == size:
print(
f"⚠️ Warning: hit size limit ({size}) for postcode '{postcode}'. "
f"Attempt {attempt}/{max_attempts}."
)
Returns (uprn, address, lexiscore) when the historic dataset agrees on a
single rank-1 UPRN, None otherwise (missing postcode file, zero score,
or ambiguous top rank). The score gate is `unambiguous_uprn`'s own
(score > 0); the 0.7 heuristic used for the new-EPC source isn't applied
here because historic addresses use a more verbose format that
systematically depresses lexiscores.
"""
if attempt < max_attempts:
print(f"🔁 Retrying with size={size * 2}")
return get_epc_data_with_postcode(
postcode=postcode,
size=size * 2,
attempt=attempt + 1,
max_attempts=max_attempts,
)
else:
print(
"🚨 Max attempts reached. Results may be truncated. "
"(Please do a manual review by the tech team.)"
)
try:
result = match_addresses_for_postcode(user_inputed_address, postcode)
except FileNotFoundError:
return None
return results_df
uprn: Optional[str] = result.unambiguous_uprn()
if not uprn or uprn == "nan":
return None
top: Optional[ScoredHistoricEpc] = result.top()
if top is None:
return None
return uprn, top.record.address, top.lexiscore
def get_uprn_with_epc_df(
user_inputed_address: str,
epc_df: pd.DataFrame,
verbose: bool = False,
):
) -> Optional[str | tuple[str, str, float]]:
"""
Return uprn (str) using a pre-fetched EPC dataframe.
This avoids calling the API multiple times for the same postcode.
@ -99,7 +79,7 @@ def get_uprn_with_epc_df(
if epc_df.empty:
return None
scored_df = get_uprn_candidates(
scored_df = rank_address_similarity(
epc_df,
user_address=user_inputed_address,
)
@ -108,14 +88,14 @@ def get_uprn_with_epc_df(
best_score = scored_df.iloc[0]["lexiscore"]
# # Return None if score is below threshold
# if best_score < 0.7:
# return None
if best_score < 0.7:
return None
# All rank-1 rows (possible draw)
top_rank_df = scored_df[scored_df["lexirank"] == 1]
# If rank-1 rows do not agree on a single UPRN → ambiguous
if not df_has_single_uprn(top_rank_df, uprn=top_rank_df.iloc[0]["uprn"]):
if not all_uprns_match(top_rank_df, target_uprn=top_rank_df.iloc[0]["uprn"]):
return None
address = top_rank_df["address"].values[0]
@ -125,7 +105,8 @@ def get_uprn_with_epc_df(
# Safe to return the agreed UPRN
found_uprn = top_rank_df.iloc[0]["uprn"]
if found_uprn == "":
# Handling numeric missingness in new api
if found_uprn in ["", "nan"]:
return None
if verbose:
@ -141,20 +122,35 @@ def get_uprn(
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return None when epc found but no UPRN
Return None when no sensible match is found in either EPC source.
This function fetches EPC data via API for a single postcode.
For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead.
Tries the new EPC API first; if that yields no confident match, falls
back to the historic EPC dataset on S3.
For processing multiple addresses in the same postcode, use
get_uprn_with_epc_df instead.
"""
df = get_epc_data_with_postcode(postcode=postcode)
return get_uprn_with_epc_df(
result: Optional[tuple[str, str, float]] = get_uprn_with_epc_df(
user_inputed_address=user_inputed_address,
epc_df=df,
verbose=verbose,
verbose=True,
)
if not result:
result = get_uprn_from_historic_epc(
user_inputed_address=user_inputed_address,
postcode=postcode,
)
if result:
logger.info(f"Historic EPC matched {user_inputed_address} in {postcode}")
if not result:
return None
return result if verbose else result[0]
def resolve_uprns_for_postcode_group(
group_df: pd.DataFrame,
@ -175,7 +171,7 @@ def resolve_uprns_for_postcode_group(
for _, row in group_df.iterrows():
user_address = str(row[address_col]).strip()
scored_df = get_uprn_candidates(
scored_df = rank_address_similarity(
epc_df,
user_address=user_address,
)
@ -208,7 +204,7 @@ def resolve_uprns_for_postcode_group(
top_rank_df = scored_df[scored_df["lexirank"] == 1]
if not df_has_single_uprn(top_rank_df, top_rank_df.iloc[0]["uprn"]):
if not all_uprns_match(top_rank_df, top_rank_df.iloc[0]["uprn"]):
results.append(
{
"found_uprn": None,
@ -444,12 +440,29 @@ def handler(event, context, local=False):
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
result: Optional[tuple[str, str, float]] = get_uprn_with_epc_df(
user_inputed_address=address2uprn_user_input,
epc_df=epc_df,
verbose=True,
)
# Fallback to historic EPC if new EPC produced no match
if not result:
try:
result = get_uprn_from_historic_epc(
user_inputed_address=address2uprn_user_input,
postcode=postcode,
)
except Exception as e:
logger.error(
f"Historic EPC lookup failed for {address2uprn_user_input} in {postcode}: {e}"
)
result = None
if result:
logger.info(
f"Historic EPC matched {address2uprn_user_input} in {postcode}"
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result

View file

@ -3,12 +3,11 @@ import pandas as pd
from backend.utils.addressMatch import AddressMatch
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
def all_uprns_match(
df: pd.DataFrame,
target_uprn: str,
column: str = "uprn",
) -> bool:
if column not in df.columns:
return False
@ -17,11 +16,11 @@ def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> boo
if len(uprns) == 0:
return False
return len(uprns) == 1 and uprns[0] == str(uprn)
return len(uprns) == 1 and uprns[0] == str(target_uprn)
def get_uprn_candidates(
df: pd.DataFrame,
def rank_address_similarity(
address_list_df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
@ -33,13 +32,13 @@ def get_uprn_candidates(
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
if address_column not in address_list_df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
if uprn_column not in address_list_df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
out = address_list_df.copy()
user_norm = AddressMatch.normalise_address(user_address)

View file

@ -0,0 +1,81 @@
import csv
import json
import os
from pathlib import Path
from urllib.parse import urlencode
import pandas as pd
from epc_api.client import EpcClient
FIXTURE_PATH = Path(__file__).parent / "test_data.csv"
SIDECAR_PATH = Path(__file__).parent / "test_lodgement_dates.json"
def fetch_postcode_records(client: EpcClient, postcode: str) -> pd.DataFrame:
url = os.path.join(client.domestic.host, "search")
url += "?" + urlencode({"size": 500})
resp = client.domestic.call(url=url, method="get", params={"postcode": postcode})
if not resp or "rows" not in resp:
return pd.DataFrame()
return pd.DataFrame(resp["rows"], columns=resp["column-names"])
def main():
auth_token = os.getenv("EPC_AUTH_TOKEN")
if not auth_token:
raise RuntimeError("EPC_AUTH_TOKEN not set")
client = EpcClient(auth_token=auth_token)
sidecar = {}
if SIDECAR_PATH.exists():
sidecar = json.loads(SIDECAR_PATH.read_text())
with open(FIXTURE_PATH, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
by_postcode: dict[str, list[dict]] = {}
for row in rows:
if row["Manual UPRN Code"] == "None":
continue
by_postcode.setdefault(row["Postcode"], []).append(row)
for postcode, postcode_rows in by_postcode.items():
print(f"Fetching {postcode} ({len(postcode_rows)} rows)...")
try:
epc_df = fetch_postcode_records(client, postcode)
except Exception as e:
print(f" ERROR: {e}")
continue
if epc_df.empty:
print(f" No results from old API for {postcode}")
continue
epc_df["uprn"] = epc_df["uprn"].astype(str).str.replace(r"\.0$", "", regex=True)
for row in postcode_rows:
key = f"{row['User Input']}|{row['Postcode']}"
if key in sidecar:
continue
expected_uprn = str(row["Manual UPRN Code"]).strip()
match = epc_df[epc_df["uprn"] == expected_uprn]
if match.empty:
print(f" WARN: UPRN {expected_uprn} not found in old API for {postcode}")
sidecar[key] = {"lodgement_date": None, "found_in_old_api": False}
else:
lodgement_date = match.iloc[0].get("lodgement-date")
sidecar[key] = {
"lodgement_date": str(lodgement_date) if lodgement_date else None,
"found_in_old_api": True,
}
print(f" {row['User Input']}: {lodgement_date}")
SIDECAR_PATH.write_text(json.dumps(sidecar, indent=2))
print(f"\nWritten to {SIDECAR_PATH}")
if __name__ == "__main__":
main()

View file

@ -1,12 +1,24 @@
# tests/test_address_to_uprn_csv.py
import csv
import time
import pytest
from pathlib import Path
from backend.address2UPRN.main import get_uprn
FIXTURE_PATH = Path(__file__).parent / "test_data.csv"
# Delay between live EPC API calls to stay under the (undocumented) rate limit.
# Each parametrized case fires at least one EPC request; without throttling,
# GitHub-hosted runners burst fast enough to hit 429s.
EPC_THROTTLE_SECONDS = 1.0
@pytest.fixture(autouse=True)
def _throttle_epc_requests():
yield
time.sleep(EPC_THROTTLE_SECONDS)
def load_test_cases():
with open(FIXTURE_PATH, newline="", encoding="utf-8") as f:

View file

@ -168,8 +168,8 @@ FLAT 8 599 HARROW ROAD,W10 4RA,None
"Apartment 18 Block D, 32, Hornsey Road",N7 7AT,10012792383
24b Honley Road,SE6 2HZ,None
FLAT B 158 LEAHURST ROAD,SE13 5NL,100021976974
2 COLLEGE HOUSE,CM7 1JS,100091449870
3 COLLEGE HOUSE,CM7 1JS,100091449871
2 COLLEGE HOUSE,CM7 1JS,None
3 COLLEGE HOUSE,CM7 1JS,None
1 Anita Street,M4 5DU,None
2 Anita Street,M4 5DU,77123061
5 Anita Street,M4 5DU,77123081
@ -279,6 +279,7 @@ FLAT B 158 LEAHURST ROAD,SE13 5NL,100021976974
80a Victoria Square,M4 5DZ,77211231
81a Victoria Square,M4 5DZ,77211232
82 Victoria Square,M4 5DZ,None
82a Victoria Square,M4 5DZ,77211233
83a Victoria Square,M4 5DZ,77211234
84a Victoria Square,M4 5DZ,None
85a Victoria Square,M4 5DZ,77211236

1 User Input Postcode Manual UPRN Code
168 Apartment 18 Block D, 32, Hornsey Road N7 7AT 10012792383
169 24b Honley Road SE6 2HZ None
170 FLAT B 158 LEAHURST ROAD SE13 5NL 100021976974
171 2 COLLEGE HOUSE CM7 1JS 100091449870 None
172 3 COLLEGE HOUSE CM7 1JS 100091449871 None
173 1 Anita Street M4 5DU None
174 2 Anita Street M4 5DU 77123061
175 5 Anita Street M4 5DU 77123081
279 80a Victoria Square M4 5DZ 77211231
280 81a Victoria Square M4 5DZ 77211232
281 82 Victoria Square M4 5DZ None
282 82a Victoria Square M4 5DZ 77211233
283 83a Victoria Square M4 5DZ 77211234
284 84a Victoria Square M4 5DZ None
285 85a Victoria Square M4 5DZ 77211236

View file

@ -45,6 +45,7 @@ class Settings(BaseSettings):
# Third parties
EPC_AUTH_TOKEN: str = "changeme"
OPEN_EPC_API_TOKEN: str = "changeme"
GOOGLE_SOLAR_API_KEY: str = "changeme"
MAGICPLAN_CUSTOMER_ID: str = "changeme"
MAGICPLAN_API_KEY: str = "changeme"

View file

@ -13,4 +13,9 @@ boto3==1.35.44
openpyxl==3.1.5
# Basic
pytz
sqlmodel
sqlmodel
# HTTP client
httpx==0.28.1
# Data
pandas
pandas-stubs

View file

@ -23,4 +23,6 @@ pyarrow==17.0.0
fastparquet==2024.5.0
aiohttp==3.10.10
# find my epc
beautifulsoup4
beautifulsoup4
# HTTP client (epc_client module)
httpx==0.28.1

View file

@ -0,0 +1,3 @@
from backend.epc_client.epc_client_service import EpcClientService
__all__ = ["EpcClientService"]

View file

@ -0,0 +1,28 @@
import time
from typing import Callable, TypeVar
from backend.epc_client.exceptions import EpcRateLimitError
T = TypeVar("T")
def call_with_retry(
fn: Callable[[], T],
max_retries: int = 5,
backoff_base: float = 1.0,
backoff_multiplier: float = 2.0,
max_backoff: float = 60.0,
) -> T:
last_exc: EpcRateLimitError | None = None
for attempt in range(max_retries + 1):
try:
return fn()
except EpcRateLimitError as exc:
last_exc = exc
if attempt < max_retries:
if exc.retry_after is not None:
delay = exc.retry_after
else:
delay = backoff_base * (backoff_multiplier ** attempt)
time.sleep(min(delay, max_backoff))
raise last_exc # type: ignore[misc]

View file

@ -0,0 +1,118 @@
# Spec: https://raw.githubusercontent.com/communitiesuk/epb-data-warehouse/main/api/api.yml
from __future__ import annotations
from typing import Any, Optional
import httpx
from backend.epc_client.exceptions import (
EpcApiError,
EpcNotFoundError,
EpcRateLimitError,
)
from backend.epc_client._retry import call_with_retry
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.search import EpcSearchResult
class EpcClientService:
BASE_URL = "https://api.get-energy-performance-data.communities.gov.uk"
REQUEST_TIMEOUT = 10.0
def __init__(self, auth_token: str) -> None:
self._headers = {
"Authorization": f"Bearer {auth_token}",
"Accept": "application/json",
}
@staticmethod
def _parse_retry_after(resp: httpx.Response) -> Optional[float]:
header = resp.headers.get("Retry-After")
if header is None:
return None
try:
return float(header)
except (TypeError, ValueError):
return None
def get_by_certificate_number(self, cert_num: str) -> EpcPropertyData:
raw = call_with_retry(lambda: self._fetch_certificate(cert_num))
return EpcPropertyDataMapper.from_api_response(raw)
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
results = call_with_retry(lambda: self._search(uprn=uprn))
if not results:
return None
latest = max(results, key=lambda r: r.registration_date)
return self.get_by_certificate_number(latest.certificate_number)
def search_by_postcode(self, postcode: str) -> list[EpcSearchResult]:
return call_with_retry(lambda: self._search(postcode=postcode))
# ------------------------------------------------------------------
# Private helperEpcRateLimpolarss
# ------------------------------------------------------------------
def _fetch_certificate(self, cert_num: str) -> dict[str, Any]:
resp = httpx.get(
f"{self.BASE_URL}/api/certificate",
params={"certificate_number": cert_num},
headers=self._headers,
timeout=self.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
raise EpcNotFoundError(cert_num)
if resp.status_code == 429:
raise EpcRateLimitError(
"Rate limited by EPC API",
retry_after=self._parse_retry_after(resp),
)
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
return resp.json()["data"]
def _search(
self,
postcode: Optional[str] = None,
uprn: Optional[int] = None,
) -> list[EpcSearchResult]:
params: dict[str, str | int] = {}
if postcode:
params["postcode"] = postcode
if uprn is not None:
params["uprn"] = uprn
resp = httpx.get(
f"{self.BASE_URL}/api/domestic/search",
params=params,
headers=self._headers,
timeout=self.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
return []
if resp.status_code == 429:
raise EpcRateLimitError(
"Rate limited by EPC API",
retry_after=self._parse_retry_after(resp),
)
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
rows = resp.json().get("data", [])
return [self._parse_search_result(r) for r in rows]
@staticmethod
def _parse_search_result(row: dict[str, Any]) -> EpcSearchResult:
return EpcSearchResult(
certificate_number=row["certificateNumber"],
address_line_1=row["addressLine1"],
address_line_2=row.get("addressLine2"),
address_line_3=row.get("addressLine3"),
address_line_4=row.get("addressLine4"),
postcode=row["postcode"],
post_town=row["postTown"],
uprn=row.get("uprn"),
current_energy_efficiency_band=row["currentEnergyEfficiencyBand"],
registration_date=row["registrationDate"],
)

View file

@ -0,0 +1,17 @@
from typing import Optional
class EpcApiError(Exception):
"""Base for all EPC client errors."""
class EpcNotFoundError(EpcApiError):
"""Raised when the API returns 404."""
class EpcRateLimitError(EpcApiError):
"""Raised when the API returns 429 and all retries are exhausted."""
def __init__(self, message: str, retry_after: Optional[float] = None) -> None:
super().__init__(message)
self.retry_after = retry_after

View file

View file

@ -0,0 +1,48 @@
import json
import pathlib
import pytest
from backend.epc_client.epc_client_service import EpcClientService
SAMPLES_DIR = pathlib.Path("backend/epc_api/json_samples")
@pytest.fixture
def rdsap_21_0_0_cert():
return json.loads((SAMPLES_DIR / "RdSAP-Schema-21.0.0/epc.json").read_text())
@pytest.fixture
def rdsap_21_0_1_cert():
return json.loads((SAMPLES_DIR / "RdSAP-Schema-21.0.1/epc.json").read_text())
@pytest.fixture
def epc_service():
return EpcClientService(auth_token="test-token")
def make_search_row(
cert_num="CERT-001",
address_line_1="1 Test Street",
postcode="SW1A 1AA",
post_town="London",
uprn=100023336956,
band="D",
registration_date="2024-01-01",
address_line_2=None,
address_line_3=None,
address_line_4=None,
):
return {
"certificateNumber": cert_num,
"addressLine1": address_line_1,
"addressLine2": address_line_2,
"addressLine3": address_line_3,
"addressLine4": address_line_4,
"postcode": postcode,
"postTown": post_town,
"uprn": uprn,
"currentEnergyEfficiencyBand": band,
"registrationDate": registration_date,
}

View file

@ -0,0 +1,217 @@
from unittest.mock import MagicMock, patch, call
import pytest
from backend.epc_client.epc_client_service import EpcClientService
from datatypes.epc.search import EpcSearchResult
from backend.epc_client.exceptions import EpcNotFoundError, EpcRateLimitError
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from backend.epc_client.tests.conftest import make_search_row
def _mock_response(status_code=200, json_data=None, headers=None):
resp = MagicMock()
resp.status_code = status_code
resp.is_success = 200 <= status_code < 300
resp.json.return_value = json_data or {}
resp.text = str(json_data)
resp.headers = headers or {}
return resp
# ---------------------------------------------------------------------------
# Test 1: get_by_certificate_number happy path
# ---------------------------------------------------------------------------
def test_get_by_certificate_number_returns_epc_property_data(
epc_service, rdsap_21_0_1_cert
):
cert_response = {"data": rdsap_21_0_1_cert}
with patch("httpx.get", return_value=_mock_response(200, cert_response)):
result = epc_service.get_by_certificate_number("CERT-001")
assert isinstance(result, EpcPropertyData)
# ---------------------------------------------------------------------------
# Test 2: get_by_certificate_number 404 → EpcNotFoundError
# ---------------------------------------------------------------------------
def test_get_by_certificate_number_404_raises_not_found(epc_service):
with patch("httpx.get", return_value=_mock_response(404)):
with pytest.raises(EpcNotFoundError):
epc_service.get_by_certificate_number("BAD-CERT")
# ---------------------------------------------------------------------------
# Test 3: 429 retried, succeeds on 3rd attempt
# ---------------------------------------------------------------------------
def test_get_by_certificate_number_retries_on_429_and_succeeds(
epc_service, rdsap_21_0_1_cert
):
cert_response = {"data": rdsap_21_0_1_cert}
responses = [
_mock_response(429),
_mock_response(429),
_mock_response(200, cert_response),
]
with patch("httpx.get", side_effect=responses), patch("time.sleep"):
result = epc_service.get_by_certificate_number("CERT-001")
assert isinstance(result, EpcPropertyData)
# ---------------------------------------------------------------------------
# Test 3b: 429 with Retry-After header → sleeps for that value
# ---------------------------------------------------------------------------
def test_429_retry_after_header_drives_sleep_duration(
epc_service, rdsap_21_0_1_cert
):
cert_response = {"data": rdsap_21_0_1_cert}
responses = [
_mock_response(429, headers={"Retry-After": "7"}),
_mock_response(200, cert_response),
]
with patch("httpx.get", side_effect=responses), patch(
"backend.epc_client._retry.time.sleep"
) as mock_sleep:
epc_service.get_by_certificate_number("CERT-001")
mock_sleep.assert_called_once_with(7.0)
# ---------------------------------------------------------------------------
# Test 3c: 429 without Retry-After → falls back to exponential backoff
# ---------------------------------------------------------------------------
def test_429_without_retry_after_uses_exponential_backoff(
epc_service, rdsap_21_0_1_cert
):
cert_response = {"data": rdsap_21_0_1_cert}
responses = [
_mock_response(429),
_mock_response(429),
_mock_response(200, cert_response),
]
with patch("httpx.get", side_effect=responses), patch(
"backend.epc_client._retry.time.sleep"
) as mock_sleep:
epc_service.get_by_certificate_number("CERT-001")
assert mock_sleep.call_args_list == [call(1.0), call(2.0)]
# ---------------------------------------------------------------------------
# Test 3d: malformed Retry-After header → falls back to exponential backoff
# ---------------------------------------------------------------------------
def test_429_malformed_retry_after_falls_back_to_backoff(
epc_service, rdsap_21_0_1_cert
):
cert_response = {"data": rdsap_21_0_1_cert}
responses = [
_mock_response(429, headers={"Retry-After": "Wed, 21 Oct 2026 07:28:00 GMT"}),
_mock_response(200, cert_response),
]
with patch("httpx.get", side_effect=responses), patch(
"backend.epc_client._retry.time.sleep"
) as mock_sleep:
epc_service.get_by_certificate_number("CERT-001")
mock_sleep.assert_called_once_with(1.0)
# ---------------------------------------------------------------------------
# Test 3e: Retry-After capped by max_backoff to avoid hostile/buggy values
# ---------------------------------------------------------------------------
def test_429_retry_after_capped_by_max_backoff(epc_service, rdsap_21_0_1_cert):
cert_response = {"data": rdsap_21_0_1_cert}
responses = [
_mock_response(429, headers={"Retry-After": "9999"}),
_mock_response(200, cert_response),
]
with patch("httpx.get", side_effect=responses), patch(
"backend.epc_client._retry.time.sleep"
) as mock_sleep:
epc_service.get_by_certificate_number("CERT-001")
mock_sleep.assert_called_once_with(60.0)
# ---------------------------------------------------------------------------
# Test 4: get_by_uprn empty search → None
# ---------------------------------------------------------------------------
def test_get_by_uprn_returns_none_when_no_results(epc_service):
with patch("httpx.get", return_value=_mock_response(200, {"data": []})):
result = epc_service.get_by_uprn(100023336956)
assert result is None
# ---------------------------------------------------------------------------
# Test 5: get_by_uprn multiple results → fetches latest by registration_date
# ---------------------------------------------------------------------------
def test_get_by_uprn_picks_most_recent_certificate(epc_service, rdsap_21_0_1_cert):
search_rows = [
make_search_row(cert_num="CERT-OLD", registration_date="2022-01-01"),
make_search_row(cert_num="CERT-NEW", registration_date="2024-06-01"),
make_search_row(cert_num="CERT-MID", registration_date="2023-03-15"),
]
cert_response = {"data": rdsap_21_0_1_cert}
def fake_get(url, params=None, **kwargs):
if "search" in url:
return _mock_response(200, {"data": search_rows})
return _mock_response(200, cert_response)
with patch("httpx.get", side_effect=fake_get) as mock_get:
result = epc_service.get_by_uprn(100023336956)
assert isinstance(result, EpcPropertyData)
# Second call must be for the most recent cert
cert_call = mock_get.call_args_list[1]
assert cert_call.kwargs["params"]["certificate_number"] == "CERT-NEW"
# ---------------------------------------------------------------------------
# Test 6: search_by_postcode returns list[EpcSearchResult]
# ---------------------------------------------------------------------------
def test_search_by_postcode_returns_results(epc_service):
rows = [
make_search_row(cert_num="CERT-A", address_line_1="1 High Street"),
make_search_row(cert_num="CERT-B", address_line_1="2 High Street"),
]
with patch("httpx.get", return_value=_mock_response(200, {"data": rows})):
results = epc_service.search_by_postcode("SW1A 1AA")
assert len(results) == 2
assert all(isinstance(r, EpcSearchResult) for r in results)
assert results[0].certificate_number == "CERT-A"
assert results[1].address_line_1 == "2 High Street"
# ---------------------------------------------------------------------------
# Test 7: search_by_postcode 404 → empty list
# ---------------------------------------------------------------------------
def test_search_by_postcode_404_returns_empty_list(epc_service):
with patch("httpx.get", return_value=_mock_response(404)):
results = epc_service.search_by_postcode("ZZ9 9ZZ")
assert results == []

View file

@ -0,0 +1,31 @@
import pytest
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.domain.epc_property_data import EpcPropertyData
# ---------------------------------------------------------------------------
# Test 1: from_api_response with RdSAP-Schema-21.0.0 fixture → EpcPropertyData
# ---------------------------------------------------------------------------
def test_from_api_response_rdsap_21_0_0(rdsap_21_0_0_cert):
result = EpcPropertyDataMapper.from_api_response(rdsap_21_0_0_cert)
assert isinstance(result, EpcPropertyData)
# ---------------------------------------------------------------------------
# Test 2: from_api_response with RdSAP-Schema-21.0.1 fixture → EpcPropertyData
# ---------------------------------------------------------------------------
def test_from_api_response_rdsap_21_0_1(rdsap_21_0_1_cert):
result = EpcPropertyDataMapper.from_api_response(rdsap_21_0_1_cert)
assert isinstance(result, EpcPropertyData)
# ---------------------------------------------------------------------------
# Test 3: unknown schema_type → ValueError
# ---------------------------------------------------------------------------
def test_from_api_response_unknown_schema_raises():
with pytest.raises(ValueError, match="Unsupported EPC schema"):
EpcPropertyDataMapper.from_api_response({"schema_type": "RdSAP-Schema-99.0.0"})

View file

@ -83,7 +83,7 @@ def process_export(
else:
scenario_recs = recommendations_df[
recommendations_df["scenario_id"] == group_key
]
]
if scenario_recs.empty:
logger.info(
@ -140,8 +140,8 @@ def handler(
body_dict = {
"task_id": "test",
"subtask_id": "test",
"portfolio_id": 682,
"scenario_ids": [1210],
"portfolio_id": 632,
"scenario_ids": [1144],
"default_plans_only": False,
}
:param event: Lambda event containing export request details

View file

@ -0,0 +1,60 @@
from backend.utils.addressMatch import AddressMatch
class TestNormaliseAddress:
def test_lowercases_input(self):
assert AddressMatch.normalise_address("1 HIGH STREET") == "1 high street"
def test_expands_road_abbreviation(self):
assert AddressMatch.normalise_address("1 Moreton Rd") == "1 moreton road"
def test_expands_avenue_abbreviation(self):
assert AddressMatch.normalise_address("2 Park Ave") == "2 park avenue"
def test_removes_punctuation_keeps_slash(self):
result = AddressMatch.normalise_address("Flat 1/A, Some Road")
assert "," not in result
assert "/" in result
def test_splits_digit_letter_suffix(self):
assert "42 a" in AddressMatch.normalise_address("42a Some Road")
def test_empty_string_returns_empty(self):
assert AddressMatch.normalise_address("") == ""
def test_removes_no_prefix(self):
result = AddressMatch.normalise_address("No 5 High Street")
assert "no" not in result.split()
assert "5" in result
class TestScore:
def test_identical_address_scores_one(self):
assert AddressMatch.score("1 High Street", "1 High Street") == 1.0
def test_case_insensitive(self):
assert AddressMatch.score("1 HIGH STREET", "1 high street") == 1.0
def test_street_type_synonym_scores_one(self):
# "Rd" expands to "road" during normalisation — should be identical
assert AddressMatch.score("1 High Rd", "1 High Road") == 1.0
def test_different_building_numbers_score_zero(self):
assert AddressMatch.score("1 High Street", "2 High Street") == 0.0
def test_disjoint_number_sets_score_zero(self):
assert AddressMatch.score("1 High Street", "99 Nowhere Lane") == 0.0
def test_user_address_has_number_but_epc_does_not_scores_zero(self):
assert AddressMatch.score("1 High Street", "High Street") == 0.0
def test_partial_address_scores_above_threshold(self):
# Extra token in user address ("London") — same building number, high overlap
score = AddressMatch.score("1 High Street London", "1 High Street")
assert 0.6 <= score < 1.0
def test_flat_number_mismatch_scores_zero(self):
# User has two numbers but no "flat" token; EPC has different flat number
# Triggers the order-sensitive flat guard
score = AddressMatch.score("3 42 High Street", "Flat 7 42 High Street")
assert score == 0.0

View file

@ -1,8 +1,14 @@
from __future__ import annotations
import re
from typing import Any, Optional
from difflib import SequenceMatcher
from typing import TYPE_CHECKING, Any, Optional
import requests
if TYPE_CHECKING:
import pandas as pd
class AddressMatch:
def __init__(self):
@ -95,6 +101,16 @@ class AddressMatch:
tokens.append(replacement)
return " ".join(tokens)
@staticmethod
def _match_building_number(token: str, next_token: Optional[str]) -> Optional[str]:
if re.fullmatch(r"\d+[a-z]", token):
return token
if re.fullmatch(r"\d+", token):
if next_token is not None and re.fullmatch(r"[a-z]", next_token):
return token + next_token
return token
return None
@staticmethod
def levenshtein(a: str, b: str) -> float:
"""
@ -121,6 +137,7 @@ class AddressMatch:
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
- '82 a victoria square' (recombined to '82a')
"""
tokens = s.split()
@ -136,10 +153,12 @@ class AddressMatch:
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
# first remaining number is building number; recombine with a
# single-letter suffix when normalisation has split "82a" → "82 a"
for i, t in enumerate(cleaned):
nxt = cleaned[i + 1] if i + 1 < len(cleaned) else None
if (match := AddressMatch._match_building_number(t, nxt)) is not None:
return match
return None
@ -172,6 +191,18 @@ class AddressMatch:
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
# Slash-format like "3/137a" is an implicit flat reference
# (flat 3 of 137a) even without a "flat" keyword.
has_implicit_flat_user = bool(re.search(r"\d+\s*/\s*\d+", a_norm))
# EPC says it's a flat but user gave no flat indication
# (neither keyword nor slash-format). Unlikely to be the right unit.
if (
has_flat_token_epc
and not has_flat_token_user
and not has_implicit_flat_user
):
return 0.0
if (
len(seq_a) == 2
@ -199,3 +230,23 @@ class AddressMatch:
0.65 * token_score + 0.35 * char_score,
4,
)
def score_addresses(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
) -> pd.Series:
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
return df[address_column].apply(lambda x: AddressMatch.score(user_address, x))
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""Returns True if all non-null UPRNs in df match the given uprn."""
if column not in df.columns:
return False
uprns = df[column].dropna().astype(str).str.strip().unique()
if len(uprns) == 0:
return False
return len(uprns) == 1 and uprns[0] == str(uprn)

View file

@ -1,11 +1,9 @@
import os
from pathlib import Path
from backend.app.config import get_settings
import os
from dotenv import load_dotenv
import os
# Load .env in conftest.py directory for local development
load_dotenv()
load_dotenv(Path(__file__).resolve().parent / "backend" / ".env")
DEFAULT_ENV = {
"API_KEY": "test",
@ -18,6 +16,10 @@ DEFAULT_ENV = {
"EPC_AUTH_TOKEN",
"test",
), # overridden in GitHub Actions
"OPEN_EPC_API_TOKEN": os.getenv(
"OPEN_EPC_API_TOKEN",
"test",
), # overridden in GitHub Actions
"GOOGLE_SOLAR_API_KEY": "test",
"DB_HOST": "localhost",
"DB_USERNAME": "test",

View file

@ -4,7 +4,7 @@ from typing import Optional
import pandas as pd
from botocore.exceptions import ClientError
from backend.address2UPRN.scoring import get_uprn_candidates
from backend.address2UPRN.scoring import rank_address_similarity
from backend.utils.addressMatch import AddressMatch
from datatypes.epc.domain.historic_epc import HistoricEpc
from utils.pandas_utils import pandas_cell_to_str
@ -85,7 +85,7 @@ def match_addresses_for_postcode(
) from e
raise
scored = get_uprn_candidates(
scored = rank_address_similarity(
df,
user_address=user_address,
address_column=address_column,

View file

@ -1,5 +1,6 @@
from datetime import date
from typing import List, Optional, Sequence, Union
from typing import List, Optional, Sequence, Union, Dict, Any
from datatypes.epc.schema.helpers import from_dict
from datatypes.epc.domain.epc_property_data import (
EnergyElement,
@ -1525,6 +1526,29 @@ class EpcPropertyDataMapper:
) -> List[EnergyElement]:
return [EpcPropertyDataMapper._map_energy_element(e) for e in elements]
@staticmethod
def from_api_response(data: Dict[str, Any]) -> "EpcPropertyData":
"""
Dispatch to the correct schema mapper based on schema_type.
Supports RdSAP-Schema-21.0.0 and RdSAP-Schema-21.0.1 only.
Raises ValueError for unsupported schemas add cases here as needed.
"""
schema = data.get("schema_type", "")
if schema == "RdSAP-Schema-21.0.1":
from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1
return EpcPropertyDataMapper.from_rdsap_schema_21_0_1(
from_dict(RdSapSchema21_0_1, data)
)
if schema == "RdSAP-Schema-21.0.0":
from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0
return EpcPropertyDataMapper.from_rdsap_schema_21_0_0(
from_dict(RdSapSchema21_0_0, data)
)
raise ValueError(f"Unsupported EPC schema: {schema!r}")
# ---------------------------------------------------------------------------
# Private helpers

View file

@ -1,3 +1,4 @@
from typing import Optional
from unittest.mock import patch
import numpy as np
@ -13,40 +14,103 @@ from datatypes.epc.domain.historic_epc_matching import (
match_addresses_for_postcode,
)
# Columns required by the HistoricEpc dataclass (lower-cased CSV columns).
# The matcher only reads ADDRESS + UPRN to score; everything else is filled
# with "" but must be present for HistoricEpc(**kwargs) to construct.
_FULL_COLUMN_FIELDS = [
"LMK_KEY", "ADDRESS1", "ADDRESS2", "ADDRESS3", "POSTCODE",
"BUILDING_REFERENCE_NUMBER", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING",
"CURRENT_ENERGY_EFFICIENCY", "POTENTIAL_ENERGY_EFFICIENCY", "PROPERTY_TYPE",
"BUILT_FORM", "INSPECTION_DATE", "LOCAL_AUTHORITY", "CONSTITUENCY", "COUNTY",
"LODGEMENT_DATE", "TRANSACTION_TYPE", "ENVIRONMENT_IMPACT_CURRENT",
"ENVIRONMENT_IMPACT_POTENTIAL", "ENERGY_CONSUMPTION_CURRENT",
"ENERGY_CONSUMPTION_POTENTIAL", "CO2_EMISSIONS_CURRENT",
"CO2_EMISS_CURR_PER_FLOOR_AREA", "CO2_EMISSIONS_POTENTIAL",
"LIGHTING_COST_CURRENT", "LIGHTING_COST_POTENTIAL", "HEATING_COST_CURRENT",
"HEATING_COST_POTENTIAL", "HOT_WATER_COST_CURRENT", "HOT_WATER_COST_POTENTIAL",
"TOTAL_FLOOR_AREA", "ENERGY_TARIFF", "MAINS_GAS_FLAG", "FLOOR_LEVEL",
"FLAT_TOP_STOREY", "FLAT_STOREY_COUNT", "MAIN_HEATING_CONTROLS",
"MULTI_GLAZE_PROPORTION", "GLAZED_TYPE", "GLAZED_AREA", "EXTENSION_COUNT",
"NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES", "HOTWATER_DESCRIPTION", "HOT_WATER_ENERGY_EFF",
"HOT_WATER_ENV_EFF", "FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF",
"WINDOWS_DESCRIPTION", "WINDOWS_ENERGY_EFF", "WINDOWS_ENV_EFF",
"WALLS_DESCRIPTION", "WALLS_ENERGY_EFF", "WALLS_ENV_EFF",
"SECONDHEAT_DESCRIPTION", "SHEATING_ENERGY_EFF", "SHEATING_ENV_EFF",
"ROOF_DESCRIPTION", "ROOF_ENERGY_EFF", "ROOF_ENV_EFF", "MAINHEAT_DESCRIPTION",
"MAINHEAT_ENERGY_EFF", "MAINHEAT_ENV_EFF", "MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF", "MAINHEATC_ENV_EFF", "LIGHTING_DESCRIPTION",
"LIGHTING_ENERGY_EFF", "LIGHTING_ENV_EFF", "MAIN_FUEL", "WIND_TURBINE_COUNT",
"HEAT_LOSS_CORRIDOR", "UNHEATED_CORRIDOR_LENGTH", "FLOOR_HEIGHT",
"PHOTO_SUPPLY", "SOLAR_WATER_HEATING_FLAG", "MECHANICAL_VENTILATION",
"ADDRESS", "LOCAL_AUTHORITY_LABEL", "CONSTITUENCY_LABEL", "POSTTOWN",
"CONSTRUCTION_AGE_BAND", "LODGEMENT_DATETIME", "TENURE",
"FIXED_LIGHTING_OUTLETS_COUNT", "LOW_ENERGY_FIXED_LIGHT_COUNT", "UPRN",
"UPRN_SOURCE", "REPORT_TYPE",
"LMK_KEY",
"ADDRESS1",
"ADDRESS2",
"ADDRESS3",
"POSTCODE",
"BUILDING_REFERENCE_NUMBER",
"CURRENT_ENERGY_RATING",
"POTENTIAL_ENERGY_RATING",
"CURRENT_ENERGY_EFFICIENCY",
"POTENTIAL_ENERGY_EFFICIENCY",
"PROPERTY_TYPE",
"BUILT_FORM",
"INSPECTION_DATE",
"LOCAL_AUTHORITY",
"CONSTITUENCY",
"COUNTY",
"LODGEMENT_DATE",
"TRANSACTION_TYPE",
"ENVIRONMENT_IMPACT_CURRENT",
"ENVIRONMENT_IMPACT_POTENTIAL",
"ENERGY_CONSUMPTION_CURRENT",
"ENERGY_CONSUMPTION_POTENTIAL",
"CO2_EMISSIONS_CURRENT",
"CO2_EMISS_CURR_PER_FLOOR_AREA",
"CO2_EMISSIONS_POTENTIAL",
"LIGHTING_COST_CURRENT",
"LIGHTING_COST_POTENTIAL",
"HEATING_COST_CURRENT",
"HEATING_COST_POTENTIAL",
"HOT_WATER_COST_CURRENT",
"HOT_WATER_COST_POTENTIAL",
"TOTAL_FLOOR_AREA",
"ENERGY_TARIFF",
"MAINS_GAS_FLAG",
"FLOOR_LEVEL",
"FLAT_TOP_STOREY",
"FLAT_STOREY_COUNT",
"MAIN_HEATING_CONTROLS",
"MULTI_GLAZE_PROPORTION",
"GLAZED_TYPE",
"GLAZED_AREA",
"EXTENSION_COUNT",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES",
"HOTWATER_DESCRIPTION",
"HOT_WATER_ENERGY_EFF",
"HOT_WATER_ENV_EFF",
"FLOOR_DESCRIPTION",
"FLOOR_ENERGY_EFF",
"FLOOR_ENV_EFF",
"WINDOWS_DESCRIPTION",
"WINDOWS_ENERGY_EFF",
"WINDOWS_ENV_EFF",
"WALLS_DESCRIPTION",
"WALLS_ENERGY_EFF",
"WALLS_ENV_EFF",
"SECONDHEAT_DESCRIPTION",
"SHEATING_ENERGY_EFF",
"SHEATING_ENV_EFF",
"ROOF_DESCRIPTION",
"ROOF_ENERGY_EFF",
"ROOF_ENV_EFF",
"MAINHEAT_DESCRIPTION",
"MAINHEAT_ENERGY_EFF",
"MAINHEAT_ENV_EFF",
"MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF",
"MAINHEATC_ENV_EFF",
"LIGHTING_DESCRIPTION",
"LIGHTING_ENERGY_EFF",
"LIGHTING_ENV_EFF",
"MAIN_FUEL",
"WIND_TURBINE_COUNT",
"HEAT_LOSS_CORRIDOR",
"UNHEATED_CORRIDOR_LENGTH",
"FLOOR_HEIGHT",
"PHOTO_SUPPLY",
"SOLAR_WATER_HEATING_FLAG",
"MECHANICAL_VENTILATION",
"ADDRESS",
"LOCAL_AUTHORITY_LABEL",
"CONSTITUENCY_LABEL",
"POSTTOWN",
"CONSTRUCTION_AGE_BAND",
"LODGEMENT_DATETIME",
"TENURE",
"FIXED_LIGHTING_OUTLETS_COUNT",
"LOW_ENERGY_FIXED_LIGHT_COUNT",
"UPRN",
"UPRN_SOURCE",
"REPORT_TYPE",
]
@ -63,7 +127,9 @@ def _build_df(rows: list[dict]) -> pd.DataFrame:
@pytest.fixture
def patch_postcode_valid():
with patch.object(matcher_mod.AddressMatch, "is_valid_postcode", return_value=True) as m:
with patch.object(
matcher_mod.AddressMatch, "is_valid_postcode", return_value=True
) as m:
yield m
@ -106,10 +172,12 @@ class TestMatchAddressesForPostcode:
self, patch_read, patch_postcode_valid
):
# Disjoint number sets => hard zero. Still kept in matches.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("999 SOMEWHERE ELSE", "200"),
])
patch_read.return_value = _build_df(
[
_row("47 GORDON ROAD", "100"),
_row("999 SOMEWHERE ELSE", "200"),
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert isinstance(result, HistoricEpcMatches)
assert len(result.matches) == 2
@ -117,10 +185,12 @@ class TestMatchAddressesForPostcode:
def test_top_has_lexirank_one_and_lexiscore_monotone(
self, patch_read, patch_postcode_valid
):
patch_read.return_value = _build_df([
_row("48 GORDON ROAD", "200"), # near miss
_row("47 GORDON ROAD", "100"), # exact (after normalisation)
])
patch_read.return_value = _build_df(
[
_row("48 GORDON ROAD", "200"), # near miss
_row("47 GORDON ROAD", "100"), # exact (after normalisation)
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.top().lexirank == 1
scores = [m.lexiscore for m in result.matches]
@ -173,19 +243,23 @@ class TestMatchAddressesForPostcode:
class TestUnambiguousUprn:
def test_exact_match_returns_uprn(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
])
patch_read.return_value = _build_df(
[
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() == "100"
def test_ambiguous_tie_returns_none(self, patch_read, patch_postcode_valid):
# Two duplicate addresses with different UPRNs share rank-1.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("47 GORDON ROAD", "200"),
])
patch_read.return_value = _build_df(
[
_row("47 GORDON ROAD", "100"),
_row("47 GORDON ROAD", "200"),
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() is None
@ -193,10 +267,12 @@ class TestUnambiguousUprn:
self, patch_read, patch_postcode_valid
):
# User address has building number 47; no row has 47 -> all hard-zero.
patch_read.return_value = _build_df([
_row("999 ELSEWHERE", "100"),
_row("888 ELSEWHERE", "200"),
])
patch_read.return_value = _build_df(
[
_row("999 ELSEWHERE", "100"),
_row("888 ELSEWHERE", "200"),
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert all(m.lexiscore == 0.0 for m in result.matches)
assert result.unambiguous_uprn() is None
@ -205,15 +281,22 @@ class TestUnambiguousUprn:
self, patch_read, patch_postcode_valid
):
# Use a real NaN in the UPRN cell.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", np.nan),
_row("48 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top = result.top()
patch_read.return_value = _build_df(
[
_row("47 GORDON ROAD", np.nan),
_row("48 GORDON ROAD", "200"),
]
)
result: HistoricEpcMatches = match_addresses_for_postcode(
"47 Gordon Road", "AB33 8AL"
)
top: Optional[ScoredHistoricEpc] = result.top()
# pandas_cell_to_str must turn NaN/"nan" into "" (not the literal string "nan"),
# so unambiguous_uprn's truthiness check correctly drops the row.
assert top.record.uprn == ""
if top:
assert top.record.uprn == ""
else:
pytest.fail("should have an epc score, no results found :(")
# ---------- top / top_n ----------
@ -222,11 +305,13 @@ class TestUnambiguousUprn:
class TestTopHelpers:
def test_top_n_returns_first_k(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
_row("49 GORDON ROAD", "300"),
])
patch_read.return_value = _build_df(
[
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
_row("49 GORDON ROAD", "300"),
]
)
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top2 = result.top_n(2)
assert len(top2) == 2

View file

@ -0,0 +1,77 @@
import dataclasses
import typing
from datetime import date
from typing import Any, Dict, Type, TypeVar
T = TypeVar("T")
def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
"""
Recursively convert a plain dict (e.g. from json.loads) into the given
dataclass type, using the field type hints to convert nested structures.
Handles:
- Nested dataclasses
- List[SomeDataclass]
- Optional[X] / Union[X, None]
- Union[DataclassType, primitive] (e.g. Union[Measurement, int])
- Primitive pass-through for Union[str, int] etc.
"""
return _from_dict_impl(cls, data) # type: ignore[return-value]
def _from_dict_impl(cls: Any, data: Any) -> Any:
hints = typing.get_type_hints(cls)
kwargs: Dict[str, Any] = {}
for field in dataclasses.fields(cls): # type: ignore[arg-type]
has_default = (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING # type: ignore[misc]
)
if field.name not in data:
if has_default:
continue
raise ValueError(f"{cls.__name__}: missing required field '{field.name}'")
kwargs[field.name] = _coerce(data[field.name], hints[field.name])
return cls(**kwargs)
def _coerce(value: Any, hint: Any) -> Any:
if value is None:
return None
origin = typing.get_origin(hint)
args = typing.get_args(hint)
# Union (includes Optional[X] which is Union[X, None])
if origin is typing.Union:
if value is None:
return None
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
# Optional[X] — recurse so List[X] and nested dataclasses are handled
return _coerce(value, non_none_args[0])
# Multi-type Union (e.g. Union[Measurement, int]): try dataclasses first
for arg in non_none_args:
if dataclasses.is_dataclass(arg) and isinstance(value, dict):
return _from_dict_impl(arg, value)
# All remaining args are primitives — return value as-is
return value
# List[X]
if origin is list:
item_hint = args[0]
return [_coerce(item, item_hint) for item in value]
# Plain dataclass
if dataclasses.is_dataclass(hint) and isinstance(value, dict):
return _from_dict_impl(hint, value)
if hint is date and isinstance(value, str):
return date.fromisoformat(value)
return value

View file

@ -1,77 +1,3 @@
import dataclasses
import typing
from datetime import date
from typing import Any, Dict, Type, TypeVar
from datatypes.epc.schema.helpers import from_dict
T = TypeVar("T")
def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
"""
Recursively convert a plain dict (e.g. from json.loads) into the given
dataclass type, using the field type hints to convert nested structures.
Handles:
- Nested dataclasses
- List[SomeDataclass]
- Optional[X] / Union[X, None]
- Union[DataclassType, primitive] (e.g. Union[Measurement, int])
- Primitive pass-through for Union[str, int] etc.
"""
return _from_dict_impl(cls, data) # type: ignore[return-value]
def _from_dict_impl(cls: Any, data: Any) -> Any:
hints = typing.get_type_hints(cls)
kwargs: Dict[str, Any] = {}
for field in dataclasses.fields(cls): # type: ignore[arg-type]
has_default = (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING # type: ignore[misc]
)
if field.name not in data:
if has_default:
continue
raise ValueError(f"{cls.__name__}: missing required field '{field.name}'")
kwargs[field.name] = _coerce(data[field.name], hints[field.name])
return cls(**kwargs)
def _coerce(value: Any, hint: Any) -> Any:
if value is None:
return None
origin = typing.get_origin(hint)
args = typing.get_args(hint)
# Union (includes Optional[X] which is Union[X, None])
if origin is typing.Union:
if value is None:
return None
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
# Optional[X] — recurse so List[X] and nested dataclasses are handled
return _coerce(value, non_none_args[0])
# Multi-type Union (e.g. Union[Measurement, int]): try dataclasses first
for arg in non_none_args:
if dataclasses.is_dataclass(arg) and isinstance(value, dict):
return _from_dict_impl(arg, value)
# All remaining args are primitives — return value as-is
return value
# List[X]
if origin is list:
item_hint = args[0]
return [_coerce(item, item_hint) for item in value]
# Plain dataclass
if dataclasses.is_dataclass(hint) and isinstance(value, dict):
return _from_dict_impl(hint, value)
if hint is date and isinstance(value, str):
return date.fromisoformat(value)
return value
__all__ = ["from_dict"]

View file

@ -0,0 +1,3 @@
from datatypes.epc.search.epc_search_result import EpcSearchResult
__all__ = ["EpcSearchResult"]

View file

@ -0,0 +1,28 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass
class EpcSearchResult:
certificate_number: str
address_line_1: str
address_line_2: Optional[str]
address_line_3: Optional[str]
address_line_4: Optional[str]
postcode: str
post_town: str
uprn: Optional[int]
current_energy_efficiency_band: str
registration_date: str
@property
def full_address(self) -> str:
parts = [
self.address_line_1,
self.address_line_2,
self.address_line_3,
self.address_line_4,
]
return ", ".join(p for p in parts if p)

View file

@ -1,6 +1,7 @@
import os
import time
from enum import Enum
from http import HTTPStatus
from typing import Optional, cast, Callable, Any
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
@ -86,19 +87,27 @@ class HubspotClient:
def _call_with_retry(self, fn: Callable[[], Any], max_retries: int = 2) -> Any:
"""
Call fn(), retrying up to max_retries times on 429 rate-limit errors.
Call fn(), retrying up to max_retries times on 429 rate-limit errors
or transient 5xx server errors.
Waits the minimal amount: the remaining interval window reported by HubSpot headers.
Falls back to the full interval (10s) if headers are absent.
Note: each HubSpot sub-module (deals, companies, etc.) ships its own ApiException
class with no shared base beyond Exception, so we detect 429s via duck-typing.
class with no shared base beyond Exception, so we detect retryable statuses via duck-typing.
"""
retryable_statuses = {
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.GATEWAY_TIMEOUT,
}
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as e:
status = getattr(e, "status", None)
if status != 429 or attempt == max_retries:
if status not in retryable_statuses or attempt == max_retries:
raise
headers = getattr(e, "headers", None) or {}
interval_ms = int(
@ -106,7 +115,7 @@ class HubspotClient:
)
wait_s = interval_ms / 1000.0
self.logger.warning(
f"HubSpot 429 (attempt {attempt + 1}/{max_retries}), "
f"HubSpot {status} (attempt {attempt + 1}/{max_retries}), "
f"waiting {wait_s:.1f}s before retry."
)
time.sleep(wait_s)

View file

@ -1,3 +1 @@
[tool.pyright]
reportUnknownMemberType = false
reportUnknownVariableType = false

View file

@ -12,6 +12,7 @@ Usage:
import sys
from datatypes.epc.domain.historic_epc_matching import match_addresses_for_postcode
from typing import Optional
def main(user_address: str, postcode: str) -> None:
@ -29,7 +30,7 @@ def main(user_address: str, postcode: str) -> None:
)
print()
uprn = result.unambiguous_uprn()
uprn: Optional[str] = result.unambiguous_uprn()
if uprn:
print(f"Unambiguous UPRN: {uprn}")
else: