mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Adds GeospatialRepository.coordinates_for_uprns(uprns) -> dict — a batch coordinate lookup returning only covered UPRNs. The S3 adapter overrides it to read the meta once, group UPRNs by their covering partition, and read each partition once for all the UPRNs it covers; co-located (closely-numbered) UPRNs share a partition, so an EPC Prediction cohort is typically one or two reads instead of one per neighbour. Default port impl is a per-UPRN loop. Feeds the EPC Prediction geo-proximity work: a cohort's UPRNs resolve to coordinates in a couple of reads (validated at corpus scale: 170 partition reads for 2683 UPRNs). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
92 lines
3.9 KiB
Python
92 lines
3.9 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from collections.abc import Callable
|
|
from typing import Any, Optional
|
|
|
|
import pandas as pd
|
|
|
|
from domain.geospatial.coordinates import Coordinates
|
|
from domain.geospatial.planning_restrictions import PlanningRestrictions
|
|
from domain.geospatial.spatial_reference import SpatialReference
|
|
from repositories.geospatial.geospatial_repository import GeospatialRepository
|
|
|
|
ParquetReader = Callable[[str], pd.DataFrame]
|
|
|
|
_META_KEY = "spatial/filename_meta.parquet"
|
|
|
|
|
|
class GeospatialS3Repository(GeospatialRepository):
|
|
"""Reads the partitioned Ordnance Survey Open-UPRN parquet dataset.
|
|
|
|
`spatial/filename_meta.parquet` maps a UPRN range (lower/upper) to a
|
|
partition file; that partition carries `UPRN`/`LATITUDE`/`LONGITUDE`. The
|
|
parquet reader is injected so the dataset can be sourced from S3 in
|
|
production or a fixture directory in tests — the Repo holds no S3/HTTP code.
|
|
"""
|
|
|
|
def __init__(self, read_parquet: ParquetReader) -> None:
|
|
self._read_parquet = read_parquet
|
|
|
|
def _row_for(self, uprn: int) -> Optional["pd.Series[Any]"]:
|
|
"""The Open-UPRN partition row for ``uprn`` (coordinates + co-located
|
|
planning flags), or None when no partition covers it / it is absent."""
|
|
meta = self._read_parquet(_META_KEY)
|
|
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
|
|
if covering.empty:
|
|
return None
|
|
filename = str(covering["filenames"].iloc[0])
|
|
|
|
partition = self._read_parquet(f"spatial/{filename}")
|
|
rows = partition[partition["UPRN"] == uprn]
|
|
if rows.empty:
|
|
return None
|
|
return rows.iloc[0]
|
|
|
|
def spatial_for(self, uprn: int) -> Optional[SpatialReference]:
|
|
row = self._row_for(uprn)
|
|
if row is None:
|
|
return None
|
|
return SpatialReference(
|
|
coordinates=Coordinates(
|
|
longitude=float(row["LONGITUDE"]),
|
|
latitude=float(row["LATITUDE"]),
|
|
),
|
|
restrictions=PlanningRestrictions(
|
|
in_conservation_area=bool(row["conservation_status"]),
|
|
is_listed=bool(row["is_listed_building"]),
|
|
is_heritage=bool(row["is_heritage_building"]),
|
|
),
|
|
)
|
|
|
|
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
|
|
reference: Optional[SpatialReference] = self.spatial_for(uprn)
|
|
return reference.coordinates if reference is not None else None
|
|
|
|
def coordinates_for_uprns(
|
|
self, uprns: list[int]
|
|
) -> dict[int, Coordinates]:
|
|
"""Batch lookup that reads the meta once, groups the UPRNs by their
|
|
covering partition, and reads each partition once for all the UPRNs it
|
|
covers (co-located UPRNs share a partition, so a cohort is typically one
|
|
or two reads). Uncovered / absent UPRNs are omitted from the result."""
|
|
meta = self._read_parquet(_META_KEY)
|
|
by_partition: dict[str, list[int]] = defaultdict(list)
|
|
for uprn in uprns:
|
|
covering = meta[(meta["lower"] <= uprn) & (meta["upper"] >= uprn)]
|
|
if not covering.empty:
|
|
by_partition[str(covering["filenames"].iloc[0])].append(uprn)
|
|
resolved: dict[int, Coordinates] = {}
|
|
for filename, partition_uprns in by_partition.items():
|
|
partition = self._read_parquet(f"spatial/{filename}")
|
|
rows = partition[partition["UPRN"].isin(partition_uprns)]
|
|
for _, row in rows.iterrows():
|
|
resolved[int(row["UPRN"])] = Coordinates(
|
|
longitude=float(row["LONGITUDE"]),
|
|
latitude=float(row["LATITUDE"]),
|
|
)
|
|
return resolved
|
|
|
|
def planning_restrictions_for(self, uprn: int) -> Optional[PlanningRestrictions]:
|
|
reference: Optional[SpatialReference] = self.spatial_for(uprn)
|
|
return reference.restrictions if reference is not None else None
|