mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
65 lines
2.6 KiB
Python
65 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import cast
|
|
|
|
from sqlalchemy import Table
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlmodel import Session
|
|
|
|
from infrastructure.postgres.property_override_table import PropertyOverrideRow
|
|
from repositories.property.property_override_repository import (
|
|
PropertyOverrideInsert,
|
|
PropertyOverrideRepository,
|
|
)
|
|
|
|
|
|
class PropertyOverridePostgresRepository(PropertyOverrideRepository):
|
|
"""Postgres adapter for ``property_overrides`` (ADR-0006).
|
|
|
|
Write-only: the finaliser materialises the fact layer; no reads here.
|
|
"""
|
|
|
|
def __init__(self, session: Session) -> None:
|
|
self._session = session
|
|
# ``__table__`` is injected at runtime on table=True classes but the
|
|
# stubs don't expose it; pin to ``Table`` so the dialect insert is typed.
|
|
self._table: Table = cast(Table, getattr(PropertyOverrideRow, "__table__"))
|
|
|
|
def upsert_all(self, rows: list[PropertyOverrideInsert]) -> int:
|
|
if not rows:
|
|
return 0
|
|
|
|
now = datetime.now(timezone.utc)
|
|
values = [
|
|
{
|
|
"property_id": r.property_id,
|
|
"portfolio_id": r.portfolio_id,
|
|
"building_part": r.building_part,
|
|
"override_component": r.override_component,
|
|
"override_value": r.override_value,
|
|
"original_spreadsheet_description": r.original_spreadsheet_description,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
stmt = pg_insert(self._table).values(values)
|
|
# Re-run = recalculate (ADR-0005/0006): refresh the snapshot on conflict,
|
|
# in contrast to ``property``'s on_conflict_do_nothing. When a per-property
|
|
# user-edit path lands (and a ``source`` column with it), this set_ gains a
|
|
# ``WHERE source='classifier'`` guard so hand-edits survive.
|
|
stmt = stmt.on_conflict_do_update(
|
|
index_elements=["property_id", "override_component", "building_part"],
|
|
set_={
|
|
"override_value": stmt.excluded.override_value,
|
|
"original_spreadsheet_description": stmt.excluded.original_spreadsheet_description,
|
|
"updated_at": now,
|
|
},
|
|
)
|
|
|
|
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
|
|
# deprecated in the stubs but the upsert path is supported.
|
|
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
|
|
return cast(int, result.rowcount)
|