Merge pull request #1314 from Hestia-Homes/fix/modelling-e2e-single-connection

Fix modelling_e2e single-connection pool exhaustion (QueuePool timeout)
This commit is contained in:
Daniel Roth 2026-06-24 18:15:27 +01:00 committed by GitHub
commit 932a252454
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 549 additions and 51 deletions

View file

@ -13,25 +13,34 @@ from the postcode cohort. ``_cohort_cache`` is module-level so warm Lambda
containers re-processing the same postcode avoid redundant fetches.
All Measure Types are considered: pricing goes through
``catalogue_with_off_catalogue_overrides`` so the measures the live ``material``
catalogue cannot supply (``secondary_heating_removal``, the glazing and heating
gaps) are priced from the committed off-catalogue overlay instead of crashing.
``catalogue_snapshot_with_off_catalogue_overrides`` so the measures the live
``material`` catalogue cannot supply (``secondary_heating_removal``, the glazing
and heating gaps) are priced from the committed off-catalogue overlay instead of
crashing.
DB engine is module-scoped so the connection pool is reused across warm
invocations (ADR-0012).
The DB engine is module-scoped (ADR-0012). Architecturally each invocation uses
one DB connection at a time: the handler reads everything up front overrides,
Scenario, a catalogue snapshot, and stored Solar through one short-lived read
Session, closes it, then writes each Property in a sequential Unit of Work whose
overrides resolve on its own session, so no two Sessions ever overlap. The engine
uses ``NullPool`` rather than a fixed pool so that target is a graceful ceiling,
not a hard one: a fresh connection is opened per checkout and closed on return,
so there is no pool slot to exhaust any future accidental overlap opens a
transient second connection instead of dead-locking the Lambda.
"""
from __future__ import annotations
import dataclasses
import io
import os
from collections.abc import Callable
from collections.abc import Callable, Generator
from contextlib import contextmanager
from typing import Any, Optional, cast
import boto3
import pandas as pd # pyright: ignore[reportMissingTypeStubs]
from sqlalchemy import Engine, text
from sqlalchemy.pool import NullPool
from sqlmodel import Session
from datatypes.epc.domain.epc_property_data import (
@ -88,7 +97,10 @@ from repositories.fuel_rates.fuel_rates_static_file_repository import (
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.product.composite_product_repository import (
catalogue_with_off_catalogue_overrides,
catalogue_snapshot_with_off_catalogue_overrides,
)
from repositories.property.in_memory_property_overrides_reader import (
InMemoryPropertyOverridesReader,
)
from repositories.property.landlord_override_overlays import overlays_from
from repositories.property.override_backed_prediction_attributes_reader import (
@ -97,10 +109,17 @@ from repositories.property.override_backed_prediction_attributes_reader import (
from repositories.property.property_overrides_postgres_reader import (
PropertyOverridesPostgresReader,
)
from repositories.property.property_overrides_reader import (
ResolvedPropertyOverrides,
)
from repositories.scenario.scenario_postgres_repository import (
ScenarioPostgresRepository,
)
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from repositories.tasks.subtask_postgres_repository import (
SubTaskPostgresRepository,
)
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
from utilities.aws_lambda.task_handler import task_handler
from uuid import UUID
from utilities.logger import setup_logger
@ -119,13 +138,42 @@ def _get_engine() -> Engine:
global _engine
if _engine is None:
config = PostgresConfig.from_env(dict(os.environ))
# Reduced pool for Lambda: 32 concurrent containers × 3 connections = 96 max,
# vs the default 3+5=8 which would reach 256+ and exhaust RDS max_connections.
# pool_size=2 covers the simultaneous read_session + UoW session per invocation.
_engine = make_engine(dataclasses.replace(config, pool_size=2, max_overflow=1))
# Architecturally one connection per invocation: the handler reads
# everything up front through one short-lived read Session, closes it,
# then writes each Property in a sequential Unit of Work — and the Unit of
# Work resolves overrides on its own session — so no two Sessions overlap
# and a single connection suffices. 32 concurrent containers × 1 = 32
# against RDS.
#
# NullPool, not a fixed pool, enforces that as a *graceful* ceiling rather
# than a hard one: each checkout opens a fresh connection and closes it on
# return, so there is no pool slot to exhaust. If a future code path ever
# holds two Sessions at once it opens a second connection for that instant
# instead of dead-locking on a 1-slot pool and failing the whole
# invocation (the "QueuePool limit of size 1 overflow 0 reached" timeout).
# The design target stays one connection; NullPool just keeps the Lambda
# running if we ever regress it.
_engine = make_engine(config, poolclass=NullPool)
return _engine
@contextmanager
def _shared_engine_orchestrator() -> Generator[TaskOrchestrator, None, None]:
"""A ``TaskOrchestrator`` on the same module-scoped engine as the modelling
work, not a separate one.
Its repositories commit on every ``save``/``create``, releasing the
connection between bookkeeping calls, so it holds none while the wrapped
handler body runs. Combined with the read-then-write handler structure, the
whole invocation uses one DB connection at a time."""
engine = _get_engine()
with Session(engine) as session:
yield TaskOrchestrator(
task_repo=TaskPostgresRepository(session=session),
subtask_repo=SubTaskPostgresRepository(session=session),
)
def _s3_parquet_reader() -> ParquetReader:
bucket = os.environ["DATA_BUCKET"]
@ -244,7 +292,12 @@ def _predict_epc(
return predicted
@task_handler(task_source="modelling_e2e", source=Source.PROPERTY, pass_task_orchestrator=True)
@task_handler(
task_source="modelling_e2e",
source=Source.PROPERTY,
orchestrator_cm=_shared_engine_orchestrator,
pass_task_orchestrator=True,
)
def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, task_id: UUID) -> None:
trigger = ModellingE2ETriggerBody.model_validate(body)
property_ids = trigger.property_ids
@ -276,7 +329,14 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
uprns: dict[int, int] = {int(row[0]): int(row[1]) for row in uprn_rows}
postcodes: dict[int, str] = {int(row[0]): (row[1] or "") for row in postcode_rows}
overrides_reader = PropertyOverridesPostgresReader(lambda: Session(engine))
# Pre-fetch every Property's overrides up front (each call opens and closes
# its own short read Session) and serve them from memory through the loop, so
# no override read Session is held open alongside a write Unit of Work.
overrides_postgres_reader = PropertyOverridesPostgresReader(lambda: Session(engine))
overrides_by_pid: dict[int, ResolvedPropertyOverrides] = {
pid: overrides_postgres_reader.overrides_for(pid) for pid in property_ids
}
overrides_reader = InMemoryPropertyOverridesReader(overrides_by_pid)
prediction_attrs_reader = OverrideBackedPredictionAttributesReader(overrides_reader)
comparables_repo = EpcComparablePropertiesRepository(
epc_client, geospatial, nearby_postcodes=PostcodesIoClient()
@ -320,9 +380,23 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
read_session = Session(engine)
try:
# Read everything the write loop needs up front: the Scenario, an in-memory
# snapshot of the catalogue (priced after the Session closes), and each
# UPRN's stored Solar insights. Then close the read Session immediately so
# its pooled connection is free before the loop — each Property's write
# Unit of Work reuses that single connection rather than opening a second
# alongside a held-open read Session. (The ``finally`` is the safety net.)
scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0]
products = catalogue_with_off_catalogue_overrides(read_session)
solar_reader = SolarPostgresRepository(read_session)
products = catalogue_snapshot_with_off_catalogue_overrides(read_session)
stored_solar: dict[int, Optional[dict[str, Any]]] = (
{}
if no_solar
else {
uprn: SolarPostgresRepository(read_session).get(uprn)
for uprn in set(uprns.values())
}
)
read_session.close()
for property_id in property_ids:
child = orchestrator.create_child_subtask(
@ -392,7 +466,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
if no_solar:
solar_insights = None
else:
solar_insights = solar_reader.get(uprn)
solar_insights = stored_solar.get(uprn)
if solar_insights is None:
solar_insights = _solar_insights_for(solar_client, spatial)
solar_was_fetched = solar_insights is not None

View file

@ -15,7 +15,7 @@ from repositories.product.composite_product_repository import (
catalogue_with_off_catalogue_overrides,
)
from repositories.property.property_overrides_postgres_reader import (
PropertyOverridesPostgresReader,
OpenSessionPropertyOverridesReader,
)
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
@ -46,10 +46,13 @@ class PostgresUnitOfWork(UnitOfWork):
spatial_repo = SpatialPostgresRepository(self._session)
# Fold Landlord Overrides onto the Effective EPC on every re-hydration
# (ADR-0032), so what the Baseline orchestrator scores off ``uow.property``
# matches what the Plan was modelled from. The reader is uow-independent —
# ``property_overrides`` is committed reference data — so it opens its own
# short read session per call via the same session factory.
overrides_reader = PropertyOverridesPostgresReader(self._session_factory)
# matches what the Plan was modelled from. ``property_overrides`` is
# committed reference data, but the reader must read on THIS uow's session
# — a second session opened concurrently checks out a second connection
# and deadlocks the single-connection pool while this uow's transaction is
# open. Reading committed data inside the uow's transaction is correct and
# keeps the invocation on one connection.
overrides_reader = OpenSessionPropertyOverridesReader(self._session)
self.property = PropertyPostgresRepository(
self._session, epc_repo, spatial_repo, overrides_reader
)

View file

@ -7,6 +7,7 @@ from sqlmodel import Session
from domain.modelling.product import Product
from repositories.product.product_json_repository import ProductJsonRepository
from repositories.product.product_postgres_repository import (
MaterialSnapshotRepository,
ProductPostgresRepository,
)
from repositories.product.product_repository import ProductRepository
@ -49,3 +50,17 @@ def catalogue_with_off_catalogue_overrides(session: Session) -> ProductRepositor
override=ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH),
fallback=ProductPostgresRepository(session),
)
def catalogue_snapshot_with_off_catalogue_overrides(
session: Session,
) -> ProductRepository:
"""Like ``catalogue_with_off_catalogue_overrides`` but the catalogue side is
an in-memory snapshot read once from ``session`` so the Modelling stage can
price after the read Session is closed. Used by the modelling_e2e Lambda to
hold a single DB connection per invocation; the live-querying variant above
stays for the per-Property Unit of Work and the local e2e runner."""
return CompositeProductRepository(
override=ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH),
fallback=MaterialSnapshotRepository.load(session),
)

View file

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Optional
from sqlmodel import Session, col, select
from domain.modelling.contingencies import contingency_rate
@ -58,3 +60,59 @@ class ProductPostgresRepository(ProductRepository):
contingency_rate=contingency_rate(measure_type),
id=row.id,
)
class MaterialSnapshotRepository(ProductRepository):
"""An in-memory snapshot of the active ``material`` catalogue, read once from
a Session via ``load``.
It lets the Modelling stage price Measure Options *after* the read Session is
closed, so a Lambda invocation can hold a single DB connection at a time the
read Session, then each per-Property write Unit of Work, never overlapping
instead of keeping the read Session's connection checked out across the whole
write loop while the catalogue is queried lazily.
``get`` mirrors ``ProductPostgresRepository.get`` exactly: the same
domain``material.type`` drift mapping, the lowest-id active row per type, and
the same ``ProductNotFound`` / missing-``total_cost`` errors. Because the
snapshot is a plain dict lookup (no live query), the off-catalogue measures
that would poison a live session's transaction simply miss and raise the
benign ``ProductNotFound`` the ``CompositeProductRepository`` override still
keeps them off the catalogue first.
"""
def __init__(
self, rows_by_catalogue_type: dict[str, tuple[Optional[float], int]]
) -> None:
self._rows_by_catalogue_type = rows_by_catalogue_type
@classmethod
def load(cls, session: Session) -> "MaterialSnapshotRepository":
rows = session.exec(
select(MaterialRow)
.where(col(MaterialRow.is_active).is_(True))
.order_by(col(MaterialRow.id))
).all()
rows_by_catalogue_type: dict[str, tuple[Optional[float], int]] = {}
for row in rows:
# Lowest id wins per type — mirrors ``.first()`` after ``order_by(id)``.
if row.type not in rows_by_catalogue_type:
rows_by_catalogue_type[row.type] = (row.total_cost, row.id)
return cls(rows_by_catalogue_type)
def get(self, measure_type: str) -> Product:
catalogue_type = _MATERIAL_TYPE_BY_MEASURE.get(measure_type, measure_type)
entry = self._rows_by_catalogue_type.get(catalogue_type)
if entry is None:
raise ProductNotFound(
f"no active product for measure type {measure_type!r}"
)
total_cost, row_id = entry
if total_cost is None:
raise ValueError(f"product {measure_type!r} has no total_cost")
return Product(
measure_type=measure_type,
unit_cost_per_m2=total_cost,
contingency_rate=contingency_rate(measure_type),
id=row_id,
)

View file

@ -0,0 +1,31 @@
"""In-memory ``PropertyOverridesReader`` over a pre-fetched snapshot.
The modelling_e2e handler reads each Property's overrides inside its write loop.
Reading them live keeps a second DB Session open alongside the per-Property write
Unit of Work; pre-fetching every Property's overrides up front (one read pass,
then the read Session is closed) lets the loop run on a single DB connection. This
serves those pre-fetched snapshots behind the same ``PropertyOverridesReader``
port a Property with no pre-fetched entry resolves to no overrides, exactly as
the Postgres reader returns an empty snapshot for a Property with no rows.
"""
from __future__ import annotations
from collections.abc import Mapping
from repositories.property.property_overrides_reader import (
PropertyOverridesReader,
ResolvedPropertyOverrides,
)
_NO_OVERRIDES = ResolvedPropertyOverrides(rows=())
class InMemoryPropertyOverridesReader(PropertyOverridesReader):
def __init__(
self, by_property_id: Mapping[int, ResolvedPropertyOverrides]
) -> None:
self._by_property_id = dict(by_property_id)
def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides:
return self._by_property_id.get(property_id, _NO_OVERRIDES)

View file

@ -2,9 +2,16 @@
Read-only and uow-independent: ``property_overrides`` is committed reference
data the ``bulk_upload_finaliser`` Lambda writes at Finalise, long before First
Run executes there is no transactional coupling to the ingestion run, so this
opens its own short read session per call via the injected session factory
(mirroring the composition root's ``lambda: Session(engine)``).
Run executes there is no transactional coupling to the ingestion run, so the
standalone reader opens its own short read session per call via the injected
session factory (mirroring the composition root's ``lambda: Session(engine)``).
Inside a Unit of Work the overrides must instead be read on the UoW's *own*
session (``OpenSessionPropertyOverridesReader``): a second session opened
concurrently checks out a second connection, which deadlocks the modelling_e2e
Lambda's single-connection pool while the UoW's read transaction is still open.
Reading committed reference data inside the UoW's transaction is correct — it
sees the same committed rows and keeps the invocation on one connection.
"""
from __future__ import annotations
@ -21,25 +28,43 @@ from repositories.property.property_overrides_reader import (
)
def _resolve_overrides(session: Session, property_id: int) -> ResolvedPropertyOverrides:
rows = session.exec(
select(PropertyOverrideRow).where(
col(PropertyOverrideRow.property_id) == property_id
)
).all()
return ResolvedPropertyOverrides(
rows=tuple(
ResolvedPropertyOverride(
override_component=row.override_component,
building_part=row.building_part,
override_value=row.override_value,
)
for row in rows
)
)
class PropertyOverridesPostgresReader(PropertyOverridesReader):
"""Opens its own short read session per call — for standalone use outside a
Unit of Work, where there is no shared session/connection to reuse."""
def __init__(self, session_factory: Callable[[], Session]) -> None:
self._session_factory = session_factory
def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides:
with self._session_factory() as session:
rows = session.exec(
select(PropertyOverrideRow).where(
col(PropertyOverrideRow.property_id) == property_id
)
).all()
return _resolve_overrides(session, property_id)
return ResolvedPropertyOverrides(
rows=tuple(
ResolvedPropertyOverride(
override_component=row.override_component,
building_part=row.building_part,
override_value=row.override_value,
)
for row in rows
)
)
class OpenSessionPropertyOverridesReader(PropertyOverridesReader):
"""Reads on a caller-owned, already-open session without closing it — for use
inside a Unit of Work so resolving overrides reuses the UoW's single
connection instead of checking out a second one."""
def __init__(self, session: Session) -> None:
self._session = session
def overrides_for(self, property_id: int) -> ResolvedPropertyOverrides:
return _resolve_overrides(self._session, property_id)

View file

@ -174,7 +174,7 @@ def test_handler_creates_one_child_subtask_per_property_id() -> None:
stack.enter_context(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
patch("applications.modelling_e2e.handler.run_modelling", return_value=_plan_mock())
@ -260,7 +260,7 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.Session")
@ -349,7 +349,7 @@ def test_skipped_cohort_certs_do_not_prevent_plan_being_saved() -> None:
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch(
"applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"
"applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides"
)
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
@ -417,7 +417,7 @@ def test_skipped_cohort_certs_are_logged_and_handler_does_not_raise() -> None:
stack.enter_context(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
patch("applications.modelling_e2e.handler.run_modelling", return_value=mock_plan)
@ -534,7 +534,7 @@ def test_prediction_path_saves_predicted_epc_plan_and_baseline(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.Session")
@ -637,7 +637,7 @@ def test_empty_cohort_gates_property_out_without_saving() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
MockUoW = stack.enter_context(
@ -743,7 +743,7 @@ def test_empty_own_postcode_broadens_to_nearby_and_predicts() -> None:
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch(
"applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"
"applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides"
)
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
@ -822,7 +822,7 @@ def test_per_property_failure_fails_child_subtask_and_siblings_continue() -> Non
stack.enter_context(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides"))
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
patch(
@ -929,7 +929,7 @@ def test_cohort_cache_prevents_duplicate_candidates_for_calls() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
@ -1012,7 +1012,7 @@ def test_dry_run_skips_all_db_writes() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
patch("applications.modelling_e2e.handler.catalogue_snapshot_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(

View file

@ -0,0 +1,170 @@
"""Behaviour of the in-memory catalogue snapshot. It must price exactly as the
live ``ProductPostgresRepository`` does same drift mapping, same lowest-id
pick, same errors but after the read Session that built it has closed, so the
modelling_e2e Lambda can hold a single DB connection per invocation."""
import pytest
from sqlalchemy import Engine
from sqlmodel import Session
from domain.modelling.product import Product
from infrastructure.postgres.product_table import MaterialRow
from repositories.product.product_postgres_repository import (
MaterialSnapshotRepository,
ProductPostgresRepository,
)
from repositories.product.product_repository import ProductNotFound
def test_snapshot_prices_after_the_loading_session_is_closed(
db_engine: Engine,
) -> None:
# Arrange
with Session(db_engine) as session:
session.add(
MaterialRow(
id=1,
type="cavity_wall_insulation",
total_cost=18.5,
cost_unit="gbp_per_m2",
is_active=True,
description="Cavity wall insulation",
)
)
session.commit()
# Act — load the snapshot, then close the Session before pricing.
with Session(db_engine) as session:
snapshot = MaterialSnapshotRepository.load(session)
product: Product = snapshot.get("cavity_wall_insulation")
# Assert — same mapping as the live repository, no Session needed.
assert product.measure_type == "cavity_wall_insulation"
assert abs(product.unit_cost_per_m2 - 18.5) <= 1e-9
assert abs(product.contingency_rate - 0.10) <= 1e-9
assert product.id == 1
def test_snapshot_resolves_a_drifted_measure_type_to_the_catalogue_spelling(
db_engine: Engine,
) -> None:
# Arrange — the domain type drifts from the catalogue's ``material.type``.
with Session(db_engine) as session:
session.add(
MaterialRow(
id=1,
type="low_energy_lighting_installation",
total_cost=5.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Low energy lighting",
)
)
session.commit()
# Act
with Session(db_engine) as session:
snapshot = MaterialSnapshotRepository.load(session)
product: Product = snapshot.get("low_energy_lighting")
# Assert — keeps the domain measure type, prices the catalogue row.
assert product.measure_type == "low_energy_lighting"
assert abs(product.unit_cost_per_m2 - 5.0) <= 1e-9
def test_snapshot_picks_the_lowest_id_when_several_active_rows_share_a_type(
db_engine: Engine,
) -> None:
# Arrange — many active rows per type; the pick must be deterministic.
with Session(db_engine) as session:
session.add_all(
[
MaterialRow(
id=7,
type="solar_pv",
total_cost=99.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Solar PV (higher id)",
),
MaterialRow(
id=3,
type="solar_pv",
total_cost=42.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Solar PV (lowest id)",
),
]
)
session.commit()
# Act
with Session(db_engine) as session:
snapshot = MaterialSnapshotRepository.load(session)
product: Product = snapshot.get("solar_pv")
# Assert — lowest-id active row wins, matching the live repository.
assert product.id == 3
assert abs(product.unit_cost_per_m2 - 42.0) <= 1e-9
def test_snapshot_excludes_inactive_rows_and_raises_product_not_found(
db_engine: Engine,
) -> None:
# Arrange — only an inactive row exists.
with Session(db_engine) as session:
session.add(
MaterialRow(
id=1,
type="cavity_wall_insulation",
total_cost=18.5,
cost_unit="gbp_per_m2",
is_active=False,
description="Cavity wall insulation (retired)",
)
)
session.commit()
# Act
with Session(db_engine) as session:
snapshot = MaterialSnapshotRepository.load(session)
# Assert — no active row, so the benign ProductNotFound (a ValueError).
with pytest.raises(ProductNotFound):
snapshot.get("cavity_wall_insulation")
def test_snapshot_matches_the_live_repository_across_a_mixed_catalogue(
db_engine: Engine,
) -> None:
# Arrange — a catalogue with a drifted type, duplicate active rows, and an
# inactive row: the snapshot and the live repo must agree row-for-row.
with Session(db_engine) as session:
session.add_all(
[
MaterialRow(id=1, type="cavity_wall_insulation", total_cost=18.5,
is_active=True),
MaterialRow(id=2, type="solar_pv", total_cost=99.0, is_active=True),
MaterialRow(id=5, type="solar_pv", total_cost=42.0, is_active=True),
MaterialRow(id=6, type="low_energy_lighting_installation",
total_cost=5.0, is_active=True),
MaterialRow(id=9, type="loft_insulation", total_cost=12.0,
is_active=False),
]
)
session.commit()
# Act
with Session(db_engine) as session:
snapshot = MaterialSnapshotRepository.load(session)
live = ProductPostgresRepository(session)
for measure_type in [
"cavity_wall_insulation",
"solar_pv",
"low_energy_lighting",
]:
live_product = live.get(measure_type)
snapshot_product = snapshot.get(measure_type)
# Assert — identical pricing for every priced measure.
assert snapshot_product == live_product

View file

@ -0,0 +1,66 @@
"""Behaviour of the pre-fetched in-memory overrides reader: it serves snapshots
keyed by Property, and resolves a Property with no entry to no overrides exactly
as the Postgres reader returns an empty snapshot for a Property with no rows."""
from repositories.property.in_memory_property_overrides_reader import (
InMemoryPropertyOverridesReader,
)
from repositories.property.property_overrides_reader import (
ResolvedPropertyOverride,
ResolvedPropertyOverrides,
)
def test_serves_the_prefetched_snapshot_for_a_known_property() -> None:
# Arrange
snapshot = ResolvedPropertyOverrides(
rows=(
ResolvedPropertyOverride(
override_component="property_type",
building_part=0,
override_value="House",
),
)
)
reader = InMemoryPropertyOverridesReader({42: snapshot})
# Act
resolved = reader.overrides_for(42)
# Assert
assert resolved is snapshot
assert resolved.value("property_type", 0) == "House"
def test_unknown_property_resolves_to_no_overrides() -> None:
# Arrange — only property 42 was pre-fetched.
reader = InMemoryPropertyOverridesReader({42: ResolvedPropertyOverrides(rows=())})
# Act
resolved = reader.overrides_for(999)
# Assert — empty snapshot, not an error (mirrors the Postgres reader).
assert resolved.rows == ()
assert resolved.value("property_type", 0) is None
def test_does_not_alias_the_caller_mapping() -> None:
# Arrange — a mutable mapping handed to the reader.
backing: dict[int, ResolvedPropertyOverrides] = {
1: ResolvedPropertyOverrides(rows=())
}
reader = InMemoryPropertyOverridesReader(backing)
# Act — mutate the caller's mapping after construction.
backing[2] = ResolvedPropertyOverrides(
rows=(
ResolvedPropertyOverride(
override_component="built_form_type",
building_part=0,
override_value="Detached",
),
)
)
# Assert — the reader took its own copy; the late insert is invisible.
assert reader.overrides_for(2).rows == ()

View file

@ -7,7 +7,7 @@ from typing import Any
import pytest
from sqlalchemy import Engine
from sqlmodel import Session
from sqlmodel import Session, create_engine
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import (
@ -152,6 +152,62 @@ def test_unit_hydrates_a_property_with_its_landlord_overrides_folded(
assert main.wall_insulation_type == 3
def test_hydrating_a_property_with_overrides_stays_on_one_connection(
db_engine: Engine,
) -> None:
"""Resolving Landlord Overrides during re-hydration must read on the unit's
OWN session, not open a second one. The modelling_e2e Lambda runs on a
single-connection pool (``pool_size=1, max_overflow=0``); a second session
opened while the unit's read transaction is still open checks out a second
connection and deadlocks ("QueuePool limit of size 1 overflow 0 reached,
connection timed out"). This pins that the unit holds exactly one connection
while it hydrates a Property that has overrides.
"""
# Arrange — seed a property + EPC + an override (committed reference data).
with PostgresUnitOfWork(_session_factory(db_engine)) as uow:
row = PropertyRow(portfolio_id=1, postcode="A0 0AA", address="1 St", uprn=1)
uow._session.add(row) # pyright: ignore[reportPrivateUsage]
uow._session.flush() # pyright: ignore[reportPrivateUsage]
property_id = row.id
assert property_id is not None
EpcPostgresRepository(uow._session).save( # pyright: ignore[reportPrivateUsage]
_epc(), property_id=property_id
)
uow._session.add( # pyright: ignore[reportPrivateUsage]
PropertyOverrideRow(
property_id=property_id,
portfolio_id=1,
building_part=0,
override_component="wall_type",
override_value="Solid brick, with internal insulation",
original_spreadsheet_description="solid brick, insulated",
)
)
uow.commit()
# A pool that admits exactly one connection and fails fast (not after 30s) if
# a second is requested — the production modelling_e2e shape.
single_connection = create_engine(
db_engine.url, pool_size=1, max_overflow=0, pool_timeout=2
)
try:
# Act — hydrate through the unit; this resolves the override.
with PostgresUnitOfWork(_session_factory(single_connection)) as uow:
prop = uow.property.get(property_id)
finally:
single_connection.dispose()
# Assert — reached here without a QueuePool timeout, and the override folded:
# cavity (4) → solid brick (3) / internal (3).
main = next(
part
for part in prop.effective_epc.sap_building_parts
if part.identifier is BuildingPartIdentifier.MAIN
)
assert main.wall_construction == 3
assert main.wall_insulation_type == 3
def test_leaving_the_block_without_commit_persists_nothing(db_engine: Engine) -> None:
# Arrange
new_unit = lambda: PostgresUnitOfWork(_session_factory(db_engine))