diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 532d5fb4..caa8a7d7 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -92,7 +92,7 @@ from repositories.comparable_properties.epc_comparable_properties_repository imp EpcComparablePropertiesRepository, SkippedCohortCert, ) -from repositories.epc.epc_postgres_repository import EpcPostgresRepository +from repositories.epc.epc_postgres_repository import EpcPostgresRepository, EpcSaveRequest from repositories.geospatial.geospatial_s3_repository import ( GeospatialS3Repository, ParquetReader, @@ -176,31 +176,30 @@ class _PropertyWrite: def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None: """Persist a whole batch of modelled Properties in one Unit of Work. - Replays each Property's saves in dependency order (EPC → spatial → solar → - Plan → mark-modelled) and commits once. All-or-nothing per batch: a failed - save rolls the whole transaction back and propagates, so the SQS message is - retried — every save is an idempotent upsert, so a retry is safe. This mirrors - the PropertyBaselineOrchestrator's existing one-UoW-per-batch contract - (ADR-0012); per-property failures are isolated earlier, in the modelling loop, + EPC writes are batched by source (lodged group first, predicted group second) + so each source emits one DELETE pass + one INSERT pass regardless of batch + size, rather than N×per-property round-trips (ADR-0012). All other writes + (spatial, solar, plan, mark-modelled) remain per-property inside the same + transaction. All-or-nothing per batch: a failed save rolls the whole + transaction back so the SQS message is retried — every save is an idempotent + upsert. Per-property failures are isolated earlier, in the modelling loop, before a write is ever queued.""" + lodged_requests = [ + EpcSaveRequest(w.lodged_epc, property_id=w.property_id, portfolio_id=w.portfolio_id, source="lodged") + for w in writes + if w.lodged_epc is not None + ] + predicted_requests = [ + EpcSaveRequest(w.predicted_epc, property_id=w.property_id, portfolio_id=w.portfolio_id, source="predicted") + for w in writes + if w.predicted_epc is not None + ] with PostgresUnitOfWork(lambda: Session(engine)) as uow: + if lodged_requests: + uow.epc.save_batch(lodged_requests) + if predicted_requests: + uow.epc.save_batch(predicted_requests) for w in writes: - if w.lodged_epc is not None: - uow.epc.save( - w.lodged_epc, - property_id=w.property_id, - portfolio_id=w.portfolio_id, - ) - elif w.predicted_epc is not None: - # Persist the synthesised EPC in the predicted slot (ADR-0031), so - # the Baseline stage can re-hydrate it and downstream sees the - # picture the Plan was modelled from. - uow.epc.save( - w.predicted_epc, - property_id=w.property_id, - portfolio_id=w.portfolio_id, - source="predicted", - ) if w.spatial is not None: uow.spatial.save(w.uprn, w.spatial) if w.solar is not None: diff --git a/tests/applications/modelling_e2e/test_handler.py b/tests/applications/modelling_e2e/test_handler.py index 90b3e97c..880ac9f4 100644 --- a/tests/applications/modelling_e2e/test_handler.py +++ b/tests/applications/modelling_e2e/test_handler.py @@ -19,6 +19,7 @@ from applications.modelling_e2e.modelling_e2e_trigger_body import ( ModellingE2ETriggerBody, ) from domain.tasks.subtasks import SubTask +from repositories.epc.epc_postgres_repository import EpcSaveRequest PROPERTY_ID = 12345 UPRN = 987654321 @@ -355,8 +356,8 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled( _call_handler(_BODY) # Assert — EPC saved (lodged path), plan saved, property marked modelled - mock_uow.epc.save.assert_called_once_with( - mock_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(mock_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="lodged")] ) mock_uow.plan.save.assert_called_once() mock_uow.property.mark_modelled.assert_called_once_with( @@ -631,11 +632,8 @@ def test_prediction_path_saves_predicted_epc_plan_and_baseline( _call_handler(_BODY) # Assert — predicted EPC persisted in the predicted slot, plan saved, baseline run - mock_uow.epc.save.assert_called_once_with( - mock_predicted_epc, - property_id=PROPERTY_ID, - portfolio_id=PORTFOLIO_ID, - source="predicted", + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")] ) mock_uow.plan.save.assert_called_once() mock_uow.commit.assert_called_once() @@ -841,11 +839,8 @@ def test_empty_own_postcode_broadens_to_nearby_and_predicts() -> None: # Assert — broadening fired, and the broadened cohort produced a saved plan # with its predicted EPC persisted in the predicted slot. MockRepo.return_value.candidates_near.assert_called_once() - mock_uow.epc.save.assert_called_once_with( - mock_predicted_epc, - property_id=PROPERTY_ID, - portfolio_id=PORTFOLIO_ID, - source="predicted", + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")] ) mock_uow.plan.save.assert_called_once() mock_uow.commit.assert_called_once() @@ -1171,8 +1166,8 @@ def test_refetch_epc_false_with_stored_epc_skips_api_call() -> None: mock_epc_client.get_by_uprn.assert_not_called() mock_run_modelling.assert_called_once() # Stored lodged EPC is persisted in the lodged slot - mock_uow.epc.save.assert_called_once_with( - stored_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(stored_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="lodged")] ) @@ -1258,11 +1253,8 @@ def test_refetch_epc_false_without_stored_epc_skips_api_and_goes_to_prediction() # Assert — API was NOT called; prediction ran and its output was persisted mock_epc_client.get_by_uprn.assert_not_called() - mock_uow.epc.save.assert_called_once_with( - mock_predicted_epc, - property_id=PROPERTY_ID, - portfolio_id=PORTFOLIO_ID, - source="predicted", + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")] ) @@ -1398,11 +1390,8 @@ def test_repredict_epc_false_with_stored_predicted_epc_skips_prediction() -> Non # Assert — EpcPrediction.predict never called; stored EPC persisted in predicted slot mock_predictor.predict.assert_not_called() - mock_uow.epc.save.assert_called_once_with( - stored_predicted, - property_id=PROPERTY_ID, - portfolio_id=PORTFOLIO_ID, - source="predicted", + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(stored_predicted, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")] ) @@ -1488,11 +1477,8 @@ def test_repredict_epc_false_without_stored_predicted_epc_falls_back_to_live_pre # Assert — live prediction was used as fallback mock_predictor.predict.assert_called_once() - mock_uow.epc.save.assert_called_once_with( - mock_predicted_epc, - property_id=PROPERTY_ID, - portfolio_id=PORTFOLIO_ID, - source="predicted", + mock_uow.epc.save_batch.assert_called_once_with( + [EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")] )