from __future__ import annotations from sqlmodel import Session, col, select from domain.geospatial.coordinates import Coordinates from domain.geospatial.planning_restrictions import PlanningRestrictions from domain.geospatial.spatial_reference import SpatialReference from infrastructure.postgres.property_details_spatial_table import ( PropertyDetailsSpatialRow, ) from repositories.spatial.spatial_repository import SpatialRepository class SpatialPostgresRepository(SpatialRepository): def __init__(self, session: Session) -> None: self._session = session def save(self, uprn: int, reference: SpatialReference) -> None: existing: PropertyDetailsSpatialRow | None = self._session.exec( select(PropertyDetailsSpatialRow).where( PropertyDetailsSpatialRow.uprn == uprn ) ).first() row = existing if existing is not None else PropertyDetailsSpatialRow(uprn=uprn) coordinates: Coordinates | None = reference.coordinates row.latitude = coordinates.latitude if coordinates is not None else None row.longitude = coordinates.longitude if coordinates is not None else None row.conservation_status = reference.restrictions.in_conservation_area row.is_listed_building = reference.restrictions.is_listed row.is_heritage_building = reference.restrictions.is_heritage self._session.add(row) def get_for_uprns(self, uprns: list[int]) -> dict[int, PlanningRestrictions]: if not uprns: return {} rows = self._session.exec( select(PropertyDetailsSpatialRow).where( col(PropertyDetailsSpatialRow.uprn).in_(uprns) ) ).all() return {row.uprn: _restrictions_from(row) for row in rows} def _restrictions_from(row: PropertyDetailsSpatialRow) -> PlanningRestrictions: """A cached row's planning protections; a null flag reads as unrestricted.""" return PlanningRestrictions( in_conservation_area=bool(row.conservation_status), is_listed=bool(row.is_listed_building), is_heritage=bool(row.is_heritage_building), )