Model/repositories/geospatial/geospatial_s3_repository.py
Khalim Conn-Kowlessar 95719dd587 feat(geospatial): batch coordinates_for_uprns lookup (#1227)
Adds GeospatialRepository.coordinates_for_uprns(uprns) -> dict — a batch
coordinate lookup returning only covered UPRNs. The S3 adapter overrides it
to read the meta once, group UPRNs by their covering partition, and read each
partition once for all the UPRNs it covers; co-located (closely-numbered)
UPRNs share a partition, so an EPC Prediction cohort is typically one or two
reads instead of one per neighbour. Default port impl is a per-UPRN loop.

Feeds the EPC Prediction geo-proximity work: a cohort's UPRNs resolve to
coordinates in a couple of reads (validated at corpus scale: 170 partition
reads for 2683 UPRNs).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 14:35:32 +00:00

92 lines
3.9 KiB
Python

from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable
from typing import Any, Optional
import pandas as pd
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from repositories.geospatial.geospatial_repository import GeospatialRepository
ParquetReader = Callable[[str], pd.DataFrame]
_META_KEY = "spatial/filename_meta.parquet"
class GeospatialS3Repository(GeospatialRepository):
"""Reads the partitioned Ordnance Survey Open-UPRN parquet dataset.
`spatial/filename_meta.parquet` maps a UPRN range (lower/upper) to a
partition file; that partition carries `UPRN`/`LATITUDE`/`LONGITUDE`. The
parquet reader is injected so the dataset can be sourced from S3 in
production or a fixture directory in tests — the Repo holds no S3/HTTP code.
"""
def __init__(self, read_parquet: ParquetReader) -> None:
self._read_parquet = read_parquet
def _row_for(self, uprn: int) -> Optional["pd.Series[Any]"]:
"""The Open-UPRN partition row for ``uprn`` (coordinates + co-located
planning flags), or None when no partition covers it / it is absent."""
meta = self._read_parquet(_META_KEY)
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
if covering.empty:
return None
filename = str(covering["filenames"].iloc[0])
partition = self._read_parquet(f"spatial/{filename}")
rows = partition[partition["UPRN"] == uprn]
if rows.empty:
return None
return rows.iloc[0]
def spatial_for(self, uprn: int) -> Optional[SpatialReference]:
row = self._row_for(uprn)
if row is None:
return None
return SpatialReference(
coordinates=Coordinates(
longitude=float(row["LONGITUDE"]),
latitude=float(row["LATITUDE"]),
),
restrictions=PlanningRestrictions(
in_conservation_area=bool(row["conservation_status"]),
is_listed=bool(row["is_listed_building"]),
is_heritage=bool(row["is_heritage_building"]),
),
)
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
reference: Optional[SpatialReference] = self.spatial_for(uprn)
return reference.coordinates if reference is not None else None
def coordinates_for_uprns(
self, uprns: list[int]
) -> dict[int, Coordinates]:
"""Batch lookup that reads the meta once, groups the UPRNs by their
covering partition, and reads each partition once for all the UPRNs it
covers (co-located UPRNs share a partition, so a cohort is typically one
or two reads). Uncovered / absent UPRNs are omitted from the result."""
meta = self._read_parquet(_META_KEY)
by_partition: dict[str, list[int]] = defaultdict(list)
for uprn in uprns:
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
if not covering.empty:
by_partition[str(covering["filenames"].iloc[0])].append(uprn)
resolved: dict[int, Coordinates] = {}
for filename, partition_uprns in by_partition.items():
partition = self._read_parquet(f"spatial/{filename}")
rows = partition[partition["UPRN"].isin(partition_uprns)]
for _, row in rows.iterrows():
resolved[int(row["UPRN"])] = Coordinates(
longitude=float(row["LONGITUDE"]),
latitude=float(row["LATITUDE"]),
)
return resolved
def planning_restrictions_for(self, uprn: int) -> Optional[PlanningRestrictions]:
reference: Optional[SpatialReference] = self.spatial_for(uprn)
return reference.restrictions if reference is not None else None