Model/repositories/property/property_postgres_repository.py
2026-06-05 12:18:13 +00:00

134 lines
5.4 KiB
Python

from __future__ import annotations
from typing import Any, Optional, cast
from sqlalchemy import Table
from sqlalchemy import select as sa_select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
from domain.property.properties import Properties
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.property_table import PropertyRow
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
class PropertyPostgresRepository(PropertyRepository):
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
(via an injected `EpcRepository`), so a hydrated Property is a pure function of
repository state. ``epc_repo`` is optional: the Finalise write path
(``insert_all``) creates new identity rows and never hydrates, so callers that
only insert construct this with a session alone.
"""
def __init__(
self, session: Session, epc_repo: Optional[EpcRepository] = None
) -> None:
self._session = session
self._epc_repo = epc_repo
# ``__table__`` is injected at runtime on table=True classes but the stubs
# don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def _epc(self) -> EpcRepository:
if self._epc_repo is None:
raise ValueError(
"PropertyPostgresRepository needs an EpcRepository to read; it was "
"constructed for the write-only Finalise path."
)
return self._epc_repo
def get(self, property_id: int) -> Property:
row = self._session.get(PropertyRow, property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
identity = PropertyIdentity(
# `postcode`/`address` are nullable in the FE schema (the finaliser may
# insert a row with neither); coerce the degenerate null to "" so the
# identity type stays a plain str.
portfolio_id=row.portfolio_id,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
)
return Property(
identity=identity,
epc=self._epc().get_for_property(property_id),
)
def get_many(self, property_ids: list[int]) -> Properties:
if not property_ids:
return Properties([])
rows = self._session.exec(
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
).all()
row_by_id = {row.id: row for row in rows}
epcs = self._epc().get_for_properties(property_ids)
items: list[Property] = []
for property_id in property_ids:
row = row_by_id.get(property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
items.append(
Property(
identity=PropertyIdentity(
portfolio_id=row.portfolio_id,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
),
epc=epcs.get(property_id),
)
)
return Properties(items)
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
# properties untouched (contrast property_overrides, which recalculates).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the INSERT path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)
def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]:
if not uprns:
return {}
stmt = sa_select(self._table.c.uprn, self._table.c.id).where(
self._table.c.portfolio_id == portfolio_id,
self._table.c.uprn.in_(uprns),
)
rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated]
return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None}