diff --git a/scripts/fetch_corpus_coordinates.py b/scripts/fetch_corpus_coordinates.py new file mode 100644 index 00000000..bd87acf1 --- /dev/null +++ b/scripts/fetch_corpus_coordinates.py @@ -0,0 +1,100 @@ +"""One-time: resolve coordinates for every EPC Prediction corpus UPRN (#1227). + +Reads the OS Open-UPRN parquet from S3 (DATA_BUCKET / spatial/) via boto3 and +resolves each corpus cert's `uprn` to WGS84 lon/lat. UPRNs are grouped by their +covering partition (the same UPRN-range bucketing `GeospatialS3Repository` uses), +so each ~1.7 MB partition is read at most once — the efficient batch lookup we +intend to add to the Geospatial Repo. Caches `{uprn: [lon, lat]}` locally +(gitignored) so the validation harness can score intra-postcode distances +without S3. + +USAGE +----- + set -a; . backend/.env; set +a + PYTHONPATH=. python scripts/fetch_corpus_coordinates.py + +Source corpus: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus). +Output: /../epc_prediction_corpus_coords.json +""" + +from __future__ import annotations + +import io +import json +import os +from collections import defaultdict +from pathlib import Path +from typing import Any + +import boto3 +import pandas as pd + +CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus")) +OUT = CORPUS.parent / "epc_prediction_corpus_coords.json" +_BUCKET = os.environ["DATA_BUCKET"] +_META_KEY = "spatial/filename_meta.parquet" + + +def _reader() -> Any: + # boto3.client is overloaded per-service in the installed stubs; bind to Any. + boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + s3: Any = boto3_client("s3") + + def read_parquet(key: str) -> pd.DataFrame: + response: dict[str, Any] = s3.get_object(Bucket=_BUCKET, Key=key) + body: bytes = response["Body"].read() + return pd.read_parquet(io.BytesIO(body)) + + return read_parquet + + +def _corpus_uprns() -> set[int]: + index: dict[str, list[str]] = json.loads((CORPUS / "_index.json").read_text()) + uprns: set[int] = set() + for postcode, certs in index.items(): + for cert in certs: + path = CORPUS / postcode / f"{cert}.json" + if not path.exists(): + continue + raw: dict[str, Any] = json.loads(path.read_text()) + uprn = raw.get("uprn") + if uprn is not None: + uprns.add(int(uprn)) + return uprns + + +def main() -> None: + read_parquet = _reader() + uprns = _corpus_uprns() + print(f"corpus UPRNs: {len(uprns)}") + + meta = read_parquet(_META_KEY) + # Group each UPRN by its covering partition (lower <= uprn <= upper), so each + # partition file is read once for all the UPRNs it covers. + by_partition: dict[str, list[int]] = defaultdict(list) + uncovered = 0 + for uprn in uprns: + covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)] + if covering.empty: + uncovered += 1 + continue + by_partition[str(covering["filenames"].iloc[0])].append(uprn) + print(f"distinct partitions to read: {len(by_partition)}; uncovered: {uncovered}") + + coords: dict[str, list[float]] = {} + for i, (filename, part_uprns) in enumerate(sorted(by_partition.items()), 1): + partition = read_parquet(f"spatial/{filename}") + rows = partition[partition["UPRN"].isin(part_uprns)] + for _, row in rows.iterrows(): + coords[str(int(row["UPRN"]))] = [ + float(row["LONGITUDE"]), + float(row["LATITUDE"]), + ] + print(f" [{i}/{len(by_partition)}] {filename}: +{len(rows)}") + + OUT.write_text(json.dumps(coords)) + print(f"resolved {len(coords)}/{len(uprns)} UPRNs -> {OUT}") + + +if __name__ == "__main__": + main()