From c0a1bcac95bd4fac532bd3cea1f5eab2733dbbe0 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 15 Jun 2026 14:28:39 +0000 Subject: [PATCH] feat(epc-prediction): resolve corpus UPRN coordinates from S3 (#1227 signal check) One-time utility: resolves every corpus cert's uprn -> WGS84 lon/lat from the OS Open-UPRN parquet (DATA_BUCKET/spatial/) via boto3, grouping UPRNs by their covering partition so each ~1.7MB partition is read at most once (the efficient batch lookup we intend to add to GeospatialRepository). Caches {uprn:[lon,lat]} locally for the validation harness. Resolved 2609/2683 corpus UPRNs (97%). Signal pre-check result (does intra-postcode proximity predict components?): intra-postcode distances are non-trivial (median 44m, p90 138m, max ~1km), and nearer neighbours match the target markedly better on age band (0.63 at <20m -> 0.16 at >300m), wall, glazing and floor construction. Roof shows no decay. => geo-proximity is worth building, per-component (strongest for age, the weakest fabric component). Co-Authored-By: Claude Opus 4.8 --- scripts/fetch_corpus_coordinates.py | 100 ++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 scripts/fetch_corpus_coordinates.py diff --git a/scripts/fetch_corpus_coordinates.py b/scripts/fetch_corpus_coordinates.py new file mode 100644 index 00000000..bd87acf1 --- /dev/null +++ b/scripts/fetch_corpus_coordinates.py @@ -0,0 +1,100 @@ +"""One-time: resolve coordinates for every EPC Prediction corpus UPRN (#1227). + +Reads the OS Open-UPRN parquet from S3 (DATA_BUCKET / spatial/) via boto3 and +resolves each corpus cert's `uprn` to WGS84 lon/lat. UPRNs are grouped by their +covering partition (the same UPRN-range bucketing `GeospatialS3Repository` uses), +so each ~1.7 MB partition is read at most once — the efficient batch lookup we +intend to add to the Geospatial Repo. Caches `{uprn: [lon, lat]}` locally +(gitignored) so the validation harness can score intra-postcode distances +without S3. + +USAGE +----- + set -a; . backend/.env; set +a + PYTHONPATH=. python scripts/fetch_corpus_coordinates.py + +Source corpus: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus). +Output: /../epc_prediction_corpus_coords.json +""" + +from __future__ import annotations + +import io +import json +import os +from collections import defaultdict +from pathlib import Path +from typing import Any + +import boto3 +import pandas as pd + +CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus")) +OUT = CORPUS.parent / "epc_prediction_corpus_coords.json" +_BUCKET = os.environ["DATA_BUCKET"] +_META_KEY = "spatial/filename_meta.parquet" + + +def _reader() -> Any: + # boto3.client is overloaded per-service in the installed stubs; bind to Any. + boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + s3: Any = boto3_client("s3") + + def read_parquet(key: str) -> pd.DataFrame: + response: dict[str, Any] = s3.get_object(Bucket=_BUCKET, Key=key) + body: bytes = response["Body"].read() + return pd.read_parquet(io.BytesIO(body)) + + return read_parquet + + +def _corpus_uprns() -> set[int]: + index: dict[str, list[str]] = json.loads((CORPUS / "_index.json").read_text()) + uprns: set[int] = set() + for postcode, certs in index.items(): + for cert in certs: + path = CORPUS / postcode / f"{cert}.json" + if not path.exists(): + continue + raw: dict[str, Any] = json.loads(path.read_text()) + uprn = raw.get("uprn") + if uprn is not None: + uprns.add(int(uprn)) + return uprns + + +def main() -> None: + read_parquet = _reader() + uprns = _corpus_uprns() + print(f"corpus UPRNs: {len(uprns)}") + + meta = read_parquet(_META_KEY) + # Group each UPRN by its covering partition (lower <= uprn <= upper), so each + # partition file is read once for all the UPRNs it covers. + by_partition: dict[str, list[int]] = defaultdict(list) + uncovered = 0 + for uprn in uprns: + covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)] + if covering.empty: + uncovered += 1 + continue + by_partition[str(covering["filenames"].iloc[0])].append(uprn) + print(f"distinct partitions to read: {len(by_partition)}; uncovered: {uncovered}") + + coords: dict[str, list[float]] = {} + for i, (filename, part_uprns) in enumerate(sorted(by_partition.items()), 1): + partition = read_parquet(f"spatial/{filename}") + rows = partition[partition["UPRN"].isin(part_uprns)] + for _, row in rows.iterrows(): + coords[str(int(row["UPRN"]))] = [ + float(row["LONGITUDE"]), + float(row["LATITUDE"]), + ] + print(f" [{i}/{len(by_partition)}] {filename}: +{len(rows)}") + + OUT.write_text(json.dumps(coords)) + print(f"resolved {len(coords)}/{len(uprns)} UPRNs -> {OUT}") + + +if __name__ == "__main__": + main()