From 4764bc7c155904a0ba2f6cb0ef9546bd7494a930 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Mon, 29 Jun 2026 15:08:47 +0000 Subject: [PATCH] =?UTF-8?q?Batch=20plan=20saves=20reduce=20RDS=20CPU=20dur?= =?UTF-8?q?ing=20bulk=20modelling=20runs=20=F0=9F=9F=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- applications/modelling_e2e/handler.py | 19 ++++++---- repositories/plan/plan_postgres_repository.py | 37 ++----------------- 2 files changed, 15 insertions(+), 41 deletions(-) diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 88ab14a0..916c2c24 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -93,6 +93,7 @@ from repositories.comparable_properties.epc_comparable_properties_repository imp SkippedCohortCert, ) from repositories.epc.epc_postgres_repository import EpcPostgresRepository, EpcSaveRequest +from repositories.plan.plan_repository import PlanSaveRequest from repositories.geospatial.geospatial_s3_repository import ( GeospatialS3Repository, ParquetReader, @@ -201,6 +202,17 @@ def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None: uow.epc.save_batch(lodged_requests) if predicted_requests: uow.epc.save_batch(predicted_requests) + plan_requests = [ + PlanSaveRequest( + w.plan, + property_id=w.property_id, + scenario_id=w.scenario_id, + portfolio_id=w.portfolio_id, + is_default=w.is_default, + ) + for w in writes + ] + uow.plan.save_batch(plan_requests) for w in writes: if w.spatial is not None: uow.spatial.save(w.uprn, w.spatial) @@ -211,13 +223,6 @@ def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None: latitude=w.solar.latitude, insights=w.solar.insights, ) - uow.plan.save( - w.plan, - property_id=w.property_id, - scenario_id=w.scenario_id, - portfolio_id=w.portfolio_id, - is_default=w.is_default, - ) uow.property.mark_modelled( w.property_id, has_recommendations=w.has_recommendations ) diff --git a/repositories/plan/plan_postgres_repository.py b/repositories/plan/plan_postgres_repository.py index 9bf22aea..e81e9084 100644 --- a/repositories/plan/plan_postgres_repository.py +++ b/repositories/plan/plan_postgres_repository.py @@ -41,40 +41,9 @@ class PlanPostgresRepository(PlanRepository): portfolio_id: int, is_default: bool, ) -> int: - # Soft-replace (ADR-0012): keep prior Plans as history rather than DELETEing - # them — the cascade delete of recommendation rows was the slow part. When - # this Plan is the default, demote every prior Plan for the same - # (property_id, scenario_id) to is_default=False, so exactly one Plan for - # the pair stays default (the one just inserted). - if is_default: - self._session.exec( # type: ignore[call-overload] - update(PlanModel) - .where( - col(PlanModel.property_id) == property_id, - col(PlanModel.scenario_id) == scenario_id, - ) - .values(is_default=False) - ) - - plan_row = PlanModel.from_domain( - plan, - property_id=property_id, - scenario_id=scenario_id, - portfolio_id=portfolio_id, - is_default=is_default, - ) - self._session.add(plan_row) - self._session.flush() - if plan_row.id is None: - raise ValueError("plan row did not receive an id") - - for measure in plan.measures: - self._session.add( - RecommendationModel.from_domain( - measure, property_id=property_id, plan_id=plan_row.id - ) - ) - return plan_row.id + return self.save_batch( + [PlanSaveRequest(plan, property_id=property_id, scenario_id=scenario_id, portfolio_id=portfolio_id, is_default=is_default)] + )[0] def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]: """Persist all Plans in three statements regardless of batch size.