From 587465bff77b47bc562cd586411d281b4f1897c1 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Mon, 29 Jun 2026 13:04:30 +0000 Subject: [PATCH] =?UTF-8?q?Batch=20EPC=20writes=20via=20save=5Fbatch()=20o?= =?UTF-8?q?n=20EpcPostgresRepository=20=F0=9F=9F=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- repositories/epc/epc_postgres_repository.py | 256 ++++++++++++++------ 1 file changed, 185 insertions(+), 71 deletions(-) diff --git a/repositories/epc/epc_postgres_repository.py b/repositories/epc/epc_postgres_repository.py index 4e15afb4..95f09fdf 100644 --- a/repositories/epc/epc_postgres_repository.py +++ b/repositories/epc/epc_postgres_repository.py @@ -130,81 +130,195 @@ class EpcPostgresRepository(EpcRepository): portfolio_id: Optional[int] = None, source: EpcSource = "lodged", ) -> int: - # Idempotent on (property_id, source): a re-run replaces the property's - # EPC graph for THAT source rather than duplicating it (ADR-0012), and a - # predicted save leaves the lodged one intact, and vice versa (ADR-0031). - # Anonymous saves (no property_id) always insert. - if property_id is not None: - self._delete_for_property(property_id, source) - parent = EpcPropertyModel.from_epc_property_data( - data, property_id=property_id, portfolio_id=portfolio_id, source=source - ) - self._session.add(parent) - self._session.flush() - epc_property_id = _require(parent.id, "id") - - self._session.add( - EpcPropertyEnergyPerformanceModel.from_epc_property_data( - data, epc_property_id=epc_property_id - ) - ) - for detail in data.sap_heating.main_heating_details: - self._session.add( - EpcMainHeatingDetailModel.from_domain(detail, epc_property_id) - ) - for part in data.sap_building_parts: - bp = EpcBuildingPartModel.from_domain(part, epc_property_id) - self._session.add(bp) - self._session.flush() - bp_id = _require(bp.id, "epc_building_part.id") - for dim in part.sap_floor_dimensions: - self._session.add(EpcFloorDimensionModel.from_domain(dim, bp_id)) - for window in data.sap_windows: - self._session.add(EpcWindowModel.from_domain(window, epc_property_id)) - for index, array in enumerate(data.sap_energy_source.photovoltaic_arrays or []): - self._session.add( - EpcPhotovoltaicArrayModel.from_domain(array, index, epc_property_id) - ) - - for element_type, elements in ( - ("roof", data.roofs), - ("wall", data.walls), - ("floor", data.floors), - ("main_heating", data.main_heating), - ): - for el in elements: - self._session.add( - EpcEnergyElementModel.from_domain(el, element_type, epc_property_id) - ) - for el, element_type in ( - (data.window, "window"), - (data.lighting, "lighting"), - (data.hot_water, "hot_water"), - (data.secondary_heating, "secondary_heating"), - (data.main_heating_controls, "main_heating_controls"), - ): - if el is not None: - self._session.add( - EpcEnergyElementModel.from_domain(el, element_type, epc_property_id) - ) - - if data.sap_flat_details is not None: - self._session.add( - EpcFlatDetailsModel.from_domain(data.sap_flat_details, epc_property_id) - ) - if data.renewable_heat_incentive is not None: - self._session.add( - EpcRenewableHeatIncentiveModel.from_domain( - data.renewable_heat_incentive, epc_property_id - ) - ) - return epc_property_id + return self.save_batch([EpcSaveRequest(data, property_id, portfolio_id, source)])[0] def save_batch(self, requests: list[EpcSaveRequest]) -> list[int]: - raise NotImplementedError + """Insert all EPCs in `requests` in one pass per table, returning one + epc_property_id per request in the same order as the input. + + Deletes are batched first (one IN-query per child table per source), + then all parent rows are inserted with a single RETURNING statement so + positional ordering maps each returned id to its request. Building-part + ids are captured the same way so floor-dimension FKs are resolved without + any per-property flush round-trips (ADR-0012). + """ + if not requests: + return [] + + # Batch-delete existing rows grouped by source so the lodged and predicted + # slots remain independent (ADR-0031). + pids_by_source: dict[EpcSource, list[int]] = {} + for r in requests: + if r.property_id is not None: + pids_by_source.setdefault(r.source, []).append(r.property_id) + for src, pids in pids_by_source.items(): + self._delete_for_properties(pids, src) + + # Insert all parent (epc_property) rows; capture returned ids positionally. + parent_rows = [ + _col_values( + EpcPropertyModel.from_epc_property_data( + r.data, property_id=r.property_id, portfolio_id=r.portfolio_id, source=r.source + ), + exclude=frozenset({"id"}), + ) + for r in requests + ] + returned_parents = self._session.execute( + _sa_insert(EpcPropertyModel).returning(EpcPropertyModel.id), + parent_rows, + ).all() + epc_property_ids = [row[0] for row in returned_parents] + + # Collect child rows, accumulating building parts in an ordered list so + # the positional RETURNING trick can map part objects to their new ids. + perf_rows: list[dict[str, Any]] = [] + heating_rows: list[dict[str, Any]] = [] + parts_ordered: list[tuple[Any, int]] = [] # (SapBuildingPart, epc_property_id) + window_rows: list[dict[str, Any]] = [] + pv_rows: list[dict[str, Any]] = [] + element_rows: list[dict[str, Any]] = [] + flat_rows: list[dict[str, Any]] = [] + rhi_rows: list[dict[str, Any]] = [] + + for r, epc_pid in zip(requests, epc_property_ids): + d = r.data + perf_rows.append( + _col_values( + EpcPropertyEnergyPerformanceModel.from_epc_property_data(d, epc_pid), + exclude=frozenset({"id"}), + ) + ) + for detail in d.sap_heating.main_heating_details: + heating_rows.append( + _col_values(EpcMainHeatingDetailModel.from_domain(detail, epc_pid), frozenset({"id"})) + ) + for part in d.sap_building_parts: + parts_ordered.append((part, epc_pid)) + for window in d.sap_windows: + window_rows.append( + _col_values(EpcWindowModel.from_domain(window, epc_pid), frozenset({"id"})) + ) + for idx, array in enumerate(d.sap_energy_source.photovoltaic_arrays or []): + pv_rows.append( + _col_values(EpcPhotovoltaicArrayModel.from_domain(array, idx, epc_pid), frozenset({"id"})) + ) + for etype, els in ( + ("roof", d.roofs), + ("wall", d.walls), + ("floor", d.floors), + ("main_heating", d.main_heating), + ): + for el in els: + element_rows.append( + _col_values(EpcEnergyElementModel.from_domain(el, etype, epc_pid), frozenset({"id"})) + ) + for el, etype in ( + (d.window, "window"), + (d.lighting, "lighting"), + (d.hot_water, "hot_water"), + (d.secondary_heating, "secondary_heating"), + (d.main_heating_controls, "main_heating_controls"), + ): + if el is not None: + element_rows.append( + _col_values(EpcEnergyElementModel.from_domain(el, etype, epc_pid), frozenset({"id"})) + ) + if d.sap_flat_details is not None: + flat_rows.append( + _col_values(EpcFlatDetailsModel.from_domain(d.sap_flat_details, epc_pid), frozenset({"id"})) + ) + if d.renewable_heat_incentive is not None: + rhi_rows.append( + _col_values(EpcRenewableHeatIncentiveModel.from_domain(d.renewable_heat_incentive, epc_pid), frozenset({"id"})) + ) + + # Bulk-insert all simple child tables (no downstream FK dependency). + if perf_rows: + self._session.execute(_sa_insert(EpcPropertyEnergyPerformanceModel), perf_rows) + if heating_rows: + self._session.execute(_sa_insert(EpcMainHeatingDetailModel), heating_rows) + if window_rows: + self._session.execute(_sa_insert(EpcWindowModel), window_rows) + if pv_rows: + self._session.execute(_sa_insert(EpcPhotovoltaicArrayModel), pv_rows) + if element_rows: + self._session.execute(_sa_insert(EpcEnergyElementModel), element_rows) + if flat_rows: + self._session.execute(_sa_insert(EpcFlatDetailsModel), flat_rows) + if rhi_rows: + self._session.execute(_sa_insert(EpcRenewableHeatIncentiveModel), rhi_rows) + + # Building parts: insert with RETURNING and zip positionally to resolve + # floor-dimension FKs. Do NOT key by id(part) — the same EpcPropertyData + # object can appear in multiple requests (same epc, different property_ids), + # giving identical object ids that collapse the dict and mis-wire FKs. + # Positional zip is safe because PostgreSQL preserves VALUES order in RETURNING. + if parts_ordered: + bp_rows = [ + _col_values(EpcBuildingPartModel.from_domain(part, epc_pid), frozenset({"id"})) + for part, epc_pid in parts_ordered + ] + returned_bps = self._session.execute( + _sa_insert(EpcBuildingPartModel).returning(EpcBuildingPartModel.id), + bp_rows, + ).all() + floor_rows: list[dict[str, Any]] = [ + _col_values(EpcFloorDimensionModel.from_domain(dim, bp_row[0]), frozenset({"id"})) + for (part, _), bp_row in zip(parts_ordered, returned_bps) + for dim in part.sap_floor_dimensions + ] + if floor_rows: + self._session.execute(_sa_insert(EpcFloorDimensionModel), floor_rows) + + return epc_property_ids def _delete_for_properties(self, property_ids: list[int], source: EpcSource) -> None: - raise NotImplementedError + """Batch-delete every EPC graph for the given property_ids and source in + one pass per child table (IN queries), replacing the per-property flush + loop that drove RDS CPU to saturation during bulk modelling runs.""" + epc_ids = [ + i + for i in self._session.exec( + select(EpcPropertyModel.id) + .where(col(EpcPropertyModel.property_id).in_(property_ids)) + .where(EpcPropertyModel.source == source) + ).all() + if i is not None + ] + if not epc_ids: + return + part_ids = [ + i + for i in self._session.exec( + select(EpcBuildingPartModel.id).where( + col(EpcBuildingPartModel.epc_property_id).in_(epc_ids) + ) + ).all() + if i is not None + ] + if part_ids: + self._session.exec( # type: ignore[call-overload] + delete(EpcFloorDimensionModel).where( + col(EpcFloorDimensionModel.epc_building_part_id).in_(part_ids) + ) + ) + for child in ( + EpcPropertyEnergyPerformanceModel, + EpcEnergyElementModel, + EpcMainHeatingDetailModel, + EpcBuildingPartModel, + EpcWindowModel, + EpcPhotovoltaicArrayModel, + EpcFlatDetailsModel, + EpcRenewableHeatIncentiveModel, + ): + self._session.exec( # type: ignore[call-overload] + delete(child).where(col(child.epc_property_id).in_(epc_ids)) + ) + self._session.exec( # type: ignore[call-overload] + delete(EpcPropertyModel).where(col(EpcPropertyModel.id).in_(epc_ids)) + ) def _delete_for_property(self, property_id: int, source: EpcSource) -> None: """Remove the property's existing EPC graph for `source` (parent + child