feat(uow): UnitOfWork port + PostgresUnitOfWork adapter (#1138)

First slice of the per-stage batch-transaction refactor (ADR-0012). A
UnitOfWork is the single transaction a stage runs its batch in: a context
manager exposing the DB repos bound to one session, committing once on
`commit()` and rolling back on exception or exit-without-commit
(all-or-nothing per batch, fail noisily).

- `UnitOfWork` (port): `property` / `epc` / `solar` / `baseline` repos +
  `commit()` / `rollback()`; `__exit__` rolls back uncommitted work.
- `PostgresUnitOfWork(session_factory)`: opens a Session from an injected
  factory (a module-scoped engine + sessionmaker in prod, so the pool is
  reused across warm invocations), binds the Postgres repos to it, closes
  on exit.

Not yet wired into any orchestrator — that lands in the Baseline /
Ingestion refactor slices. 3 tests against ephemeral PG (commit durable
across units; exception rolls back; no-commit persists nothing). pyright
strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-31 09:25:17 +00:00
parent b77fe26892
commit 4daba1f7c5
3 changed files with 176 additions and 0 deletions

View file

@ -0,0 +1,56 @@
from __future__ import annotations
from collections.abc import Callable
from types import TracebackType
from typing import Optional
from sqlmodel import Session
from repositories.baseline.baseline_postgres_repository import (
BaselinePostgresRepository,
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from repositories.unit_of_work import UnitOfWork
class PostgresUnitOfWork(UnitOfWork):
"""Postgres-backed Unit of Work: one ``Session``, all repos bound to it.
Built from a session factory (a module-scoped engine + sessionmaker in
production, ADR-0012) so the connection pool is reused across warm Lambda
invocations. The session is opened on ``__enter__`` and closed on
``__exit__``; a fresh instance is one single-use unit.
"""
def __init__(self, session_factory: Callable[[], Session]) -> None:
self._session_factory = session_factory
def __enter__(self) -> "PostgresUnitOfWork":
self._session = self._session_factory()
epc_repo = EpcPostgresRepository(self._session)
self.property = PropertyPostgresRepository(self._session, epc_repo)
self.epc = epc_repo
self.solar = SolarPostgresRepository(self._session)
self.baseline = BaselinePostgresRepository(self._session)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
try:
self._session.rollback()
finally:
self._session.close()
def commit(self) -> None:
self._session.commit()
def rollback(self) -> None:
self._session.rollback()

View file

@ -0,0 +1,47 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Optional
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.solar.solar_repository import SolarRepository
class UnitOfWork(ABC):
"""A single batch transaction across the DB-backed repos (ADR-0012).
A context manager that exposes the repos bound to one session. A stage runs
its whole batch inside one unit and calls ``commit()`` once; leaving the
block without committing including via an exception rolls back, so a
failed batch persists nothing and the subtask fails noisily.
The non-DB dependencies (EPC/Solar fetchers, the geospatial S3 repo, the
Rebaseliner) are *not* part of the unit only transactional DB work is.
"""
property: PropertyRepository
epc: EpcRepository
solar: SolarRepository
baseline: BaselineRepository
@abstractmethod
def commit(self) -> None: ...
@abstractmethod
def rollback(self) -> None: ...
def __enter__(self) -> "UnitOfWork":
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
# Roll back whatever was not explicitly committed (a no-op after a
# successful commit). All-or-nothing per batch.
self.rollback()

View file

@ -0,0 +1,73 @@
from __future__ import annotations
from collections.abc import Callable
import pytest
from sqlalchemy import Engine
from sqlmodel import Session
from datatypes.epc.domain.epc import Epc
from domain.baseline.baseline_performance import BaselinePerformance
from domain.baseline.performance import Performance
from repositories.postgres_unit_of_work import PostgresUnitOfWork
def _session_factory(db_engine: Engine) -> Callable[[], Session]:
return lambda: Session(db_engine)
def _baseline() -> BaselinePerformance:
perf = Performance(
sap_score=72, epc_band=Epc.C, co2_emissions=1.8, primary_energy_intensity=180
)
return BaselinePerformance(
lodged=perf,
effective=perf,
rebaseline_reason="none",
space_heating_kwh=5000.0,
water_heating_kwh=2000.0,
)
def test_committed_work_is_visible_to_a_later_unit(db_engine: Engine) -> None:
# Arrange
new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine))
baseline = _baseline()
# Act
with new_unit() as uow:
uow.baseline.save(baseline, property_id=10)
uow.commit()
# Assert — a fresh unit reads back what the first one committed.
with new_unit() as uow:
loaded = uow.baseline.get_for_property(10)
assert loaded == baseline
def test_an_exception_in_the_block_rolls_the_batch_back(db_engine: Engine) -> None:
# Arrange
new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine))
# Act — a property mid-batch raises after a write but before commit.
with pytest.raises(RuntimeError, match="boom"):
with new_unit() as uow:
uow.baseline.save(_baseline(), property_id=10)
raise RuntimeError("boom")
# Assert — nothing from the aborted batch is persisted.
with new_unit() as uow:
assert uow.baseline.get_for_property(10) is None
def test_leaving_the_block_without_commit_persists_nothing(db_engine: Engine) -> None:
# Arrange
new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine))
# Act — write but never commit.
with new_unit() as uow:
uow.baseline.save(_baseline(), property_id=10)
# Assert
with new_unit() as uow:
assert uow.baseline.get_for_property(10) is None