mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
134 lines
5.4 KiB
Python
134 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Optional, cast
|
|
|
|
from sqlalchemy import Table
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlmodel import Session, col, select
|
|
|
|
from domain.property.properties import Properties
|
|
from domain.property.property import Property, PropertyIdentity
|
|
from infrastructure.postgres.property_table import PropertyRow
|
|
from repositories.epc.epc_repository import EpcRepository
|
|
from repositories.property.property_repository import (
|
|
PropertyIdentityInsert,
|
|
PropertyRepository,
|
|
)
|
|
|
|
|
|
class PropertyPostgresRepository(PropertyRepository):
|
|
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
|
|
|
|
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
|
|
(via an injected `EpcRepository`), so a hydrated Property is a pure function of
|
|
repository state. ``epc_repo`` is optional: the Finalise write path
|
|
(``insert_all``) creates new identity rows and never hydrates, so callers that
|
|
only insert construct this with a session alone.
|
|
"""
|
|
|
|
def __init__(
|
|
self, session: Session, epc_repo: Optional[EpcRepository] = None
|
|
) -> None:
|
|
self._session = session
|
|
self._epc_repo = epc_repo
|
|
# ``__table__`` is injected at runtime on table=True classes but the stubs
|
|
# don't expose it; pin to ``Table`` so the dialect insert is typed.
|
|
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
|
|
|
|
def _epc(self) -> EpcRepository:
|
|
if self._epc_repo is None:
|
|
raise ValueError(
|
|
"PropertyPostgresRepository needs an EpcRepository to read; it was "
|
|
"constructed for the write-only Finalise path."
|
|
)
|
|
return self._epc_repo
|
|
|
|
def get(self, property_id: int) -> Property:
|
|
row = self._session.get(PropertyRow, property_id)
|
|
if row is None:
|
|
raise ValueError(f"property {property_id} not found")
|
|
identity = PropertyIdentity(
|
|
# `postcode`/`address` are nullable in the FE schema (the finaliser may
|
|
# insert a row with neither); coerce the degenerate null to "" so the
|
|
# identity type stays a plain str.
|
|
portfolio_id=row.portfolio_id,
|
|
postcode=row.postcode or "",
|
|
address=row.address or "",
|
|
uprn=row.uprn,
|
|
landlord_property_id=row.landlord_property_id,
|
|
)
|
|
return Property(
|
|
identity=identity,
|
|
epc=self._epc().get_for_property(property_id),
|
|
)
|
|
|
|
def get_many(self, property_ids: list[int]) -> Properties:
|
|
if not property_ids:
|
|
return Properties([])
|
|
rows = self._session.exec(
|
|
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
|
|
).all()
|
|
row_by_id = {row.id: row for row in rows}
|
|
epcs = self._epc().get_for_properties(property_ids)
|
|
items: list[Property] = []
|
|
for property_id in property_ids:
|
|
row = row_by_id.get(property_id)
|
|
if row is None:
|
|
raise ValueError(f"property {property_id} not found")
|
|
items.append(
|
|
Property(
|
|
identity=PropertyIdentity(
|
|
portfolio_id=row.portfolio_id,
|
|
postcode=row.postcode or "",
|
|
address=row.address or "",
|
|
uprn=row.uprn,
|
|
landlord_property_id=row.landlord_property_id,
|
|
),
|
|
epc=epcs.get(property_id),
|
|
)
|
|
)
|
|
return Properties(items)
|
|
|
|
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
|
|
if not rows:
|
|
return 0
|
|
|
|
values = [
|
|
{
|
|
"portfolio_id": r.portfolio_id,
|
|
"creation_status": r.creation_status,
|
|
"uprn": r.uprn,
|
|
"landlord_property_id": r.landlord_property_id,
|
|
"address": r.address,
|
|
"postcode": r.postcode,
|
|
"user_inputted_address": r.user_inputted_address,
|
|
"user_inputted_postcode": r.user_inputted_postcode,
|
|
"lexiscore": r.lexiscore,
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
stmt = pg_insert(self._table).values(values)
|
|
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
|
|
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
|
|
# properties untouched (contrast property_overrides, which recalculates).
|
|
stmt = stmt.on_conflict_do_nothing(
|
|
index_elements=["portfolio_id", "uprn"],
|
|
index_where=self._table.c.uprn.isnot(None),
|
|
)
|
|
|
|
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
|
|
# deprecated in the stubs but the INSERT path is supported.
|
|
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
|
|
return cast(int, result.rowcount)
|
|
|
|
def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]:
|
|
if not uprns:
|
|
return {}
|
|
stmt = sa_select(self._table.c.uprn, self._table.c.id).where(
|
|
self._table.c.portfolio_id == portfolio_id,
|
|
self._table.c.uprn.in_(uprns),
|
|
)
|
|
rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated]
|
|
return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None}
|