diff --git a/domain/epc/property_overlays/main_fuel_overlay.py b/domain/epc/property_overlays/main_fuel_overlay.py index 4d721865..9118c0c9 100644 --- a/domain/epc/property_overlays/main_fuel_overlay.py +++ b/domain/epc/property_overlays/main_fuel_overlay.py @@ -13,10 +13,18 @@ from __future__ import annotations from typing import Optional -from domain.modelling.simulation import EpcSimulation +from domain.modelling.simulation import EpcSimulation, HeatingOverlay + +# RdSAP-20/21 `main_fuel` `(not community)` codes (epc_codes.csv `main_fuel`). +_FUEL_CODES: dict[str, int] = { + "mains gas": 26, +} def fuel_overlay_for( main_fuel_value: str, building_part: int ) -> Optional[EpcSimulation]: - raise NotImplementedError + code = _FUEL_CODES.get(main_fuel_value) + if code is None: + return None + return EpcSimulation(heating=HeatingOverlay(main_fuel_type=code)) diff --git a/scripts/fill_domna_addresses.py b/scripts/fill_domna_addresses.py new file mode 100644 index 00000000..e4a7e18b --- /dev/null +++ b/scripts/fill_domna_addresses.py @@ -0,0 +1,353 @@ +"""Fill the DOMNA columns in the AddressProfilingResults spreadsheet. + +Input: scripts/manipulation(2).xlsx, sheet "AddressProfilingResults", columns + Organisation Reference | UPRN | DOMNA FOUND UPRN | DOMNA FOUND ADDRESS | Address | Postcode + +Per-row rule ("if there's a UPRN in the UPRN column we're done"): + + * UPRN present AND Address present -> nothing to do (already sorted). + * UPRN present AND Address missing -> reverse-lookup the address from the UPRN + via the EPC API -> DOMNA FOUND ADDRESS. + * UPRN missing AND Address present -> resolve a UPRN from address + postcode + (EPC API, then Ordnance Survey) -> writes + DOMNA FOUND UPRN + DOMNA FOUND ADDRESS. + * not resolvable -> marked "NOT FOUND" and listed in the + unresolved report. + +Relaxed matching (this batch only — production AddressMatch is untouched): the +landlord writes flats as "3 GLADYS COURT" while EPC stores "Flat 3 Gladys +Court", which the production matcher hard-rejects. So per address we try several +query variants — the full string, just the first comma-segment, and a +"Flat ..." form — and keep the best-scoring, unambiguous match. The unit +number must still match exactly (AddressMatch zeroes mismatched numbers), so a +wrong-unit match stays unlikely. Each fill carries its score + source so you can +spot-check (DOMNA SCORE / DOMNA SOURCE). + +Rows that already have a DOMNA FOUND UPRN are skipped (idempotent / resumable). + + python -m scripts.fill_domna_addresses + python -m scripts.fill_domna_addresses --limit 200 # smoke test first N + +Keys come from backend/.env (OPEN_EPC_API_TOKEN, ORDNANCE_SURVEY_API_KEY). Run +from the worktree root (import trap). +""" + +from __future__ import annotations + +import argparse +import os +import re +import sys +from pathlib import Path +from typing import Optional + +import pandas as pd + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from backend.address2UPRN.main import get_epc_data_with_postcode # noqa: E402 +from backend.address2UPRN.scoring import all_uprns_match, rank_address_similarity # noqa: E402 +from backend.ordnanceSurvey.helpers import ( # noqa: E402 + lookup_os_places, + os_places_results_to_dataframe, +) +from backend.utils.addressMatch import AddressMatch # noqa: E402 +from datatypes.epc.search import EpcSearchResult # noqa: E402 +from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402 +from scripts.resolve_uprns_for_finaliser import clean_postcode, load_keys # noqa: E402 + +SHEET = "AddressProfilingResults" +UPRN_COL = "UPRN" +ADDRESS_COL = "Address" +POSTCODE_COL = "Postcode" +REF_COL = "Organisation Reference" +FOUND_UPRN_COL = "DOMNA FOUND UPRN" +FOUND_ADDRESS_COL = "DOMNA FOUND ADDRESS" +SCORE_COL = "DOMNA SCORE" +SOURCE_COL = "DOMNA SOURCE" +NOT_FOUND = "NOT FOUND" + +# EPC matches are tight (short addresses) so we hold the production 0.7 bar; OS +# addresses carry more trailing tokens, so a slightly lower bar is appropriate. +EPC_THRESHOLD = 0.7 +OS_THRESHOLD = 0.6 + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation(2).xlsx" +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx" +_DEFAULT_UNRESOLVED = _REPO_ROOT / "scripts" / "manipulation_unresolved.csv" + +# A resolved hit: (uprn, matched_address, score, source). +Hit = tuple[str, str, float, str] + + +def cell_str(value: object) -> str: + """Coerce a spreadsheet cell to a trimmed string ("" for NaN/None).""" + if value is None: + return "" + text = str(value).strip() + return "" if text.lower() == "nan" else text + + +def parse_uprn_cell(value: object) -> Optional[int]: + """Read a UPRN cell that pandas loaded as float64 back into an int.""" + text = cell_str(value) + if not text: + return None + try: + return int(float(text)) + except ValueError: + return None + + +def address_variants(address: str) -> list[str]: + """Query forms to try for one input address, best-discriminating first. + + Landlord flats read "3 GLADYS COURT, 260 REIGATE ROAD" but EPC stores + "Flat 3 Gladys Court"; the full string scores low (extra tokens) and the + bare "3 ..." trips the flat guard. So we also try the first comma-segment + and a "Flat " form. + """ + address = address.strip() + first = address.split(",")[0].strip() + variants = [address, first] + if re.match(r"^\d", first): # starts with a unit/house number + variants.append("Flat " + first) + variants.append("Flat " + address) + seen: set[str] = set() + out: list[str] = [] + for v in variants: + key = v.lower() + if v and key not in seen: + seen.add(key) + out.append(v) + return out + + +def resolve_epc_relaxed( + address: str, + postcode_clean: str, + epc_cache: dict[str, pd.DataFrame], + threshold: float = EPC_THRESHOLD, +) -> Optional[Hit]: + """Best unambiguous EPC match across the address variants (cached per postcode).""" + epc_df = epc_cache.get(postcode_clean) + if epc_df is None: + epc_df = get_epc_data_with_postcode(postcode=postcode_clean) + epc_cache[postcode_clean] = epc_df + if epc_df.empty: + return None + + best: Optional[Hit] = None + for variant in address_variants(address): + scored = rank_address_similarity(epc_df, user_address=variant) + if scored.empty: + continue + score = float(scored.iloc[0]["lexiscore"]) + if best is not None and score <= best[2]: + continue + top_rank = scored[scored["lexirank"] == 1] + # rank-1 rows must agree on one UPRN, else it's ambiguous — skip. + if not all_uprns_match(top_rank, top_rank.iloc[0]["uprn"]): + continue + uprn = str(top_rank.iloc[0]["uprn"]) + if uprn in ("", "nan"): + continue + best = (uprn, str(scored.iloc[0]["address"]), score, "epc") + + return best if best is not None and best[2] >= threshold else None + + +def resolve_os_relaxed( + address: str, + postcode_clean: str, + os_api_key: str, + os_cache: dict[str, pd.DataFrame], + threshold: float = OS_THRESHOLD, +) -> Optional[Hit]: + """Best OS Places match across the address variants (cached per postcode).""" + places_df = os_cache.get(postcode_clean) + if places_df is None: + response = lookup_os_places(postcode_clean, os_api_key) + if response.get("status") == 200 and "data" in response: + places_df = os_places_results_to_dataframe(response["data"]) + else: + places_df = pd.DataFrame() + os_cache[postcode_clean] = places_df + if places_df.empty or "ADDRESS" not in places_df.columns: + return None + + records: list[dict[str, object]] = places_df.to_dict(orient="records") + best: Optional[Hit] = None + for variant in address_variants(address): + for rec in records: + candidate = str(rec.get("ADDRESS", "")) + score = AddressMatch.score(variant, candidate) + if best is None or score > best[2]: + best = (str(rec.get("UPRN", "")), candidate, score, "ordnance_survey") + return best if best is not None and best[2] >= threshold else None + + +def _address_from_search(result: EpcSearchResult) -> str: + parts = [ + result.address_line_1, + result.address_line_2, + result.address_line_3, + result.address_line_4, + result.post_town, + ] + return ", ".join(p.strip() for p in parts if p and p.strip()) + + +def reverse_address_from_uprn( + uprn: int, + postcode_clean: str, + service: EpcClientService, + search_cache: dict[str, list[EpcSearchResult]], +) -> Optional[str]: + """Find the EPC address for a known UPRN by searching its postcode (cached).""" + results = search_cache.get(postcode_clean) + if results is None: + results = service.search_by_postcode(postcode_clean) + search_cache[postcode_clean] = results + for result in results: + if result.uprn is not None and int(result.uprn) == uprn: + return _address_from_search(result) + return None + + +def fill(df: pd.DataFrame, *, os_api_key: Optional[str]) -> list[dict[str, str]]: + """Fill the DOMNA columns in place. Returns the unresolved rows.""" + for col in (FOUND_UPRN_COL, FOUND_ADDRESS_COL, SCORE_COL, SOURCE_COL): + if col not in df.columns: + df[col] = "" + df[FOUND_UPRN_COL] = df[FOUND_UPRN_COL].astype("object") + df[FOUND_ADDRESS_COL] = df[FOUND_ADDRESS_COL].astype("object") + + token = os.environ.get("OPEN_EPC_API_TOKEN") + service = EpcClientService(auth_token=token) if token else None + epc_cache: dict[str, pd.DataFrame] = {} + os_cache: dict[str, pd.DataFrame] = {} + search_cache: dict[str, list[EpcSearchResult]] = {} + + unresolved: list[dict[str, str]] = [] + resolved_uprn = resolved_addr = skipped = 0 + total = len(df) + + for n, idx in enumerate(df.index, start=1): + ref = cell_str(df.at[idx, REF_COL]) + given_uprn = parse_uprn_cell(df.at[idx, UPRN_COL]) + address = cell_str(df.at[idx, ADDRESS_COL]) + postcode_raw = cell_str(df.at[idx, POSTCODE_COL]) + postcode_clean = clean_postcode(postcode_raw) + + # Already sorted (UPRN + address) or already filled by a prior run. + if given_uprn is not None and address: + skipped += 1 + continue + if cell_str(df.at[idx, FOUND_UPRN_COL]) and cell_str(df.at[idx, FOUND_UPRN_COL]) != NOT_FOUND: + skipped += 1 + continue + + def mark_not_found(reason: str) -> None: + df.at[idx, FOUND_UPRN_COL] = NOT_FOUND if given_uprn is None else "" + df.at[idx, FOUND_ADDRESS_COL] = NOT_FOUND + df.at[idx, SOURCE_COL] = "not_found" + unresolved.append( + { + "Organisation Reference": ref, + "reason": reason, + "Address": address, + "Postcode": postcode_raw, + } + ) + + # Case B — UPRN present, address missing: reverse-lookup the address. + if given_uprn is not None and not address: + found: Optional[str] = None + if service is not None and postcode_clean: + try: + found = reverse_address_from_uprn( + given_uprn, postcode_clean, service, search_cache + ) + except Exception as exc: + print(f" reverse failed {ref} {given_uprn}: {exc}") + if found: + df.at[idx, FOUND_ADDRESS_COL] = found + df.at[idx, SOURCE_COL] = "epc_reverse" + resolved_addr += 1 + else: + mark_not_found("no address for UPRN") + continue + + # Case A — no UPRN, has address: resolve a UPRN. + if given_uprn is None and address: + if not postcode_clean: + mark_not_found("no postcode") + continue + hit: Optional[Hit] = None + if token: + try: + hit = resolve_epc_relaxed(address, postcode_clean, epc_cache) + except Exception as exc: + print(f" EPC failed {ref} {postcode_clean}: {exc}") + if hit is None and os_api_key: + try: + hit = resolve_os_relaxed(address, postcode_clean, os_api_key, os_cache) + except Exception as exc: + print(f" OS failed {ref} {postcode_clean}: {exc}") + if hit is not None: + uprn, matched, score, source = hit + df.at[idx, FOUND_UPRN_COL] = uprn + df.at[idx, FOUND_ADDRESS_COL] = matched + df.at[idx, SCORE_COL] = round(score, 4) + df.at[idx, SOURCE_COL] = source + resolved_uprn += 1 + else: + mark_not_found("no UPRN match") + if n % 100 == 0: + print( + f"[{n}/{total}] resolved={resolved_uprn} not_found={len(unresolved)}" + ) + continue + + # Case C — neither a UPRN nor an address. + mark_not_found("no UPRN and no address") + + print( + f"\nResolved {resolved_uprn} UPRNs, {resolved_addr} addresses; " + f"{skipped} already sorted/done; {len(unresolved)} not found." + ) + return unresolved + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + parser.add_argument("--unresolved", type=Path, default=_DEFAULT_UNRESOLVED) + parser.add_argument("--limit", type=int, default=None, help="process first N rows") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + _epc_token, os_api_key = load_keys() + + df = pd.read_excel(args.inp, sheet_name=SHEET) + if args.limit is not None: + df = df.head(args.limit).copy() + print(f"Loaded {len(df)} rows from {args.inp} [{SHEET}]") + + unresolved = fill(df, os_api_key=os_api_key) + + df.to_excel(args.out, sheet_name=SHEET, index=False) + print(f"Wrote filled sheet -> {args.out}") + if unresolved: + pd.DataFrame(unresolved).to_csv(args.unresolved, index=False) + print(f"Wrote {len(unresolved)} unresolved rows -> {args.unresolved}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/finalise_to_property_table.py b/scripts/finalise_to_property_table.py new file mode 100644 index 00000000..751e8c59 --- /dev/null +++ b/scripts/finalise_to_property_table.py @@ -0,0 +1,331 @@ +"""Insert resolved manipulation_filled rows into the FE-owned ``property`` table. + +Reuses the bulk_upload_finaliser's own row->PropertyIdentityInsert mapping +(``BulkUploadFinaliserOrchestrator._row_to_insert``) and the same +``PropertyPostgresRepository.insert_all`` the Lambda uses, so a row inserted here +is identical to one the real finaliser would write. The status-writer / +property_overrides path is skipped — this only populates ``property`` (no +BulkUpload task needed). + +Insert is ON CONFLICT (portfolio_id, uprn) DO NOTHING, so re-running is safe. + + # one random resolved row into portfolio 796, then read it back + python -m scripts.finalise_to_property_table --portfolio 796 --one + + # a specific Organisation Reference + python -m scripts.finalise_to_property_table --portfolio 796 --ref 56100000101 + + # the whole sheet (resolved rows only by default; --include-unmatched to add + # null-UPRN rows too) + python -m scripts.finalise_to_property_table --portfolio 796 --all + +Postgres target comes from the root .env (POSTGRES_*). Run from the worktree root. +""" + +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path +from typing import Optional + +import pandas as pd +from dotenv import load_dotenv +from sqlmodel import select + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from infrastructure.postgres.config import PostgresConfig # noqa: E402 +from infrastructure.postgres.engine import commit_scope, make_engine, make_session # noqa: E402 +from infrastructure.postgres.property_table import PropertyRow # noqa: E402 +from orchestration.bulk_upload_finaliser_orchestrator import ( # noqa: E402 + BulkUploadFinaliserOrchestrator, +) +from repositories.property.property_postgres_repository import ( # noqa: E402 + PropertyPostgresRepository, +) +from repositories.property.property_repository import PropertyIdentityInsert # noqa: E402 +from scripts.fill_domna_addresses import ( # noqa: E402 + ADDRESS_COL, + FOUND_ADDRESS_COL, + FOUND_UPRN_COL, + POSTCODE_COL, + REF_COL, + SCORE_COL, + SHEET, + UPRN_COL, + NOT_FOUND, + cell_str, + parse_uprn_cell, +) + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx" + + +def _final_uprn(row: pd.Series) -> Optional[int]: + """The authoritative UPRN: the given one, else the DOMNA-found one.""" + given = parse_uprn_cell(row.get(UPRN_COL)) + if given is not None: + return given + found = cell_str(row.get(FOUND_UPRN_COL)) + if found and found != NOT_FOUND: + return parse_uprn_cell(found) + return None + + +def to_combiner_row(row: pd.Series) -> dict[str, str]: + """Map one spreadsheet row to the combiner-output shape the finaliser reads.""" + given_uprn = parse_uprn_cell(row.get(UPRN_COL)) + address = cell_str(row.get(ADDRESS_COL)) + uprn = _final_uprn(row) + + domna_addr = cell_str(row.get(FOUND_ADDRESS_COL)) + if domna_addr == NOT_FOUND: + domna_addr = "" + # Matched address: the resolved one when we found it, else the given address + # (for rows that already had a UPRN + address). + matched = domna_addr or (address if given_uprn is not None else "") + score = cell_str(row.get(SCORE_COL)) + + return { + "Address 1": address, + "Address 2": "", + "Address 3": "", + "postcode": cell_str(row.get(POSTCODE_COL)), + "Internal Reference": cell_str(row.get(REF_COL)), + "address2uprn_uprn": "" if uprn is None else str(uprn), + "address2uprn_address": matched, + "address2uprn_lexiscore": score, + } + + +def load_rows( + path: Path, *, include_unmatched: bool +) -> tuple[pd.DataFrame, list[dict[str, str]]]: + """Load the sheet and the combiner rows. By default drop rows with no UPRN.""" + df = pd.read_excel(path, sheet_name=SHEET) + df = df.reset_index(drop=True) + if not include_unmatched: + keep = df.apply(lambda r: _final_uprn(r) is not None, axis=1) + df = df[keep].reset_index(drop=True) + rows = [to_combiner_row(r) for _, r in df.iterrows()] + return df, rows + + +def dedupe_by_uprn( + rows: list[dict[str, str]], +) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + """Keep the first row per UPRN; return (kept, dropped collisions). + + The DB INSERT collapses duplicate (portfolio, uprn) via ON CONFLICT DO + NOTHING anyway, so this just makes the collision explicit (the dropped rows + are written out for review) rather than letting an arbitrary ref win silently. + """ + seen: set[str] = set() + kept: list[dict[str, str]] = [] + dropped: list[dict[str, str]] = [] + for row in rows: + uprn = row["address2uprn_uprn"] + if uprn in seen: + dropped.append(row) + else: + seen.add(uprn) + kept.append(row) + return kept, dropped + + +# Force-reload teardown order (bottom-up). property_overrides is ON DELETE +# CASCADE so it clears itself when the property goes; everything below is NO +# ACTION and must be deleted first, deepest child first. +# property -> epc_property -> {these children} +_EPC_CHILD_TABLES = ( + "epc_energy_element", + "epc_window", + "epc_main_heating_detail", + "epc_renewable_heat_incentive", + "epc_building_part", + "epc_flat_details", +) +# property -> {these direct dependents}, deleted after the epc children +_PROPERTY_DEPENDENTS = ("epc_property", "plan") +_INSERT_CHUNK = 4000 # 9 cols/row -> well under psycopg2's 65535-param limit + + +def _reset_portfolio(session: object, portfolio_id: int) -> int: + """Delete a portfolio's properties and their NO ACTION dependency tree. + + Returns the number of property rows deleted (property_overrides cascade). + """ + from sqlalchemy import text + + pids = "SELECT id FROM property WHERE portfolio_id = :pid" + epc_ids = f"SELECT id FROM epc_property WHERE property_id IN ({pids})" + for table in _EPC_CHILD_TABLES: + session.execute( # type: ignore[attr-defined] + text(f"DELETE FROM {table} WHERE epc_property_id IN ({epc_ids})"), + {"pid": portfolio_id}, + ) + for table in _PROPERTY_DEPENDENTS: + session.execute( # type: ignore[attr-defined] + text(f"DELETE FROM {table} WHERE property_id IN ({pids})"), + {"pid": portfolio_id}, + ) + result = session.execute( # type: ignore[attr-defined] + text("DELETE FROM property WHERE portfolio_id = :pid"), {"pid": portfolio_id} + ) + return result.rowcount + + +def clean_reload( + rows: list[dict[str, str]], portfolio_id: int, *, reset: bool +) -> tuple[int, int]: + """Optionally wipe the portfolio, then chunk-insert rows. One transaction. + + Returns (properties_deleted, properties_inserted). + """ + inserts: list[PropertyIdentityInsert] = [ + BulkUploadFinaliserOrchestrator._row_to_insert(r, portfolio_id) for r in rows + ] + engine = _engine() + session = make_session(engine) + deleted = 0 + inserted = 0 + try: + repo = PropertyPostgresRepository(session) + with commit_scope(session): + if reset: + deleted = _reset_portfolio(session, portfolio_id) + for start in range(0, len(inserts), _INSERT_CHUNK): + inserted += repo.insert_all(inserts[start : start + _INSERT_CHUNK]) + finally: + session.close() + return deleted, inserted + + +def _engine(): + load_dotenv(_REPO_ROOT / ".env") + return make_engine(PostgresConfig.from_env(os.environ)) + + +def insert_rows(rows: list[dict[str, str]], portfolio_id: int) -> int: + """Insert via the finaliser's mapper + repository. Returns rows inserted.""" + inserts: list[PropertyIdentityInsert] = [ + BulkUploadFinaliserOrchestrator._row_to_insert(r, portfolio_id) for r in rows + ] + engine = _engine() + session = make_session(engine) + try: + repo = PropertyPostgresRepository(session) + with commit_scope(session): + inserted = repo.insert_all(inserts) + finally: + session.close() + return inserted + + +def fetch_by_ref(portfolio_id: int, ref: str) -> list[PropertyRow]: + """Read back inserted rows for one Organisation Reference (for verification).""" + engine = _engine() + session = make_session(engine) + try: + stmt = select(PropertyRow).where( + PropertyRow.portfolio_id == portfolio_id, + PropertyRow.landlord_property_id == ref, + ) + return list(session.exec(stmt).all()) + finally: + session.close() + + +def _show(row: dict[str, str], insert: PropertyIdentityInsert) -> None: + print("\nSource (combiner) row:") + for k, v in row.items(): + print(f" {k}: {v!r}") + print("\nMapped PropertyIdentityInsert:") + for k, v in insert.__dict__.items(): + print(f" {k}: {v!r}") + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--portfolio", type=int, required=True) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--one", action="store_true", help="one random resolved row") + group.add_argument("--ref", help="a specific Organisation Reference") + group.add_argument("--all", action="store_true", help="every row") + parser.add_argument( + "--include-unmatched", + action="store_true", + help="also insert rows with no UPRN (null-UPRN property rows)", + ) + parser.add_argument( + "--reset", + action="store_true", + help="(with --all) DELETE all properties in the portfolio first " + "(cascades property_overrides; clears plan/epc_property)", + ) + parser.add_argument( + "--collisions", + type=Path, + default=_REPO_ROOT / "scripts" / "manipulation_collisions.csv", + help="where to write rows dropped as duplicate-UPRN collisions", + ) + parser.add_argument("--seed", type=int, default=0, help="random seed for --one") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + df, rows = load_rows(args.inp, include_unmatched=args.include_unmatched) + print(f"Loaded {len(rows)} candidate rows from {args.inp}") + + if args.all: + kept, dropped = dedupe_by_uprn(rows) + if dropped: + pd.DataFrame(dropped).to_csv(args.collisions, index=False) + print( + f"{len(dropped)} duplicate-UPRN rows dropped -> {args.collisions} " + f"({len(kept)} unique to insert)" + ) + deleted, inserted = clean_reload(kept, args.portfolio, reset=args.reset) + if args.reset: + print(f"Deleted {deleted} existing properties in portfolio {args.portfolio}.") + print(f"Inserted {inserted} properties into portfolio {args.portfolio}.") + return 0 + + # Single-row paths: pick the row, show the mapping, insert, read back. + if args.ref: + match = [r for r in rows if r["Internal Reference"] == args.ref] + if not match: + print(f"No resolved row with Organisation Reference {args.ref!r}.") + return 1 + row = match[0] + else: # --one: deterministic "random" pick via seed + idx = (args.seed * 7919) % len(rows) + row = rows[idx] + + ref = row["Internal Reference"] + insert = BulkUploadFinaliserOrchestrator._row_to_insert(row, args.portfolio) + _show(row, insert) + + inserted = insert_rows([row], args.portfolio) + print( + f"\ninsert_all -> {inserted} new row(s) " + f"(0 means it already existed; ON CONFLICT DO NOTHING)." + ) + + print(f"\nproperty rows for portfolio {args.portfolio}, ref {ref!r}:") + for pr in fetch_by_ref(args.portfolio, ref): + print( + f" id={pr.id} uprn={pr.uprn} address={pr.address!r} " + f"postcode={pr.postcode!r} status={pr.creation_status} " + f"lexiscore={pr.lexiscore}" + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/hyde_epc_schema_versions.py b/scripts/hyde_epc_schema_versions.py new file mode 100644 index 00000000..7bb882ac --- /dev/null +++ b/scripts/hyde_epc_schema_versions.py @@ -0,0 +1,159 @@ +"""Tally the EPC schema versions across the hyde list (manipulation_filled UPRNs). + +For every resolved UPRN we look up its EPC certificate's ``schemaType`` (e.g. +``RdSAP-Schema-21.0.1``, ``RdSAP-Schema-17.1``, ``SAP-Schema-16.2``). The +gov EPC ``/api/domestic/search`` endpoint returns ``schemaType`` per row, so one +search-per-postcode covers every UPRN in that postcode — far cheaper than a +certificate fetch per UPRN. The latest cert (max registrationDate) wins per UPRN. + +Outputs: a per-schema-version tally with one example UPRN each, plus a CSV +mapping every UPRN -> schema version. + + python -m scripts.hyde_epc_schema_versions + python -m scripts.hyde_epc_schema_versions --workers 8 --out scripts/hyde_schema_versions.csv + +Reads OPEN_EPC_API_TOKEN from backend/.env. Run from the worktree root. +""" + +from __future__ import annotations + +import argparse +import os +import sys +import time +from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any, Optional + +import httpx +from dotenv import load_dotenv + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from scripts.fill_domna_addresses import clean_postcode # noqa: E402 +from scripts.finalise_to_property_table import load_rows # noqa: E402 + +_BASE = "https://api.get-energy-performance-data.communities.gov.uk" +_SEARCH = f"{_BASE}/api/domestic/search" +NOT_IN_EPC = "NOT_IN_EPC" + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "manipulation_filled.xlsx" +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "hyde_schema_versions.csv" + + +def search_postcode( + client: httpx.Client, postcode: str, headers: dict[str, str] +) -> list[dict[str, Any]]: + """Return the search rows for a postcode, retrying on rate-limit (429).""" + for attempt in range(5): + resp = client.get(_SEARCH, params={"postcode": postcode}, headers=headers, timeout=30) + if resp.status_code == 429: + retry_after = float(resp.headers.get("Retry-After", "2")) + time.sleep(min(retry_after, 10) * (attempt + 1)) + continue + # 400 = malformed postcode (data-entry typo), 404 = no certs — skip both. + if resp.status_code in (400, 404): + return [] + resp.raise_for_status() + return resp.json().get("data", []) + return [] + + +def build_uprn_schema_map( + postcodes: list[str], token: str, workers: int +) -> dict[int, tuple[str, str]]: + """Map UPRN -> (schemaType, registrationDate) for the latest cert per UPRN. + + One search per postcode (concurrent); later we look our UPRNs up in here. + """ + headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} + by_uprn: dict[int, tuple[str, str]] = {} + done = 0 + total = len(postcodes) + + def fetch(pc: str) -> list[dict[str, Any]]: + with httpx.Client() as client: + return search_postcode(client, pc, headers) + + with ThreadPoolExecutor(max_workers=workers) as pool: + for rows in pool.map(fetch, postcodes): + for row in rows: + uprn = row.get("uprn") + schema = row.get("schemaType") + reg = row.get("registrationDate") or "" + if uprn is None or not schema: + continue + prev = by_uprn.get(int(uprn)) + # Keep the latest-registered cert's schema for this UPRN. + if prev is None or reg > prev[1]: + by_uprn[int(uprn)] = (str(schema), str(reg)) + done += 1 + if done % 250 == 0: + print(f" searched {done}/{total} postcodes, {len(by_uprn)} uprns seen") + return by_uprn + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + parser.add_argument("--workers", type=int, default=8) + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + load_dotenv(_REPO_ROOT / "backend" / ".env") + token = os.environ.get("OPEN_EPC_API_TOKEN") + if not token: + print("OPEN_EPC_API_TOKEN not set (backend/.env)") + return 2 + + _, rows = load_rows(args.inp, include_unmatched=False) + pairs: list[tuple[int, str, str]] = [] # (uprn, postcode_clean, address) + for r in rows: + uprn = r["address2uprn_uprn"] + if uprn: + pairs.append((int(uprn), clean_postcode(r["postcode"]), r["address2uprn_address"])) + postcodes = sorted({pc for _, pc, _ in pairs if pc}) + print(f"{len(pairs)} UPRNs across {len(postcodes)} unique postcodes") + + by_uprn = build_uprn_schema_map(postcodes, token, args.workers) + print(f"EPC search returned schema for {len(by_uprn)} distinct UPRNs") + + # Resolve each hyde UPRN to its schema version. + tally: Counter[str] = Counter() + example: dict[str, tuple[int, str]] = {} + out_lines: list[tuple[int, str, str, str]] = [] # uprn, schema, postcode, address + seen: set[int] = set() + for uprn, pc, address in pairs: + if uprn in seen: + continue + seen.add(uprn) + schema = by_uprn.get(uprn, (NOT_IN_EPC, ""))[0] + tally[schema] += 1 + example.setdefault(schema, (uprn, address)) + out_lines.append((uprn, schema, pc, address)) + + # Write the full per-UPRN mapping. + import csv + + with args.out.open("w", newline="", encoding="utf-8") as fh: + w = csv.writer(fh) + w.writerow(["uprn", "schema_version", "postcode", "matched_address"]) + w.writerows(out_lines) + + print(f"\nSchema versions across {len(seen)} distinct UPRNs:\n") + print(f" {'schema version':<26} {'count':>7} example UPRN") + print(f" {'-'*26} {'-'*7} {'-'*12}") + for schema, count in tally.most_common(): + ex_uprn, ex_addr = example[schema] + print(f" {schema:<26} {count:>7} {ex_uprn} ({ex_addr})") + print(f"\nFull mapping -> {args.out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/compare_to_ara.py b/scripts/lisasrequest/compare_to_ara.py new file mode 100644 index 00000000..c30b5f29 --- /dev/null +++ b/scripts/lisasrequest/compare_to_ara.py @@ -0,0 +1,169 @@ +"""Compare our step-1 UPRN resolution against the old "Ara output" data. + +The Ara data lives in scripts/lisasrequest/Durkan data.xlsx, sheet "Ara output", +and carries UPRNs from our previous dataset. It is NOT treated as ground truth — +this just lines it up against what we found / didn't find so a human can eyeball +the differences. (We read the xlsx, not the CSV export: the CSV mangled half the +UPRNs to Excel scientific notation, e.g. ``1.00023E+11``; the xlsx keeps them +intact, so every comparison below is exact.) + +Join key is (postcode, leading number, first street word), since the UPRN is the +thing under comparison and Ara's address strings differ from the landlord input. + +Each of our rows lands in one comparison bucket: + match both found a UPRN and they are equal. + differ both found a UPRN and they differ. + we_only we resolved a UPRN, Ara had none for this address. + ara_only we did NOT resolve, but Ara had a UPRN <- recovery candidates. + both_missing neither resolved a UPRN. + no_ara_record the Ara sheet had no row matching this address at all. + + python -m scripts.lisasrequest.compare_to_ara +""" + +from __future__ import annotations + +import argparse +import csv +import re +import sys +from collections import Counter, OrderedDict +from pathlib import Path +from typing import Optional + +import pandas as pd + +_REPO_ROOT = Path(__file__).resolve().parents[2] + +ADDRESS_COL = "address" +POSTCODE_COL = "postcode" +OUR_UPRN_COL = "domna_address_uprn" +OUR_SOURCE_COL = "domna_source" + +ARA_UPRN_COL = "EPC_B.uprn" +ARA_ADDRESS_COL = "EPC_B.address" +ARA_POSTCODE_COL = "EPC_B.postcode" +ARA_SHEET = "Ara output" + +_OUR_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv" +_ARA_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "Durkan data.xlsx" +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_vs_ara.csv" + +Key = tuple[str, str, str] + + +def norm_key(address: str, postcode: str) -> Key: + """(postcode-no-space, leading number, first street word) — the join key.""" + pc = postcode.upper().replace(" ", "") + upper = address.upper() + nums = re.findall(r"\d+[A-Z]?", upper) + words = [w for w in re.findall(r"[A-Z]+", upper) if w != "FLAT"] + return (pc, nums[0] if nums else "", words[0] if words else "") + + +def load_ara(path: Path) -> tuple[dict[Key, dict[str, str]], int]: + """Index the Ara-output xlsx sheet by join key (first row wins). + + Returns (index, duplicates). Read as strings so UPRNs keep their full value. + """ + df = pd.read_excel(path, sheet_name=ARA_SHEET, dtype=str) + rows: list[dict[str, str]] = df.fillna("").to_dict(orient="records") + index: dict[Key, dict[str, str]] = OrderedDict() + dupes = 0 + for row in rows: + address = str(row.get(ARA_ADDRESS_COL) or "").strip() + postcode = str(row.get(ARA_POSTCODE_COL) or row.get(POSTCODE_COL) or "").strip() + if not address: + continue + key = norm_key(address, postcode) + if key in index: + dupes += 1 + continue + index[key] = row + return index, dupes + + +def classify( + our_uprn: str, our_found: bool, ara: Optional[dict[str, str]] +) -> tuple[str, str, str]: + """Return (comparison, ara_uprn, ara_address) for one of our rows.""" + if ara is None: + return ("no_ara_record", "", "") + ara_uprn = (ara.get(ARA_UPRN_COL) or "").strip() + ara_address = (ara.get(ARA_ADDRESS_COL) or "").strip() + ara_found = bool(ara_uprn) + + if our_found and ara_found: + comparison = "match" if our_uprn == ara_uprn else "differ" + elif our_found and not ara_found: + comparison = "we_only" + elif not our_found and ara_found: + comparison = "ara_only" + else: + comparison = "both_missing" + return (comparison, ara_uprn, ara_address) + + +def compare( + our_rows: list[dict[str, str]], ara_index: dict[Key, dict[str, str]] +) -> list[dict[str, str]]: + out: list[dict[str, str]] = [] + for row in our_rows: + address = (row.get(ADDRESS_COL) or "").strip() + postcode = (row.get(POSTCODE_COL) or "").strip() + our_uprn = (row.get(OUR_UPRN_COL) or "").strip() + our_source = (row.get(OUR_SOURCE_COL) or "").strip() + our_found = bool(our_uprn) and our_source != "not_found" + + ara = ara_index.get(norm_key(address, postcode)) + comparison, ara_uprn, ara_address = classify(our_uprn, our_found, ara) + out.append( + { + "address": address, + "postcode": postcode, + "our_uprn": our_uprn, + "our_source": our_source, + "ara_uprn": ara_uprn, + "ara_address": ara_address, + "comparison": comparison, + } + ) + return out + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--ours", type=Path, default=_OUR_IN) + parser.add_argument("--ara", type=Path, default=_ARA_IN) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + args = parser.parse_args() + + with args.ours.open(newline="", encoding="utf-8-sig") as fh: + our_rows = [dict(r) for r in csv.DictReader(fh)] + ara_index, dupes = load_ara(args.ara) + print(f"Loaded {len(our_rows)} of our rows; {len(ara_index)} Ara keys " + f"({dupes} duplicate Ara rows ignored).") + + result = compare(our_rows, ara_index) + fieldnames = list(result[0].keys()) + with args.out.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(result) + + counts = Counter(r["comparison"] for r in result) + print(f"\nComparison of {len(result)} rows -> {args.out}") + for name in ( + "match", + "differ", + "we_only", + "ara_only", + "both_missing", + "no_ara_record", + ): + print(f" {name}: {counts.get(name, 0)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/durkan_805_schema_check.py b/scripts/lisasrequest/durkan_805_schema_check.py new file mode 100644 index 00000000..776a7813 --- /dev/null +++ b/scripts/lisasrequest/durkan_805_schema_check.py @@ -0,0 +1,142 @@ +"""EPC SAP-schema check for portfolio 805, and whether each is mapper-supported. + +For every UPRN currently in the ``property`` table for portfolio 805, look up its +latest EPC certificate's ``schemaType`` (one /api/domestic/search per postcode, +reusing scripts.hyde_epc_schema_versions) and check it against the schemas the +EpcPropertyData mapper actually handles +(``EpcPropertyDataMapper.from_api_response``, datatypes/epc/domain/mapper.py). + +Prints a per-schema tally with a supported? flag and an example UPRN, and writes +the full per-UPRN mapping to durkan_805_schema_check.csv. + + python -m scripts.lisasrequest.durkan_805_schema_check + python -m scripts.lisasrequest.durkan_805_schema_check --portfolio 805 --workers 8 + +Reads OPEN_EPC_API_TOKEN from backend/.env and POSTGRES_* from the root .env. +Run from the worktree root. +""" + +from __future__ import annotations + +import argparse +import csv +import os +import sys +from collections import Counter +from pathlib import Path + +from dotenv import load_dotenv +from sqlmodel import select + +_REPO_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from infrastructure.postgres.config import PostgresConfig # noqa: E402 +from infrastructure.postgres.engine import make_engine, make_session # noqa: E402 +from infrastructure.postgres.property_table import PropertyRow # noqa: E402 +from scripts.fill_domna_addresses import clean_postcode # noqa: E402 +from scripts.hyde_epc_schema_versions import ( # noqa: E402 + NOT_IN_EPC, + build_uprn_schema_map, +) + +# Schemas EpcPropertyDataMapper.from_api_response dispatches on (everything else +# raises "Unsupported EPC schema"). Keep in sync with mapper.py:2539-2603. +SUPPORTED_SCHEMAS = frozenset( + { + "RdSAP-Schema-17.0", + "RdSAP-Schema-17.1", + "RdSAP-Schema-18.0", + "RdSAP-Schema-19.0", + "RdSAP-Schema-20.0.0", + "RdSAP-Schema-21.0.0", + "RdSAP-Schema-21.0.1", + "SAP-Schema-16.0", + "SAP-Schema-16.2", + "SAP-Schema-16.3", + "SAP-Schema-17.0", + "SAP-Schema-17.1", + "SAP-Schema-18.0.0", + } +) + +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_805_schema_check.csv" + + +def load_portfolio_uprns(portfolio_id: int) -> list[tuple[int, str]]: + """Return (uprn, postcode) for every property in the portfolio with a UPRN.""" + load_dotenv(_REPO_ROOT / ".env") + engine = make_engine(PostgresConfig.from_env(os.environ)) + session = make_session(engine) + try: + stmt = select(PropertyRow.uprn, PropertyRow.postcode).where( + PropertyRow.portfolio_id == portfolio_id + ) + out: list[tuple[int, str]] = [] + for uprn, postcode in session.exec(stmt).all(): + if uprn is not None: + out.append((int(uprn), str(postcode or ""))) + return out + finally: + session.close() + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--portfolio", type=int, default=805) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + parser.add_argument("--workers", type=int, default=8) + args = parser.parse_args() + + load_dotenv(_REPO_ROOT / "backend" / ".env") + token = os.environ.get("OPEN_EPC_API_TOKEN") + if not token: + print("OPEN_EPC_API_TOKEN not set (backend/.env)") + return 2 + + pairs = load_portfolio_uprns(args.portfolio) + postcodes = sorted({clean_postcode(pc) for _, pc in pairs if pc}) + print( + f"Portfolio {args.portfolio}: {len(pairs)} UPRNs across " + f"{len(postcodes)} unique postcodes" + ) + + by_uprn = build_uprn_schema_map(postcodes, token, args.workers) + print(f"EPC search returned a schema for {len(by_uprn)} distinct UPRNs") + + tally: Counter[str] = Counter() + example: dict[str, int] = {} + rows_out: list[tuple[int, str, str, str]] = [] # uprn, schema, supported, postcode + seen: set[int] = set() + for uprn, pc in pairs: + if uprn in seen: + continue + seen.add(uprn) + schema = by_uprn.get(uprn, (NOT_IN_EPC, ""))[0] + supported = "yes" if schema in SUPPORTED_SCHEMAS else "no" + tally[schema] += 1 + example.setdefault(schema, uprn) + rows_out.append((uprn, schema, supported, clean_postcode(pc))) + + with args.out.open("w", newline="", encoding="utf-8") as fh: + writer = csv.writer(fh) + writer.writerow(["uprn", "schema_version", "mapper_supported", "postcode"]) + writer.writerows(rows_out) + + supported_count = sum(c for s, c in tally.items() if s in SUPPORTED_SCHEMAS) + print(f"\nSchema versions across {len(seen)} distinct UPRNs in portfolio " + f"{args.portfolio}:\n") + print(f" {'schema version':<26} {'count':>5} {'supported?':<10} example UPRN") + print(f" {'-' * 26} {'-' * 5} {'-' * 10} {'-' * 12}") + for schema, count in tally.most_common(): + supported = "yes" if schema in SUPPORTED_SCHEMAS else "NO" + print(f" {schema:<26} {count:>5} {supported:<10} {example[schema]}") + print( + f"\nMapper-supported: {supported_count}/{len(seen)} UPRNs. " + f"Full mapping -> {args.out}" + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/fill_domna_address.py b/scripts/lisasrequest/fill_domna_address.py new file mode 100644 index 00000000..98d70071 --- /dev/null +++ b/scripts/lisasrequest/fill_domna_address.py @@ -0,0 +1,200 @@ +"""Step 1 (Durkan portfolio): resolve a UPRN per CSV row via EPC then OS. + +Input: scripts/lisasrequest/260611_Sample_Seed_Portfolio_Durkan_split_addresses(Split Addresses).csv + columns include ``address`` and ``postcode``. + +Every row carries an address and none carry a UPRN, so there is a single case: + + * resolve a UPRN from ``address`` + ``postcode`` via the EPC API (relaxed + address variants, threshold 0.7), then Ordnance Survey Places as a fallback + (threshold 0.6). + * not resolvable -> domna_source = "not_found"; uprn/address/score left empty. + +Writes a NEW CSV = every original column, in order, plus four DOMNA columns: + + domna_address_found the canonical address EPC/OS returned (matched string) + domna_address_uprn the resolved UPRN ("" when unresolved) + domna_lexiscore the match score in [0, 1] ("" when unresolved) + domna_source epc / ordnance_survey / not_found + +This is the human-review file; step 2 (resolve_uprns_for_finaliser) reshapes it +into the finaliser columns without re-hitting the APIs. + + python -m scripts.lisasrequest.fill_domna_address + python -m scripts.lisasrequest.fill_domna_address --limit 20 # smoke test + +Resolution reuses the relaxed matchers from scripts.fill_domna_addresses. Keys +come from backend/.env (OPEN_EPC_API_TOKEN, ORDNANCE_SURVEY_API_KEY). Run from +the worktree root (import trap). +""" + +from __future__ import annotations + +import argparse +import csv +import sys +from pathlib import Path +from typing import Optional + +import pandas as pd + +_REPO_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from scripts.fill_domna_addresses import ( # noqa: E402 + Hit, + resolve_epc_relaxed, + resolve_os_relaxed, +) +from scripts.resolve_uprns_for_finaliser import clean_postcode, load_keys # noqa: E402 + +ADDRESS_COL = "address" +POSTCODE_COL = "postcode" +FOUND_ADDRESS_COL = "domna_address_found" +FOUND_UPRN_COL = "domna_address_uprn" +LEXISCORE_COL = "domna_lexiscore" +SOURCE_COL = "domna_source" +NOT_FOUND = "not_found" +_RESULT_COLS = (FOUND_ADDRESS_COL, FOUND_UPRN_COL, LEXISCORE_COL, SOURCE_COL) + +_CSV_NAME = "260611_Sample_Seed_Portfolio_Durkan_split_addresses(Split Addresses).csv" +_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / _CSV_NAME +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv" + + +def read_rows(path: Path) -> tuple[list[dict[str, str]], list[str]]: + """Read a CSV into (rows, fieldnames), preserving column order.""" + with path.open(newline="", encoding="utf-8-sig") as fh: + reader = csv.DictReader(fh) + fieldnames = list(reader.fieldnames or []) + rows = [dict(row) for row in reader] + return rows, fieldnames + + +def resolve_one( + address: str, + postcode_raw: str, + *, + epc_token: Optional[str], + os_api_key: Optional[str], + epc_cache: dict[str, pd.DataFrame], + os_cache: dict[str, pd.DataFrame], +) -> Optional[Hit]: + """Resolve one row's UPRN: EPC (relaxed) first, then OS Places fallback.""" + postcode_clean = clean_postcode(postcode_raw) + if not address or not postcode_clean: + return None + + hit: Optional[Hit] = None + if epc_token: + try: + hit = resolve_epc_relaxed(address, postcode_clean, epc_cache) + except Exception as exc: + print(f" EPC failed {address!r} / {postcode_clean}: {exc}") + if hit is None and os_api_key: + try: + hit = resolve_os_relaxed(address, postcode_clean, os_api_key, os_cache) + except Exception as exc: + print(f" OS failed {address!r} / {postcode_clean}: {exc}") + return hit + + +def fill( + rows: list[dict[str, str]], + *, + epc_token: Optional[str], + os_api_key: Optional[str], +) -> tuple[int, int, int]: + """Fill the DOMNA columns on each row in place. + + Returns (epc_hits, os_hits, not_found) counts. + """ + epc_cache: dict[str, pd.DataFrame] = {} + os_cache: dict[str, pd.DataFrame] = {} + epc_hits = os_hits = not_found = 0 + total = len(rows) + + for n, row in enumerate(rows, start=1): + address = str(row.get(ADDRESS_COL, "") or "").strip() + postcode_raw = str(row.get(POSTCODE_COL, "") or "").strip() + hit = resolve_one( + address, + postcode_raw, + epc_token=epc_token, + os_api_key=os_api_key, + epc_cache=epc_cache, + os_cache=os_cache, + ) + if hit is None: + row[FOUND_ADDRESS_COL] = "" + row[FOUND_UPRN_COL] = "" + row[LEXISCORE_COL] = "" + row[SOURCE_COL] = NOT_FOUND + not_found += 1 + else: + uprn, matched, score, source = hit + row[FOUND_ADDRESS_COL] = matched + row[FOUND_UPRN_COL] = uprn + row[LEXISCORE_COL] = str(round(score, 4)) + row[SOURCE_COL] = source + if source == "epc": + epc_hits += 1 + else: + os_hits += 1 + print( + f"[{n}/{total}] {address!r} -> " + f"{row[FOUND_UPRN_COL] or '(no match)'} ({row[SOURCE_COL]})" + ) + + return epc_hits, os_hits, not_found + + +def write_rows(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None: + """Write rows to CSV, preserving input columns and appending DOMNA columns.""" + out_fields = list(fieldnames) + for col in _RESULT_COLS: + if col not in out_fields: + out_fields.append(col) + with path.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=out_fields, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + parser.add_argument("--limit", type=int, default=None, help="process first N rows") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + epc_token, os_api_key = load_keys() + if not epc_token: + print("OPEN_EPC_API_TOKEN not set (backend/.env) — EPC resolution disabled") + if not os_api_key: + print("ORDNANCE_SURVEY_API_KEY not set (backend/.env) — OS fallback disabled") + + rows, fieldnames = read_rows(args.inp) + if args.limit is not None: + rows = rows[: args.limit] + print(f"Loaded {len(rows)} rows from {args.inp}") + + epc_hits, os_hits, not_found = fill( + rows, epc_token=epc_token, os_api_key=os_api_key + ) + + write_rows(rows, args.out, fieldnames) + resolved = epc_hits + os_hits + print( + f"\nResolved {resolved}/{len(rows)} " + f"(epc={epc_hits}, ordnance_survey={os_hits}); {not_found} not found." + ) + print(f"Wrote filled CSV -> {args.out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/finalise_to_property_table.py b/scripts/lisasrequest/finalise_to_property_table.py new file mode 100644 index 00000000..eee66a0f --- /dev/null +++ b/scripts/lisasrequest/finalise_to_property_table.py @@ -0,0 +1,111 @@ +"""Step 3 (Durkan portfolio): insert the reshaped rows into the ``property`` table. + +Reads durkan_finaliser_input.csv (step 2) and, per row, maps it with the real +finaliser mapper (``BulkUploadFinaliserOrchestrator._row_to_insert``) and inserts +via the same ``PropertyPostgresRepository.insert_all`` the Lambda uses — so a row +written here is identical to one the production finaliser would write. Insert is +ON CONFLICT (portfolio_id, uprn) DO NOTHING, so re-running is safe. + +DRY RUN BY DEFAULT — it dedupes, reports, and writes the collisions file but does +NOT touch the database. Add --commit to actually insert. + + # preview only (no DB writes): dedupe + mapping report + python -m scripts.lisasrequest.finalise_to_property_table --portfolio 805 + + # actually insert + python -m scripts.lisasrequest.finalise_to_property_table --portfolio 805 --commit + +Postgres target comes from the root .env (POSTGRES_*). Run from the worktree root. +""" + +from __future__ import annotations + +import argparse +import csv +import sys +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from scripts.finalise_to_property_table import ( # noqa: E402 + dedupe_by_uprn, + insert_rows, +) + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_input.csv" +_DEFAULT_COLLISIONS = ( + _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_collisions.csv" +) +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +POSTCODE_COL = "postcode" +LEXISCORE_COL = "address2uprn_lexiscore" + + +def read_rows(path: Path) -> list[dict[str, str]]: + with path.open(newline="", encoding="utf-8-sig") as fh: + return [dict(row) for row in csv.DictReader(fh)] + + +def _preview(rows: list[dict[str, str]]) -> None: + """Show the first few rows as they will be inserted (no DB, no mapper call). + + The finalise step applies the standard finaliser mapper + (BulkUploadFinaliserOrchestrator) on insert; the fields below are its inputs. + """ + print("\nSample rows to insert (uprn | matched address | postcode | lexiscore):") + for row in rows[:3]: + print( + f" {row.get(UPRN_COL)} | {row.get(MATCHED_ADDRESS_COL)!r} | " + f"{row.get(POSTCODE_COL)!r} | {row.get(LEXISCORE_COL)}" + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--portfolio", type=int, required=True) + parser.add_argument( + "--commit", + action="store_true", + help="actually insert into property (default is a dry-run preview)", + ) + parser.add_argument("--collisions", type=Path, default=_DEFAULT_COLLISIONS) + args = parser.parse_args() + + rows = read_rows(args.inp) + print(f"Loaded {len(rows)} finaliser rows from {args.inp}") + + kept, dropped = dedupe_by_uprn(rows) + if dropped: + with args.collisions.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=list(dropped[0].keys())) + writer.writeheader() + writer.writerows(dropped) + print( + f"{len(dropped)} duplicate-UPRN rows dropped -> {args.collisions} " + f"({len(kept)} unique to insert)" + ) + else: + print(f"No duplicate-UPRN collisions; {len(kept)} unique rows to insert.") + + _preview(kept) + + if not args.commit: + print( + f"\nDRY RUN — nothing written. {len(kept)} rows would be inserted into " + f"portfolio {args.portfolio}. Re-run with --commit to write." + ) + return 0 + + inserted = insert_rows(kept, args.portfolio) + print( + f"\nInserted {inserted} new properties into portfolio {args.portfolio} " + f"({len(kept) - inserted} already existed; ON CONFLICT DO NOTHING)." + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/resolve_uprns_for_finaliser.py b/scripts/lisasrequest/resolve_uprns_for_finaliser.py new file mode 100644 index 00000000..6107b837 --- /dev/null +++ b/scripts/lisasrequest/resolve_uprns_for_finaliser.py @@ -0,0 +1,212 @@ +"""Step 2 (Durkan portfolio): split step-1 matches, reshape the confident ones. + +Reads durkan_domna_filled.csv (step 1) and SPLITS it in two — no re-resolution, +just column work: + + * Rows we cannot confidently insert are held back to a client-clarification CSV + (durkan_client_clarification.csv) for Khalim to take to the client. Reasons: + not_found_no_match no UPRN was resolved. + no_flat_level_uprn a block of flats all collapsed onto one building + UPRN — OS/EPC carry no flat-level records, so we + can't tell the flats apart. + unit_number_mismatch the matched house number differs from the input + (e.g. "9 ..." matched "9A ..."), so the property is + ambiguous. + * Every remaining row is reshaped into the columns the finaliser reads + (bulk_upload_finaliser_orchestrator), written to durkan_finaliser_input.csv + ready for step 3: + Address 1/2/3 | postcode | Internal Reference | address2uprn_uprn + | address2uprn_address | address2uprn_lexiscore + Internal Reference is left blank (landlord_property_id null, by decision). + + python -m scripts.lisasrequest.resolve_uprns_for_finaliser + +This stage hits no APIs. The held rows are not lost — once the client confirms +them they can be appended to the finaliser input by hand. +""" + +from __future__ import annotations + +import argparse +import csv +import sys +from collections import Counter +from pathlib import Path +from typing import Optional + +_REPO_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from scripts.lisasrequest.fill_domna_address import ( # noqa: E402 + ADDRESS_COL, + FOUND_ADDRESS_COL, + FOUND_UPRN_COL, + LEXISCORE_COL, + POSTCODE_COL, + SOURCE_COL, +) +from scripts.lisasrequest.review_flags import address_numbers, input_unit # noqa: E402 + +# Finaliser input columns — must match bulk_upload_finaliser_orchestrator +# (ADDRESS_COLS / POSTCODE_COL / INTERNAL_REF_COL / UPRN_COL / +# MATCHED_ADDRESS_COL / LEXISCORE_COL). Hard-coded to keep this a light, +# stdlib-only reshape; step 3 imports the real orchestrator and will fail loudly +# if these ever drift. +FIN_ADDRESS_1, FIN_ADDRESS_2, FIN_ADDRESS_3 = "Address 1", "Address 2", "Address 3" +FIN_POSTCODE = "postcode" +FIN_INTERNAL_REF = "Internal Reference" +FIN_UPRN = "address2uprn_uprn" +FIN_MATCHED_ADDRESS = "address2uprn_address" +FIN_LEXISCORE = "address2uprn_lexiscore" +_FINALISER_COLS = [ + FIN_ADDRESS_1, + FIN_ADDRESS_2, + FIN_ADDRESS_3, + FIN_POSTCODE, + FIN_INTERNAL_REF, + FIN_UPRN, + FIN_MATCHED_ADDRESS, + FIN_LEXISCORE, +] + +# Client-clarification report columns (kept human-readable for the client). +CONTEXT_COLS = ["address", "postcode", "No.", "Address Block"] +DOMNA_COLS = [FOUND_ADDRESS_COL, FOUND_UPRN_COL, LEXISCORE_COL, SOURCE_COL] +REASON_COL = "clarification_reason" +ACTION_COL = "action_needed" +_CLARIFY_COLS = CONTEXT_COLS + DOMNA_COLS + [REASON_COL, ACTION_COL] + +_REASON_ORDER = { + "not_found_no_match": 0, + "no_flat_level_uprn": 1, + "unit_number_mismatch": 2, +} +_REASON_ACTION = { + "not_found_no_match": "No UPRN found for this address — please confirm the " + "exact address or provide the UPRN.", + "no_flat_level_uprn": "Address registers hold only the building, not the " + "individual flats — please provide a UPRN per flat, or confirm a " + "building-level record is acceptable.", + "unit_number_mismatch": "Closest match has a different unit number (see " + "domna_address_found) — please confirm the correct property / UPRN.", +} + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv" +_DEFAULT_FINALISER = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_finaliser_input.csv" +_DEFAULT_CLARIFY = ( + _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_client_clarification.csv" +) + + +def read_rows(path: Path) -> list[dict[str, str]]: + with path.open(newline="", encoding="utf-8-sig") as fh: + return [dict(row) for row in csv.DictReader(fh)] + + +def clarification_reason( + row: dict[str, str], uprn_counts: Counter[str] +) -> Optional[str]: + """Why this row can't be inserted yet, or None if it's safe to finalise.""" + uprn = row.get(FOUND_UPRN_COL, "") + if row.get(SOURCE_COL) == "not_found" or not uprn: + return "not_found_no_match" + + unit = input_unit(row.get(ADDRESS_COL, "")) + unit_missing = bool(unit) and unit not in address_numbers( + row.get(FOUND_ADDRESS_COL, "") + ) + duplicate = uprn_counts[uprn] > 1 + if unit_missing: + return "no_flat_level_uprn" if duplicate else "unit_number_mismatch" + if duplicate: + # A shared UPRN with the right unit number still collides at finalise. + return "no_flat_level_uprn" + return None + + +def to_finaliser_row(row: dict[str, str]) -> dict[str, str]: + """Rename a confident step-1 row into the finaliser's input columns.""" + return { + FIN_ADDRESS_1: row.get(ADDRESS_COL, ""), + FIN_ADDRESS_2: "", + FIN_ADDRESS_3: "", + FIN_POSTCODE: row.get(POSTCODE_COL, ""), + FIN_INTERNAL_REF: "", # landlord_property_id null, by decision + FIN_UPRN: row.get(FOUND_UPRN_COL, ""), + FIN_MATCHED_ADDRESS: row.get(FOUND_ADDRESS_COL, ""), + FIN_LEXISCORE: row.get(LEXISCORE_COL, ""), + } + + +def to_clarify_row(row: dict[str, str], reason: str) -> dict[str, str]: + out = {col: row.get(col, "") for col in CONTEXT_COLS + DOMNA_COLS} + out[REASON_COL] = reason + out[ACTION_COL] = _REASON_ACTION[reason] + return out + + +def split( + rows: list[dict[str, str]], + *, + accept_unit_mismatch: bool = False, +) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + """Return (finaliser_rows, clarification_rows). + + ``accept_unit_mismatch`` reshapes the ``unit_number_mismatch`` rows (a + near-miss like 9 -> 9A the client has already confirmed) into the finaliser + input instead of holding them back. + """ + uprn_counts: Counter[str] = Counter( + r.get(FOUND_UPRN_COL, "") for r in rows if r.get(FOUND_UPRN_COL) + ) + finaliser: list[dict[str, str]] = [] + clarify: list[dict[str, str]] = [] + for row in rows: + reason = clarification_reason(row, uprn_counts) + if reason is None or ( + accept_unit_mismatch and reason == "unit_number_mismatch" + ): + finaliser.append(to_finaliser_row(row)) + else: + clarify.append(to_clarify_row(row, reason)) + clarify.sort(key=lambda r: _REASON_ORDER.get(r[REASON_COL], 9)) + return finaliser, clarify + + +def write_csv(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None: + with path.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--finaliser-out", type=Path, default=_DEFAULT_FINALISER) + parser.add_argument("--clarify-out", type=Path, default=_DEFAULT_CLARIFY) + parser.add_argument( + "--accept-unit-mismatch", + action="store_true", + help="reshape unit_number_mismatch rows (e.g. 9->9A) into the finaliser " + "input instead of holding them for the client", + ) + args = parser.parse_args() + + rows = read_rows(args.inp) + finaliser, clarify = split(rows, accept_unit_mismatch=args.accept_unit_mismatch) + + write_csv(finaliser, args.finaliser_out, _FINALISER_COLS) + write_csv(clarify, args.clarify_out, _CLARIFY_COLS) + + counts = Counter(r[REASON_COL] for r in clarify) + print(f"Read {len(rows)} step-1 rows.") + print(f" -> {len(finaliser)} confident rows reshaped -> {args.finaliser_out}") + print(f" -> {len(clarify)} held for client -> {args.clarify_out}") + for reason in _REASON_ORDER: + print(f" {reason}: {counts.get(reason, 0)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/lisasrequest/review_flags.py b/scripts/lisasrequest/review_flags.py new file mode 100644 index 00000000..4bf710e2 --- /dev/null +++ b/scripts/lisasrequest/review_flags.py @@ -0,0 +1,135 @@ +"""Flag step-1 matches that need a human eye, for review before finalising. + +Reads durkan_domna_filled.csv (the step-1 output) and writes a review CSV of +only the rows carrying at least one flag, newest-doubt-first: + + not_found no UPRN resolved at all. + unit_not_in_match the input flat/house number does NOT appear in the matched + address — the high-precision "wrong property" signal. Two + shapes: a near-miss ("9 VANBRUGH" matched "9A, VANBRUGH") + or a flat collapsing onto its building ("FLAT 1, 20 WARWICK" + matched "20, WARWICK ROAD"). + dup_uprn the same UPRN was resolved for >1 input row — typically a + block of flats all collapsing onto the building UPRN; all + but one will be dropped at finalise. + low_score lexiscore < 0.70 (a weak match, just over the OS bar). NOTE: + on its own this is noisy — truncated EPC addresses and extra + locality tokens push correct matches below 0.70. Treat it as + informational unless paired with one of the flags above. + + python -m scripts.lisasrequest.review_flags +""" + +from __future__ import annotations + +import argparse +import csv +import re +import sys +from collections import Counter +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parents[2] + +ADDRESS_COL = "address" +POSTCODE_COL = "postcode" +FOUND_ADDRESS_COL = "domna_address_found" +FOUND_UPRN_COL = "domna_address_uprn" +LEXISCORE_COL = "domna_lexiscore" +SOURCE_COL = "domna_source" +LOW_SCORE = 0.70 + +_DEFAULT_IN = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_domna_filled.csv" +_DEFAULT_OUT = _REPO_ROOT / "scripts" / "lisasrequest" / "durkan_review_flags.csv" + +_REVIEW_COLS = [ + ADDRESS_COL, + POSTCODE_COL, + FOUND_ADDRESS_COL, + FOUND_UPRN_COL, + LEXISCORE_COL, + SOURCE_COL, + "flags", +] + + +def input_unit(address: str) -> str: + """The salient unit number of an input address: the FLAT number if present, + else the leading house number ("" if neither). Upper-cased.""" + upper = address.upper() + flat = re.search(r"\bFLAT\s+(\d+[A-Z]?)", upper) + if flat: + return flat.group(1) + lead = re.match(r"\s*(\d+[A-Z]?)\b", upper) + return lead.group(1) if lead else "" + + +def address_numbers(address: str) -> set[str]: + """All standalone number tokens in an address (e.g. {"3", "20"}). Upper-cased.""" + return set(re.findall(r"\b\d+[A-Z]?\b", address.upper())) + + +def _score(value: str) -> float: + try: + return float(value) + except (TypeError, ValueError): + return 0.0 + + +def flag_rows(rows: list[dict[str, str]]) -> list[dict[str, str]]: + """Return the flagged subset, each with a ';'-joined ``flags`` field.""" + uprn_counts = Counter( + r.get(FOUND_UPRN_COL, "") for r in rows if r.get(FOUND_UPRN_COL) + ) + + flagged: list[dict[str, str]] = [] + for row in rows: + uprn = row.get(FOUND_UPRN_COL, "") + source = row.get(SOURCE_COL, "") + flags: list[str] = [] + + if source == "not_found" or not uprn: + flags.append("not_found") + else: + unit = input_unit(row.get(ADDRESS_COL, "")) + if unit and unit not in address_numbers(row.get(FOUND_ADDRESS_COL, "")): + flags.append("unit_not_in_match") + if uprn_counts[uprn] > 1: + flags.append("dup_uprn") + if _score(row.get(LEXISCORE_COL, "")) < LOW_SCORE: + flags.append("low_score") + + if flags: + flagged.append({**{c: row.get(c, "") for c in _REVIEW_COLS[:-1]}, + "flags": ";".join(flags)}) + + # not_found first, then mismatches, then dup/low. + order = {"not_found": 0, "unit_not_in_match": 1, "dup_uprn": 2, "low_score": 3} + flagged.sort(key=lambda r: order.get(r["flags"].split(";")[0], 9)) + return flagged + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--in", dest="inp", type=Path, default=_DEFAULT_IN) + parser.add_argument("--out", type=Path, default=_DEFAULT_OUT) + args = parser.parse_args() + + with args.inp.open(newline="", encoding="utf-8-sig") as fh: + rows = [dict(r) for r in csv.DictReader(fh)] + + flagged = flag_rows(rows) + with args.out.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=_REVIEW_COLS, extrasaction="ignore") + writer.writeheader() + writer.writerows(flagged) + + counts = Counter(f for r in flagged for f in r["flags"].split(";")) + print(f"{len(flagged)}/{len(rows)} rows flagged for review -> {args.out}") + for name in ("not_found", "unit_not_in_match", "dup_uprn", "low_score"): + print(f" {name}: {counts.get(name, 0)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/resolve_uprns_for_finaliser.py b/scripts/resolve_uprns_for_finaliser.py new file mode 100644 index 00000000..c01a55ed --- /dev/null +++ b/scripts/resolve_uprns_for_finaliser.py @@ -0,0 +1,328 @@ +"""Resolve a CSV of addresses to UPRNs, ready to feed the bulk-upload finaliser. + +Takes a CSV with `Address 1/2/3` + `postcode` columns and, per row, resolves a +UPRN by trying — in order — the new EPC API (address2uprn), the historic EPC S3 +dataset, then the Ordnance Survey Places API as a fallback. Whichever source +wins, the result is written into the SAME three columns the finaliser reads +(`bulk_upload_finaliser_orchestrator`): + + address2uprn_uprn UPRN integer (empty when unresolved) + address2uprn_address the matched address + address2uprn_lexiscore the match score in [0, 1] + +A `resolution_source` diagnostic column (epc / epc_historic / ordnance_survey / +none) is appended too — the finaliser ignores unknown columns. All original +columns are preserved in their original order, so the output CSV drops straight +into the finaliser. + + python -m scripts.resolve_uprns_for_finaliser input.csv -o resolved.csv + + # OS-only / EPC-only, custom postcode column, custom OS score threshold + python -m scripts.resolve_uprns_for_finaliser in.csv -o out.csv --no-epc + python -m scripts.resolve_uprns_for_finaliser in.csv -o out.csv --postcode-col Postcode --os-threshold 0.6 + +Keys are read from backend/.env: OPEN_EPC_API_TOKEN (EPC) and +ORDNANCE_SURVEY_API_KEY (OS Places). Run from the worktree root (import trap). + +The module-level functions (`load_keys`, `read_rows`, `resolve_row`, `process`, +`write_rows`) are written to be driven line-by-line from a REPL as well as via +the CLI. +""" + +from __future__ import annotations + +import argparse +import csv +import os +import sys +from pathlib import Path +from typing import Optional + +import pandas as pd +from dotenv import load_dotenv + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from backend.address2UPRN.main import ( # noqa: E402 + get_epc_data_with_postcode, + get_uprn_from_historic_epc, + get_uprn_with_epc_df, +) +from backend.ordnanceSurvey.helpers import ( # noqa: E402 + lookup_os_places, + os_places_results_to_dataframe, +) +from backend.utils.addressMatch import AddressMatch # noqa: E402 + +# Columns the finaliser reads (bulk_upload_finaliser_orchestrator). +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +LEXISCORE_COL = "address2uprn_lexiscore" +SOURCE_COL = "resolution_source" +_RESULT_COLS = (UPRN_COL, MATCHED_ADDRESS_COL, LEXISCORE_COL, SOURCE_COL) + +# A resolved hit: (uprn, matched_address, lexiscore, source). +Resolution = tuple[str, str, float, str] + + +def load_keys() -> tuple[Optional[str], Optional[str]]: + """Load (epc_token, os_api_key) from backend/.env (and the process env).""" + load_dotenv(_REPO_ROOT / "backend" / ".env") + epc_token = os.environ.get("OPEN_EPC_API_TOKEN") + os_api_key = os.environ.get("ORDNANCE_SURVEY_API_KEY") + return epc_token, os_api_key + + +def read_rows(path: Path) -> tuple[list[dict[str, str]], list[str]]: + """Read a CSV into (rows, fieldnames). Preserves column order.""" + with path.open(newline="", encoding="utf-8-sig") as fh: + reader = csv.DictReader(fh) + fieldnames = list(reader.fieldnames or []) + rows = [dict(row) for row in reader] + return rows, fieldnames + + +def clean_postcode(postcode: str) -> str: + """Sanitise to the no-space upper form the EPC/OS lookups expect (e.g. E84SQ).""" + return postcode.upper().replace(" ", "").strip() + + +def build_address(row: dict[str, str]) -> str: + """Concatenate Address 1/2/3 the same way the address2uprn lambda does.""" + return " ".join( + str(row.get(col, "") or "").strip() for col in ("Address 1", "Address 2", "Address 3") + ).strip() + + +def resolve_epc( + address: str, postcode_clean: str, epc_cache: dict[str, pd.DataFrame] +) -> Optional[Resolution]: + """Resolve via the new EPC API (cached per postcode), then historic EPC S3. + + `epc_cache` is mutated to memoise one EPC API call per postcode — pass the + same dict across rows so a postcode is only fetched once. + """ + epc_df = epc_cache.get(postcode_clean) + if epc_df is None: + epc_df = get_epc_data_with_postcode(postcode=postcode_clean) + epc_cache[postcode_clean] = epc_df + + result = get_uprn_with_epc_df( + user_inputed_address=address, epc_df=epc_df, verbose=True + ) + if isinstance(result, tuple): + uprn, matched, score = result + return str(uprn), str(matched), float(score), "epc" + + historic = get_uprn_from_historic_epc( + user_inputed_address=address, postcode=postcode_clean + ) + if historic is not None: + uprn, matched, score = historic + return str(uprn), str(matched), float(score), "epc_historic" + + return None + + +def resolve_os( + address: str, + postcode_clean: str, + os_api_key: str, + os_cache: dict[str, pd.DataFrame], + threshold: float, +) -> Optional[Resolution]: + """Resolve via the OS Places API: best-scoring address above `threshold`. + + `os_cache` memoises one OS Places call per postcode. + """ + places_df = os_cache.get(postcode_clean) + if places_df is None: + response = lookup_os_places(postcode_clean, os_api_key) + if response.get("status") != 200 or "data" not in response: + places_df = pd.DataFrame() + else: + places_df = os_places_results_to_dataframe(response["data"]) + os_cache[postcode_clean] = places_df + + if places_df.empty or "ADDRESS" not in places_df.columns: + return None + + # Iterate plain records — avoids pandas' partially-unknown indexing types. + records: list[dict[str, object]] = places_df.to_dict(orient="records") + best: Optional[Resolution] = None + for rec in records: + candidate = str(rec.get("ADDRESS", "")) + score = AddressMatch.score(address, candidate) + if score >= threshold and (best is None or score > best[2]): + best = (str(rec.get("UPRN", "")), candidate, score, "ordnance_survey") + return best + + +def resolve_row( + row: dict[str, str], + *, + epc_token: Optional[str], + os_api_key: Optional[str], + epc_cache: dict[str, pd.DataFrame], + os_cache: dict[str, pd.DataFrame], + postcode_col: str, + use_epc: bool, + use_os: bool, + os_threshold: float, + validate_postcode: bool, +) -> dict[str, str]: + """Resolve one row in place and return it with the finaliser columns filled. + + Tries EPC (new + historic) first, then OS Places. On no match the three + result columns are written empty and `resolution_source` is "none". + """ + address = build_address(row) + postcode_clean = clean_postcode(str(row.get(postcode_col, "") or "")) + + def write(res: Optional[Resolution]) -> dict[str, str]: + if res is None: + row[UPRN_COL] = "" + row[MATCHED_ADDRESS_COL] = "" + row[LEXISCORE_COL] = "" + row[SOURCE_COL] = "none" + else: + uprn, matched, score, source = res + row[UPRN_COL] = uprn + row[MATCHED_ADDRESS_COL] = matched + row[LEXISCORE_COL] = str(score) + row[SOURCE_COL] = source + return row + + if not address or not postcode_clean: + return write(None) + + if validate_postcode and not AddressMatch.is_valid_postcode(postcode_clean): + return write(None) + + if use_epc and epc_token: + try: + res = resolve_epc(address, postcode_clean, epc_cache) + if res is not None: + return write(res) + except Exception as exc: # keep going on a per-row API/lookup failure + print(f" EPC lookup failed for {address!r} / {postcode_clean}: {exc}") + + if use_os and os_api_key: + try: + res = resolve_os(address, postcode_clean, os_api_key, os_cache, os_threshold) + if res is not None: + return write(res) + except Exception as exc: + print(f" OS lookup failed for {address!r} / {postcode_clean}: {exc}") + + return write(None) + + +def process( + rows: list[dict[str, str]], + *, + epc_token: Optional[str], + os_api_key: Optional[str], + postcode_col: str = "postcode", + use_epc: bool = True, + use_os: bool = True, + os_threshold: float = 0.5, + validate_postcode: bool = True, +) -> list[dict[str, str]]: + """Resolve every row, printing a per-row line so REPL/CLI progress is visible.""" + epc_cache: dict[str, pd.DataFrame] = {} + os_cache: dict[str, pd.DataFrame] = {} + for i, row in enumerate(rows, start=1): + resolve_row( + row, + epc_token=epc_token, + os_api_key=os_api_key, + epc_cache=epc_cache, + os_cache=os_cache, + postcode_col=postcode_col, + use_epc=use_epc, + use_os=use_os, + os_threshold=os_threshold, + validate_postcode=validate_postcode, + ) + print( + f"[{i}/{len(rows)}] {build_address(row)!r} -> " + f"{row[UPRN_COL] or '(no match)'} ({row[SOURCE_COL]})" + ) + return rows + + +def write_rows(rows: list[dict[str, str]], path: Path, fieldnames: list[str]) -> None: + """Write rows to CSV, preserving input columns and appending the result columns.""" + out_fields = list(fieldnames) + for col in _RESULT_COLS: + if col not in out_fields: + out_fields.append(col) + with path.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=out_fields, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("input", type=Path, help="input CSV (Address 1/2/3 + postcode)") + parser.add_argument( + "-o", "--out", type=Path, required=True, help="output CSV for the finaliser" + ) + parser.add_argument("--postcode-col", default="postcode", help="postcode column name") + parser.add_argument("--no-epc", action="store_true", help="skip EPC resolution") + parser.add_argument("--no-os", action="store_true", help="skip Ordnance Survey fallback") + parser.add_argument( + "--os-threshold", type=float, default=0.5, help="min OS match score (default 0.5)" + ) + parser.add_argument( + "--no-validate-postcode", + action="store_true", + help="skip the postcodes.io validity check (one HTTP call per postcode)", + ) + parser.add_argument("--limit", type=int, default=None, help="process only the first N rows") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + epc_token, os_api_key = load_keys() + + use_epc = not args.no_epc + use_os = not args.no_os + if use_epc and not epc_token: + print("OPEN_EPC_API_TOKEN not set (backend/.env) — EPC resolution disabled") + use_epc = False + if use_os and not os_api_key: + print("ORDNANCE_SURVEY_API_KEY not set (backend/.env) — OS fallback disabled") + use_os = False + if not use_epc and not use_os: + print("No resolver enabled (missing keys or both --no-* flags). Nothing to do.") + return 2 + + rows, fieldnames = read_rows(args.input) + if args.limit is not None: + rows = rows[: args.limit] + print(f"Loaded {len(rows)} rows from {args.input}") + + process( + rows, + epc_token=epc_token, + os_api_key=os_api_key, + postcode_col=args.postcode_col, + use_epc=use_epc, + use_os=use_os, + os_threshold=args.os_threshold, + validate_postcode=not args.no_validate_postcode, + ) + + write_rows(rows, args.out, fieldnames) + matched = sum(1 for r in rows if r.get(UPRN_COL)) + print(f"\nResolved {matched}/{len(rows)} rows. Wrote {args.out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())