feat(epc-prediction): resolve corpus UPRN coordinates from S3 (#1227 signal check)

One-time utility: resolves every corpus cert's uprn -> WGS84 lon/lat from the
OS Open-UPRN parquet (DATA_BUCKET/spatial/) via boto3, grouping UPRNs by their
covering partition so each ~1.7MB partition is read at most once (the efficient
batch lookup we intend to add to GeospatialRepository). Caches {uprn:[lon,lat]}
locally for the validation harness. Resolved 2609/2683 corpus UPRNs (97%).

Signal pre-check result (does intra-postcode proximity predict components?):
intra-postcode distances are non-trivial (median 44m, p90 138m, max ~1km),
and nearer neighbours match the target markedly better on age band (0.63 at
<20m -> 0.16 at >300m), wall, glazing and floor construction. Roof shows no
decay. => geo-proximity is worth building, per-component (strongest for age,
the weakest fabric component).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-15 14:28:39 +00:00
parent 4afab2c3d8
commit c0a1bcac95

View file

@ -0,0 +1,100 @@
"""One-time: resolve coordinates for every EPC Prediction corpus UPRN (#1227).
Reads the OS Open-UPRN parquet from S3 (DATA_BUCKET / spatial/) via boto3 and
resolves each corpus cert's `uprn` to WGS84 lon/lat. UPRNs are grouped by their
covering partition (the same UPRN-range bucketing `GeospatialS3Repository` uses),
so each ~1.7 MB partition is read at most once the efficient batch lookup we
intend to add to the Geospatial Repo. Caches `{uprn: [lon, lat]}` locally
(gitignored) so the validation harness can score intra-postcode distances
without S3.
USAGE
-----
set -a; . backend/.env; set +a
PYTHONPATH=. python scripts/fetch_corpus_coordinates.py
Source corpus: $EPC_PREDICTION_CORPUS (default /tmp/epc_prediction_corpus).
Output: <corpus>/../epc_prediction_corpus_coords.json
"""
from __future__ import annotations
import io
import json
import os
from collections import defaultdict
from pathlib import Path
from typing import Any
import boto3
import pandas as pd
CORPUS = Path(os.environ.get("EPC_PREDICTION_CORPUS", "/tmp/epc_prediction_corpus"))
OUT = CORPUS.parent / "epc_prediction_corpus_coords.json"
_BUCKET = os.environ["DATA_BUCKET"]
_META_KEY = "spatial/filename_meta.parquet"
def _reader() -> Any:
# boto3.client is overloaded per-service in the installed stubs; bind to Any.
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
s3: Any = boto3_client("s3")
def read_parquet(key: str) -> pd.DataFrame:
response: dict[str, Any] = s3.get_object(Bucket=_BUCKET, Key=key)
body: bytes = response["Body"].read()
return pd.read_parquet(io.BytesIO(body))
return read_parquet
def _corpus_uprns() -> set[int]:
index: dict[str, list[str]] = json.loads((CORPUS / "_index.json").read_text())
uprns: set[int] = set()
for postcode, certs in index.items():
for cert in certs:
path = CORPUS / postcode / f"{cert}.json"
if not path.exists():
continue
raw: dict[str, Any] = json.loads(path.read_text())
uprn = raw.get("uprn")
if uprn is not None:
uprns.add(int(uprn))
return uprns
def main() -> None:
read_parquet = _reader()
uprns = _corpus_uprns()
print(f"corpus UPRNs: {len(uprns)}")
meta = read_parquet(_META_KEY)
# Group each UPRN by its covering partition (lower <= uprn <= upper), so each
# partition file is read once for all the UPRNs it covers.
by_partition: dict[str, list[int]] = defaultdict(list)
uncovered = 0
for uprn in uprns:
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
if covering.empty:
uncovered += 1
continue
by_partition[str(covering["filenames"].iloc[0])].append(uprn)
print(f"distinct partitions to read: {len(by_partition)}; uncovered: {uncovered}")
coords: dict[str, list[float]] = {}
for i, (filename, part_uprns) in enumerate(sorted(by_partition.items()), 1):
partition = read_parquet(f"spatial/{filename}")
rows = partition[partition["UPRN"].isin(part_uprns)]
for _, row in rows.iterrows():
coords[str(int(row["UPRN"]))] = [
float(row["LONGITUDE"]),
float(row["LATITUDE"]),
]
print(f" [{i}/{len(by_partition)}] {filename}: +{len(rows)}")
OUT.write_text(json.dumps(coords))
print(f"resolved {len(coords)}/{len(uprns)} UPRNs -> {OUT}")
if __name__ == "__main__":
main()