Model/repositories/property/property_postgres_repository.py

197 lines
8.8 KiB
Python

from __future__ import annotations
from typing import Optional, cast
from sqlalchemy import Table, func
from sqlalchemy import select as sa_select
from sqlalchemy import update as sa_update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.modelling.simulation import EpcSimulation
from domain.property.properties import Properties
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.property_table import PropertyRow
from repositories.epc.epc_repository import EpcRepository
from repositories.property.landlord_override_overlays import overlays_from
from repositories.property.property_overrides_reader import PropertyOverridesReader
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
from repositories.spatial.spatial_repository import SpatialRepository
class PropertyPostgresRepository(PropertyRepository):
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
(via an injected `EpcRepository`) and the planning protections (via an
injected `SpatialRepository`, keyed by UPRN — ADR-0020), so a hydrated
Property is a pure function of repository state. ``epc_repo`` / ``spatial_repo``
are optional: the Finalise write path (``insert_all``) creates new identity
rows and never hydrates, so callers that only insert construct this with a
session alone.
"""
def __init__(
self,
session: Session,
epc_repo: Optional[EpcRepository] = None,
spatial_repo: Optional[SpatialRepository] = None,
overrides_reader: Optional[PropertyOverridesReader] = None,
) -> None:
self._session = session
self._epc_repo = epc_repo
self._spatial_repo = spatial_repo
self._overrides_reader = overrides_reader
# ``__table__`` is injected at runtime on table=True classes but the stubs
# don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def _epc(self) -> EpcRepository:
if self._epc_repo is None:
raise ValueError(
"PropertyPostgresRepository needs an EpcRepository to read; it was "
"constructed for the write-only Finalise path."
)
return self._epc_repo
def _landlord_overrides(self, property_id: int) -> list[EpcSimulation]:
"""The Property's Landlord Overrides as Simulation Overlays — empty when
no reader is wired (the overlay stays off) or the Property has none."""
if self._overrides_reader is None:
return []
return overlays_from(self._overrides_reader.overrides_for(property_id))
def get(self, property_id: int) -> Property:
row = self._session.get(PropertyRow, property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
identity = PropertyIdentity(
# `postcode`/`address` are nullable in the FE schema (the finaliser may
# insert a row with neither); coerce the degenerate null to "" so the
# identity type stays a plain str.
portfolio_id=row.portfolio_id,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
)
restrictions: dict[int, PlanningRestrictions] = self._restrictions_for(
[row.uprn] if row.uprn is not None else []
)
return Property(
identity=identity,
epc=self._epc().get_for_property(property_id),
predicted_epc=self._epc().get_predicted_for_property(property_id),
landlord_overrides=self._landlord_overrides(property_id),
planning_restrictions=_restrictions_of(row.uprn, restrictions),
)
def get_many(self, property_ids: list[int]) -> Properties:
if not property_ids:
return Properties([])
rows = self._session.exec(
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
).all()
row_by_id = {row.id: row for row in rows}
epcs = self._epc().get_for_properties(property_ids)
predicted_epcs = self._epc().get_predicted_for_properties(property_ids)
restrictions: dict[int, PlanningRestrictions] = self._restrictions_for(
[row.uprn for row in rows if row.uprn is not None]
)
items: list[Property] = []
for property_id in property_ids:
row = row_by_id.get(property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
items.append(
Property(
identity=PropertyIdentity(
portfolio_id=row.portfolio_id,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
),
epc=epcs.get(property_id),
predicted_epc=predicted_epcs.get(property_id),
landlord_overrides=self._landlord_overrides(property_id),
planning_restrictions=_restrictions_of(row.uprn, restrictions),
)
)
return Properties(items)
def _restrictions_for(self, uprns: list[int]) -> dict[int, PlanningRestrictions]:
# No spatial repo (the write-only Finalise path) → no cached protections;
# `_restrictions_of` then defaults every UPRN to unrestricted.
if not uprns or self._spatial_repo is None:
return {}
return self._spatial_repo.get_for_uprns(uprns)
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
# The old engine set `has_recommendations` per Property; we mirror it and
# bump `updated_at` (DB clock) so a new-process run is datable against the
# 2026-06-01 cutoff. Does not commit — the Unit of Work owns the txn.
stmt = (
sa_update(self._table)
.where(self._table.c.id == property_id)
.values(has_recommendations=has_recommendations, updated_at=func.now())
)
self._session.execute(stmt) # pyright: ignore[reportDeprecated]
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
# properties untouched (contrast property_overrides, which recalculates).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs, and they resolve the INSERT to a bare
# ``Result`` (no ``rowcount``) — both are stub limitations, not real.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]:
if not uprns:
return {}
stmt = sa_select(self._table.c.uprn, self._table.c.id).where(
self._table.c.portfolio_id == portfolio_id,
self._table.c.uprn.in_(uprns),
)
rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated]
return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None}
def _restrictions_of(
uprn: Optional[int], by_uprn: dict[int, PlanningRestrictions]
) -> PlanningRestrictions:
"""The cached protections for a UPRN, defaulting to unrestricted when the
UPRN is absent or uncached (per legacy `empty_spatial_df`; ADR-0020)."""
if uprn is None:
return PlanningRestrictions()
return by_uprn.get(uprn, PlanningRestrictions())