From 17b9ae08ebed20431362bed15552f11c90f46e5b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 24 Jun 2026 16:30:44 +0000 Subject: [PATCH 1/3] Hold one DB connection per modelling_e2e invocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The modelling_e2e Lambda held up to ~4 concurrent Postgres connections per invocation: the read Session stayed open across the write loop (the catalogue was queried live and overrides were read per-Property), each per-Property Unit of Work opened a second, and the TaskOrchestrator ran on its own NullPool engine — so the pool needed pool_size=2 + max_overflow=1 just for the modelling work. Under 32 concurrent containers that approached RDS max_connections. Restructure the handler to read everything up front — overrides, Scenario, an in-memory catalogue snapshot, and stored Solar — through one short-lived read Session, close it, then write each Property in a sequential Unit of Work. The read and write Sessions no longer overlap, so the engine drops to pool_size=1, max_overflow=0. Fold the orchestrator onto the same pooled engine: its repos commit on every save, releasing the connection between bookkeeping calls, so it holds none during the work. One invocation now uses one connection at a time. The catalogue becomes a per-invocation snapshot (MaterialSnapshotRepository), mirroring ProductPostgresRepository.get exactly — same drift mapping, lowest-id pick, and errors — but priced after the Session closes. Transaction isolation is preserved: per-Property writes and orchestrator bookkeeping keep their own independent transactions, just drawn sequentially from a single connection. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 94 ++++++++-- .../product/composite_product_repository.py | 15 ++ .../product/product_postgres_repository.py | 58 ++++++ .../in_memory_property_overrides_reader.py | 31 ++++ .../modelling_e2e/test_handler.py | 20 +-- .../test_material_snapshot_repository.py | 170 ++++++++++++++++++ ...est_in_memory_property_overrides_reader.py | 66 +++++++ 7 files changed, 429 insertions(+), 25 deletions(-) create mode 100644 repositories/property/in_memory_property_overrides_reader.py create mode 100644 tests/repositories/product/test_material_snapshot_repository.py create mode 100644 tests/repositories/property/test_in_memory_property_overrides_reader.py diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index c3477bc1..251b7a89 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -13,12 +13,18 @@ from the postcode cohort. ``_cohort_cache`` is module-level so warm Lambda containers re-processing the same postcode avoid redundant fetches. All Measure Types are considered: pricing goes through -``catalogue_with_off_catalogue_overrides`` so the measures the live ``material`` -catalogue cannot supply (``secondary_heating_removal``, the glazing and heating -gaps) are priced from the committed off-catalogue overlay instead of crashing. +``catalogue_snapshot_with_off_catalogue_overrides`` so the measures the live +``material`` catalogue cannot supply (``secondary_heating_removal``, the glazing +and heating gaps) are priced from the committed off-catalogue overlay instead of +crashing. DB engine is module-scoped so the connection pool is reused across warm -invocations (ADR-0012). +invocations (ADR-0012). The pool holds a single connection (``pool_size=1``): the +handler reads everything up front — overrides, Scenario, a catalogue snapshot, and +stored Solar — through one short-lived read Session, closes it, then writes each +Property in a sequential Unit of Work, so the read and write Sessions never +overlap. The orchestrator shares the same engine and releases its connection +between bookkeeping commits, so one invocation uses one DB connection at a time. """ from __future__ import annotations @@ -26,7 +32,8 @@ from __future__ import annotations import dataclasses import io import os -from collections.abc import Callable +from collections.abc import Callable, Generator +from contextlib import contextmanager from typing import Any, Optional, cast import boto3 @@ -88,7 +95,10 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import ( ) from repositories.postgres_unit_of_work import PostgresUnitOfWork from repositories.product.composite_product_repository import ( - catalogue_with_off_catalogue_overrides, + catalogue_snapshot_with_off_catalogue_overrides, +) +from repositories.property.in_memory_property_overrides_reader import ( + InMemoryPropertyOverridesReader, ) from repositories.property.landlord_override_overlays import overlays_from from repositories.property.override_backed_prediction_attributes_reader import ( @@ -97,10 +107,17 @@ from repositories.property.override_backed_prediction_attributes_reader import ( from repositories.property.property_overrides_postgres_reader import ( PropertyOverridesPostgresReader, ) +from repositories.property.property_overrides_reader import ( + ResolvedPropertyOverrides, +) from repositories.scenario.scenario_postgres_repository import ( ScenarioPostgresRepository, ) from repositories.solar.solar_postgres_repository import SolarPostgresRepository +from repositories.tasks.subtask_postgres_repository import ( + SubTaskPostgresRepository, +) +from repositories.tasks.task_postgres_repository import TaskPostgresRepository from utilities.aws_lambda.task_handler import task_handler from uuid import UUID from utilities.logger import setup_logger @@ -119,13 +136,34 @@ def _get_engine() -> Engine: global _engine if _engine is None: config = PostgresConfig.from_env(dict(os.environ)) - # Reduced pool for Lambda: 32 concurrent containers × 3 connections = 96 max, - # vs the default 3+5=8 which would reach 256+ and exhaust RDS max_connections. - # pool_size=2 covers the simultaneous read_session + UoW session per invocation. - _engine = make_engine(dataclasses.replace(config, pool_size=2, max_overflow=1)) + # One connection per invocation: the handler reads everything up front + # through one short-lived read Session, closes it, then writes each + # Property in a sequential Unit of Work — so the read and write Sessions + # never overlap and a single pooled connection suffices. The orchestrator + # shares this engine (see ``_shared_engine_orchestrator``) and releases + # its connection between bookkeeping commits, so it holds none during the + # work. 32 concurrent containers × 1 connection = 32 against RDS. + _engine = make_engine(dataclasses.replace(config, pool_size=1, max_overflow=0)) return _engine +@contextmanager +def _shared_engine_orchestrator() -> Generator[TaskOrchestrator, None, None]: + """A ``TaskOrchestrator`` on the same module-scoped pooled engine as the + modelling work — not a separate per-invocation NullPool engine. + + Its repositories commit on every ``save``/``create``, releasing the pooled + connection between bookkeeping calls, so it holds none while the wrapped + handler body runs. Combined with the read-then-write handler structure and + ``pool_size=1``, the whole invocation uses one DB connection at a time.""" + engine = _get_engine() + with Session(engine) as session: + yield TaskOrchestrator( + task_repo=TaskPostgresRepository(session=session), + subtask_repo=SubTaskPostgresRepository(session=session), + ) + + def _s3_parquet_reader() -> ParquetReader: bucket = os.environ["DATA_BUCKET"] @@ -244,7 +282,12 @@ def _predict_epc( return predicted -@task_handler(task_source="modelling_e2e", source=Source.PROPERTY, pass_task_orchestrator=True) +@task_handler( + task_source="modelling_e2e", + source=Source.PROPERTY, + orchestrator_cm=_shared_engine_orchestrator, + pass_task_orchestrator=True, +) def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, task_id: UUID) -> None: trigger = ModellingE2ETriggerBody.model_validate(body) property_ids = trigger.property_ids @@ -276,7 +319,14 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, uprns: dict[int, int] = {int(row[0]): int(row[1]) for row in uprn_rows} postcodes: dict[int, str] = {int(row[0]): (row[1] or "") for row in postcode_rows} - overrides_reader = PropertyOverridesPostgresReader(lambda: Session(engine)) + # Pre-fetch every Property's overrides up front (each call opens and closes + # its own short read Session) and serve them from memory through the loop, so + # no override read Session is held open alongside a write Unit of Work. + overrides_postgres_reader = PropertyOverridesPostgresReader(lambda: Session(engine)) + overrides_by_pid: dict[int, ResolvedPropertyOverrides] = { + pid: overrides_postgres_reader.overrides_for(pid) for pid in property_ids + } + overrides_reader = InMemoryPropertyOverridesReader(overrides_by_pid) prediction_attrs_reader = OverrideBackedPredictionAttributesReader(overrides_reader) comparables_repo = EpcComparablePropertiesRepository( epc_client, geospatial, nearby_postcodes=PostcodesIoClient() @@ -320,9 +370,23 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, read_session = Session(engine) try: + # Read everything the write loop needs up front: the Scenario, an in-memory + # snapshot of the catalogue (priced after the Session closes), and each + # UPRN's stored Solar insights. Then close the read Session immediately so + # its pooled connection is free before the loop — each Property's write + # Unit of Work reuses that single connection rather than opening a second + # alongside a held-open read Session. (The ``finally`` is the safety net.) scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] - products = catalogue_with_off_catalogue_overrides(read_session) - solar_reader = SolarPostgresRepository(read_session) + products = catalogue_snapshot_with_off_catalogue_overrides(read_session) + stored_solar: dict[int, Optional[dict[str, Any]]] = ( + {} + if no_solar + else { + uprn: SolarPostgresRepository(read_session).get(uprn) + for uprn in set(uprns.values()) + } + ) + read_session.close() for property_id in property_ids: child = orchestrator.create_child_subtask( @@ -392,7 +456,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, if no_solar: solar_insights = None else: - solar_insights = solar_reader.get(uprn) + solar_insights = stored_solar.get(uprn) if solar_insights is None: solar_insights = _solar_insights_for(solar_client, spatial) solar_was_fetched = solar_insights is not None diff --git a/repositories/product/composite_product_repository.py b/repositories/product/composite_product_repository.py index 8c03c68e..9d8ab81a 100644 --- a/repositories/product/composite_product_repository.py +++ b/repositories/product/composite_product_repository.py @@ -7,6 +7,7 @@ from sqlmodel import Session from domain.modelling.product import Product from repositories.product.product_json_repository import ProductJsonRepository from repositories.product.product_postgres_repository import ( + MaterialSnapshotRepository, ProductPostgresRepository, ) from repositories.product.product_repository import ProductRepository @@ -49,3 +50,17 @@ def catalogue_with_off_catalogue_overrides(session: Session) -> ProductRepositor override=ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH), fallback=ProductPostgresRepository(session), ) + + +def catalogue_snapshot_with_off_catalogue_overrides( + session: Session, +) -> ProductRepository: + """Like ``catalogue_with_off_catalogue_overrides`` but the catalogue side is + an in-memory snapshot read once from ``session`` — so the Modelling stage can + price after the read Session is closed. Used by the modelling_e2e Lambda to + hold a single DB connection per invocation; the live-querying variant above + stays for the per-Property Unit of Work and the local e2e runner.""" + return CompositeProductRepository( + override=ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH), + fallback=MaterialSnapshotRepository.load(session), + ) diff --git a/repositories/product/product_postgres_repository.py b/repositories/product/product_postgres_repository.py index 5f27a68e..fb7b948d 100644 --- a/repositories/product/product_postgres_repository.py +++ b/repositories/product/product_postgres_repository.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import Optional + from sqlmodel import Session, col, select from domain.modelling.contingencies import contingency_rate @@ -58,3 +60,59 @@ class ProductPostgresRepository(ProductRepository): contingency_rate=contingency_rate(measure_type), id=row.id, ) + + +class MaterialSnapshotRepository(ProductRepository): + """An in-memory snapshot of the active ``material`` catalogue, read once from + a Session via ``load``. + + It lets the Modelling stage price Measure Options *after* the read Session is + closed, so a Lambda invocation can hold a single DB connection at a time — the + read Session, then each per-Property write Unit of Work, never overlapping — + instead of keeping the read Session's connection checked out across the whole + write loop while the catalogue is queried lazily. + + ``get`` mirrors ``ProductPostgresRepository.get`` exactly: the same + domain→``material.type`` drift mapping, the lowest-id active row per type, and + the same ``ProductNotFound`` / missing-``total_cost`` errors. Because the + snapshot is a plain dict lookup (no live query), the off-catalogue measures + that would poison a live session's transaction simply miss and raise the + benign ``ProductNotFound`` — the ``CompositeProductRepository`` override still + keeps them off the catalogue first. + """ + + def __init__( + self, rows_by_catalogue_type: dict[str, tuple[Optional[float], int]] + ) -> None: + self._rows_by_catalogue_type = rows_by_catalogue_type + + @classmethod + def load(cls, session: Session) -> "MaterialSnapshotRepository": + rows = session.exec( + select(MaterialRow) + .where(col(MaterialRow.is_active).is_(True)) + .order_by(col(MaterialRow.id)) + ).all() + rows_by_catalogue_type: dict[str, tuple[Optional[float], int]] = {} + for row in rows: + # Lowest id wins per type — mirrors ``.first()`` after ``order_by(id)``. + if row.type not in rows_by_catalogue_type: + rows_by_catalogue_type[row.type] = (row.total_cost, row.id) + return cls(rows_by_catalogue_type) + + def get(self, measure_type: str) -> Product: + catalogue_type = _MATERIAL_TYPE_BY_MEASURE.get(measure_type, measure_type) + entry = self._rows_by_catalogue_type.get(catalogue_type) + if entry is None: + raise ProductNotFound( + f"no active product for measure type {measure_type!r}" + ) + total_cost, row_id = entry + if total_cost is None: + raise ValueError(f"product {measure_type!r} has no total_cost") + return Product( + measure_type=measure_type, + unit_cost_per_m2=total_cost, + contingency_rate=contingency_rate(measure_type), + id=row_id, + ) diff --git a/repositories/property/in_memory_property_overrides_reader.py b/repositories/property/in_memory_property_overrides_reader.py new file mode 100644 index 00000000..d2aa2b60 --- /dev/null +++ b/repositories/property/in_memory_property_overrides_reader.py @@ -0,0 +1,31 @@ +"""In-memory ``PropertyOverridesReader`` over a pre-fetched snapshot. + +The modelling_e2e handler reads each Property's overrides inside its write loop. +Reading them live keeps a second DB Session open alongside the per-Property write +Unit of Work; pre-fetching every Property's overrides up front (one read pass, +then the read Session is closed) lets the loop run on a single DB connection. This +serves those pre-fetched snapshots behind the same ``PropertyOverridesReader`` +port — a Property with no pre-fetched entry resolves to no overrides, exactly as +the Postgres reader returns an empty snapshot for a Property with no rows. +""" + +from __future__ import annotations + +from collections.abc import Mapping + +from repositories.property.property_overrides_reader import ( + PropertyOverridesReader, + ResolvedPropertyOverrides, +) + +_NO_OVERRIDES = ResolvedPropertyOverrides(rows=()) + + +class InMemoryPropertyOverridesReader(PropertyOverridesReader): + def __init__( + self, by_property_id: Mapping[int, ResolvedPropertyOverrides] + ) -> None: + self._by_property_id = dict(by_property_id) + + def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides: + return self._by_property_id.get(property_id, _NO_OVERRIDES) diff --git a/tests/applications/modelling_e2e/test_handler.py b/tests/applications/modelling_e2e/test_handler.py index e2664a4b..8cd41c67 100644 --- a/tests/applications/modelling_e2e/test_handler.py +++ b/tests/applications/modelling_e2e/test_handler.py @@ -174,7 +174,7 @@ def test_handler_creates_one_child_subtask_per_property_id() -> None: stack.enter_context( patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] - stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")) + stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) stack.enter_context( patch("applications.modelling_e2e.handler.run_modelling", return_value=_plan_mock()) @@ -260,7 +260,7 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled( patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( - patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides") + patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides") ) stack.enter_context( patch("applications.modelling_e2e.handler.Session") @@ -349,7 +349,7 @@ def test_skipped_cohort_certs_do_not_prevent_plan_being_saved() -> None: ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( patch( - "applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides" + "applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides" ) ) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) @@ -417,7 +417,7 @@ def test_skipped_cohort_certs_are_logged_and_handler_does_not_raise() -> None: stack.enter_context( patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] - stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")) + stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) stack.enter_context( patch("applications.modelling_e2e.handler.run_modelling", return_value=mock_plan) @@ -534,7 +534,7 @@ def test_prediction_path_saves_predicted_epc_plan_and_baseline( patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( - patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides") + patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides") ) stack.enter_context( patch("applications.modelling_e2e.handler.Session") @@ -637,7 +637,7 @@ def test_empty_cohort_gates_property_out_without_saving() -> None: patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( - patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides") + patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides") ) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) MockUoW = stack.enter_context( @@ -743,7 +743,7 @@ def test_empty_own_postcode_broadens_to_nearby_and_predicts() -> None: ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( patch( - "applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides" + "applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides" ) ) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) @@ -822,7 +822,7 @@ def test_per_property_failure_fails_child_subtask_and_siblings_continue() -> Non stack.enter_context( patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] - stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")) + stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) stack.enter_context( patch( @@ -929,7 +929,7 @@ def test_cohort_cache_prevents_duplicate_candidates_for_calls() -> None: patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( - patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides") + patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides") ) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) stack.enter_context( @@ -1012,7 +1012,7 @@ def test_dry_run_skips_all_db_writes() -> None: patch("applications.modelling_e2e.handler.ScenarioPostgresRepository") ).return_value.get_many.return_value = [MagicMock()] stack.enter_context( - patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides") + patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides") ) stack.enter_context(patch("applications.modelling_e2e.handler.Session")) stack.enter_context( diff --git a/tests/repositories/product/test_material_snapshot_repository.py b/tests/repositories/product/test_material_snapshot_repository.py new file mode 100644 index 00000000..acdf3886 --- /dev/null +++ b/tests/repositories/product/test_material_snapshot_repository.py @@ -0,0 +1,170 @@ +"""Behaviour of the in-memory catalogue snapshot. It must price exactly as the +live ``ProductPostgresRepository`` does — same drift mapping, same lowest-id +pick, same errors — but after the read Session that built it has closed, so the +modelling_e2e Lambda can hold a single DB connection per invocation.""" + +import pytest +from sqlalchemy import Engine +from sqlmodel import Session + +from domain.modelling.product import Product +from infrastructure.postgres.product_table import MaterialRow +from repositories.product.product_postgres_repository import ( + MaterialSnapshotRepository, + ProductPostgresRepository, +) +from repositories.product.product_repository import ProductNotFound + + +def test_snapshot_prices_after_the_loading_session_is_closed( + db_engine: Engine, +) -> None: + # Arrange + with Session(db_engine) as session: + session.add( + MaterialRow( + id=1, + type="cavity_wall_insulation", + total_cost=18.5, + cost_unit="gbp_per_m2", + is_active=True, + description="Cavity wall insulation", + ) + ) + session.commit() + + # Act — load the snapshot, then close the Session before pricing. + with Session(db_engine) as session: + snapshot = MaterialSnapshotRepository.load(session) + product: Product = snapshot.get("cavity_wall_insulation") + + # Assert — same mapping as the live repository, no Session needed. + assert product.measure_type == "cavity_wall_insulation" + assert abs(product.unit_cost_per_m2 - 18.5) <= 1e-9 + assert abs(product.contingency_rate - 0.10) <= 1e-9 + assert product.id == 1 + + +def test_snapshot_resolves_a_drifted_measure_type_to_the_catalogue_spelling( + db_engine: Engine, +) -> None: + # Arrange — the domain type drifts from the catalogue's ``material.type``. + with Session(db_engine) as session: + session.add( + MaterialRow( + id=1, + type="low_energy_lighting_installation", + total_cost=5.0, + cost_unit="gbp_per_unit", + is_active=True, + description="Low energy lighting", + ) + ) + session.commit() + + # Act + with Session(db_engine) as session: + snapshot = MaterialSnapshotRepository.load(session) + product: Product = snapshot.get("low_energy_lighting") + + # Assert — keeps the domain measure type, prices the catalogue row. + assert product.measure_type == "low_energy_lighting" + assert abs(product.unit_cost_per_m2 - 5.0) <= 1e-9 + + +def test_snapshot_picks_the_lowest_id_when_several_active_rows_share_a_type( + db_engine: Engine, +) -> None: + # Arrange — many active rows per type; the pick must be deterministic. + with Session(db_engine) as session: + session.add_all( + [ + MaterialRow( + id=7, + type="solar_pv", + total_cost=99.0, + cost_unit="gbp_per_unit", + is_active=True, + description="Solar PV (higher id)", + ), + MaterialRow( + id=3, + type="solar_pv", + total_cost=42.0, + cost_unit="gbp_per_unit", + is_active=True, + description="Solar PV (lowest id)", + ), + ] + ) + session.commit() + + # Act + with Session(db_engine) as session: + snapshot = MaterialSnapshotRepository.load(session) + product: Product = snapshot.get("solar_pv") + + # Assert — lowest-id active row wins, matching the live repository. + assert product.id == 3 + assert abs(product.unit_cost_per_m2 - 42.0) <= 1e-9 + + +def test_snapshot_excludes_inactive_rows_and_raises_product_not_found( + db_engine: Engine, +) -> None: + # Arrange — only an inactive row exists. + with Session(db_engine) as session: + session.add( + MaterialRow( + id=1, + type="cavity_wall_insulation", + total_cost=18.5, + cost_unit="gbp_per_m2", + is_active=False, + description="Cavity wall insulation (retired)", + ) + ) + session.commit() + + # Act + with Session(db_engine) as session: + snapshot = MaterialSnapshotRepository.load(session) + + # Assert — no active row, so the benign ProductNotFound (a ValueError). + with pytest.raises(ProductNotFound): + snapshot.get("cavity_wall_insulation") + + +def test_snapshot_matches_the_live_repository_across_a_mixed_catalogue( + db_engine: Engine, +) -> None: + # Arrange — a catalogue with a drifted type, duplicate active rows, and an + # inactive row: the snapshot and the live repo must agree row-for-row. + with Session(db_engine) as session: + session.add_all( + [ + MaterialRow(id=1, type="cavity_wall_insulation", total_cost=18.5, + is_active=True), + MaterialRow(id=2, type="solar_pv", total_cost=99.0, is_active=True), + MaterialRow(id=5, type="solar_pv", total_cost=42.0, is_active=True), + MaterialRow(id=6, type="low_energy_lighting_installation", + total_cost=5.0, is_active=True), + MaterialRow(id=9, type="loft_insulation", total_cost=12.0, + is_active=False), + ] + ) + session.commit() + + # Act + with Session(db_engine) as session: + snapshot = MaterialSnapshotRepository.load(session) + live = ProductPostgresRepository(session) + for measure_type in [ + "cavity_wall_insulation", + "solar_pv", + "low_energy_lighting", + ]: + live_product = live.get(measure_type) + snapshot_product = snapshot.get(measure_type) + # Assert — identical pricing for every priced measure. + assert snapshot_product == live_product diff --git a/tests/repositories/property/test_in_memory_property_overrides_reader.py b/tests/repositories/property/test_in_memory_property_overrides_reader.py new file mode 100644 index 00000000..5cf64f0b --- /dev/null +++ b/tests/repositories/property/test_in_memory_property_overrides_reader.py @@ -0,0 +1,66 @@ +"""Behaviour of the pre-fetched in-memory overrides reader: it serves snapshots +keyed by Property, and resolves a Property with no entry to no overrides — exactly +as the Postgres reader returns an empty snapshot for a Property with no rows.""" + +from repositories.property.in_memory_property_overrides_reader import ( + InMemoryPropertyOverridesReader, +) +from repositories.property.property_overrides_reader import ( + ResolvedPropertyOverride, + ResolvedPropertyOverrides, +) + + +def test_serves_the_prefetched_snapshot_for_a_known_property() -> None: + # Arrange + snapshot = ResolvedPropertyOverrides( + rows=( + ResolvedPropertyOverride( + override_component="property_type", + building_part=0, + override_value="House", + ), + ) + ) + reader = InMemoryPropertyOverridesReader({42: snapshot}) + + # Act + resolved = reader.overrides_for(42) + + # Assert + assert resolved is snapshot + assert resolved.value("property_type", 0) == "House" + + +def test_unknown_property_resolves_to_no_overrides() -> None: + # Arrange — only property 42 was pre-fetched. + reader = InMemoryPropertyOverridesReader({42: ResolvedPropertyOverrides(rows=())}) + + # Act + resolved = reader.overrides_for(999) + + # Assert — empty snapshot, not an error (mirrors the Postgres reader). + assert resolved.rows == () + assert resolved.value("property_type", 0) is None + + +def test_does_not_alias_the_caller_mapping() -> None: + # Arrange — a mutable mapping handed to the reader. + backing: dict[int, ResolvedPropertyOverrides] = { + 1: ResolvedPropertyOverrides(rows=()) + } + reader = InMemoryPropertyOverridesReader(backing) + + # Act — mutate the caller's mapping after construction. + backing[2] = ResolvedPropertyOverrides( + rows=( + ResolvedPropertyOverride( + override_component="built_form_type", + building_part=0, + override_value="Detached", + ), + ) + ) + + # Assert — the reader took its own copy; the late insert is invisible. + assert reader.overrides_for(2).rows == () From de71f9abb609e55a115ded38defd104c763ed0f0 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 24 Jun 2026 17:01:41 +0000 Subject: [PATCH 2/3] Resolve overrides on the unit's own session, not a second connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The modelling_e2e Lambda runs on a single-connection pool (pool_size=1, max_overflow=0) so one invocation uses one Postgres connection. But re-hydrating a Property through PostgresUnitOfWork resolved its Landlord Overrides through a PropertyOverridesPostgresReader built from the unit's session *factory* — which opens a brand-new Session per call. While the unit's own read transaction was still open (PropertyPostgresRepository.get_many had checked out the connection), that second Session asked the pool for a second connection, found none, and timed out after 30s: QueuePool limit of size 1 overflow 0 reached, connection timed out, timeout 30.00 The baseline stage (PropertyBaselineOrchestrator.run -> uow.property.get_many -> landlord overrides) hit this on every invocation. Read the overrides on the unit's OWN session instead. property_overrides is committed reference data, so reading it inside the unit's transaction sees the same rows and keeps the invocation on one connection. Extract the query/mapping into a shared helper and add OpenSessionPropertyOverridesReader (reads on a caller-owned, already-open session without closing it) for the unit; the standalone PropertyOverridesPostgresReader still opens its own short session for use outside a unit. Regression test pins the invariant with a real pool_size=1/max_overflow=0 engine: without the fix it reproduces the exact QueuePool timeout. Co-Authored-By: Claude Opus 4.8 (1M context) --- repositories/postgres_unit_of_work.py | 13 ++-- .../property_overrides_postgres_reader.py | 61 +++++++++++++------ tests/repositories/test_unit_of_work.py | 58 +++++++++++++++++- 3 files changed, 108 insertions(+), 24 deletions(-) diff --git a/repositories/postgres_unit_of_work.py b/repositories/postgres_unit_of_work.py index 8a66146c..25764397 100644 --- a/repositories/postgres_unit_of_work.py +++ b/repositories/postgres_unit_of_work.py @@ -15,7 +15,7 @@ from repositories.product.composite_product_repository import ( catalogue_with_off_catalogue_overrides, ) from repositories.property.property_overrides_postgres_reader import ( - PropertyOverridesPostgresReader, + OpenSessionPropertyOverridesReader, ) from repositories.property.property_postgres_repository import ( PropertyPostgresRepository, @@ -46,10 +46,13 @@ class PostgresUnitOfWork(UnitOfWork): spatial_repo = SpatialPostgresRepository(self._session) # Fold Landlord Overrides onto the Effective EPC on every re-hydration # (ADR-0032), so what the Baseline orchestrator scores off ``uow.property`` - # matches what the Plan was modelled from. The reader is uow-independent — - # ``property_overrides`` is committed reference data — so it opens its own - # short read session per call via the same session factory. - overrides_reader = PropertyOverridesPostgresReader(self._session_factory) + # matches what the Plan was modelled from. ``property_overrides`` is + # committed reference data, but the reader must read on THIS uow's session + # — a second session opened concurrently checks out a second connection + # and deadlocks the single-connection pool while this uow's transaction is + # open. Reading committed data inside the uow's transaction is correct and + # keeps the invocation on one connection. + overrides_reader = OpenSessionPropertyOverridesReader(self._session) self.property = PropertyPostgresRepository( self._session, epc_repo, spatial_repo, overrides_reader ) diff --git a/repositories/property/property_overrides_postgres_reader.py b/repositories/property/property_overrides_postgres_reader.py index 1ccb13d5..c30993b6 100644 --- a/repositories/property/property_overrides_postgres_reader.py +++ b/repositories/property/property_overrides_postgres_reader.py @@ -2,9 +2,16 @@ Read-only and uow-independent: ``property_overrides`` is committed reference data the ``bulk_upload_finaliser`` Lambda writes at Finalise, long before First -Run executes — there is no transactional coupling to the ingestion run, so this -opens its own short read session per call via the injected session factory -(mirroring the composition root's ``lambda: Session(engine)``). +Run executes — there is no transactional coupling to the ingestion run, so the +standalone reader opens its own short read session per call via the injected +session factory (mirroring the composition root's ``lambda: Session(engine)``). + +Inside a Unit of Work the overrides must instead be read on the UoW's *own* +session (``OpenSessionPropertyOverridesReader``): a second session opened +concurrently checks out a second connection, which deadlocks the modelling_e2e +Lambda's single-connection pool while the UoW's read transaction is still open. +Reading committed reference data inside the UoW's transaction is correct — it +sees the same committed rows — and keeps the invocation on one connection. """ from __future__ import annotations @@ -21,25 +28,43 @@ from repositories.property.property_overrides_reader import ( ) +def _resolve_overrides(session: Session, property_id: int) -> ResolvedPropertyOverrides: + rows = session.exec( + select(PropertyOverrideRow).where( + col(PropertyOverrideRow.property_id) == property_id + ) + ).all() + return ResolvedPropertyOverrides( + rows=tuple( + ResolvedPropertyOverride( + override_component=row.override_component, + building_part=row.building_part, + override_value=row.override_value, + ) + for row in rows + ) + ) + + class PropertyOverridesPostgresReader(PropertyOverridesReader): + """Opens its own short read session per call — for standalone use outside a + Unit of Work, where there is no shared session/connection to reuse.""" + def __init__(self, session_factory: Callable[[], Session]) -> None: self._session_factory = session_factory def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides: with self._session_factory() as session: - rows = session.exec( - select(PropertyOverrideRow).where( - col(PropertyOverrideRow.property_id) == property_id - ) - ).all() + return _resolve_overrides(session, property_id) - return ResolvedPropertyOverrides( - rows=tuple( - ResolvedPropertyOverride( - override_component=row.override_component, - building_part=row.building_part, - override_value=row.override_value, - ) - for row in rows - ) - ) + +class OpenSessionPropertyOverridesReader(PropertyOverridesReader): + """Reads on a caller-owned, already-open session without closing it — for use + inside a Unit of Work so resolving overrides reuses the UoW's single + connection instead of checking out a second one.""" + + def __init__(self, session: Session) -> None: + self._session = session + + def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides: + return _resolve_overrides(self._session, property_id) diff --git a/tests/repositories/test_unit_of_work.py b/tests/repositories/test_unit_of_work.py index 3cc13d6c..43f6f14e 100644 --- a/tests/repositories/test_unit_of_work.py +++ b/tests/repositories/test_unit_of_work.py @@ -7,7 +7,7 @@ from typing import Any import pytest from sqlalchemy import Engine -from sqlmodel import Session +from sqlmodel import Session, create_engine from datatypes.epc.domain.epc import Epc from datatypes.epc.domain.epc_property_data import ( @@ -152,6 +152,62 @@ def test_unit_hydrates_a_property_with_its_landlord_overrides_folded( assert main.wall_insulation_type == 3 +def test_hydrating_a_property_with_overrides_stays_on_one_connection( + db_engine: Engine, +) -> None: + """Resolving Landlord Overrides during re-hydration must read on the unit's + OWN session, not open a second one. The modelling_e2e Lambda runs on a + single-connection pool (``pool_size=1, max_overflow=0``); a second session + opened while the unit's read transaction is still open checks out a second + connection and deadlocks ("QueuePool limit of size 1 overflow 0 reached, + connection timed out"). This pins that the unit holds exactly one connection + while it hydrates a Property that has overrides. + """ + # Arrange — seed a property + EPC + an override (committed reference data). + with PostgresUnitOfWork(_session_factory(db_engine)) as uow: + row = PropertyRow(portfolio_id=1, postcode="A0 0AA", address="1 St", uprn=1) + uow._session.add(row) # pyright: ignore[reportPrivateUsage] + uow._session.flush() # pyright: ignore[reportPrivateUsage] + property_id = row.id + assert property_id is not None + EpcPostgresRepository(uow._session).save( # pyright: ignore[reportPrivateUsage] + _epc(), property_id=property_id + ) + uow._session.add( # pyright: ignore[reportPrivateUsage] + PropertyOverrideRow( + property_id=property_id, + portfolio_id=1, + building_part=0, + override_component="wall_type", + override_value="Solid brick, with internal insulation", + original_spreadsheet_description="solid brick, insulated", + ) + ) + uow.commit() + + # A pool that admits exactly one connection and fails fast (not after 30s) if + # a second is requested — the production modelling_e2e shape. + single_connection = create_engine( + db_engine.url, pool_size=1, max_overflow=0, pool_timeout=2 + ) + try: + # Act — hydrate through the unit; this resolves the override. + with PostgresUnitOfWork(_session_factory(single_connection)) as uow: + prop = uow.property.get(property_id) + finally: + single_connection.dispose() + + # Assert — reached here without a QueuePool timeout, and the override folded: + # cavity (4) → solid brick (3) / internal (3). + main = next( + part + for part in prop.effective_epc.sap_building_parts + if part.identifier is BuildingPartIdentifier.MAIN + ) + assert main.wall_construction == 3 + assert main.wall_insulation_type == 3 + + def test_leaving_the_block_without_commit_persists_nothing(db_engine: Engine) -> None: # Arrange new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine)) From fb308cfaea56ea35b080243d1068deb0bc2ddf5c Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 24 Jun 2026 17:10:23 +0000 Subject: [PATCH 3/3] Use NullPool as a graceful ceiling for the one-connection-per-lambda design The invocation is architecturally one DB connection at a time (read up front, sequential write Units of Work, overrides resolved on the unit's own session). Keep that as the design intent, but back it with NullPool instead of a fixed pool_size=1 pool: each checkout opens a fresh connection and closes it on return, so there is no pool slot to exhaust. The difference is the failure mode if a path ever regresses and holds two Sessions at once. A pool_size=1/max_overflow=0 pool turns that into a hard 30s dead-lock that fails the whole invocation ("QueuePool limit of size 1 overflow 0 reached, connection timed out"). NullPool instead opens a transient second connection for that instant and the Lambda keeps running. The design target stays one connection; NullPool just keeps it alive if we slip. The single-connection invariant itself is still enforced in the Unit of Work (overrides read on the unit's own session) and pinned by the regression test, which uses its own strict pool_size=1 engine so it asserts the architecture regardless of the production NullPool choice. Co-Authored-By: Claude Opus 4.8 (1M context) --- applications/modelling_e2e/handler.py | 52 ++++++++++++++++----------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 251b7a89..7d011fef 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -18,18 +18,19 @@ All Measure Types are considered: pricing goes through and heating gaps) are priced from the committed off-catalogue overlay instead of crashing. -DB engine is module-scoped so the connection pool is reused across warm -invocations (ADR-0012). The pool holds a single connection (``pool_size=1``): the -handler reads everything up front — overrides, Scenario, a catalogue snapshot, and -stored Solar — through one short-lived read Session, closes it, then writes each -Property in a sequential Unit of Work, so the read and write Sessions never -overlap. The orchestrator shares the same engine and releases its connection -between bookkeeping commits, so one invocation uses one DB connection at a time. +The DB engine is module-scoped (ADR-0012). Architecturally each invocation uses +one DB connection at a time: the handler reads everything up front — overrides, +Scenario, a catalogue snapshot, and stored Solar — through one short-lived read +Session, closes it, then writes each Property in a sequential Unit of Work whose +overrides resolve on its own session, so no two Sessions ever overlap. The engine +uses ``NullPool`` rather than a fixed pool so that target is a graceful ceiling, +not a hard one: a fresh connection is opened per checkout and closed on return, +so there is no pool slot to exhaust — any future accidental overlap opens a +transient second connection instead of dead-locking the Lambda. """ from __future__ import annotations -import dataclasses import io import os from collections.abc import Callable, Generator @@ -39,6 +40,7 @@ from typing import Any, Optional, cast import boto3 import pandas as pd # pyright: ignore[reportMissingTypeStubs] from sqlalchemy import Engine, text +from sqlalchemy.pool import NullPool from sqlmodel import Session from datatypes.epc.domain.epc_property_data import ( @@ -136,26 +138,34 @@ def _get_engine() -> Engine: global _engine if _engine is None: config = PostgresConfig.from_env(dict(os.environ)) - # One connection per invocation: the handler reads everything up front - # through one short-lived read Session, closes it, then writes each - # Property in a sequential Unit of Work — so the read and write Sessions - # never overlap and a single pooled connection suffices. The orchestrator - # shares this engine (see ``_shared_engine_orchestrator``) and releases - # its connection between bookkeeping commits, so it holds none during the - # work. 32 concurrent containers × 1 connection = 32 against RDS. - _engine = make_engine(dataclasses.replace(config, pool_size=1, max_overflow=0)) + # Architecturally one connection per invocation: the handler reads + # everything up front through one short-lived read Session, closes it, + # then writes each Property in a sequential Unit of Work — and the Unit of + # Work resolves overrides on its own session — so no two Sessions overlap + # and a single connection suffices. 32 concurrent containers × 1 = 32 + # against RDS. + # + # NullPool, not a fixed pool, enforces that as a *graceful* ceiling rather + # than a hard one: each checkout opens a fresh connection and closes it on + # return, so there is no pool slot to exhaust. If a future code path ever + # holds two Sessions at once it opens a second connection for that instant + # instead of dead-locking on a 1-slot pool and failing the whole + # invocation (the "QueuePool limit of size 1 overflow 0 reached" timeout). + # The design target stays one connection; NullPool just keeps the Lambda + # running if we ever regress it. + _engine = make_engine(config, poolclass=NullPool) return _engine @contextmanager def _shared_engine_orchestrator() -> Generator[TaskOrchestrator, None, None]: - """A ``TaskOrchestrator`` on the same module-scoped pooled engine as the - modelling work — not a separate per-invocation NullPool engine. + """A ``TaskOrchestrator`` on the same module-scoped engine as the modelling + work, not a separate one. - Its repositories commit on every ``save``/``create``, releasing the pooled + Its repositories commit on every ``save``/``create``, releasing the connection between bookkeeping calls, so it holds none while the wrapped - handler body runs. Combined with the read-then-write handler structure and - ``pool_size=1``, the whole invocation uses one DB connection at a time.""" + handler body runs. Combined with the read-then-write handler structure, the + whole invocation uses one DB connection at a time.""" engine = _get_engine() with Session(engine) as session: yield TaskOrchestrator(