Batch EPC writes via save_batch() on EpcPostgresRepository 🟩

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Roth 2026-06-29 13:04:30 +00:00
parent fe69bccf22
commit 587465bff7

View file

@ -130,81 +130,195 @@ class EpcPostgresRepository(EpcRepository):
portfolio_id: Optional[int] = None,
source: EpcSource = "lodged",
) -> int:
# Idempotent on (property_id, source): a re-run replaces the property's
# EPC graph for THAT source rather than duplicating it (ADR-0012), and a
# predicted save leaves the lodged one intact, and vice versa (ADR-0031).
# Anonymous saves (no property_id) always insert.
if property_id is not None:
self._delete_for_property(property_id, source)
parent = EpcPropertyModel.from_epc_property_data(
data, property_id=property_id, portfolio_id=portfolio_id, source=source
)
self._session.add(parent)
self._session.flush()
epc_property_id = _require(parent.id, "id")
self._session.add(
EpcPropertyEnergyPerformanceModel.from_epc_property_data(
data, epc_property_id=epc_property_id
)
)
for detail in data.sap_heating.main_heating_details:
self._session.add(
EpcMainHeatingDetailModel.from_domain(detail, epc_property_id)
)
for part in data.sap_building_parts:
bp = EpcBuildingPartModel.from_domain(part, epc_property_id)
self._session.add(bp)
self._session.flush()
bp_id = _require(bp.id, "epc_building_part.id")
for dim in part.sap_floor_dimensions:
self._session.add(EpcFloorDimensionModel.from_domain(dim, bp_id))
for window in data.sap_windows:
self._session.add(EpcWindowModel.from_domain(window, epc_property_id))
for index, array in enumerate(data.sap_energy_source.photovoltaic_arrays or []):
self._session.add(
EpcPhotovoltaicArrayModel.from_domain(array, index, epc_property_id)
)
for element_type, elements in (
("roof", data.roofs),
("wall", data.walls),
("floor", data.floors),
("main_heating", data.main_heating),
):
for el in elements:
self._session.add(
EpcEnergyElementModel.from_domain(el, element_type, epc_property_id)
)
for el, element_type in (
(data.window, "window"),
(data.lighting, "lighting"),
(data.hot_water, "hot_water"),
(data.secondary_heating, "secondary_heating"),
(data.main_heating_controls, "main_heating_controls"),
):
if el is not None:
self._session.add(
EpcEnergyElementModel.from_domain(el, element_type, epc_property_id)
)
if data.sap_flat_details is not None:
self._session.add(
EpcFlatDetailsModel.from_domain(data.sap_flat_details, epc_property_id)
)
if data.renewable_heat_incentive is not None:
self._session.add(
EpcRenewableHeatIncentiveModel.from_domain(
data.renewable_heat_incentive, epc_property_id
)
)
return epc_property_id
return self.save_batch([EpcSaveRequest(data, property_id, portfolio_id, source)])[0]
def save_batch(self, requests: list[EpcSaveRequest]) -> list[int]:
raise NotImplementedError
"""Insert all EPCs in `requests` in one pass per table, returning one
epc_property_id per request in the same order as the input.
Deletes are batched first (one IN-query per child table per source),
then all parent rows are inserted with a single RETURNING statement so
positional ordering maps each returned id to its request. Building-part
ids are captured the same way so floor-dimension FKs are resolved without
any per-property flush round-trips (ADR-0012).
"""
if not requests:
return []
# Batch-delete existing rows grouped by source so the lodged and predicted
# slots remain independent (ADR-0031).
pids_by_source: dict[EpcSource, list[int]] = {}
for r in requests:
if r.property_id is not None:
pids_by_source.setdefault(r.source, []).append(r.property_id)
for src, pids in pids_by_source.items():
self._delete_for_properties(pids, src)
# Insert all parent (epc_property) rows; capture returned ids positionally.
parent_rows = [
_col_values(
EpcPropertyModel.from_epc_property_data(
r.data, property_id=r.property_id, portfolio_id=r.portfolio_id, source=r.source
),
exclude=frozenset({"id"}),
)
for r in requests
]
returned_parents = self._session.execute(
_sa_insert(EpcPropertyModel).returning(EpcPropertyModel.id),
parent_rows,
).all()
epc_property_ids = [row[0] for row in returned_parents]
# Collect child rows, accumulating building parts in an ordered list so
# the positional RETURNING trick can map part objects to their new ids.
perf_rows: list[dict[str, Any]] = []
heating_rows: list[dict[str, Any]] = []
parts_ordered: list[tuple[Any, int]] = [] # (SapBuildingPart, epc_property_id)
window_rows: list[dict[str, Any]] = []
pv_rows: list[dict[str, Any]] = []
element_rows: list[dict[str, Any]] = []
flat_rows: list[dict[str, Any]] = []
rhi_rows: list[dict[str, Any]] = []
for r, epc_pid in zip(requests, epc_property_ids):
d = r.data
perf_rows.append(
_col_values(
EpcPropertyEnergyPerformanceModel.from_epc_property_data(d, epc_pid),
exclude=frozenset({"id"}),
)
)
for detail in d.sap_heating.main_heating_details:
heating_rows.append(
_col_values(EpcMainHeatingDetailModel.from_domain(detail, epc_pid), frozenset({"id"}))
)
for part in d.sap_building_parts:
parts_ordered.append((part, epc_pid))
for window in d.sap_windows:
window_rows.append(
_col_values(EpcWindowModel.from_domain(window, epc_pid), frozenset({"id"}))
)
for idx, array in enumerate(d.sap_energy_source.photovoltaic_arrays or []):
pv_rows.append(
_col_values(EpcPhotovoltaicArrayModel.from_domain(array, idx, epc_pid), frozenset({"id"}))
)
for etype, els in (
("roof", d.roofs),
("wall", d.walls),
("floor", d.floors),
("main_heating", d.main_heating),
):
for el in els:
element_rows.append(
_col_values(EpcEnergyElementModel.from_domain(el, etype, epc_pid), frozenset({"id"}))
)
for el, etype in (
(d.window, "window"),
(d.lighting, "lighting"),
(d.hot_water, "hot_water"),
(d.secondary_heating, "secondary_heating"),
(d.main_heating_controls, "main_heating_controls"),
):
if el is not None:
element_rows.append(
_col_values(EpcEnergyElementModel.from_domain(el, etype, epc_pid), frozenset({"id"}))
)
if d.sap_flat_details is not None:
flat_rows.append(
_col_values(EpcFlatDetailsModel.from_domain(d.sap_flat_details, epc_pid), frozenset({"id"}))
)
if d.renewable_heat_incentive is not None:
rhi_rows.append(
_col_values(EpcRenewableHeatIncentiveModel.from_domain(d.renewable_heat_incentive, epc_pid), frozenset({"id"}))
)
# Bulk-insert all simple child tables (no downstream FK dependency).
if perf_rows:
self._session.execute(_sa_insert(EpcPropertyEnergyPerformanceModel), perf_rows)
if heating_rows:
self._session.execute(_sa_insert(EpcMainHeatingDetailModel), heating_rows)
if window_rows:
self._session.execute(_sa_insert(EpcWindowModel), window_rows)
if pv_rows:
self._session.execute(_sa_insert(EpcPhotovoltaicArrayModel), pv_rows)
if element_rows:
self._session.execute(_sa_insert(EpcEnergyElementModel), element_rows)
if flat_rows:
self._session.execute(_sa_insert(EpcFlatDetailsModel), flat_rows)
if rhi_rows:
self._session.execute(_sa_insert(EpcRenewableHeatIncentiveModel), rhi_rows)
# Building parts: insert with RETURNING and zip positionally to resolve
# floor-dimension FKs. Do NOT key by id(part) — the same EpcPropertyData
# object can appear in multiple requests (same epc, different property_ids),
# giving identical object ids that collapse the dict and mis-wire FKs.
# Positional zip is safe because PostgreSQL preserves VALUES order in RETURNING.
if parts_ordered:
bp_rows = [
_col_values(EpcBuildingPartModel.from_domain(part, epc_pid), frozenset({"id"}))
for part, epc_pid in parts_ordered
]
returned_bps = self._session.execute(
_sa_insert(EpcBuildingPartModel).returning(EpcBuildingPartModel.id),
bp_rows,
).all()
floor_rows: list[dict[str, Any]] = [
_col_values(EpcFloorDimensionModel.from_domain(dim, bp_row[0]), frozenset({"id"}))
for (part, _), bp_row in zip(parts_ordered, returned_bps)
for dim in part.sap_floor_dimensions
]
if floor_rows:
self._session.execute(_sa_insert(EpcFloorDimensionModel), floor_rows)
return epc_property_ids
def _delete_for_properties(self, property_ids: list[int], source: EpcSource) -> None:
raise NotImplementedError
"""Batch-delete every EPC graph for the given property_ids and source in
one pass per child table (IN queries), replacing the per-property flush
loop that drove RDS CPU to saturation during bulk modelling runs."""
epc_ids = [
i
for i in self._session.exec(
select(EpcPropertyModel.id)
.where(col(EpcPropertyModel.property_id).in_(property_ids))
.where(EpcPropertyModel.source == source)
).all()
if i is not None
]
if not epc_ids:
return
part_ids = [
i
for i in self._session.exec(
select(EpcBuildingPartModel.id).where(
col(EpcBuildingPartModel.epc_property_id).in_(epc_ids)
)
).all()
if i is not None
]
if part_ids:
self._session.exec( # type: ignore[call-overload]
delete(EpcFloorDimensionModel).where(
col(EpcFloorDimensionModel.epc_building_part_id).in_(part_ids)
)
)
for child in (
EpcPropertyEnergyPerformanceModel,
EpcEnergyElementModel,
EpcMainHeatingDetailModel,
EpcBuildingPartModel,
EpcWindowModel,
EpcPhotovoltaicArrayModel,
EpcFlatDetailsModel,
EpcRenewableHeatIncentiveModel,
):
self._session.exec( # type: ignore[call-overload]
delete(child).where(col(child.epc_property_id).in_(epc_ids))
)
self._session.exec( # type: ignore[call-overload]
delete(EpcPropertyModel).where(col(EpcPropertyModel.id).in_(epc_ids))
)
def _delete_for_property(self, property_id: int, source: EpcSource) -> None:
"""Remove the property's existing EPC graph for `source` (parent + child