Model/repositories/property/property_override_postgres_repository.py
2026-06-05 12:18:13 +00:00

65 lines
2.6 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from infrastructure.postgres.property_override_table import PropertyOverrideRow
from repositories.property.property_override_repository import (
PropertyOverrideInsert,
PropertyOverrideRepository,
)
class PropertyOverridePostgresRepository(PropertyOverrideRepository):
"""Postgres adapter for ``property_overrides`` (ADR-0006).
Write-only: the finaliser materialises the fact layer; no reads here.
"""
def __init__(self, session: Session) -> None:
self._session = session
# ``__table__`` is injected at runtime on table=True classes but the
# stubs don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyOverrideRow, "__table__"))
def upsert_all(self, rows: list[PropertyOverrideInsert]) -> int:
if not rows:
return 0
now = datetime.now(timezone.utc)
values = [
{
"property_id": r.property_id,
"portfolio_id": r.portfolio_id,
"building_part": r.building_part,
"override_component": r.override_component,
"override_value": r.override_value,
"original_spreadsheet_description": r.original_spreadsheet_description,
"created_at": now,
"updated_at": now,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Re-run = recalculate (ADR-0005/0006): refresh the snapshot on conflict,
# in contrast to ``property``'s on_conflict_do_nothing. When a per-property
# user-edit path lands (and a ``source`` column with it), this set_ gains a
# ``WHERE source='classifier'`` guard so hand-edits survive.
stmt = stmt.on_conflict_do_update(
index_elements=["property_id", "override_component", "building_part"],
set_={
"override_value": stmt.excluded.override_value,
"original_spreadsheet_description": stmt.excluded.original_spreadsheet_description,
"updated_at": now,
},
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the upsert path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)