feat(spatial): per-UPRN cache repo for the OS spatial reference

Slice 3c.2. The OS Open-UPRN reference set is too large to host in Postgres, so
it lives in S3 and is cached per-UPRN in the existing `property_details_spatial`
table (ADR-0020). `PropertyDetailsSpatialRow` mirrors that table (uprn unique);
`SpatialRepository` / `SpatialPostgresRepository` upsert one shared row per UPRN
and read the planning protections back by UPRN (a null flag reads as
unrestricted; absent UPRNs are omitted so the caller defaults them).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-04 17:15:51 +00:00
parent 9be95a0d3b
commit a1c60d2fba
6 changed files with 177 additions and 0 deletions

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from typing import ClassVar, Optional
from sqlmodel import Field, SQLModel
class PropertyDetailsSpatialRow(SQLModel, table=True):
"""Per-UPRN cache of the Ordnance Survey spatial reference data.
The OS Open-UPRN set is tens of millions of rows too large for Postgres
so Ingestion resolves it from S3 and writes the row it used here, keyed by
UPRN (one shared row per UPRN, not per Property). The front-end reads the
planning flags off this table to show why a Property did or did not get a
given measure; Modelling hydrates them onto the Property (ADR-0020). Coords
are retained for parity with the legacy ``property_details_spatial`` shape.
"""
__tablename__: ClassVar[str] = "property_details_spatial" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
uprn: int = Field(index=True, unique=True)
x_coordinate: Optional[float] = Field(default=None)
y_coordinate: Optional[float] = Field(default=None)
latitude: Optional[float] = Field(default=None)
longitude: Optional[float] = Field(default=None)
conservation_status: Optional[bool] = Field(default=None)
is_listed_building: Optional[bool] = Field(default=None)
is_heritage_building: Optional[bool] = Field(default=None)

View file

View file

@ -0,0 +1,50 @@
from __future__ import annotations
from sqlmodel import Session, col, select
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from infrastructure.postgres.property_details_spatial_table import (
PropertyDetailsSpatialRow,
)
from repositories.spatial.spatial_repository import SpatialRepository
class SpatialPostgresRepository(SpatialRepository):
def __init__(self, session: Session) -> None:
self._session = session
def save(self, uprn: int, reference: SpatialReference) -> None:
existing: PropertyDetailsSpatialRow | None = self._session.exec(
select(PropertyDetailsSpatialRow).where(
PropertyDetailsSpatialRow.uprn == uprn
)
).first()
row = existing if existing is not None else PropertyDetailsSpatialRow(uprn=uprn)
coordinates: Coordinates | None = reference.coordinates
row.latitude = coordinates.latitude if coordinates is not None else None
row.longitude = coordinates.longitude if coordinates is not None else None
row.conservation_status = reference.restrictions.in_conservation_area
row.is_listed_building = reference.restrictions.is_listed
row.is_heritage_building = reference.restrictions.is_heritage
self._session.add(row)
def get_for_uprns(self, uprns: list[int]) -> dict[int, PlanningRestrictions]:
if not uprns:
return {}
rows = self._session.exec(
select(PropertyDetailsSpatialRow).where(
col(PropertyDetailsSpatialRow.uprn).in_(uprns)
)
).all()
return {row.uprn: _restrictions_from(row) for row in rows}
def _restrictions_from(row: PropertyDetailsSpatialRow) -> PlanningRestrictions:
"""A cached row's planning protections; a null flag reads as unrestricted."""
return PlanningRestrictions(
in_conservation_area=bool(row.conservation_status),
is_listed=bool(row.is_listed_building),
is_heritage=bool(row.is_heritage_building),
)

View file

@ -0,0 +1,27 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
class SpatialRepository(ABC):
"""Caches the OS spatial reference data (coordinates + planning flags) by
UPRN a per-UPRN write-through cache of the S3 reference lookup (ADR-0020).
Written by Ingestion, read by Modelling (which hydrates the planning
protections onto the Property). One shared row per UPRN; ``save`` upserts.
"""
@abstractmethod
def save(self, uprn: int, reference: SpatialReference) -> None: ...
@abstractmethod
def get_for_uprns(
self, uprns: list[int]
) -> dict[int, PlanningRestrictions]:
"""The planning protections for each covered UPRN, keyed by UPRN.
UPRNs with no cached row are omitted (the caller defaults them to
unrestricted)."""
...

View file

View file

@ -0,0 +1,71 @@
"""SpatialRepo caches the OS spatial reference (coords + planning flags) by UPRN.
The OS Open-UPRN reference set is too large to host in Postgres, so Ingestion
resolves it from S3 and writes a per-UPRN cache row here; Modelling reads the
planning protections back off it (ADR-0020). A real ephemeral Postgres exercises
the upsert-by-UPRN semantics (one shared row per UPRN).
"""
from __future__ import annotations
from sqlalchemy import Engine
from sqlmodel import Session
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from repositories.spatial.spatial_postgres_repository import SpatialPostgresRepository
def test_planning_restrictions_round_trip_by_uprn(db_engine: Engine) -> None:
# Arrange
reference = SpatialReference(
coordinates=Coordinates(longitude=-0.1278, latitude=51.5074),
restrictions=PlanningRestrictions(
in_conservation_area=True, is_listed=False, is_heritage=False
),
)
# Act
with Session(db_engine) as session:
SpatialPostgresRepository(session).save(uprn=12345, reference=reference)
session.commit()
with Session(db_engine) as session:
reloaded = SpatialPostgresRepository(session).get_for_uprns([12345])
# Assert
assert reloaded == {12345: reference.restrictions}
def test_save_upserts_the_shared_uprn_row(db_engine: Engine) -> None:
# Arrange — the same UPRN re-ingested with corrected flags.
unprotected = SpatialReference(
coordinates=Coordinates(longitude=-0.1, latitude=51.5),
restrictions=PlanningRestrictions(),
)
listed = SpatialReference(
coordinates=Coordinates(longitude=-0.1, latitude=51.5),
restrictions=PlanningRestrictions(is_listed=True),
)
# Act
with Session(db_engine) as session:
repo = SpatialPostgresRepository(session)
repo.save(uprn=999, reference=unprotected)
repo.save(uprn=999, reference=listed)
session.commit()
with Session(db_engine) as session:
reloaded = SpatialPostgresRepository(session).get_for_uprns([999])
# Assert — one row per UPRN; the latest write wins.
assert reloaded == {999: PlanningRestrictions(is_listed=True)}
def test_get_for_uprns_omits_uncovered_uprns(db_engine: Engine) -> None:
# Arrange / Act — nothing stored for this UPRN.
with Session(db_engine) as session:
reloaded = SpatialPostgresRepository(session).get_for_uprns([404])
# Assert — absent UPRNs are simply not in the map (caller defaults them to
# unrestricted).
assert reloaded == {}