Batch plan saves reduce RDS CPU during bulk modelling runs 🟪

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Roth 2026-06-29 15:08:47 +00:00
parent 46ca714ef9
commit 4764bc7c15
2 changed files with 15 additions and 41 deletions

View file

@ -93,6 +93,7 @@ from repositories.comparable_properties.epc_comparable_properties_repository imp
SkippedCohortCert,
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository, EpcSaveRequest
from repositories.plan.plan_repository import PlanSaveRequest
from repositories.geospatial.geospatial_s3_repository import (
GeospatialS3Repository,
ParquetReader,
@ -201,6 +202,17 @@ def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None:
uow.epc.save_batch(lodged_requests)
if predicted_requests:
uow.epc.save_batch(predicted_requests)
plan_requests = [
PlanSaveRequest(
w.plan,
property_id=w.property_id,
scenario_id=w.scenario_id,
portfolio_id=w.portfolio_id,
is_default=w.is_default,
)
for w in writes
]
uow.plan.save_batch(plan_requests)
for w in writes:
if w.spatial is not None:
uow.spatial.save(w.uprn, w.spatial)
@ -211,13 +223,6 @@ def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None:
latitude=w.solar.latitude,
insights=w.solar.insights,
)
uow.plan.save(
w.plan,
property_id=w.property_id,
scenario_id=w.scenario_id,
portfolio_id=w.portfolio_id,
is_default=w.is_default,
)
uow.property.mark_modelled(
w.property_id, has_recommendations=w.has_recommendations
)

View file

@ -41,40 +41,9 @@ class PlanPostgresRepository(PlanRepository):
portfolio_id: int,
is_default: bool,
) -> int:
# Soft-replace (ADR-0012): keep prior Plans as history rather than DELETEing
# them — the cascade delete of recommendation rows was the slow part. When
# this Plan is the default, demote every prior Plan for the same
# (property_id, scenario_id) to is_default=False, so exactly one Plan for
# the pair stays default (the one just inserted).
if is_default:
self._session.exec( # type: ignore[call-overload]
update(PlanModel)
.where(
col(PlanModel.property_id) == property_id,
col(PlanModel.scenario_id) == scenario_id,
)
.values(is_default=False)
)
plan_row = PlanModel.from_domain(
plan,
property_id=property_id,
scenario_id=scenario_id,
portfolio_id=portfolio_id,
is_default=is_default,
)
self._session.add(plan_row)
self._session.flush()
if plan_row.id is None:
raise ValueError("plan row did not receive an id")
for measure in plan.measures:
self._session.add(
RecommendationModel.from_domain(
measure, property_id=property_id, plan_id=plan_row.id
)
)
return plan_row.id
return self.save_batch(
[PlanSaveRequest(plan, property_id=property_id, scenario_id=scenario_id, portfolio_id=portfolio_id, is_default=is_default)]
)[0]
def save_batch(self, requests: list[PlanSaveRequest]) -> list[int]:
"""Persist all Plans in three statements regardless of batch size.