From 9c6b47702515b1b329e16d1107f79dd365777d36 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Mon, 29 Jun 2026 15:04:54 +0000 Subject: [PATCH] =?UTF-8?q?Batch=20plan=20saves=20reduce=20RDS=20CPU=20dur?= =?UTF-8?q?ing=20bulk=20modelling=20runs=20=F0=9F=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- repositories/plan/plan_postgres_repository.py | 5 +- repositories/plan/plan_repository.py | 24 +++ .../repositories/plan/test_plan_batch_save.py | 183 ++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 tests/repositories/plan/test_plan_batch_save.py diff --git a/repositories/plan/plan_postgres_repository.py b/repositories/plan/plan_postgres_repository.py index 7be21bac..36253764 100644 --- a/repositories/plan/plan_postgres_repository.py +++ b/repositories/plan/plan_postgres_repository.py @@ -4,7 +4,7 @@ from sqlmodel import Session, col, update from domain.modelling.plan import Plan from infrastructure.postgres.modelling import PlanModel, RecommendationModel -from repositories.plan.plan_repository import PlanRepository +from repositories.plan.plan_repository import PlanRepository, PlanSaveRequest class PlanPostgresRepository(PlanRepository): @@ -63,3 +63,6 @@ class PlanPostgresRepository(PlanRepository): ) ) return plan_row.id + + def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]: + raise NotImplementedError diff --git a/repositories/plan/plan_repository.py b/repositories/plan/plan_repository.py index b534e8ea..6fdc16d6 100644 --- a/repositories/plan/plan_repository.py +++ b/repositories/plan/plan_repository.py @@ -1,10 +1,25 @@ from __future__ import annotations from abc import ABC, abstractmethod +from dataclasses import dataclass from domain.modelling.plan import Plan +@dataclass(frozen=True) +class PlanSaveRequest: + """Bundles the five fields the plan repository needs to persist one Plan. + + Mirrors ``EpcSaveRequest`` in shape — used by ``PlanRepository.save_batch()`` + to accumulate write intent before the batch is flushed in one Unit of Work.""" + + plan: Plan + property_id: int + scenario_id: int + portfolio_id: int + is_default: bool + + class PlanRepository(ABC): """Persists a Plan (and its Plan Measures) for a Property + Scenario. @@ -30,3 +45,12 @@ class PlanRepository(ABC): ``(property_id, scenario_id)`` as history; when ``is_default`` is True, demotes those prior Plans to ``is_default=False``.""" ... + + @abstractmethod + def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]: + """Persist a batch of Plans in three statements regardless of batch size. + + Returns one plan id per request in input order. Fires a single demote + UPDATE only when at least one request has ``is_default=True``. Keeps + prior Plans as history (ADR-0017).""" + ... diff --git a/tests/repositories/plan/test_plan_batch_save.py b/tests/repositories/plan/test_plan_batch_save.py new file mode 100644 index 00000000..e82d0f1e --- /dev/null +++ b/tests/repositories/plan/test_plan_batch_save.py @@ -0,0 +1,183 @@ +"""Batch plan write path — save_batch() correctness and safety tests. + +Guards the four user stories from #1355: + 1. save()/save_batch() parity: a single-element save_batch() produces + identical DB state (plan row + recommendation rows) as the equivalent + save() call. + 2. Recommendation FK isolation: two properties in the same save_batch() each + get their own recommendation rows; no FK cross-wiring between properties. + 3. Demote correctness: a second save_batch() for the same properties demotes + the prior default Plans and inserts fresh ones (history preserved). + 4. Non-default batch: a save_batch() where all writes have is_default=False + leaves any pre-existing default Plan untouched. +""" + +from __future__ import annotations + +from sqlalchemy import Engine +from sqlmodel import Session, col, select + +from domain.modelling.measure_type import MeasureType +from domain.modelling.plan import Plan, PlanMeasure +from domain.modelling.recommendation import Cost +from domain.modelling.scoring.package_scorer import Score +from domain.modelling.scoring.scoring import MeasureImpact +from infrastructure.postgres.modelling import PlanModel, RecommendationModel +from repositories.plan.plan_postgres_repository import PlanPostgresRepository +from repositories.plan.plan_repository import PlanSaveRequest + + +def _plan(*, sap: float = 70.0, measures: int = 1) -> Plan: + ms: tuple[PlanMeasure, ...] = tuple( + PlanMeasure( + measure_type=MeasureType.CAVITY_WALL_INSULATION, + description="Cavity wall insulation", + cost=Cost(total=1000.0, contingency_rate=0.10), + impact=MeasureImpact( + sap_points=8.0, + co2_savings_kg_per_yr=500.0, + energy_savings_kwh_per_yr=2000.0, + ), + kwh_savings=1500.0, + energy_cost_savings=300.0, + ) + for _ in range(measures) + ) + return Plan( + measures=ms, + baseline=Score(sap_continuous=40.0, co2_kg_per_yr=4000.0, primary_energy_kwh_per_yr=20000.0), + post_retrofit=Score(sap_continuous=sap, co2_kg_per_yr=3500.0, primary_energy_kwh_per_yr=18000.0), + ) + + +# --------------------------------------------------------------------------- +# Tracer bullet: single-element save_batch() is loss-free vs save() +# --------------------------------------------------------------------------- + +def test_single_request_save_batch_matches_save(db_engine: Engine) -> None: + # Arrange + plan = _plan() + scenario_id = 7 + + with Session(db_engine) as session: + repo = PlanPostgresRepository(session) + save_id = repo.save(plan, property_id=5001, scenario_id=scenario_id, portfolio_id=1, is_default=True) + batch_id = repo.save_batch([PlanSaveRequest(plan, property_id=5002, scenario_id=scenario_id, portfolio_id=1, is_default=True)])[0] + session.commit() + + # Act + with Session(db_engine) as session: + via_save = session.get(PlanModel, save_id) + via_batch = session.get(PlanModel, batch_id) + recs_save = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == save_id)).all() + recs_batch = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == batch_id)).all() + + # Assert — both paths produce one plan row + one recommendation row with the + # same field values (modulo property_id which differs by design). + assert via_save is not None + assert via_batch is not None + assert via_save.is_default is True + assert via_batch.is_default is True + assert via_save.post_sap_points == via_batch.post_sap_points + assert via_save.post_co2_emissions == via_batch.post_co2_emissions + assert via_save.co2_savings == via_batch.co2_savings + assert len(recs_save) == 1 + assert len(recs_batch) == 1 + assert recs_save[0].type == recs_batch[0].type + assert recs_save[0].estimated_cost == recs_batch[0].estimated_cost + assert recs_save[0].sap_points == recs_batch[0].sap_points + assert recs_save[0].plan_id == batch_id + + +# --------------------------------------------------------------------------- +# FK isolation: recommendation rows must not be crossed between properties +# --------------------------------------------------------------------------- + +def test_multi_property_recommendation_fks_are_not_crossed(db_engine: Engine) -> None: + # Arrange — property A gets 2 measures, property B gets 1 measure. + plan_a = _plan(measures=2) + plan_b = _plan(measures=1) + + with Session(db_engine) as session: + [id_a, id_b] = PlanPostgresRepository(session).save_batch([ + PlanSaveRequest(plan_a, property_id=6001, scenario_id=7, portfolio_id=1, is_default=True), + PlanSaveRequest(plan_b, property_id=6002, scenario_id=7, portfolio_id=1, is_default=True), + ]) + session.commit() + + # Act + with Session(db_engine) as session: + recs_a = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == id_a)).all() + recs_b = session.exec(select(RecommendationModel).where(col(RecommendationModel.plan_id) == id_b)).all() + + # Assert — A has 2 rows, B has 1; none cross-wired. + assert len(recs_a) == 2 + assert len(recs_b) == 1 + assert all(r.plan_id == id_a and r.property_id == 6001 for r in recs_a) + assert all(r.plan_id == id_b and r.property_id == 6002 for r in recs_b) + + +# --------------------------------------------------------------------------- +# Demote correctness: second save_batch() demotes prior defaults +# --------------------------------------------------------------------------- + +def test_second_save_batch_demotes_prior_default_plans(db_engine: Engine) -> None: + # Arrange — first batch creates default Plans for two properties. + plan = _plan() + requests = [ + PlanSaveRequest(plan, property_id=7001, scenario_id=7, portfolio_id=1, is_default=True), + PlanSaveRequest(plan, property_id=7002, scenario_id=7, portfolio_id=1, is_default=True), + ] + with Session(db_engine) as session: + first_ids = PlanPostgresRepository(session).save_batch(requests) + session.commit() + + # Act — re-run the same batch; new Plans should become default, old ones demoted. + with Session(db_engine) as session: + second_ids = PlanPostgresRepository(session).save_batch(requests) + session.commit() + + # Assert — history is preserved (4 plan rows total); exactly one default per property. + with Session(db_engine) as session: + rows_7001 = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 7001)).all() + rows_7002 = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 7002)).all() + + by_id_7001 = {p.id: p for p in rows_7001} + by_id_7002 = {p.id: p for p in rows_7002} + + assert len(rows_7001) == 2 + assert len(rows_7002) == 2 + assert by_id_7001[first_ids[0]].is_default is False + assert by_id_7001[second_ids[0]].is_default is True + assert by_id_7002[first_ids[1]].is_default is False + assert by_id_7002[second_ids[1]].is_default is True + + +# --------------------------------------------------------------------------- +# Non-default batch: existing default Plan is untouched +# --------------------------------------------------------------------------- + +def test_non_default_save_batch_does_not_demote_existing_default(db_engine: Engine) -> None: + # Arrange — a default Plan already exists for the property. + plan = _plan() + with Session(db_engine) as session: + default_id = PlanPostgresRepository(session).save( + plan, property_id=8001, scenario_id=7, portfolio_id=1, is_default=True + ) + session.commit() + + # Act — save a non-default Plan via save_batch(); no demote UPDATE should fire. + with Session(db_engine) as session: + PlanPostgresRepository(session).save_batch([ + PlanSaveRequest(plan, property_id=8001, scenario_id=7, portfolio_id=1, is_default=False), + ]) + session.commit() + + # Assert — the original default Plan is still the default. + with Session(db_engine) as session: + rows = session.exec(select(PlanModel).where(col(PlanModel.property_id) == 8001)).all() + by_id = {p.id: p for p in rows} + + assert len(rows) == 2 + assert by_id[default_id].is_default is True + assert sum(1 for p in rows if p.is_default) == 1