mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
197 lines
8.8 KiB
Python
197 lines
8.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Optional, cast
|
|
|
|
from sqlalchemy import Table, func
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy import update as sa_update
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlmodel import Session, col, select
|
|
|
|
from domain.geospatial.planning_restrictions import PlanningRestrictions
|
|
from domain.modelling.simulation import EpcSimulation
|
|
from domain.property.properties import Properties
|
|
from domain.property.property import Property, PropertyIdentity
|
|
from infrastructure.postgres.property_table import PropertyRow
|
|
from repositories.epc.epc_repository import EpcRepository
|
|
from repositories.property.landlord_override_overlays import overlays_from
|
|
from repositories.property.property_overrides_reader import PropertyOverridesReader
|
|
from repositories.property.property_repository import (
|
|
PropertyIdentityInsert,
|
|
PropertyRepository,
|
|
)
|
|
from repositories.spatial.spatial_repository import SpatialRepository
|
|
|
|
|
|
class PropertyPostgresRepository(PropertyRepository):
|
|
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
|
|
|
|
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
|
|
(via an injected `EpcRepository`) and the planning protections (via an
|
|
injected `SpatialRepository`, keyed by UPRN — ADR-0020), so a hydrated
|
|
Property is a pure function of repository state. ``epc_repo`` / ``spatial_repo``
|
|
are optional: the Finalise write path (``insert_all``) creates new identity
|
|
rows and never hydrates, so callers that only insert construct this with a
|
|
session alone.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
session: Session,
|
|
epc_repo: Optional[EpcRepository] = None,
|
|
spatial_repo: Optional[SpatialRepository] = None,
|
|
overrides_reader: Optional[PropertyOverridesReader] = None,
|
|
) -> None:
|
|
self._session = session
|
|
self._epc_repo = epc_repo
|
|
self._spatial_repo = spatial_repo
|
|
self._overrides_reader = overrides_reader
|
|
# ``__table__`` is injected at runtime on table=True classes but the stubs
|
|
# don't expose it; pin to ``Table`` so the dialect insert is typed.
|
|
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
|
|
|
|
def _epc(self) -> EpcRepository:
|
|
if self._epc_repo is None:
|
|
raise ValueError(
|
|
"PropertyPostgresRepository needs an EpcRepository to read; it was "
|
|
"constructed for the write-only Finalise path."
|
|
)
|
|
return self._epc_repo
|
|
|
|
def _landlord_overrides(self, property_id: int) -> list[EpcSimulation]:
|
|
"""The Property's Landlord Overrides as Simulation Overlays — empty when
|
|
no reader is wired (the overlay stays off) or the Property has none."""
|
|
if self._overrides_reader is None:
|
|
return []
|
|
return overlays_from(self._overrides_reader.overrides_for(property_id))
|
|
|
|
def get(self, property_id: int) -> Property:
|
|
row = self._session.get(PropertyRow, property_id)
|
|
if row is None:
|
|
raise ValueError(f"property {property_id} not found")
|
|
identity = PropertyIdentity(
|
|
# `postcode`/`address` are nullable in the FE schema (the finaliser may
|
|
# insert a row with neither); coerce the degenerate null to "" so the
|
|
# identity type stays a plain str.
|
|
portfolio_id=row.portfolio_id,
|
|
postcode=row.postcode or "",
|
|
address=row.address or "",
|
|
uprn=row.uprn,
|
|
landlord_property_id=row.landlord_property_id,
|
|
)
|
|
restrictions: dict[int, PlanningRestrictions] = self._restrictions_for(
|
|
[row.uprn] if row.uprn is not None else []
|
|
)
|
|
return Property(
|
|
identity=identity,
|
|
epc=self._epc().get_for_property(property_id),
|
|
predicted_epc=self._epc().get_predicted_for_property(property_id),
|
|
landlord_overrides=self._landlord_overrides(property_id),
|
|
planning_restrictions=_restrictions_of(row.uprn, restrictions),
|
|
)
|
|
|
|
def get_many(self, property_ids: list[int]) -> Properties:
|
|
if not property_ids:
|
|
return Properties([])
|
|
rows = self._session.exec(
|
|
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
|
|
).all()
|
|
row_by_id = {row.id: row for row in rows}
|
|
epcs = self._epc().get_for_properties(property_ids)
|
|
predicted_epcs = self._epc().get_predicted_for_properties(property_ids)
|
|
restrictions: dict[int, PlanningRestrictions] = self._restrictions_for(
|
|
[row.uprn for row in rows if row.uprn is not None]
|
|
)
|
|
items: list[Property] = []
|
|
for property_id in property_ids:
|
|
row = row_by_id.get(property_id)
|
|
if row is None:
|
|
raise ValueError(f"property {property_id} not found")
|
|
items.append(
|
|
Property(
|
|
identity=PropertyIdentity(
|
|
portfolio_id=row.portfolio_id,
|
|
postcode=row.postcode or "",
|
|
address=row.address or "",
|
|
uprn=row.uprn,
|
|
landlord_property_id=row.landlord_property_id,
|
|
),
|
|
epc=epcs.get(property_id),
|
|
predicted_epc=predicted_epcs.get(property_id),
|
|
landlord_overrides=self._landlord_overrides(property_id),
|
|
planning_restrictions=_restrictions_of(row.uprn, restrictions),
|
|
)
|
|
)
|
|
return Properties(items)
|
|
|
|
def _restrictions_for(self, uprns: list[int]) -> dict[int, PlanningRestrictions]:
|
|
# No spatial repo (the write-only Finalise path) → no cached protections;
|
|
# `_restrictions_of` then defaults every UPRN to unrestricted.
|
|
if not uprns or self._spatial_repo is None:
|
|
return {}
|
|
return self._spatial_repo.get_for_uprns(uprns)
|
|
|
|
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
|
|
# The old engine set `has_recommendations` per Property; we mirror it and
|
|
# bump `updated_at` (DB clock) so a new-process run is datable against the
|
|
# 2026-06-01 cutoff. Does not commit — the Unit of Work owns the txn.
|
|
stmt = (
|
|
sa_update(self._table)
|
|
.where(self._table.c.id == property_id)
|
|
.values(has_recommendations=has_recommendations, updated_at=func.now())
|
|
)
|
|
self._session.execute(stmt) # pyright: ignore[reportDeprecated]
|
|
|
|
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
|
|
if not rows:
|
|
return 0
|
|
|
|
values = [
|
|
{
|
|
"portfolio_id": r.portfolio_id,
|
|
"creation_status": r.creation_status,
|
|
"uprn": r.uprn,
|
|
"landlord_property_id": r.landlord_property_id,
|
|
"address": r.address,
|
|
"postcode": r.postcode,
|
|
"user_inputted_address": r.user_inputted_address,
|
|
"user_inputted_postcode": r.user_inputted_postcode,
|
|
"lexiscore": r.lexiscore,
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
stmt = pg_insert(self._table).values(values)
|
|
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
|
|
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
|
|
# properties untouched (contrast property_overrides, which recalculates).
|
|
stmt = stmt.on_conflict_do_nothing(
|
|
index_elements=["portfolio_id", "uprn"],
|
|
index_where=self._table.c.uprn.isnot(None),
|
|
)
|
|
|
|
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
|
|
# deprecated in the stubs, and they resolve the INSERT to a bare
|
|
# ``Result`` (no ``rowcount``) — both are stub limitations, not real.
|
|
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
|
|
return cast(int, result.rowcount) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
|
|
|
|
def ids_by_uprn(self, portfolio_id: int, uprns: list[int]) -> dict[int, int]:
|
|
if not uprns:
|
|
return {}
|
|
stmt = sa_select(self._table.c.uprn, self._table.c.id).where(
|
|
self._table.c.portfolio_id == portfolio_id,
|
|
self._table.c.uprn.in_(uprns),
|
|
)
|
|
rows = self._session.execute(stmt).all() # pyright: ignore[reportDeprecated]
|
|
return {int(uprn): int(pid) for uprn, pid in rows if uprn is not None}
|
|
|
|
|
|
def _restrictions_of(
|
|
uprn: Optional[int], by_uprn: dict[int, PlanningRestrictions]
|
|
) -> PlanningRestrictions:
|
|
"""The cached protections for a UPRN, defaulting to unrestricted when the
|
|
UPRN is absent or uncached (per legacy `empty_spatial_df`; ADR-0020)."""
|
|
if uprn is None:
|
|
return PlanningRestrictions()
|
|
return by_uprn.get(uprn, PlanningRestrictions())
|