mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Batch EPC writes in _flush_writes: two save_batch() calls instead of N save() calls 🟩
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
587465bff7
commit
0fa1b9001c
2 changed files with 37 additions and 52 deletions
|
|
@ -92,7 +92,7 @@ from repositories.comparable_properties.epc_comparable_properties_repository imp
|
|||
EpcComparablePropertiesRepository,
|
||||
SkippedCohortCert,
|
||||
)
|
||||
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
|
||||
from repositories.epc.epc_postgres_repository import EpcPostgresRepository, EpcSaveRequest
|
||||
from repositories.geospatial.geospatial_s3_repository import (
|
||||
GeospatialS3Repository,
|
||||
ParquetReader,
|
||||
|
|
@ -176,31 +176,30 @@ class _PropertyWrite:
|
|||
def _flush_writes(engine: Engine, writes: list[_PropertyWrite]) -> None:
|
||||
"""Persist a whole batch of modelled Properties in one Unit of Work.
|
||||
|
||||
Replays each Property's saves in dependency order (EPC → spatial → solar →
|
||||
Plan → mark-modelled) and commits once. All-or-nothing per batch: a failed
|
||||
save rolls the whole transaction back and propagates, so the SQS message is
|
||||
retried — every save is an idempotent upsert, so a retry is safe. This mirrors
|
||||
the PropertyBaselineOrchestrator's existing one-UoW-per-batch contract
|
||||
(ADR-0012); per-property failures are isolated earlier, in the modelling loop,
|
||||
EPC writes are batched by source (lodged group first, predicted group second)
|
||||
so each source emits one DELETE pass + one INSERT pass regardless of batch
|
||||
size, rather than N×per-property round-trips (ADR-0012). All other writes
|
||||
(spatial, solar, plan, mark-modelled) remain per-property inside the same
|
||||
transaction. All-or-nothing per batch: a failed save rolls the whole
|
||||
transaction back so the SQS message is retried — every save is an idempotent
|
||||
upsert. Per-property failures are isolated earlier, in the modelling loop,
|
||||
before a write is ever queued."""
|
||||
lodged_requests = [
|
||||
EpcSaveRequest(w.lodged_epc, property_id=w.property_id, portfolio_id=w.portfolio_id, source="lodged")
|
||||
for w in writes
|
||||
if w.lodged_epc is not None
|
||||
]
|
||||
predicted_requests = [
|
||||
EpcSaveRequest(w.predicted_epc, property_id=w.property_id, portfolio_id=w.portfolio_id, source="predicted")
|
||||
for w in writes
|
||||
if w.predicted_epc is not None
|
||||
]
|
||||
with PostgresUnitOfWork(lambda: Session(engine)) as uow:
|
||||
if lodged_requests:
|
||||
uow.epc.save_batch(lodged_requests)
|
||||
if predicted_requests:
|
||||
uow.epc.save_batch(predicted_requests)
|
||||
for w in writes:
|
||||
if w.lodged_epc is not None:
|
||||
uow.epc.save(
|
||||
w.lodged_epc,
|
||||
property_id=w.property_id,
|
||||
portfolio_id=w.portfolio_id,
|
||||
)
|
||||
elif w.predicted_epc is not None:
|
||||
# Persist the synthesised EPC in the predicted slot (ADR-0031), so
|
||||
# the Baseline stage can re-hydrate it and downstream sees the
|
||||
# picture the Plan was modelled from.
|
||||
uow.epc.save(
|
||||
w.predicted_epc,
|
||||
property_id=w.property_id,
|
||||
portfolio_id=w.portfolio_id,
|
||||
source="predicted",
|
||||
)
|
||||
if w.spatial is not None:
|
||||
uow.spatial.save(w.uprn, w.spatial)
|
||||
if w.solar is not None:
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ from applications.modelling_e2e.modelling_e2e_trigger_body import (
|
|||
ModellingE2ETriggerBody,
|
||||
)
|
||||
from domain.tasks.subtasks import SubTask
|
||||
from repositories.epc.epc_postgres_repository import EpcSaveRequest
|
||||
|
||||
PROPERTY_ID = 12345
|
||||
UPRN = 987654321
|
||||
|
|
@ -355,8 +356,8 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled(
|
|||
_call_handler(_BODY)
|
||||
|
||||
# Assert — EPC saved (lodged path), plan saved, property marked modelled
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
mock_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(mock_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="lodged")]
|
||||
)
|
||||
mock_uow.plan.save.assert_called_once()
|
||||
mock_uow.property.mark_modelled.assert_called_once_with(
|
||||
|
|
@ -631,11 +632,8 @@ def test_prediction_path_saves_predicted_epc_plan_and_baseline(
|
|||
_call_handler(_BODY)
|
||||
|
||||
# Assert — predicted EPC persisted in the predicted slot, plan saved, baseline run
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
mock_predicted_epc,
|
||||
property_id=PROPERTY_ID,
|
||||
portfolio_id=PORTFOLIO_ID,
|
||||
source="predicted",
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")]
|
||||
)
|
||||
mock_uow.plan.save.assert_called_once()
|
||||
mock_uow.commit.assert_called_once()
|
||||
|
|
@ -841,11 +839,8 @@ def test_empty_own_postcode_broadens_to_nearby_and_predicts() -> None:
|
|||
# Assert — broadening fired, and the broadened cohort produced a saved plan
|
||||
# with its predicted EPC persisted in the predicted slot.
|
||||
MockRepo.return_value.candidates_near.assert_called_once()
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
mock_predicted_epc,
|
||||
property_id=PROPERTY_ID,
|
||||
portfolio_id=PORTFOLIO_ID,
|
||||
source="predicted",
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")]
|
||||
)
|
||||
mock_uow.plan.save.assert_called_once()
|
||||
mock_uow.commit.assert_called_once()
|
||||
|
|
@ -1171,8 +1166,8 @@ def test_refetch_epc_false_with_stored_epc_skips_api_call() -> None:
|
|||
mock_epc_client.get_by_uprn.assert_not_called()
|
||||
mock_run_modelling.assert_called_once()
|
||||
# Stored lodged EPC is persisted in the lodged slot
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
stored_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(stored_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="lodged")]
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1258,11 +1253,8 @@ def test_refetch_epc_false_without_stored_epc_skips_api_and_goes_to_prediction()
|
|||
|
||||
# Assert — API was NOT called; prediction ran and its output was persisted
|
||||
mock_epc_client.get_by_uprn.assert_not_called()
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
mock_predicted_epc,
|
||||
property_id=PROPERTY_ID,
|
||||
portfolio_id=PORTFOLIO_ID,
|
||||
source="predicted",
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")]
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1398,11 +1390,8 @@ def test_repredict_epc_false_with_stored_predicted_epc_skips_prediction() -> Non
|
|||
|
||||
# Assert — EpcPrediction.predict never called; stored EPC persisted in predicted slot
|
||||
mock_predictor.predict.assert_not_called()
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
stored_predicted,
|
||||
property_id=PROPERTY_ID,
|
||||
portfolio_id=PORTFOLIO_ID,
|
||||
source="predicted",
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(stored_predicted, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")]
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1488,11 +1477,8 @@ def test_repredict_epc_false_without_stored_predicted_epc_falls_back_to_live_pre
|
|||
|
||||
# Assert — live prediction was used as fallback
|
||||
mock_predictor.predict.assert_called_once()
|
||||
mock_uow.epc.save.assert_called_once_with(
|
||||
mock_predicted_epc,
|
||||
property_id=PROPERTY_ID,
|
||||
portfolio_id=PORTFOLIO_ID,
|
||||
source="predicted",
|
||||
mock_uow.epc.save_batch.assert_called_once_with(
|
||||
[EpcSaveRequest(mock_predicted_epc, property_id=PROPERTY_ID, portfolio_id=PORTFOLIO_ID, source="predicted")]
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue