Batch plan saves reduce RDS CPU during bulk modelling runs 🟥

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Roth 2026-06-29 15:04:54 +00:00
parent 9f0dd067f8
commit 9c6b477025
3 changed files with 211 additions and 1 deletions

View file

@ -4,7 +4,7 @@ from sqlmodel import Session, col, update
from domain.modelling.plan import Plan
from infrastructure.postgres.modelling import PlanModel, RecommendationModel
from repositories.plan.plan_repository import PlanRepository
from repositories.plan.plan_repository import PlanRepository, PlanSaveRequest
class PlanPostgresRepository(PlanRepository):
@ -63,3 +63,6 @@ class PlanPostgresRepository(PlanRepository):
)
)
return plan_row.id
def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]:
raise NotImplementedError

View file

@ -1,10 +1,25 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from domain.modelling.plan import Plan
@dataclass(frozen=True)
class PlanSaveRequest:
"""Bundles the five fields the plan repository needs to persist one Plan.
Mirrors ``EpcSaveRequest`` in shape used by ``PlanRepository.save_batch()``
to accumulate write intent before the batch is flushed in one Unit of Work."""
plan: Plan
property_id: int
scenario_id: int
portfolio_id: int
is_default: bool
class PlanRepository(ABC):
"""Persists a Plan (and its Plan Measures) for a Property + Scenario.
@ -30,3 +45,12 @@ class PlanRepository(ABC):
``(property_id, scenario_id)`` as history; when ``is_default`` is True,
demotes those prior Plans to ``is_default=False``."""
...
@abstractmethod
def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]:
"""Persist a batch of Plans in three statements regardless of batch size.
Returns one plan id per request in input order. Fires a single demote
UPDATE only when at least one request has ``is_default=True``. Keeps
prior Plans as history (ADR-0017)."""
...

View file

@ -0,0 +1,183 @@
"""Batch plan write path — save_batch() correctness and safety tests.
Guards the four user stories from #1355:
1. save()/save_batch() parity: a single-element save_batch() produces
identical DB state (plan row + recommendation rows) as the equivalent
save() call.
2. Recommendation FK isolation: two properties in the same save_batch() each
get their own recommendation rows; no FK cross-wiring between properties.
3. Demote correctness: a second save_batch() for the same properties demotes
the prior default Plans and inserts fresh ones (history preserved).
4. Non-default batch: a save_batch() where all writes have is_default=False
leaves any pre-existing default Plan untouched.
"""
from __future__ import annotations
from sqlalchemy import Engine
from sqlmodel import Session, col, select
from domain.modelling.measure_type import MeasureType
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import Cost
from domain.modelling.scoring.package_scorer import Score
from domain.modelling.scoring.scoring import MeasureImpact
from infrastructure.postgres.modelling import PlanModel, RecommendationModel
from repositories.plan.plan_postgres_repository import PlanPostgresRepository
from repositories.plan.plan_repository import PlanSaveRequest
def _plan(*, sap: float = 70.0, measures: int = 1) -> Plan:
ms: tuple[PlanMeasure, ...] = tuple(
PlanMeasure(
measure_type=MeasureType.CAVITY_WALL_INSULATION,
description="Cavity wall insulation",
cost=Cost(total=1000.0, contingency_rate=0.10),
impact=MeasureImpact(
sap_points=8.0,
co2_savings_kg_per_yr=500.0,
energy_savings_kwh_per_yr=2000.0,
),
kwh_savings=1500.0,
energy_cost_savings=300.0,
)
for _ in range(measures)
)
return Plan(
measures=ms,
baseline=Score(sap_continuous=40.0, co2_kg_per_yr=4000.0, primary_energy_kwh_per_yr=20000.0),
post_retrofit=Score(sap_continuous=sap, co2_kg_per_yr=3500.0, primary_energy_kwh_per_yr=18000.0),
)
# ---------------------------------------------------------------------------
# Tracer bullet: single-element save_batch() is loss-free vs save()
# ---------------------------------------------------------------------------
def test_single_request_save_batch_matches_save(db_engine: Engine) -> None:
# Arrange
plan = _plan()
scenario_id = 7
with Session(db_engine) as session:
repo = PlanPostgresRepository(session)
save_id = repo.save(plan, property_id=5001, scenario_id=scenario_id, portfolio_id=1, is_default=True)
batch_id = repo.save_batch([PlanSaveRequest(plan, property_id=5002, scenario_id=scenario_id, portfolio_id=1, is_default=True)])[0]
session.commit()
# Act
with Session(db_engine) as session:
via_save = session.get(PlanModel, save_id)
via_batch = session.get(PlanModel, batch_id)
recs_save = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == save_id)).all()
recs_batch = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == batch_id)).all()
# Assert — both paths produce one plan row + one recommendation row with the
# same field values (modulo property_id which differs by design).
assert via_save is not None
assert via_batch is not None
assert via_save.is_default is True
assert via_batch.is_default is True
assert via_save.post_sap_points == via_batch.post_sap_points
assert via_save.post_co2_emissions == via_batch.post_co2_emissions
assert via_save.co2_savings == via_batch.co2_savings
assert len(recs_save) == 1
assert len(recs_batch) == 1
assert recs_save[0].type == recs_batch[0].type
assert recs_save[0].estimated_cost == recs_batch[0].estimated_cost
assert recs_save[0].sap_points == recs_batch[0].sap_points
assert recs_save[0].plan_id == batch_id
# ---------------------------------------------------------------------------
# FK isolation: recommendation rows must not be crossed between properties
# ---------------------------------------------------------------------------
def test_multi_property_recommendation_fks_are_not_crossed(db_engine: Engine) -> None:
# Arrange — property A gets 2 measures, property B gets 1 measure.
plan_a = _plan(measures=2)
plan_b = _plan(measures=1)
with Session(db_engine) as session:
[id_a, id_b] = PlanPostgresRepository(session).save_batch([
PlanSaveRequest(plan_a, property_id=6001, scenario_id=7, portfolio_id=1, is_default=True),
PlanSaveRequest(plan_b, property_id=6002, scenario_id=7, portfolio_id=1, is_default=True),
])
session.commit()
# Act
with Session(db_engine) as session:
recs_a = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == id_a)).all()
recs_b = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == id_b)).all()
# Assert — A has 2 rows, B has 1; none cross-wired.
assert len(recs_a) == 2
assert len(recs_b) == 1
assert all(r.plan_id == id_a and r.property_id == 6001 for r in recs_a)
assert all(r.plan_id == id_b and r.property_id == 6002 for r in recs_b)
# ---------------------------------------------------------------------------
# Demote correctness: second save_batch() demotes prior defaults
# ---------------------------------------------------------------------------
def test_second_save_batch_demotes_prior_default_plans(db_engine: Engine) -> None:
# Arrange — first batch creates default Plans for two properties.
plan = _plan()
requests = [
PlanSaveRequest(plan, property_id=7001, scenario_id=7, portfolio_id=1, is_default=True),
PlanSaveRequest(plan, property_id=7002, scenario_id=7, portfolio_id=1, is_default=True),
]
with Session(db_engine) as session:
first_ids = PlanPostgresRepository(session).save_batch(requests)
session.commit()
# Act — re-run the same batch; new Plans should become default, old ones demoted.
with Session(db_engine) as session:
second_ids = PlanPostgresRepository(session).save_batch(requests)
session.commit()
# Assert — history is preserved (4 plan rows total); exactly one default per property.
with Session(db_engine) as session:
rows_7001 = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 7001)).all()
rows_7002 = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 7002)).all()
by_id_7001 = {p.id: p for p in rows_7001}
by_id_7002 = {p.id: p for p in rows_7002}
assert len(rows_7001) == 2
assert len(rows_7002) == 2
assert by_id_7001[first_ids[0]].is_default is False
assert by_id_7001[second_ids[0]].is_default is True
assert by_id_7002[first_ids[1]].is_default is False
assert by_id_7002[second_ids[1]].is_default is True
# ---------------------------------------------------------------------------
# Non-default batch: existing default Plan is untouched
# ---------------------------------------------------------------------------
def test_non_default_save_batch_does_not_demote_existing_default(db_engine: Engine) -> None:
# Arrange — a default Plan already exists for the property.
plan = _plan()
with Session(db_engine) as session:
default_id = PlanPostgresRepository(session).save(
plan, property_id=8001, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Act — save a non-default Plan via save_batch(); no demote UPDATE should fire.
with Session(db_engine) as session:
PlanPostgresRepository(session).save_batch([
PlanSaveRequest(plan, property_id=8001, scenario_id=7, portfolio_id=1, is_default=False),
])
session.commit()
# Assert — the original default Plan is still the default.
with Session(db_engine) as session:
rows = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 8001)).all()
by_id = {p.id: p for p in rows}
assert len(rows) == 2
assert by_id[default_id].is_default is True
assert sum(1 for p in rows if p.is_default) == 1