Resolve a landlord mains-gas override to the primary fuel code 🟩

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jun-te Kim 2026-06-19 12:15:54 +00:00
parent 240d7b1025
commit 0e85da1507
11 changed files with 2150 additions and 2 deletions

View file

@ -13,10 +13,18 @@ from __future__ import annotations
from typing import Optional
from domain.modelling.simulation import EpcSimulation
from domain.modelling.simulation import EpcSimulation, HeatingOverlay
# RdSAP-20/21 `main_fuel` `(not community)` codes (epc_codes.csv `main_fuel`).
_FUEL_CODES: dict[str, int] = {
"mains gas": 26,
}
def fuel_overlay_for(
main_fuel_value: str, building_part: int
) -> Optional[EpcSimulation]:
raise NotImplementedError
code = _FUEL_CODES.get(main_fuel_value)
if code is None:
return None
return EpcSimulation(heating=HeatingOverlay(main_fuel_type=code))

View file

@ -0,0 +1,353 @@
"""Fill the DOMNA columns in the AddressProfilingResults spreadsheet.
Input: scripts/manipulation(2).xlsx, sheet "AddressProfilingResults", columns
Organisation Reference | UPRN | DOMNA FOUND UPRN | DOMNA FOUND ADDRESS | Address | Postcode
Per-row rule ("if there's a UPRN in the UPRN column we're done"):
* UPRN present AND Address present -> nothing to do (already sorted).
* UPRN present AND Address missing -> reverse-lookup the address from the UPRN
via the EPC API -> DOMNA FOUND ADDRESS.
* UPRN missing AND Address present -> resolve a UPRN from address + postcode
(EPC API, then Ordnance Survey) -> writes
DOMNA FOUND UPRN + DOMNA FOUND ADDRESS.
* not resolvable -> marked "NOT FOUND" and listed in the
unresolved report.
Relaxed matching (this batch only production AddressMatch is untouched): the
landlord writes flats as "3 GLADYS COURT" while EPC stores "Flat 3 Gladys
Court", which the production matcher hard-rejects. So per address we try several
query variants the full string, just the first comma-segment, and a
"Flat <n> ..." form and keep the best-scoring, unambiguous match. The unit
number must still match exactly (AddressMatch zeroes mismatched numbers), so a
wrong-unit match stays unlikely. Each fill carries its score + source so you can
spot-check (DOMNA SCORE / DOMNA SOURCE).
Rows that already have a DOMNA FOUND UPRN are skipped (idempotent / resumable).
python -m scripts.fill_domna_addresses
python -m scripts.fill_domna_addresses --limit 200 # smoke test first N
Keys come from backend/.env (OPEN_EPC_API_TOKEN, ORDNANCE_SURVEY_API_KEY). Run
from the worktree root (import trap).
"""
from __future__ import annotations
import argparse
import os
import re
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from backend.address2UPRN.main import get_epc_data_with_postcode # noqa: E402
from backend.address2UPRN.scoring import all_uprns_match, rank_address_similarity # noqa: E402
from backend.ordnanceSurvey.helpers import ( # noqa: E402
lookup_os_places,
os_places_results_to_dataframe,
)
from backend.utils.addressMatch import AddressMatch # noqa: E402
from datatypes.epc.search import EpcSearchResult # noqa: E402
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from scripts.resolve_uprns_for_finaliser import clean_postcode, load_keys # noqa: E402
SHEET = "AddressProfilingResults"
UPRN_COL = "UPRN"
ADDRESS_COL = "Address"
POSTCODE_COL = "Postcode"
REF_COL = "Organisation Reference"
FOUND_UPRN_COL = "DOMNA FOUND UPRN"
FOUND_ADDRESS_COL = "DOMNA FOUND ADDRESS"
SCORE_COL = "DOMNA SCORE"
SOURCE_COL = "DOMNA SOURCE"
NOT_FOUND = "NOT FOUND"
# EPC matches are tight (short addresses) so we hold the production 0.7 bar; OS
# addresses carry more trailing tokens, so a slightly lower bar is appropriate.
EPC_THRESHOLD = 0.7
OS_THRESHOLD = 0.6
_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation(2).xlsx"
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx"
_DEFAULT_UNRESOLVED = _REPO_ROOT / "scripts" / "manipulation_unresolved.csv"
# A resolved hit: (uprn, matched_address, score, source).
Hit = tuple[str, str, float, str]
def cell_str(value: object) -> str:
"""Coerce a spreadsheet cell to a trimmed string ("" for NaN/None)."""
if value is None:
return ""
text = str(value).strip()
return "" if text.lower() == "nan" else text
def parse_uprn_cell(value: object) -> Optional[int]:
"""Read a UPRN cell that pandas loaded as float64 back into an int."""
text = cell_str(value)
if not text:
return None
try:
return int(float(text))
except ValueError:
return None
def address_variants(address: str) -> list[str]:
"""Query forms to try for one input address, best-discriminating first.
Landlord flats read "3 GLADYS COURT, 260 REIGATE ROAD" but EPC stores
"Flat 3 Gladys Court"; the full string scores low (extra tokens) and the
bare "3 ..." trips the flat guard. So we also try the first comma-segment
and a "Flat <segment>" form.
"""
address = address.strip()
first = address.split(",")[0].strip()
variants = [address, first]
if re.match(r"^\d", first): # starts with a unit/house number
variants.append("Flat " + first)
variants.append("Flat " + address)
seen: set[str] = set()
out: list[str] = []
for v in variants:
key = v.lower()
if v and key not in seen:
seen.add(key)
out.append(v)
return out
def resolve_epc_relaxed(
address: str,
postcode_clean: str,
epc_cache: dict[str, pd.DataFrame],
threshold: float = EPC_THRESHOLD,
) -> Optional[Hit]:
"""Best unambiguous EPC match across the address variants (cached per postcode)."""
epc_df = epc_cache.get(postcode_clean)
if epc_df is None:
epc_df = get_epc_data_with_postcode(postcode=postcode_clean)
epc_cache[postcode_clean] = epc_df
if epc_df.empty:
return None
best: Optional[Hit] = None
for variant in address_variants(address):
scored = rank_address_similarity(epc_df, user_address=variant)
if scored.empty:
continue
score = float(scored.iloc[0]["lexiscore"])
if best is not None and score <= best[2]:
continue
top_rank = scored[scored["lexirank"] == 1]
# rank-1 rows must agree on one UPRN, else it's ambiguous — skip.
if not all_uprns_match(top_rank, top_rank.iloc[0]["uprn"]):
continue
uprn = str(top_rank.iloc[0]["uprn"])
if uprn in ("", "nan"):
continue
best = (uprn, str(scored.iloc[0]["address"]), score, "epc")
return best if best is not None and best[2] >= threshold else None
def resolve_os_relaxed(
address: str,
postcode_clean: str,
os_api_key: str,
os_cache: dict[str, pd.DataFrame],
threshold: float = OS_THRESHOLD,
) -> Optional[Hit]:
"""Best OS Places match across the address variants (cached per postcode)."""
places_df = os_cache.get(postcode_clean)
if places_df is None:
response = lookup_os_places(postcode_clean, os_api_key)
if response.get("status") == 200 and "data" in response:
places_df = os_places_results_to_dataframe(response["data"])
else:
places_df = pd.DataFrame()
os_cache[postcode_clean] = places_df
if places_df.empty or "ADDRESS" not in places_df.columns:
return None
records: list[dict[str, object]] = places_df.to_dict(orient="records")
best: Optional[Hit] = None
for variant in address_variants(address):
for rec in records:
candidate = str(rec.get("ADDRESS", ""))
score = AddressMatch.score(variant, candidate)
if best is None or score > best[2]:
best = (str(rec.get("UPRN", "")), candidate, score, "ordnance_survey")
return best if best is not None and best[2] >= threshold else None
def _address_from_search(result: EpcSearchResult) -> str:
parts = [
result.address_line_1,
result.address_line_2,
result.address_line_3,
result.address_line_4,
result.post_town,
]
return ", ".join(p.strip() for p in parts if p and p.strip())
def reverse_address_from_uprn(
uprn: int,
postcode_clean: str,
service: EpcClientService,
search_cache: dict[str, list[EpcSearchResult]],
) -> Optional[str]:
"""Find the EPC address for a known UPRN by searching its postcode (cached)."""
results = search_cache.get(postcode_clean)
if results is None:
results = service.search_by_postcode(postcode_clean)
search_cache[postcode_clean] = results
for result in results:
if result.uprn is not None and int(result.uprn) == uprn:
return _address_from_search(result)
return None
def fill(df: pd.DataFrame, *, os_api_key: Optional[str]) -> list[dict[str, str]]:
"""Fill the DOMNA columns in place. Returns the unresolved rows."""
for col in (FOUND_UPRN_COL, FOUND_ADDRESS_COL, SCORE_COL, SOURCE_COL):
if col not in df.columns:
df[col] = ""
df[FOUND_UPRN_COL] = df[FOUND_UPRN_COL].astype("object")
df[FOUND_ADDRESS_COL] = df[FOUND_ADDRESS_COL].astype("object")
token = os.environ.get("OPEN_EPC_API_TOKEN")
service = EpcClientService(auth_token=token) if token else None
epc_cache: dict[str, pd.DataFrame] = {}
os_cache: dict[str, pd.DataFrame] = {}
search_cache: dict[str, list[EpcSearchResult]] = {}
unresolved: list[dict[str, str]] = []
resolved_uprn = resolved_addr = skipped = 0
total = len(df)
for n, idx in enumerate(df.index, start=1):
ref = cell_str(df.at[idx, REF_COL])
given_uprn = parse_uprn_cell(df.at[idx, UPRN_COL])
address = cell_str(df.at[idx, ADDRESS_COL])
postcode_raw = cell_str(df.at[idx, POSTCODE_COL])
postcode_clean = clean_postcode(postcode_raw)
# Already sorted (UPRN + address) or already filled by a prior run.
if given_uprn is not None and address:
skipped += 1
continue
if cell_str(df.at[idx, FOUND_UPRN_COL]) and cell_str(df.at[idx, FOUND_UPRN_COL]) != NOT_FOUND:
skipped += 1
continue
def mark_not_found(reason: str) -> None:
df.at[idx, FOUND_UPRN_COL] = NOT_FOUND if given_uprn is None else ""
df.at[idx, FOUND_ADDRESS_COL] = NOT_FOUND
df.at[idx, SOURCE_COL] = "not_found"
unresolved.append(
{
"Organisation Reference": ref,
"reason": reason,
"Address": address,
"Postcode": postcode_raw,
}
)
# Case B — UPRN present, address missing: reverse-lookup the address.
if given_uprn is not None and not address:
found: Optional[str] = None
if service is not None and postcode_clean:
try:
found = reverse_address_from_uprn(
given_uprn, postcode_clean, service, search_cache
)
except Exception as exc:
print(f" reverse failed {ref} {given_uprn}: {exc}")
if found:
df.at[idx, FOUND_ADDRESS_COL] = found
df.at[idx, SOURCE_COL] = "epc_reverse"
resolved_addr += 1
else:
mark_not_found("no address for UPRN")
continue
# Case A — no UPRN, has address: resolve a UPRN.
if given_uprn is None and address:
if not postcode_clean:
mark_not_found("no postcode")
continue
hit: Optional[Hit] = None
if token:
try:
hit = resolve_epc_relaxed(address, postcode_clean, epc_cache)
except Exception as exc:
print(f" EPC failed {ref} {postcode_clean}: {exc}")
if hit is None and os_api_key:
try:
hit = resolve_os_relaxed(address, postcode_clean, os_api_key, os_cache)
except Exception as exc:
print(f" OS failed {ref} {postcode_clean}: {exc}")
if hit is not None:
uprn, matched, score, source = hit
df.at[idx, FOUND_UPRN_COL] = uprn
df.at[idx, FOUND_ADDRESS_COL] = matched
df.at[idx, SCORE_COL] = round(score, 4)
df.at[idx, SOURCE_COL] = source
resolved_uprn += 1
else:
mark_not_found("no UPRN match")
if n % 100 == 0:
print(
f"[{n}/{total}] resolved={resolved_uprn} not_found={len(unresolved)}"
)
continue
# Case C — neither a UPRN nor an address.
mark_not_found("no UPRN and no address")
print(
f"\nResolved {resolved_uprn} UPRNs, {resolved_addr} addresses; "
f"{skipped} already sorted/done; {len(unresolved)} not found."
)
return unresolved
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
parser.add_argument("--unresolved", type=Path, default=_DEFAULT_UNRESOLVED)
parser.add_argument("--limit", type=int, default=None, help="process first N rows")
return parser.parse_args()
def main() -> int:
args = _parse_args()
_epc_token, os_api_key = load_keys()
df = pd.read_excel(args.inp, sheet_name=SHEET)
if args.limit is not None:
df = df.head(args.limit).copy()
print(f"Loaded {len(df)} rows from {args.inp} [{SHEET}]")
unresolved = fill(df, os_api_key=os_api_key)
df.to_excel(args.out, sheet_name=SHEET, index=False)
print(f"Wrote filled sheet -> {args.out}")
if unresolved:
pd.DataFrame(unresolved).to_csv(args.unresolved, index=False)
print(f"Wrote {len(unresolved)} unresolved rows -> {args.unresolved}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,331 @@
"""Insert resolved manipulation_filled rows into the FE-owned ``property`` table.
Reuses the bulk_upload_finaliser's own row->PropertyIdentityInsert mapping
(``BulkUploadFinaliserOrchestrator._row_to_insert``) and the same
``PropertyPostgresRepository.insert_all`` the Lambda uses, so a row inserted here
is identical to one the real finaliser would write. The status-writer /
property_overrides path is skipped this only populates ``property`` (no
BulkUpload task needed).
Insert is ON CONFLICT (portfolio_id, uprn) DO NOTHING, so re-running is safe.
# one random resolved row into portfolio 796, then read it back
python -m scripts.finalise_to_property_table --portfolio 796 --one
# a specific Organisation Reference
python -m scripts.finalise_to_property_table --portfolio 796 --ref 56100000101
# the whole sheet (resolved rows only by default; --include-unmatched to add
# null-UPRN rows too)
python -m scripts.finalise_to_property_table --portfolio 796 --all
Postgres target comes from the root .env (POSTGRES_*). Run from the worktree root.
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
from dotenv import load_dotenv
from sqlmodel import select
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from infrastructure.postgres.config import PostgresConfig # noqa: E402
from infrastructure.postgres.engine import commit_scope, make_engine, make_session # noqa: E402
from infrastructure.postgres.property_table import PropertyRow # noqa: E402
from orchestration.bulk_upload_finaliser_orchestrator import ( # noqa: E402
BulkUploadFinaliserOrchestrator,
)
from repositories.property.property_postgres_repository import ( # noqa: E402
PropertyPostgresRepository,
)
from repositories.property.property_repository import PropertyIdentityInsert # noqa: E402
from scripts.fill_domna_addresses import ( # noqa: E402
ADDRESS_COL,
FOUND_ADDRESS_COL,
FOUND_UPRN_COL,
POSTCODE_COL,
REF_COL,
SCORE_COL,
SHEET,
UPRN_COL,
NOT_FOUND,
cell_str,
parse_uprn_cell,
)
_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx"
def _final_uprn(row: pd.Series) -> Optional[int]:
"""The authoritative UPRN: the given one, else the DOMNA-found one."""
given = parse_uprn_cell(row.get(UPRN_COL))
if given is not None:
return given
found = cell_str(row.get(FOUND_UPRN_COL))
if found and found != NOT_FOUND:
return parse_uprn_cell(found)
return None
def to_combiner_row(row: pd.Series) -> dict[str, str]:
"""Map one spreadsheet row to the combiner-output shape the finaliser reads."""
given_uprn = parse_uprn_cell(row.get(UPRN_COL))
address = cell_str(row.get(ADDRESS_COL))
uprn = _final_uprn(row)
domna_addr = cell_str(row.get(FOUND_ADDRESS_COL))
if domna_addr == NOT_FOUND:
domna_addr = ""
# Matched address: the resolved one when we found it, else the given address
# (for rows that already had a UPRN + address).
matched = domna_addr or (address if given_uprn is not None else "")
score = cell_str(row.get(SCORE_COL))
return {
"Address 1": address,
"Address 2": "",
"Address 3": "",
"postcode": cell_str(row.get(POSTCODE_COL)),
"Internal Reference": cell_str(row.get(REF_COL)),
"address2uprn_uprn": "" if uprn is None else str(uprn),
"address2uprn_address": matched,
"address2uprn_lexiscore": score,
}
def load_rows(
path: Path, *, include_unmatched: bool
) -> tuple[pd.DataFrame, list[dict[str, str]]]:
"""Load the sheet and the combiner rows. By default drop rows with no UPRN."""
df = pd.read_excel(path, sheet_name=SHEET)
df = df.reset_index(drop=True)
if not include_unmatched:
keep = df.apply(lambda r: _final_uprn(r) is not None, axis=1)
df = df[keep].reset_index(drop=True)
rows = [to_combiner_row(r) for _, r in df.iterrows()]
return df, rows
def dedupe_by_uprn(
rows: list[dict[str, str]],
) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
"""Keep the first row per UPRN; return (kept, dropped collisions).
The DB INSERT collapses duplicate (portfolio, uprn) via ON CONFLICT DO
NOTHING anyway, so this just makes the collision explicit (the dropped rows
are written out for review) rather than letting an arbitrary ref win silently.
"""
seen: set[str] = set()
kept: list[dict[str, str]] = []
dropped: list[dict[str, str]] = []
for row in rows:
uprn = row["address2uprn_uprn"]
if uprn in seen:
dropped.append(row)
else:
seen.add(uprn)
kept.append(row)
return kept, dropped
# Force-reload teardown order (bottom-up). property_overrides is ON DELETE
# CASCADE so it clears itself when the property goes; everything below is NO
# ACTION and must be deleted first, deepest child first.
# property -> epc_property -> {these children}
_EPC_CHILD_TABLES = (
"epc_energy_element",
"epc_window",
"epc_main_heating_detail",
"epc_renewable_heat_incentive",
"epc_building_part",
"epc_flat_details",
)
# property -> {these direct dependents}, deleted after the epc children
_PROPERTY_DEPENDENTS = ("epc_property", "plan")
_INSERT_CHUNK = 4000 # 9 cols/row -> well under psycopg2's 65535-param limit
def _reset_portfolio(session: object, portfolio_id: int) -> int:
"""Delete a portfolio's properties and their NO ACTION dependency tree.
Returns the number of property rows deleted (property_overrides cascade).
"""
from sqlalchemy import text
pids = "SELECT id FROM property WHERE portfolio_id = :pid"
epc_ids = f"SELECT id FROM epc_property WHERE property_id IN ({pids})"
for table in _EPC_CHILD_TABLES:
session.execute( # type: ignore[attr-defined]
text(f"DELETE FROM {table} WHERE epc_property_id IN ({epc_ids})"),
{"pid": portfolio_id},
)
for table in _PROPERTY_DEPENDENTS:
session.execute( # type: ignore[attr-defined]
text(f"DELETE FROM {table} WHERE property_id IN ({pids})"),
{"pid": portfolio_id},
)
result = session.execute( # type: ignore[attr-defined]
text("DELETE FROM property WHERE portfolio_id = :pid"), {"pid": portfolio_id}
)
return result.rowcount
def clean_reload(
rows: list[dict[str, str]], portfolio_id: int, *, reset: bool
) -> tuple[int, int]:
"""Optionally wipe the portfolio, then chunk-insert rows. One transaction.
Returns (properties_deleted, properties_inserted).
"""
inserts: list[PropertyIdentityInsert] = [
BulkUploadFinaliserOrchestrator._row_to_insert(r, portfolio_id) for r in rows
]
engine = _engine()
session = make_session(engine)
deleted = 0
inserted = 0
try:
repo = PropertyPostgresRepository(session)
with commit_scope(session):
if reset:
deleted = _reset_portfolio(session, portfolio_id)
for start in range(0, len(inserts), _INSERT_CHUNK):
inserted += repo.insert_all(inserts[start : start + _INSERT_CHUNK])
finally:
session.close()
return deleted, inserted
def _engine():
load_dotenv(_REPO_ROOT / ".env")
return make_engine(PostgresConfig.from_env(os.environ))
def insert_rows(rows: list[dict[str, str]], portfolio_id: int) -> int:
"""Insert via the finaliser's mapper + repository. Returns rows inserted."""
inserts: list[PropertyIdentityInsert] = [
BulkUploadFinaliserOrchestrator._row_to_insert(r, portfolio_id) for r in rows
]
engine = _engine()
session = make_session(engine)
try:
repo = PropertyPostgresRepository(session)
with commit_scope(session):
inserted = repo.insert_all(inserts)
finally:
session.close()
return inserted
def fetch_by_ref(portfolio_id: int, ref: str) -> list[PropertyRow]:
"""Read back inserted rows for one Organisation Reference (for verification)."""
engine = _engine()
session = make_session(engine)
try:
stmt = select(PropertyRow).where(
PropertyRow.portfolio_id == portfolio_id,
PropertyRow.landlord_property_id == ref,
)
return list(session.exec(stmt).all())
finally:
session.close()
def _show(row: dict[str, str], insert: PropertyIdentityInsert) -> None:
print("\nSource (combiner) row:")
for k, v in row.items():
print(f" {k}: {v!r}")
print("\nMapped PropertyIdentityInsert:")
for k, v in insert.__dict__.items():
print(f" {k}: {v!r}")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--portfolio", type=int, required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--one", action="store_true", help="one random resolved row")
group.add_argument("--ref", help="a specific Organisation Reference")
group.add_argument("--all", action="store_true", help="every row")
parser.add_argument(
"--include-unmatched",
action="store_true",
help="also insert rows with no UPRN (null-UPRN property rows)",
)
parser.add_argument(
"--reset",
action="store_true",
help="(with --all) DELETE all properties in the portfolio first "
"(cascades property_overrides; clears plan/epc_property)",
)
parser.add_argument(
"--collisions",
type=Path,
default=_REPO_ROOT / "scripts" / "manipulation_collisions.csv",
help="where to write rows dropped as duplicate-UPRN collisions",
)
parser.add_argument("--seed", type=int, default=0, help="random seed for --one")
return parser.parse_args()
def main() -> int:
args = _parse_args()
df, rows = load_rows(args.inp, include_unmatched=args.include_unmatched)
print(f"Loaded {len(rows)} candidate rows from {args.inp}")
if args.all:
kept, dropped = dedupe_by_uprn(rows)
if dropped:
pd.DataFrame(dropped).to_csv(args.collisions, index=False)
print(
f"{len(dropped)} duplicate-UPRN rows dropped -> {args.collisions} "
f"({len(kept)} unique to insert)"
)
deleted, inserted = clean_reload(kept, args.portfolio, reset=args.reset)
if args.reset:
print(f"Deleted {deleted} existing properties in portfolio {args.portfolio}.")
print(f"Inserted {inserted} properties into portfolio {args.portfolio}.")
return 0
# Single-row paths: pick the row, show the mapping, insert, read back.
if args.ref:
match = [r for r in rows if r["Internal Reference"] == args.ref]
if not match:
print(f"No resolved row with Organisation Reference {args.ref!r}.")
return 1
row = match[0]
else: # --one: deterministic "random" pick via seed
idx = (args.seed * 7919) % len(rows)
row = rows[idx]
ref = row["Internal Reference"]
insert = BulkUploadFinaliserOrchestrator._row_to_insert(row, args.portfolio)
_show(row, insert)
inserted = insert_rows([row], args.portfolio)
print(
f"\ninsert_all -> {inserted} new row(s) "
f"(0 means it already existed; ON CONFLICT DO NOTHING)."
)
print(f"\nproperty rows for portfolio {args.portfolio}, ref {ref!r}:")
for pr in fetch_by_ref(args.portfolio, ref):
print(
f" id={pr.id} uprn={pr.uprn} address={pr.address!r} "
f"postcode={pr.postcode!r} status={pr.creation_status} "
f"lexiscore={pr.lexiscore}"
)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,159 @@
"""Tally the EPC schema versions across the hyde list (manipulation_filled UPRNs).
For every resolved UPRN we look up its EPC certificate's ``schemaType`` (e.g.
``RdSAP-Schema-21.0.1``, ``RdSAP-Schema-17.1``, ``SAP-Schema-16.2``). The
gov EPC ``/api/domestic/search`` endpoint returns ``schemaType`` per row, so one
search-per-postcode covers every UPRN in that postcode far cheaper than a
certificate fetch per UPRN. The latest cert (max registrationDate) wins per UPRN.
Outputs: a per-schema-version tally with one example UPRN each, plus a CSV
mapping every UPRN -> schema version.
python -m scripts.hyde_epc_schema_versions
python -m scripts.hyde_epc_schema_versions --workers 8 --out scripts/hyde_schema_versions.csv
Reads OPEN_EPC_API_TOKEN from backend/.env. Run from the worktree root.
"""
from __future__ import annotations
import argparse
import os
import sys
import time
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Optional
import httpx
from dotenv import load_dotenv
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from scripts.fill_domna_addresses import clean_postcode # noqa: E402
from scripts.finalise_to_property_table import load_rows # noqa: E402
_BASE = "https://api.get-energy-performance-data.communities.gov.uk"
_SEARCH = f"{_BASE}/api/domestic/search"
NOT_IN_EPC = "NOT_IN_EPC"
_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx"
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "hyde_schema_versions.csv"
def search_postcode(
client: httpx.Client, postcode: str, headers: dict[str, str]
) -> list[dict[str, Any]]:
"""Return the search rows for a postcode, retrying on rate-limit (429)."""
for attempt in range(5):
resp = client.get(_SEARCH, params={"postcode": postcode}, headers=headers, timeout=30)
if resp.status_code == 429:
retry_after = float(resp.headers.get("Retry-After", "2"))
time.sleep(min(retry_after, 10) * (attempt + 1))
continue
# 400 = malformed postcode (data-entry typo), 404 = no certs — skip both.
if resp.status_code in (400, 404):
return []
resp.raise_for_status()
return resp.json().get("data", [])
return []
def build_uprn_schema_map(
postcodes: list[str], token: str, workers: int
) -> dict[int, tuple[str, str]]:
"""Map UPRN -> (schemaType, registrationDate) for the latest cert per UPRN.
One search per postcode (concurrent); later we look our UPRNs up in here.
"""
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
by_uprn: dict[int, tuple[str, str]] = {}
done = 0
total = len(postcodes)
def fetch(pc: str) -> list[dict[str, Any]]:
with httpx.Client() as client:
return search_postcode(client, pc, headers)
with ThreadPoolExecutor(max_workers=workers) as pool:
for rows in pool.map(fetch, postcodes):
for row in rows:
uprn = row.get("uprn")
schema = row.get("schemaType")
reg = row.get("registrationDate") or ""
if uprn is None or not schema:
continue
prev = by_uprn.get(int(uprn))
# Keep the latest-registered cert's schema for this UPRN.
if prev is None or reg > prev[1]:
by_uprn[int(uprn)] = (str(schema), str(reg))
done += 1
if done % 250 == 0:
print(f" searched {done}/{total} postcodes, {len(by_uprn)} uprns seen")
return by_uprn
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
parser.add_argument("--workers", type=int, default=8)
return parser.parse_args()
def main() -> int:
args = _parse_args()
load_dotenv(_REPO_ROOT / "backend" / ".env")
token = os.environ.get("OPEN_EPC_API_TOKEN")
if not token:
print("OPEN_EPC_API_TOKEN not set (backend/.env)")
return 2
_, rows = load_rows(args.inp, include_unmatched=False)
pairs: list[tuple[int, str, str]] = [] # (uprn, postcode_clean, address)
for r in rows:
uprn = r["address2uprn_uprn"]
if uprn:
pairs.append((int(uprn), clean_postcode(r["postcode"]), r["address2uprn_address"]))
postcodes = sorted({pc for _, pc, _ in pairs if pc})
print(f"{len(pairs)} UPRNs across {len(postcodes)} unique postcodes")
by_uprn = build_uprn_schema_map(postcodes, token, args.workers)
print(f"EPC search returned schema for {len(by_uprn)} distinct UPRNs")
# Resolve each hyde UPRN to its schema version.
tally: Counter[str] = Counter()
example: dict[str, tuple[int, str]] = {}
out_lines: list[tuple[int, str, str, str]] = [] # uprn, schema, postcode, address
seen: set[int] = set()
for uprn, pc, address in pairs:
if uprn in seen:
continue
seen.add(uprn)
schema = by_uprn.get(uprn, (NOT_IN_EPC, ""))[0]
tally[schema] += 1
example.setdefault(schema, (uprn, address))
out_lines.append((uprn, schema, pc, address))
# Write the full per-UPRN mapping.
import csv
with args.out.open("w", newline="", encoding="utf-8") as fh:
w = csv.writer(fh)
w.writerow(["uprn", "schema_version", "postcode", "matched_address"])
w.writerows(out_lines)
print(f"\nSchema versions across {len(seen)} distinct UPRNs:\n")
print(f" {'schema version':<26} {'count':>7} example UPRN")
print(f" {'-'*26} {'-'*7} {'-'*12}")
for schema, count in tally.most_common():
ex_uprn, ex_addr = example[schema]
print(f" {schema:<26} {count:>7} {ex_uprn} ({ex_addr})")
print(f"\nFull mapping -> {args.out}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,169 @@
"""Compare our step-1 UPRN resolution against the old "Ara output" data.
The Ara data lives in scripts/lisasrequest/Durkan data.xlsx, sheet "Ara output",
and carries UPRNs from our previous dataset. It is NOT treated as ground truth
this just lines it up against what we found / didn't find so a human can eyeball
the differences. (We read the xlsx, not the CSV export: the CSV mangled half the
UPRNs to Excel scientific notation, e.g. ``1.00023E+11``; the xlsx keeps them
intact, so every comparison below is exact.)
Join key is (postcode, leading number, first street word), since the UPRN is the
thing under comparison and Ara's address strings differ from the landlord input.
Each of our rows lands in one comparison bucket:
match both found a UPRN and they are equal.
differ both found a UPRN and they differ.
we_only we resolved a UPRN, Ara had none for this address.
ara_only we did NOT resolve, but Ara had a UPRN <- recovery candidates.
both_missing neither resolved a UPRN.
no_ara_record the Ara sheet had no row matching this address at all.
python -m scripts.lisasrequest.compare_to_ara
"""
from __future__ import annotations
import argparse
import csv
import re
import sys
from collections import Counter, OrderedDict
from pathlib import Path
from typing import Optional
import pandas as pd
_REPO_ROOT = Path(__file__).resolve().parents[2]
ADDRESS_COL = "address"
POSTCODE_COL = "postcode"
OUR_UPRN_COL = "domna_address_uprn"
OUR_SOURCE_COL = "domna_source"
ARA_UPRN_COL = "EPC_B.uprn"
ARA_ADDRESS_COL = "EPC_B.address"
ARA_POSTCODE_COL = "EPC_B.postcode"
ARA_SHEET = "Ara output"
_OUR_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv"
_ARA_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "Durkan data.xlsx"
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_vs_ara.csv"
Key = tuple[str, str, str]
def norm_key(address: str, postcode: str) -> Key:
"""(postcode-no-space, leading number, first street word) — the join key."""
pc = postcode.upper().replace(" ", "")
upper = address.upper()
nums = re.findall(r"\d+[A-Z]?", upper)
words = [w for w in re.findall(r"[A-Z]+", upper) if w != "FLAT"]
return (pc, nums[0] if nums else "", words[0] if words else "")
def load_ara(path: Path) -> tuple[dict[Key, dict[str, str]], int]:
"""Index the Ara-output xlsx sheet by join key (first row wins).
Returns (index, duplicates). Read as strings so UPRNs keep their full value.
"""
df = pd.read_excel(path, sheet_name=ARA_SHEET, dtype=str)
rows: list[dict[str, str]] = df.fillna("").to_dict(orient="records")
index: dict[Key, dict[str, str]] = OrderedDict()
dupes = 0
for row in rows:
address = str(row.get(ARA_ADDRESS_COL) or "").strip()
postcode = str(row.get(ARA_POSTCODE_COL) or row.get(POSTCODE_COL) or "").strip()
if not address:
continue
key = norm_key(address, postcode)
if key in index:
dupes += 1
continue
index[key] = row
return index, dupes
def classify(
our_uprn: str, our_found: bool, ara: Optional[dict[str, str]]
) -> tuple[str, str, str]:
"""Return (comparison, ara_uprn, ara_address) for one of our rows."""
if ara is None:
return ("no_ara_record", "", "")
ara_uprn = (ara.get(ARA_UPRN_COL) or "").strip()
ara_address = (ara.get(ARA_ADDRESS_COL) or "").strip()
ara_found = bool(ara_uprn)
if our_found and ara_found:
comparison = "match" if our_uprn == ara_uprn else "differ"
elif our_found and not ara_found:
comparison = "we_only"
elif not our_found and ara_found:
comparison = "ara_only"
else:
comparison = "both_missing"
return (comparison, ara_uprn, ara_address)
def compare(
our_rows: list[dict[str, str]], ara_index: dict[Key, dict[str, str]]
) -> list[dict[str, str]]:
out: list[dict[str, str]] = []
for row in our_rows:
address = (row.get(ADDRESS_COL) or "").strip()
postcode = (row.get(POSTCODE_COL) or "").strip()
our_uprn = (row.get(OUR_UPRN_COL) or "").strip()
our_source = (row.get(OUR_SOURCE_COL) or "").strip()
our_found = bool(our_uprn) and our_source != "not_found"
ara = ara_index.get(norm_key(address, postcode))
comparison, ara_uprn, ara_address = classify(our_uprn, our_found, ara)
out.append(
{
"address": address,
"postcode": postcode,
"our_uprn": our_uprn,
"our_source": our_source,
"ara_uprn": ara_uprn,
"ara_address": ara_address,
"comparison": comparison,
}
)
return out
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--ours", type=Path, default=_OUR_IN)
parser.add_argument("--ara", type=Path, default=_ARA_IN)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
args = parser.parse_args()
with args.ours.open(newline="", encoding="utf-8-sig") as fh:
our_rows = [dict(r) for r in csv.DictReader(fh)]
ara_index, dupes = load_ara(args.ara)
print(f"Loaded {len(our_rows)} of our rows; {len(ara_index)} Ara keys "
f"({dupes} duplicate Ara rows ignored).")
result = compare(our_rows, ara_index)
fieldnames = list(result[0].keys())
with args.out.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(result)
counts = Counter(r["comparison"] for r in result)
print(f"\nComparison of {len(result)} rows -> {args.out}")
for name in (
"match",
"differ",
"we_only",
"ara_only",
"both_missing",
"no_ara_record",
):
print(f" {name}: {counts.get(name, 0)}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,142 @@
"""EPC SAP-schema check for portfolio 805, and whether each is mapper-supported.
For every UPRN currently in the ``property`` table for portfolio 805, look up its
latest EPC certificate's ``schemaType`` (one /api/domestic/search per postcode,
reusing scripts.hyde_epc_schema_versions) and check it against the schemas the
EpcPropertyData mapper actually handles
(``EpcPropertyDataMapper.from_api_response``, datatypes/epc/domain/mapper.py).
Prints a per-schema tally with a supported? flag and an example UPRN, and writes
the full per-UPRN mapping to durkan_805_schema_check.csv.
python -m scripts.lisasrequest.durkan_805_schema_check
python -m scripts.lisasrequest.durkan_805_schema_check --portfolio 805 --workers 8
Reads OPEN_EPC_API_TOKEN from backend/.env and POSTGRES_* from the root .env.
Run from the worktree root.
"""
from __future__ import annotations
import argparse
import csv
import os
import sys
from collections import Counter
from pathlib import Path
from dotenv import load_dotenv
from sqlmodel import select
_REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from infrastructure.postgres.config import PostgresConfig # noqa: E402
from infrastructure.postgres.engine import make_engine, make_session # noqa: E402
from infrastructure.postgres.property_table import PropertyRow # noqa: E402
from scripts.fill_domna_addresses import clean_postcode # noqa: E402
from scripts.hyde_epc_schema_versions import ( # noqa: E402
NOT_IN_EPC,
build_uprn_schema_map,
)
# Schemas EpcPropertyDataMapper.from_api_response dispatches on (everything else
# raises "Unsupported EPC schema"). Keep in sync with mapper.py:2539-2603.
SUPPORTED_SCHEMAS = frozenset(
{
"RdSAP-Schema-17.0",
"RdSAP-Schema-17.1",
"RdSAP-Schema-18.0",
"RdSAP-Schema-19.0",
"RdSAP-Schema-20.0.0",
"RdSAP-Schema-21.0.0",
"RdSAP-Schema-21.0.1",
"SAP-Schema-16.0",
"SAP-Schema-16.2",
"SAP-Schema-16.3",
"SAP-Schema-17.0",
"SAP-Schema-17.1",
"SAP-Schema-18.0.0",
}
)
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_805_schema_check.csv"
def load_portfolio_uprns(portfolio_id: int) -> list[tuple[int, str]]:
"""Return (uprn, postcode) for every property in the portfolio with a UPRN."""
load_dotenv(_REPO_ROOT / ".env")
engine = make_engine(PostgresConfig.from_env(os.environ))
session = make_session(engine)
try:
stmt = select(PropertyRow.uprn, PropertyRow.postcode).where(
PropertyRow.portfolio_id == portfolio_id
)
out: list[tuple[int, str]] = []
for uprn, postcode in session.exec(stmt).all():
if uprn is not None:
out.append((int(uprn), str(postcode or "")))
return out
finally:
session.close()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--portfolio", type=int, default=805)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
parser.add_argument("--workers", type=int, default=8)
args = parser.parse_args()
load_dotenv(_REPO_ROOT / "backend" / ".env")
token = os.environ.get("OPEN_EPC_API_TOKEN")
if not token:
print("OPEN_EPC_API_TOKEN not set (backend/.env)")
return 2
pairs = load_portfolio_uprns(args.portfolio)
postcodes = sorted({clean_postcode(pc) for _, pc in pairs if pc})
print(
f"Portfolio {args.portfolio}: {len(pairs)} UPRNs across "
f"{len(postcodes)} unique postcodes"
)
by_uprn = build_uprn_schema_map(postcodes, token, args.workers)
print(f"EPC search returned a schema for {len(by_uprn)} distinct UPRNs")
tally: Counter[str] = Counter()
example: dict[str, int] = {}
rows_out: list[tuple[int, str, str, str]] = [] # uprn, schema, supported, postcode
seen: set[int] = set()
for uprn, pc in pairs:
if uprn in seen:
continue
seen.add(uprn)
schema = by_uprn.get(uprn, (NOT_IN_EPC, ""))[0]
supported = "yes" if schema in SUPPORTED_SCHEMAS else "no"
tally[schema] += 1
example.setdefault(schema, uprn)
rows_out.append((uprn, schema, supported, clean_postcode(pc)))
with args.out.open("w", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow(["uprn", "schema_version", "mapper_supported", "postcode"])
writer.writerows(rows_out)
supported_count = sum(c for s, c in tally.items() if s in SUPPORTED_SCHEMAS)
print(f"\nSchema versions across {len(seen)} distinct UPRNs in portfolio "
f"{args.portfolio}:\n")
print(f" {'schema version':<26} {'count':>5} {'supported?':<10} example UPRN")
print(f" {'-' * 26} {'-' * 5} {'-' * 10} {'-' * 12}")
for schema, count in tally.most_common():
supported = "yes" if schema in SUPPORTED_SCHEMAS else "NO"
print(f" {schema:<26} {count:>5} {supported:<10} {example[schema]}")
print(
f"\nMapper-supported: {supported_count}/{len(seen)} UPRNs. "
f"Full mapping -> {args.out}"
)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,200 @@
"""Step 1 (Durkan portfolio): resolve a UPRN per CSV row via EPC then OS.
Input: scripts/lisasrequest/260611_Sample_Seed_Portfolio_Durkan_split_addresses(Split Addresses).csv
columns include ``address`` and ``postcode``.
Every row carries an address and none carry a UPRN, so there is a single case:
* resolve a UPRN from ``address`` + ``postcode`` via the EPC API (relaxed
address variants, threshold 0.7), then Ordnance Survey Places as a fallback
(threshold 0.6).
* not resolvable -> domna_source = "not_found"; uprn/address/score left empty.
Writes a NEW CSV = every original column, in order, plus four DOMNA columns:
domna_address_found the canonical address EPC/OS returned (matched string)
domna_address_uprn the resolved UPRN ("" when unresolved)
domna_lexiscore the match score in [0, 1] ("" when unresolved)
domna_source epc / ordnance_survey / not_found
This is the human-review file; step 2 (resolve_uprns_for_finaliser) reshapes it
into the finaliser columns without re-hitting the APIs.
python -m scripts.lisasrequest.fill_domna_address
python -m scripts.lisasrequest.fill_domna_address --limit 20 # smoke test
Resolution reuses the relaxed matchers from scripts.fill_domna_addresses. Keys
come from backend/.env (OPEN_EPC_API_TOKEN, ORDNANCE_SURVEY_API_KEY). Run from
the worktree root (import trap).
"""
from __future__ import annotations
import argparse
import csv
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
_REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from scripts.fill_domna_addresses import ( # noqa: E402
Hit,
resolve_epc_relaxed,
resolve_os_relaxed,
)
from scripts.resolve_uprns_for_finaliser import clean_postcode, load_keys # noqa: E402
ADDRESS_COL = "address"
POSTCODE_COL = "postcode"
FOUND_ADDRESS_COL = "domna_address_found"
FOUND_UPRN_COL = "domna_address_uprn"
LEXISCORE_COL = "domna_lexiscore"
SOURCE_COL = "domna_source"
NOT_FOUND = "not_found"
_RESULT_COLS = (FOUND_ADDRESS_COL, FOUND_UPRN_COL, LEXISCORE_COL, SOURCE_COL)
_CSV_NAME = "260611_Sample_Seed_Portfolio_Durkan_split_addresses(Split Addresses).csv"
_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / _CSV_NAME
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv"
def read_rows(path: Path) -> tuple[list[dict[str, str]], list[str]]:
"""Read a CSV into (rows, fieldnames), preserving column order."""
with path.open(newline="", encoding="utf-8-sig") as fh:
reader = csv.DictReader(fh)
fieldnames = list(reader.fieldnames or [])
rows = [dict(row) for row in reader]
return rows, fieldnames
def resolve_one(
address: str,
postcode_raw: str,
*,
epc_token: Optional[str],
os_api_key: Optional[str],
epc_cache: dict[str, pd.DataFrame],
os_cache: dict[str, pd.DataFrame],
) -> Optional[Hit]:
"""Resolve one row's UPRN: EPC (relaxed) first, then OS Places fallback."""
postcode_clean = clean_postcode(postcode_raw)
if not address or not postcode_clean:
return None
hit: Optional[Hit] = None
if epc_token:
try:
hit = resolve_epc_relaxed(address, postcode_clean, epc_cache)
except Exception as exc:
print(f" EPC failed {address!r} / {postcode_clean}: {exc}")
if hit is None and os_api_key:
try:
hit = resolve_os_relaxed(address, postcode_clean, os_api_key, os_cache)
except Exception as exc:
print(f" OS failed {address!r} / {postcode_clean}: {exc}")
return hit
def fill(
rows: list[dict[str, str]],
*,
epc_token: Optional[str],
os_api_key: Optional[str],
) -> tuple[int, int, int]:
"""Fill the DOMNA columns on each row in place.
Returns (epc_hits, os_hits, not_found) counts.
"""
epc_cache: dict[str, pd.DataFrame] = {}
os_cache: dict[str, pd.DataFrame] = {}
epc_hits = os_hits = not_found = 0
total = len(rows)
for n, row in enumerate(rows, start=1):
address = str(row.get(ADDRESS_COL, "") or "").strip()
postcode_raw = str(row.get(POSTCODE_COL, "") or "").strip()
hit = resolve_one(
address,
postcode_raw,
epc_token=epc_token,
os_api_key=os_api_key,
epc_cache=epc_cache,
os_cache=os_cache,
)
if hit is None:
row[FOUND_ADDRESS_COL] = ""
row[FOUND_UPRN_COL] = ""
row[LEXISCORE_COL] = ""
row[SOURCE_COL] = NOT_FOUND
not_found += 1
else:
uprn, matched, score, source = hit
row[FOUND_ADDRESS_COL] = matched
row[FOUND_UPRN_COL] = uprn
row[LEXISCORE_COL] = str(round(score, 4))
row[SOURCE_COL] = source
if source == "epc":
epc_hits += 1
else:
os_hits += 1
print(
f"[{n}/{total}] {address!r} -> "
f"{row[FOUND_UPRN_COL] or '(no match)'} ({row[SOURCE_COL]})"
)
return epc_hits, os_hits, not_found
def write_rows(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None:
"""Write rows to CSV, preserving input columns and appending DOMNA columns."""
out_fields = list(fieldnames)
for col in _RESULT_COLS:
if col not in out_fields:
out_fields.append(col)
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=out_fields, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
parser.add_argument("--limit", type=int, default=None, help="process first N rows")
return parser.parse_args()
def main() -> int:
args = _parse_args()
epc_token, os_api_key = load_keys()
if not epc_token:
print("OPEN_EPC_API_TOKEN not set (backend/.env) — EPC resolution disabled")
if not os_api_key:
print("ORDNANCE_SURVEY_API_KEY not set (backend/.env) — OS fallback disabled")
rows, fieldnames = read_rows(args.inp)
if args.limit is not None:
rows = rows[: args.limit]
print(f"Loaded {len(rows)} rows from {args.inp}")
epc_hits, os_hits, not_found = fill(
rows, epc_token=epc_token, os_api_key=os_api_key
)
write_rows(rows, args.out, fieldnames)
resolved = epc_hits + os_hits
print(
f"\nResolved {resolved}/{len(rows)} "
f"(epc={epc_hits}, ordnance_survey={os_hits}); {not_found} not found."
)
print(f"Wrote filled CSV -> {args.out}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,111 @@
"""Step 3 (Durkan portfolio): insert the reshaped rows into the ``property`` table.
Reads durkan_finaliser_input.csv (step 2) and, per row, maps it with the real
finaliser mapper (``BulkUploadFinaliserOrchestrator._row_to_insert``) and inserts
via the same ``PropertyPostgresRepository.insert_all`` the Lambda uses so a row
written here is identical to one the production finaliser would write. Insert is
ON CONFLICT (portfolio_id, uprn) DO NOTHING, so re-running is safe.
DRY RUN BY DEFAULT it dedupes, reports, and writes the collisions file but does
NOT touch the database. Add --commit to actually insert.
# preview only (no DB writes): dedupe + mapping report
python -m scripts.lisasrequest.finalise_to_property_table --portfolio 805
# actually insert
python -m scripts.lisasrequest.finalise_to_property_table --portfolio 805 --commit
Postgres target comes from the root .env (POSTGRES_*). Run from the worktree root.
"""
from __future__ import annotations
import argparse
import csv
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from scripts.finalise_to_property_table import ( # noqa: E402
dedupe_by_uprn,
insert_rows,
)
_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_input.csv"
_DEFAULT_COLLISIONS = (
_REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_collisions.csv"
)
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
POSTCODE_COL = "postcode"
LEXISCORE_COL = "address2uprn_lexiscore"
def read_rows(path: Path) -> list[dict[str, str]]:
with path.open(newline="", encoding="utf-8-sig") as fh:
return [dict(row) for row in csv.DictReader(fh)]
def _preview(rows: list[dict[str, str]]) -> None:
"""Show the first few rows as they will be inserted (no DB, no mapper call).
The finalise step applies the standard finaliser mapper
(BulkUploadFinaliserOrchestrator) on insert; the fields below are its inputs.
"""
print("\nSample rows to insert (uprn | matched address | postcode | lexiscore):")
for row in rows[:3]:
print(
f" {row.get(UPRN_COL)} | {row.get(MATCHED_ADDRESS_COL)!r} | "
f"{row.get(POSTCODE_COL)!r} | {row.get(LEXISCORE_COL)}"
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--portfolio", type=int, required=True)
parser.add_argument(
"--commit",
action="store_true",
help="actually insert into property (default is a dry-run preview)",
)
parser.add_argument("--collisions", type=Path, default=_DEFAULT_COLLISIONS)
args = parser.parse_args()
rows = read_rows(args.inp)
print(f"Loaded {len(rows)} finaliser rows from {args.inp}")
kept, dropped = dedupe_by_uprn(rows)
if dropped:
with args.collisions.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=list(dropped[0].keys()))
writer.writeheader()
writer.writerows(dropped)
print(
f"{len(dropped)} duplicate-UPRN rows dropped -> {args.collisions} "
f"({len(kept)} unique to insert)"
)
else:
print(f"No duplicate-UPRN collisions; {len(kept)} unique rows to insert.")
_preview(kept)
if not args.commit:
print(
f"\nDRY RUN — nothing written. {len(kept)} rows would be inserted into "
f"portfolio {args.portfolio}. Re-run with --commit to write."
)
return 0
inserted = insert_rows(kept, args.portfolio)
print(
f"\nInserted {inserted} new properties into portfolio {args.portfolio} "
f"({len(kept) - inserted} already existed; ON CONFLICT DO NOTHING)."
)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,212 @@
"""Step 2 (Durkan portfolio): split step-1 matches, reshape the confident ones.
Reads durkan_domna_filled.csv (step 1) and SPLITS it in two no re-resolution,
just column work:
* Rows we cannot confidently insert are held back to a client-clarification CSV
(durkan_client_clarification.csv) for Khalim to take to the client. Reasons:
not_found_no_match no UPRN was resolved.
no_flat_level_uprn a block of flats all collapsed onto one building
UPRN OS/EPC carry no flat-level records, so we
can't tell the flats apart.
unit_number_mismatch the matched house number differs from the input
(e.g. "9 ..." matched "9A ..."), so the property is
ambiguous.
* Every remaining row is reshaped into the columns the finaliser reads
(bulk_upload_finaliser_orchestrator), written to durkan_finaliser_input.csv
ready for step 3:
Address 1/2/3 | postcode | Internal Reference | address2uprn_uprn
| address2uprn_address | address2uprn_lexiscore
Internal Reference is left blank (landlord_property_id null, by decision).
python -m scripts.lisasrequest.resolve_uprns_for_finaliser
This stage hits no APIs. The held rows are not lost once the client confirms
them they can be appended to the finaliser input by hand.
"""
from __future__ import annotations
import argparse
import csv
import sys
from collections import Counter
from pathlib import Path
from typing import Optional
_REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from scripts.lisasrequest.fill_domna_address import ( # noqa: E402
ADDRESS_COL,
FOUND_ADDRESS_COL,
FOUND_UPRN_COL,
LEXISCORE_COL,
POSTCODE_COL,
SOURCE_COL,
)
from scripts.lisasrequest.review_flags import address_numbers, input_unit # noqa: E402
# Finaliser input columns — must match bulk_upload_finaliser_orchestrator
# (ADDRESS_COLS / POSTCODE_COL / INTERNAL_REF_COL / UPRN_COL /
# MATCHED_ADDRESS_COL / LEXISCORE_COL). Hard-coded to keep this a light,
# stdlib-only reshape; step 3 imports the real orchestrator and will fail loudly
# if these ever drift.
FIN_ADDRESS_1, FIN_ADDRESS_2, FIN_ADDRESS_3 = "Address 1", "Address 2", "Address 3"
FIN_POSTCODE = "postcode"
FIN_INTERNAL_REF = "Internal Reference"
FIN_UPRN = "address2uprn_uprn"
FIN_MATCHED_ADDRESS = "address2uprn_address"
FIN_LEXISCORE = "address2uprn_lexiscore"
_FINALISER_COLS = [
FIN_ADDRESS_1,
FIN_ADDRESS_2,
FIN_ADDRESS_3,
FIN_POSTCODE,
FIN_INTERNAL_REF,
FIN_UPRN,
FIN_MATCHED_ADDRESS,
FIN_LEXISCORE,
]
# Client-clarification report columns (kept human-readable for the client).
CONTEXT_COLS = ["address", "postcode", "No.", "Address Block"]
DOMNA_COLS = [FOUND_ADDRESS_COL, FOUND_UPRN_COL, LEXISCORE_COL, SOURCE_COL]
REASON_COL = "clarification_reason"
ACTION_COL = "action_needed"
_CLARIFY_COLS = CONTEXT_COLS + DOMNA_COLS + [REASON_COL, ACTION_COL]
_REASON_ORDER = {
"not_found_no_match": 0,
"no_flat_level_uprn": 1,
"unit_number_mismatch": 2,
}
_REASON_ACTION = {
"not_found_no_match": "No UPRN found for this address — please confirm the "
"exact address or provide the UPRN.",
"no_flat_level_uprn": "Address registers hold only the building, not the "
"individual flats — please provide a UPRN per flat, or confirm a "
"building-level record is acceptable.",
"unit_number_mismatch": "Closest match has a different unit number (see "
"domna_address_found) — please confirm the correct property / UPRN.",
}
_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv"
_DEFAULT_FINALISER = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_input.csv"
_DEFAULT_CLARIFY = (
_REPO_ROOT / "scripts" / "lisasrequest" / "durkan_client_clarification.csv"
)
def read_rows(path: Path) -> list[dict[str, str]]:
with path.open(newline="", encoding="utf-8-sig") as fh:
return [dict(row) for row in csv.DictReader(fh)]
def clarification_reason(
row: dict[str, str], uprn_counts: Counter[str]
) -> Optional[str]:
"""Why this row can't be inserted yet, or None if it's safe to finalise."""
uprn = row.get(FOUND_UPRN_COL, "")
if row.get(SOURCE_COL) == "not_found" or not uprn:
return "not_found_no_match"
unit = input_unit(row.get(ADDRESS_COL, ""))
unit_missing = bool(unit) and unit not in address_numbers(
row.get(FOUND_ADDRESS_COL, "")
)
duplicate = uprn_counts[uprn] > 1
if unit_missing:
return "no_flat_level_uprn" if duplicate else "unit_number_mismatch"
if duplicate:
# A shared UPRN with the right unit number still collides at finalise.
return "no_flat_level_uprn"
return None
def to_finaliser_row(row: dict[str, str]) -> dict[str, str]:
"""Rename a confident step-1 row into the finaliser's input columns."""
return {
FIN_ADDRESS_1: row.get(ADDRESS_COL, ""),
FIN_ADDRESS_2: "",
FIN_ADDRESS_3: "",
FIN_POSTCODE: row.get(POSTCODE_COL, ""),
FIN_INTERNAL_REF: "", # landlord_property_id null, by decision
FIN_UPRN: row.get(FOUND_UPRN_COL, ""),
FIN_MATCHED_ADDRESS: row.get(FOUND_ADDRESS_COL, ""),
FIN_LEXISCORE: row.get(LEXISCORE_COL, ""),
}
def to_clarify_row(row: dict[str, str], reason: str) -> dict[str, str]:
out = {col: row.get(col, "") for col in CONTEXT_COLS + DOMNA_COLS}
out[REASON_COL] = reason
out[ACTION_COL] = _REASON_ACTION[reason]
return out
def split(
rows: list[dict[str, str]],
*,
accept_unit_mismatch: bool = False,
) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
"""Return (finaliser_rows, clarification_rows).
``accept_unit_mismatch`` reshapes the ``unit_number_mismatch`` rows (a
near-miss like 9 -> 9A the client has already confirmed) into the finaliser
input instead of holding them back.
"""
uprn_counts: Counter[str] = Counter(
r.get(FOUND_UPRN_COL, "") for r in rows if r.get(FOUND_UPRN_COL)
)
finaliser: list[dict[str, str]] = []
clarify: list[dict[str, str]] = []
for row in rows:
reason = clarification_reason(row, uprn_counts)
if reason is None or (
accept_unit_mismatch and reason == "unit_number_mismatch"
):
finaliser.append(to_finaliser_row(row))
else:
clarify.append(to_clarify_row(row, reason))
clarify.sort(key=lambda r: _REASON_ORDER.get(r[REASON_COL], 9))
return finaliser, clarify
def write_csv(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None:
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--finaliser-out", type=Path, default=_DEFAULT_FINALISER)
parser.add_argument("--clarify-out", type=Path, default=_DEFAULT_CLARIFY)
parser.add_argument(
"--accept-unit-mismatch",
action="store_true",
help="reshape unit_number_mismatch rows (e.g. 9->9A) into the finaliser "
"input instead of holding them for the client",
)
args = parser.parse_args()
rows = read_rows(args.inp)
finaliser, clarify = split(rows, accept_unit_mismatch=args.accept_unit_mismatch)
write_csv(finaliser, args.finaliser_out, _FINALISER_COLS)
write_csv(clarify, args.clarify_out, _CLARIFY_COLS)
counts = Counter(r[REASON_COL] for r in clarify)
print(f"Read {len(rows)} step-1 rows.")
print(f" -> {len(finaliser)} confident rows reshaped -> {args.finaliser_out}")
print(f" -> {len(clarify)} held for client -> {args.clarify_out}")
for reason in _REASON_ORDER:
print(f" {reason}: {counts.get(reason, 0)}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,135 @@
"""Flag step-1 matches that need a human eye, for review before finalising.
Reads durkan_domna_filled.csv (the step-1 output) and writes a review CSV of
only the rows carrying at least one flag, newest-doubt-first:
not_found no UPRN resolved at all.
unit_not_in_match the input flat/house number does NOT appear in the matched
address the high-precision "wrong property" signal. Two
shapes: a near-miss ("9 VANBRUGH" matched "9A, VANBRUGH")
or a flat collapsing onto its building ("FLAT 1, 20 WARWICK"
matched "20, WARWICK ROAD").
dup_uprn the same UPRN was resolved for >1 input row typically a
block of flats all collapsing onto the building UPRN; all
but one will be dropped at finalise.
low_score lexiscore < 0.70 (a weak match, just over the OS bar). NOTE:
on its own this is noisy truncated EPC addresses and extra
locality tokens push correct matches below 0.70. Treat it as
informational unless paired with one of the flags above.
python -m scripts.lisasrequest.review_flags
"""
from __future__ import annotations
import argparse
import csv
import re
import sys
from collections import Counter
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[2]
ADDRESS_COL = "address"
POSTCODE_COL = "postcode"
FOUND_ADDRESS_COL = "domna_address_found"
FOUND_UPRN_COL = "domna_address_uprn"
LEXISCORE_COL = "domna_lexiscore"
SOURCE_COL = "domna_source"
LOW_SCORE = 0.70
_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv"
_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_review_flags.csv"
_REVIEW_COLS = [
ADDRESS_COL,
POSTCODE_COL,
FOUND_ADDRESS_COL,
FOUND_UPRN_COL,
LEXISCORE_COL,
SOURCE_COL,
"flags",
]
def input_unit(address: str) -> str:
"""The salient unit number of an input address: the FLAT number if present,
else the leading house number ("" if neither). Upper-cased."""
upper = address.upper()
flat = re.search(r"\bFLAT\s+(\d+[A-Z]?)", upper)
if flat:
return flat.group(1)
lead = re.match(r"\s*(\d+[A-Z]?)\b", upper)
return lead.group(1) if lead else ""
def address_numbers(address: str) -> set[str]:
"""All standalone number tokens in an address (e.g. {"3", "20"}). Upper-cased."""
return set(re.findall(r"\b\d+[A-Z]?\b", address.upper()))
def _score(value: str) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0
def flag_rows(rows: list[dict[str, str]]) -> list[dict[str, str]]:
"""Return the flagged subset, each with a ';'-joined ``flags`` field."""
uprn_counts = Counter(
r.get(FOUND_UPRN_COL, "") for r in rows if r.get(FOUND_UPRN_COL)
)
flagged: list[dict[str, str]] = []
for row in rows:
uprn = row.get(FOUND_UPRN_COL, "")
source = row.get(SOURCE_COL, "")
flags: list[str] = []
if source == "not_found" or not uprn:
flags.append("not_found")
else:
unit = input_unit(row.get(ADDRESS_COL, ""))
if unit and unit not in address_numbers(row.get(FOUND_ADDRESS_COL, "")):
flags.append("unit_not_in_match")
if uprn_counts[uprn] > 1:
flags.append("dup_uprn")
if _score(row.get(LEXISCORE_COL, "")) < LOW_SCORE:
flags.append("low_score")
if flags:
flagged.append({**{c: row.get(c, "") for c in _REVIEW_COLS[:-1]},
"flags": ";".join(flags)})
# not_found first, then mismatches, then dup/low.
order = {"not_found": 0, "unit_not_in_match": 1, "dup_uprn": 2, "low_score": 3}
flagged.sort(key=lambda r: order.get(r["flags"].split(";")[0], 9))
return flagged
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN)
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT)
args = parser.parse_args()
with args.inp.open(newline="", encoding="utf-8-sig") as fh:
rows = [dict(r) for r in csv.DictReader(fh)]
flagged = flag_rows(rows)
with args.out.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=_REVIEW_COLS, extrasaction="ignore")
writer.writeheader()
writer.writerows(flagged)
counts = Counter(f for r in flagged for f in r["flags"].split(";"))
print(f"{len(flagged)}/{len(rows)} rows flagged for review -> {args.out}")
for name in ("not_found", "unit_not_in_match", "dup_uprn", "low_score"):
print(f" {name}: {counts.get(name, 0)}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,328 @@
"""Resolve a CSV of addresses to UPRNs, ready to feed the bulk-upload finaliser.
Takes a CSV with `Address 1/2/3` + `postcode` columns and, per row, resolves a
UPRN by trying in order the new EPC API (address2uprn), the historic EPC S3
dataset, then the Ordnance Survey Places API as a fallback. Whichever source
wins, the result is written into the SAME three columns the finaliser reads
(`bulk_upload_finaliser_orchestrator`):
address2uprn_uprn UPRN integer (empty when unresolved)
address2uprn_address the matched address
address2uprn_lexiscore the match score in [0, 1]
A `resolution_source` diagnostic column (epc / epc_historic / ordnance_survey /
none) is appended too the finaliser ignores unknown columns. All original
columns are preserved in their original order, so the output CSV drops straight
into the finaliser.
python -m scripts.resolve_uprns_for_finaliser input.csv -o resolved.csv
# OS-only / EPC-only, custom postcode column, custom OS score threshold
python -m scripts.resolve_uprns_for_finaliser in.csv -o out.csv --no-epc
python -m scripts.resolve_uprns_for_finaliser in.csv -o out.csv --postcode-col Postcode --os-threshold 0.6
Keys are read from backend/.env: OPEN_EPC_API_TOKEN (EPC) and
ORDNANCE_SURVEY_API_KEY (OS Places). Run from the worktree root (import trap).
The module-level functions (`load_keys`, `read_rows`, `resolve_row`, `process`,
`write_rows`) are written to be driven line-by-line from a REPL as well as via
the CLI.
"""
from __future__ import annotations
import argparse
import csv
import os
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
from dotenv import load_dotenv
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from backend.address2UPRN.main import ( # noqa: E402
get_epc_data_with_postcode,
get_uprn_from_historic_epc,
get_uprn_with_epc_df,
)
from backend.ordnanceSurvey.helpers import ( # noqa: E402
lookup_os_places,
os_places_results_to_dataframe,
)
from backend.utils.addressMatch import AddressMatch # noqa: E402
# Columns the finaliser reads (bulk_upload_finaliser_orchestrator).
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
SOURCE_COL = "resolution_source"
_RESULT_COLS = (UPRN_COL, MATCHED_ADDRESS_COL, LEXISCORE_COL, SOURCE_COL)
# A resolved hit: (uprn, matched_address, lexiscore, source).
Resolution = tuple[str, str, float, str]
def load_keys() -> tuple[Optional[str], Optional[str]]:
"""Load (epc_token, os_api_key) from backend/.env (and the process env)."""
load_dotenv(_REPO_ROOT / "backend" / ".env")
epc_token = os.environ.get("OPEN_EPC_API_TOKEN")
os_api_key = os.environ.get("ORDNANCE_SURVEY_API_KEY")
return epc_token, os_api_key
def read_rows(path: Path) -> tuple[list[dict[str, str]], list[str]]:
"""Read a CSV into (rows, fieldnames). Preserves column order."""
with path.open(newline="", encoding="utf-8-sig") as fh:
reader = csv.DictReader(fh)
fieldnames = list(reader.fieldnames or [])
rows = [dict(row) for row in reader]
return rows, fieldnames
def clean_postcode(postcode: str) -> str:
"""Sanitise to the no-space upper form the EPC/OS lookups expect (e.g. E84SQ)."""
return postcode.upper().replace(" ", "").strip()
def build_address(row: dict[str, str]) -> str:
"""Concatenate Address 1/2/3 the same way the address2uprn lambda does."""
return " ".join(
str(row.get(col, "") or "").strip() for col in ("Address 1", "Address 2", "Address 3")
).strip()
def resolve_epc(
address: str, postcode_clean: str, epc_cache: dict[str, pd.DataFrame]
) -> Optional[Resolution]:
"""Resolve via the new EPC API (cached per postcode), then historic EPC S3.
`epc_cache` is mutated to memoise one EPC API call per postcode pass the
same dict across rows so a postcode is only fetched once.
"""
epc_df = epc_cache.get(postcode_clean)
if epc_df is None:
epc_df = get_epc_data_with_postcode(postcode=postcode_clean)
epc_cache[postcode_clean] = epc_df
result = get_uprn_with_epc_df(
user_inputed_address=address, epc_df=epc_df, verbose=True
)
if isinstance(result, tuple):
uprn, matched, score = result
return str(uprn), str(matched), float(score), "epc"
historic = get_uprn_from_historic_epc(
user_inputed_address=address, postcode=postcode_clean
)
if historic is not None:
uprn, matched, score = historic
return str(uprn), str(matched), float(score), "epc_historic"
return None
def resolve_os(
address: str,
postcode_clean: str,
os_api_key: str,
os_cache: dict[str, pd.DataFrame],
threshold: float,
) -> Optional[Resolution]:
"""Resolve via the OS Places API: best-scoring address above `threshold`.
`os_cache` memoises one OS Places call per postcode.
"""
places_df = os_cache.get(postcode_clean)
if places_df is None:
response = lookup_os_places(postcode_clean, os_api_key)
if response.get("status") != 200 or "data" not in response:
places_df = pd.DataFrame()
else:
places_df = os_places_results_to_dataframe(response["data"])
os_cache[postcode_clean] = places_df
if places_df.empty or "ADDRESS" not in places_df.columns:
return None
# Iterate plain records — avoids pandas' partially-unknown indexing types.
records: list[dict[str, object]] = places_df.to_dict(orient="records")
best: Optional[Resolution] = None
for rec in records:
candidate = str(rec.get("ADDRESS", ""))
score = AddressMatch.score(address, candidate)
if score >= threshold and (best is None or score > best[2]):
best = (str(rec.get("UPRN", "")), candidate, score, "ordnance_survey")
return best
def resolve_row(
row: dict[str, str],
*,
epc_token: Optional[str],
os_api_key: Optional[str],
epc_cache: dict[str, pd.DataFrame],
os_cache: dict[str, pd.DataFrame],
postcode_col: str,
use_epc: bool,
use_os: bool,
os_threshold: float,
validate_postcode: bool,
) -> dict[str, str]:
"""Resolve one row in place and return it with the finaliser columns filled.
Tries EPC (new + historic) first, then OS Places. On no match the three
result columns are written empty and `resolution_source` is "none".
"""
address = build_address(row)
postcode_clean = clean_postcode(str(row.get(postcode_col, "") or ""))
def write(res: Optional[Resolution]) -> dict[str, str]:
if res is None:
row[UPRN_COL] = ""
row[MATCHED_ADDRESS_COL] = ""
row[LEXISCORE_COL] = ""
row[SOURCE_COL] = "none"
else:
uprn, matched, score, source = res
row[UPRN_COL] = uprn
row[MATCHED_ADDRESS_COL] = matched
row[LEXISCORE_COL] = str(score)
row[SOURCE_COL] = source
return row
if not address or not postcode_clean:
return write(None)
if validate_postcode and not AddressMatch.is_valid_postcode(postcode_clean):
return write(None)
if use_epc and epc_token:
try:
res = resolve_epc(address, postcode_clean, epc_cache)
if res is not None:
return write(res)
except Exception as exc: # keep going on a per-row API/lookup failure
print(f" EPC lookup failed for {address!r} / {postcode_clean}: {exc}")
if use_os and os_api_key:
try:
res = resolve_os(address, postcode_clean, os_api_key, os_cache, os_threshold)
if res is not None:
return write(res)
except Exception as exc:
print(f" OS lookup failed for {address!r} / {postcode_clean}: {exc}")
return write(None)
def process(
rows: list[dict[str, str]],
*,
epc_token: Optional[str],
os_api_key: Optional[str],
postcode_col: str = "postcode",
use_epc: bool = True,
use_os: bool = True,
os_threshold: float = 0.5,
validate_postcode: bool = True,
) -> list[dict[str, str]]:
"""Resolve every row, printing a per-row line so REPL/CLI progress is visible."""
epc_cache: dict[str, pd.DataFrame] = {}
os_cache: dict[str, pd.DataFrame] = {}
for i, row in enumerate(rows, start=1):
resolve_row(
row,
epc_token=epc_token,
os_api_key=os_api_key,
epc_cache=epc_cache,
os_cache=os_cache,
postcode_col=postcode_col,
use_epc=use_epc,
use_os=use_os,
os_threshold=os_threshold,
validate_postcode=validate_postcode,
)
print(
f"[{i}/{len(rows)}] {build_address(row)!r} -> "
f"{row[UPRN_COL] or '(no match)'} ({row[SOURCE_COL]})"
)
return rows
def write_rows(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None:
"""Write rows to CSV, preserving input columns and appending the result columns."""
out_fields = list(fieldnames)
for col in _RESULT_COLS:
if col not in out_fields:
out_fields.append(col)
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=out_fields, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("input", type=Path, help="input CSV (Address 1/2/3 + postcode)")
parser.add_argument(
"-o", "--out", type=Path, required=True, help="output CSV for the finaliser"
)
parser.add_argument("--postcode-col", default="postcode", help="postcode column name")
parser.add_argument("--no-epc", action="store_true", help="skip EPC resolution")
parser.add_argument("--no-os", action="store_true", help="skip Ordnance Survey fallback")
parser.add_argument(
"--os-threshold", type=float, default=0.5, help="min OS match score (default 0.5)"
)
parser.add_argument(
"--no-validate-postcode",
action="store_true",
help="skip the postcodes.io validity check (one HTTP call per postcode)",
)
parser.add_argument("--limit", type=int, default=None, help="process only the first N rows")
return parser.parse_args()
def main() -> int:
args = _parse_args()
epc_token, os_api_key = load_keys()
use_epc = not args.no_epc
use_os = not args.no_os
if use_epc and not epc_token:
print("OPEN_EPC_API_TOKEN not set (backend/.env) — EPC resolution disabled")
use_epc = False
if use_os and not os_api_key:
print("ORDNANCE_SURVEY_API_KEY not set (backend/.env) — OS fallback disabled")
use_os = False
if not use_epc and not use_os:
print("No resolver enabled (missing keys or both --no-* flags). Nothing to do.")
return 2
rows, fieldnames = read_rows(args.input)
if args.limit is not None:
rows = rows[: args.limit]
print(f"Loaded {len(rows)} rows from {args.input}")
process(
rows,
epc_token=epc_token,
os_api_key=os_api_key,
postcode_col=args.postcode_col,
use_epc=use_epc,
use_os=use_os,
os_threshold=args.os_threshold,
validate_postcode=not args.no_validate_postcode,
)
write_rows(rows, args.out, fieldnames)
matched = sum(1 for r in rows if r.get(UPRN_COL))
print(f"\nResolved {matched}/{len(rows)} rows. Wrote {args.out}")
return 0
if __name__ == "__main__":
sys.exit(main())