From 4daba1f7c57520739dde6306c319a9bb44a92185 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sun, 31 May 2026 09:25:17 +0000 Subject: [PATCH] feat(uow): UnitOfWork port + PostgresUnitOfWork adapter (#1138) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First slice of the per-stage batch-transaction refactor (ADR-0012). A UnitOfWork is the single transaction a stage runs its batch in: a context manager exposing the DB repos bound to one session, committing once on `commit()` and rolling back on exception or exit-without-commit (all-or-nothing per batch, fail noisily). - `UnitOfWork` (port): `property` / `epc` / `solar` / `baseline` repos + `commit()` / `rollback()`; `__exit__` rolls back uncommitted work. - `PostgresUnitOfWork(session_factory)`: opens a Session from an injected factory (a module-scoped engine + sessionmaker in prod, so the pool is reused across warm invocations), binds the Postgres repos to it, closes on exit. Not yet wired into any orchestrator — that lands in the Baseline / Ingestion refactor slices. 3 tests against ephemeral PG (commit durable across units; exception rolls back; no-commit persists nothing). pyright strict clean; AAA. Co-Authored-By: Claude Opus 4.8 --- repositories/postgres_unit_of_work.py | 56 +++++++++++++++++++ repositories/unit_of_work.py | 47 ++++++++++++++++ tests/repositories/test_unit_of_work.py | 73 +++++++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 repositories/postgres_unit_of_work.py create mode 100644 repositories/unit_of_work.py create mode 100644 tests/repositories/test_unit_of_work.py diff --git a/repositories/postgres_unit_of_work.py b/repositories/postgres_unit_of_work.py new file mode 100644 index 00000000..bd5957e9 --- /dev/null +++ b/repositories/postgres_unit_of_work.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from collections.abc import Callable +from types import TracebackType +from typing import Optional + +from sqlmodel import Session + +from repositories.baseline.baseline_postgres_repository import ( + BaselinePostgresRepository, +) +from repositories.epc.epc_postgres_repository import EpcPostgresRepository +from repositories.property.property_postgres_repository import ( + PropertyPostgresRepository, +) +from repositories.solar.solar_postgres_repository import SolarPostgresRepository +from repositories.unit_of_work import UnitOfWork + + +class PostgresUnitOfWork(UnitOfWork): + """Postgres-backed Unit of Work: one ``Session``, all repos bound to it. + + Built from a session factory (a module-scoped engine + sessionmaker in + production, ADR-0012) so the connection pool is reused across warm Lambda + invocations. The session is opened on ``__enter__`` and closed on + ``__exit__``; a fresh instance is one single-use unit. + """ + + def __init__(self, session_factory: Callable[[], Session]) -> None: + self._session_factory = session_factory + + def __enter__(self) -> "PostgresUnitOfWork": + self._session = self._session_factory() + epc_repo = EpcPostgresRepository(self._session) + self.property = PropertyPostgresRepository(self._session, epc_repo) + self.epc = epc_repo + self.solar = SolarPostgresRepository(self._session) + self.baseline = BaselinePostgresRepository(self._session) + return self + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + try: + self._session.rollback() + finally: + self._session.close() + + def commit(self) -> None: + self._session.commit() + + def rollback(self) -> None: + self._session.rollback() diff --git a/repositories/unit_of_work.py b/repositories/unit_of_work.py new file mode 100644 index 00000000..af5b77f2 --- /dev/null +++ b/repositories/unit_of_work.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from types import TracebackType +from typing import Optional + +from repositories.baseline.baseline_repository import BaselineRepository +from repositories.epc.epc_repository import EpcRepository +from repositories.property.property_repository import PropertyRepository +from repositories.solar.solar_repository import SolarRepository + + +class UnitOfWork(ABC): + """A single batch transaction across the DB-backed repos (ADR-0012). + + A context manager that exposes the repos bound to one session. A stage runs + its whole batch inside one unit and calls ``commit()`` once; leaving the + block without committing — including via an exception — rolls back, so a + failed batch persists nothing and the subtask fails noisily. + + The non-DB dependencies (EPC/Solar fetchers, the geospatial S3 repo, the + Rebaseliner) are *not* part of the unit — only transactional DB work is. + """ + + property: PropertyRepository + epc: EpcRepository + solar: SolarRepository + baseline: BaselineRepository + + @abstractmethod + def commit(self) -> None: ... + + @abstractmethod + def rollback(self) -> None: ... + + def __enter__(self) -> "UnitOfWork": + return self + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + # Roll back whatever was not explicitly committed (a no-op after a + # successful commit). All-or-nothing per batch. + self.rollback() diff --git a/tests/repositories/test_unit_of_work.py b/tests/repositories/test_unit_of_work.py new file mode 100644 index 00000000..2851edaf --- /dev/null +++ b/tests/repositories/test_unit_of_work.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from collections.abc import Callable + +import pytest +from sqlalchemy import Engine +from sqlmodel import Session + +from datatypes.epc.domain.epc import Epc +from domain.baseline.baseline_performance import BaselinePerformance +from domain.baseline.performance import Performance +from repositories.postgres_unit_of_work import PostgresUnitOfWork + + +def _session_factory(db_engine: Engine) -> Callable[[], Session]: + return lambda: Session(db_engine) + + +def _baseline() -> BaselinePerformance: + perf = Performance( + sap_score=72, epc_band=Epc.C, co2_emissions=1.8, primary_energy_intensity=180 + ) + return BaselinePerformance( + lodged=perf, + effective=perf, + rebaseline_reason="none", + space_heating_kwh=5000.0, + water_heating_kwh=2000.0, + ) + + +def test_committed_work_is_visible_to_a_later_unit(db_engine: Engine) -> None: + # Arrange + new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine)) + baseline = _baseline() + + # Act + with new_unit() as uow: + uow.baseline.save(baseline, property_id=10) + uow.commit() + + # Assert — a fresh unit reads back what the first one committed. + with new_unit() as uow: + loaded = uow.baseline.get_for_property(10) + assert loaded == baseline + + +def test_an_exception_in_the_block_rolls_the_batch_back(db_engine: Engine) -> None: + # Arrange + new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine)) + + # Act — a property mid-batch raises after a write but before commit. + with pytest.raises(RuntimeError, match="boom"): + with new_unit() as uow: + uow.baseline.save(_baseline(), property_id=10) + raise RuntimeError("boom") + + # Assert — nothing from the aborted batch is persisted. + with new_unit() as uow: + assert uow.baseline.get_for_property(10) is None + + +def test_leaving_the_block_without_commit_persists_nothing(db_engine: Engine) -> None: + # Arrange + new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine)) + + # Act — write but never commit. + with new_unit() as uow: + uow.baseline.save(_baseline(), property_id=10) + + # Assert + with new_unit() as uow: + assert uow.baseline.get_for_property(10) is None