diff --git a/repositories/geospatial/geospatial_repository.py b/repositories/geospatial/geospatial_repository.py index b9dbff17..b3b3098c 100644 --- a/repositories/geospatial/geospatial_repository.py +++ b/repositories/geospatial/geospatial_repository.py @@ -18,6 +18,21 @@ class GeospatialRepository(ABC): @abstractmethod def coordinates_for(self, uprn: int) -> Optional[Coordinates]: ... + def coordinates_for_uprns( + self, uprns: list[int] + ) -> dict[int, Coordinates]: + """Resolve many UPRNs at once, returning only those covered. The default + is a per-UPRN loop; adapters whose storage is partitioned (e.g. the S3 + Open-UPRN parquet) override this to read each partition once for all the + UPRNs it covers — far fewer reads when the UPRNs are co-located, as + closely-numbered UPRNs share a partition.""" + resolved: dict[int, Coordinates] = {} + for uprn in uprns: + coordinates = self.coordinates_for(uprn) + if coordinates is not None: + resolved[uprn] = coordinates + return resolved + def spatial_for(self, uprn: int) -> Optional[SpatialReference]: """The Property's coordinates and planning protections together, in one reference lookup (ADR-0020) — Ingestion uses the coordinates to drive diff --git a/repositories/geospatial/geospatial_s3_repository.py b/repositories/geospatial/geospatial_s3_repository.py index 39946f2b..64eba14c 100644 --- a/repositories/geospatial/geospatial_s3_repository.py +++ b/repositories/geospatial/geospatial_s3_repository.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Callable from typing import Any, Optional @@ -62,6 +63,30 @@ class GeospatialS3Repository(GeospatialRepository): reference: Optional[SpatialReference] = self.spatial_for(uprn) return reference.coordinates if reference is not None else None + def coordinates_for_uprns( + self, uprns: list[int] + ) -> dict[int, Coordinates]: + """Batch lookup that reads the meta once, groups the UPRNs by their + covering partition, and reads each partition once for all the UPRNs it + covers (co-located UPRNs share a partition, so a cohort is typically one + or two reads). Uncovered / absent UPRNs are omitted from the result.""" + meta = self._read_parquet(_META_KEY) + by_partition: dict[str, list[int]] = defaultdict(list) + for uprn in uprns: + covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)] + if not covering.empty: + by_partition[str(covering["filenames"].iloc[0])].append(uprn) + resolved: dict[int, Coordinates] = {} + for filename, partition_uprns in by_partition.items(): + partition = self._read_parquet(f"spatial/{filename}") + rows = partition[partition["UPRN"].isin(partition_uprns)] + for _, row in rows.iterrows(): + resolved[int(row["UPRN"])] = Coordinates( + longitude=float(row["LONGITUDE"]), + latitude=float(row["LATITUDE"]), + ) + return resolved + def planning_restrictions_for(self, uprn: int) -> Optional[PlanningRestrictions]: reference: Optional[SpatialReference] = self.spatial_for(uprn) return reference.restrictions if reference is not None else None diff --git a/tests/repositories/geospatial/test_geospatial_repository.py b/tests/repositories/geospatial/test_geospatial_repository.py index a85bb468..2f0b9873 100644 --- a/tests/repositories/geospatial/test_geospatial_repository.py +++ b/tests/repositories/geospatial/test_geospatial_repository.py @@ -131,3 +131,60 @@ def test_spatial_for_returns_none_when_uprn_absent(tmp_path: Path) -> None: # Act / Assert assert repo.spatial_for(99999) is None + + +def _write_two_partition_open_uprn(base: Path) -> None: + """Two UPRN-range partitions, so the batch lookup must span both.""" + spatial = base / "spatial" + spatial.mkdir(parents=True, exist_ok=True) + pd.DataFrame( + { + "lower": [0, 100001], + "upper": [100000, 200000], + "filenames": ["0_100000.parquet", "100001_200000.parquet"], + } + ).to_parquet(spatial / "filename_meta.parquet") + pd.DataFrame( + {"UPRN": [10, 11], "LATITUDE": [51.0, 51.1], "LONGITUDE": [-1.0, -1.1]} + ).to_parquet(spatial / "0_100000.parquet") + pd.DataFrame( + {"UPRN": [150000], "LATITUDE": [52.0], "LONGITUDE": [-2.0]} + ).to_parquet(spatial / "100001_200000.parquet") + + +def test_coordinates_for_uprns_resolves_a_batch_across_partitions( + tmp_path: Path, +) -> None: + # Arrange — UPRNs spanning two partitions, plus one absent and one off-scale. + _write_two_partition_open_uprn(tmp_path) + repo = GeospatialS3Repository(_reader(tmp_path)) + + # Act + resolved = repo.coordinates_for_uprns([10, 11, 150000, 99999, 500000]) + + # Assert — present UPRNs resolved; absent (99999) and uncovered (500000) omitted. + assert resolved == { + 10: Coordinates(longitude=-1.0, latitude=51.0), + 11: Coordinates(longitude=-1.1, latitude=51.1), + 150000: Coordinates(longitude=-2.0, latitude=52.0), + } + + +def test_coordinates_for_uprns_reads_each_partition_once(tmp_path: Path) -> None: + # Arrange — count reads so co-located UPRNs don't re-read their partition. + _write_two_partition_open_uprn(tmp_path) + reads: list[str] = [] + + def counting_reader(key: str) -> pd.DataFrame: + reads.append(key) + return pd.read_parquet(tmp_path / key) + + repo = GeospatialS3Repository(counting_reader) + + # Act — two UPRNs share partition 0; one is in partition 1. + repo.coordinates_for_uprns([10, 11, 150000]) + + # Assert — the meta once + each of the two partitions once (3 reads, not 4). + assert reads.count("spatial/0_100000.parquet") == 1 + assert reads.count("spatial/100001_200000.parquet") == 1 + assert reads.count("spatial/filename_meta.parquet") == 1