feat(geospatial): batch coordinates_for_uprns lookup (#1227)

Adds GeospatialRepository.coordinates_for_uprns(uprns) -> dict — a batch
coordinate lookup returning only covered UPRNs. The S3 adapter overrides it
to read the meta once, group UPRNs by their covering partition, and read each
partition once for all the UPRNs it covers; co-located (closely-numbered)
UPRNs share a partition, so an EPC Prediction cohort is typically one or two
reads instead of one per neighbour. Default port impl is a per-UPRN loop.

Feeds the EPC Prediction geo-proximity work: a cohort's UPRNs resolve to
coordinates in a couple of reads (validated at corpus scale: 170 partition
reads for 2683 UPRNs).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-15 14:35:32 +00:00
parent c0a1bcac95
commit 95719dd587
3 changed files with 97 additions and 0 deletions

View file

@ -18,6 +18,21 @@ class GeospatialRepository(ABC):
@abstractmethod
def coordinates_for(self, uprn: int) -> Optional[Coordinates]: ...
def coordinates_for_uprns(
self, uprns: list[int]
) -> dict[int, Coordinates]:
"""Resolve many UPRNs at once, returning only those covered. The default
is a per-UPRN loop; adapters whose storage is partitioned (e.g. the S3
Open-UPRN parquet) override this to read each partition once for all the
UPRNs it covers far fewer reads when the UPRNs are co-located, as
closely-numbered UPRNs share a partition."""
resolved: dict[int, Coordinates] = {}
for uprn in uprns:
coordinates = self.coordinates_for(uprn)
if coordinates is not None:
resolved[uprn] = coordinates
return resolved
def spatial_for(self, uprn: int) -> Optional[SpatialReference]:
"""The Property's coordinates and planning protections together, in one
reference lookup (ADR-0020) Ingestion uses the coordinates to drive

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable
from typing import Any, Optional
@ -62,6 +63,30 @@ class GeospatialS3Repository(GeospatialRepository):
reference: Optional[SpatialReference] = self.spatial_for(uprn)
return reference.coordinates if reference is not None else None
def coordinates_for_uprns(
self, uprns: list[int]
) -> dict[int, Coordinates]:
"""Batch lookup that reads the meta once, groups the UPRNs by their
covering partition, and reads each partition once for all the UPRNs it
covers (co-located UPRNs share a partition, so a cohort is typically one
or two reads). Uncovered / absent UPRNs are omitted from the result."""
meta = self._read_parquet(_META_KEY)
by_partition: dict[str, list[int]] = defaultdict(list)
for uprn in uprns:
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
if not covering.empty:
by_partition[str(covering["filenames"].iloc[0])].append(uprn)
resolved: dict[int, Coordinates] = {}
for filename, partition_uprns in by_partition.items():
partition = self._read_parquet(f"spatial/{filename}")
rows = partition[partition["UPRN"].isin(partition_uprns)]
for _, row in rows.iterrows():
resolved[int(row["UPRN"])] = Coordinates(
longitude=float(row["LONGITUDE"]),
latitude=float(row["LATITUDE"]),
)
return resolved
def planning_restrictions_for(self, uprn: int) -> Optional[PlanningRestrictions]:
reference: Optional[SpatialReference] = self.spatial_for(uprn)
return reference.restrictions if reference is not None else None

View file

@ -131,3 +131,60 @@ def test_spatial_for_returns_none_when_uprn_absent(tmp_path: Path) -> None:
# Act / Assert
assert repo.spatial_for(99999) is None
def _write_two_partition_open_uprn(base: Path) -> None:
"""Two UPRN-range partitions, so the batch lookup must span both."""
spatial = base / "spatial"
spatial.mkdir(parents=True, exist_ok=True)
pd.DataFrame(
{
"lower": [0, 100001],
"upper": [100000, 200000],
"filenames": ["0_100000.parquet", "100001_200000.parquet"],
}
).to_parquet(spatial / "filename_meta.parquet")
pd.DataFrame(
{"UPRN": [10, 11], "LATITUDE": [51.0, 51.1], "LONGITUDE": [-1.0, -1.1]}
).to_parquet(spatial / "0_100000.parquet")
pd.DataFrame(
{"UPRN": [150000], "LATITUDE": [52.0], "LONGITUDE": [-2.0]}
).to_parquet(spatial / "100001_200000.parquet")
def test_coordinates_for_uprns_resolves_a_batch_across_partitions(
tmp_path: Path,
) -> None:
# Arrange — UPRNs spanning two partitions, plus one absent and one off-scale.
_write_two_partition_open_uprn(tmp_path)
repo = GeospatialS3Repository(_reader(tmp_path))
# Act
resolved = repo.coordinates_for_uprns([10, 11, 150000, 99999, 500000])
# Assert — present UPRNs resolved; absent (99999) and uncovered (500000) omitted.
assert resolved == {
10: Coordinates(longitude=-1.0, latitude=51.0),
11: Coordinates(longitude=-1.1, latitude=51.1),
150000: Coordinates(longitude=-2.0, latitude=52.0),
}
def test_coordinates_for_uprns_reads_each_partition_once(tmp_path: Path) -> None:
# Arrange — count reads so co-located UPRNs don't re-read their partition.
_write_two_partition_open_uprn(tmp_path)
reads: list[str] = []
def counting_reader(key: str) -> pd.DataFrame:
reads.append(key)
return pd.read_parquet(tmp_path / key)
repo = GeospatialS3Repository(counting_reader)
# Act — two UPRNs share partition 0; one is in partition 1.
repo.coordinates_for_uprns([10, 11, 150000])
# Assert — the meta once + each of the two partitions once (3 reads, not 4).
assert reads.count("spatial/0_100000.parquet") == 1
assert reads.count("spatial/100001_200000.parquet") == 1
assert reads.count("spatial/filename_meta.parquet") == 1