Model/repositories/postgres_unit_of_work.py
Khalim Conn-Kowlessar 234c4ae947 feat(repositories): expose the spatial cache repo on the Unit of Work
Slice 3c.3. Ingestion writes the OS spatial reference cache through the same
unit it persists the EPC/solar enrichments with, so `UnitOfWork` declares a
`spatial` repo, `PostgresUnitOfWork` binds a `SpatialPostgresRepository` to the
session, and `FakeUnitOfWork` gains a `FakeSpatialRepo` (seedable for read
tests, recording writes for ingestion-side assertions).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 17:20:39 +00:00

68 lines
2.5 KiB
Python

from __future__ import annotations
from collections.abc import Callable
from types import TracebackType
from typing import Optional
from sqlmodel import Session
from repositories.property_baseline.property_baseline_postgres_repository import (
PropertyBaselinePostgresRepository,
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.plan.plan_postgres_repository import PlanPostgresRepository
from repositories.product.product_postgres_repository import (
ProductPostgresRepository,
)
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from repositories.scenario.scenario_postgres_repository import (
ScenarioPostgresRepository,
)
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from repositories.spatial.spatial_postgres_repository import SpatialPostgresRepository
from repositories.unit_of_work import UnitOfWork
class PostgresUnitOfWork(UnitOfWork):
"""Postgres-backed Unit of Work: one ``Session``, all repos bound to it.
Built from a session factory (a module-scoped engine + sessionmaker in
production, ADR-0012) so the connection pool is reused across warm Lambda
invocations. The session is opened on ``__enter__`` and closed on
``__exit__``; a fresh instance is one single-use unit.
"""
def __init__(self, session_factory: Callable[[], Session]) -> None:
self._session_factory = session_factory
def __enter__(self) -> "PostgresUnitOfWork":
self._session = self._session_factory()
epc_repo = EpcPostgresRepository(self._session)
self.property = PropertyPostgresRepository(self._session, epc_repo)
self.epc = epc_repo
self.solar = SolarPostgresRepository(self._session)
self.spatial = SpatialPostgresRepository(self._session)
self.property_baseline = PropertyBaselinePostgresRepository(self._session)
self.scenario = ScenarioPostgresRepository(self._session)
self.product = ProductPostgresRepository(self._session)
self.plan = PlanPostgresRepository(self._session)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
try:
self._session.rollback()
finally:
self._session.close()
def commit(self) -> None:
self._session.commit()
def rollback(self) -> None:
self._session.rollback()