from __future__ import annotations from typing import Any, Optional, cast from sqlalchemy import Table from sqlalchemy import select as sa_select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col, select from domain.property.properties import Properties from domain.property.property import Property, PropertyIdentity from infrastructure.postgres.property_table import PropertyRow from repositories.epc.epc_repository import EpcRepository from repositories.property.property_repository import ( PropertyIdentityInsert, PropertyRepository, ) class PropertyPostgresRepository(PropertyRepository): """Postgres adapter for the ``property`` table — reads and writes (ADR-0003). Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice (via an injected `EpcRepository`), so a hydrated Property is a pure function of repository state. ``epc_repo`` is optional: the Finalise write path (``insert_all``) creates new identity rows and never hydrates, so callers that only insert construct this with a session alone. """ def __init__( self, session: Session, epc_repo: Optional[EpcRepository] = None ) -> None: self._session = session self._epc_repo = epc_repo # ``__table__`` is injected at runtime on table=True classes but the stubs # don't expose it; pin to ``Table`` so the dialect insert is typed. self._table: Table = cast(Table, getattr(PropertyRow, "__table__")) def _epc(self) -> EpcRepository: if self._epc_repo is None: raise ValueError( "PropertyPostgresRepository needs an EpcRepository to read; it was " "constructed for the write-only Finalise path." ) return self._epc_repo def get(self, property_id: int) -> Property: row = self._session.get(PropertyRow, property_id) if row is None: raise ValueError(f"property {property_id} not found") identity = PropertyIdentity( # `postcode`/`address` are nullable in the FE schema (the finaliser may # insert a row with neither); coerce the degenerate null to "" so the # identity type stays a plain str. portfolio_id=row.portfolio_id, postcode=row.postcode or "", address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ) return Property( identity=identity, epc=self._epc().get_for_property(property_id), ) def get_many(self, property_ids: list[int]) -> Properties: if not property_ids: return Properties([]) rows = self._session.exec( select(PropertyRow).where(col(PropertyRow.id).in_(property_ids)) ).all() row_by_id = {row.id: row for row in rows} epcs = self._epc().get_for_properties(property_ids) items: list[Property] = [] for property_id in property_ids: row = row_by_id.get(property_id) if row is None: raise ValueError(f"property {property_id} not found") items.append( Property( identity=PropertyIdentity( portfolio_id=row.portfolio_id, postcode=row.postcode or "", address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ), epc=epcs.get(property_id), ) ) return Properties(items) def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: if not rows: return 0 values = [ { "portfolio_id": r.portfolio_id, "creation_status": r.creation_status, "uprn": r.uprn, "landlord_property_id": r.landlord_property_id, "address": r.address, "postcode": r.postcode, "user_inputted_address": r.user_inputted_address, "user_inputted_postcode": r.user_inputted_postcode, "lexiscore": r.lexiscore, } for r in rows ] stmt = pg_insert(self._table).values(values) # Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL), # reproducing today's Next.js onConflictDoNothing — a re-run leaves existing # properties untouched (contrast property_overrides, which recalculates). stmt = stmt.on_conflict_do_nothing( index_elements=["portfolio_id", "uprn"], index_where=self._table.c.uprn.isnot(None), ) # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked # deprecated in the stubs but the INSERT path is supported. result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] return cast(int, result.rowcount) def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]: if not uprns: return {} stmt = sa_select(self._table.c.uprn, self._table.c.id).where( self._table.c.portfolio_id == portfolio_id, self._table.c.uprn.in_(uprns), ) rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated] return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None}