diff --git a/orchestration/baseline_orchestrator.py b/orchestration/baseline_orchestrator.py index 4ae3a480..9a1138c8 100644 --- a/orchestration/baseline_orchestrator.py +++ b/orchestration/baseline_orchestrator.py @@ -38,8 +38,9 @@ class BaselineOrchestrator: def run(self, property_ids: list[int]) -> None: with self._unit_of_work() as uow: - for property_id in property_ids: - effective_epc = uow.property.get(property_id).effective_epc + properties = uow.property.get_many(property_ids) + for property_id, prop in zip(property_ids, properties, strict=True): + effective_epc = prop.effective_epc lodged = lodged_performance(effective_epc) effective, reason = self._rebaseliner.rebaseline( effective_epc, lodged diff --git a/orchestration/ingestion_orchestrator.py b/orchestration/ingestion_orchestrator.py index f2bce52b..1662ecf9 100644 --- a/orchestration/ingestion_orchestrator.py +++ b/orchestration/ingestion_orchestrator.py @@ -69,12 +69,12 @@ class IngestionOrchestrator: # A short read unit; properties with no UPRN (e.g. landlord_property_id # only) are skipped — a later Site-Notes path covers them. with self._unit_of_work() as uow: - pairs: list[tuple[int, int]] = [] - for property_id in property_ids: - uprn = uow.property.get(property_id).identity.uprn - if uprn is not None: - pairs.append((property_id, uprn)) - return pairs + properties = uow.property.get_many(property_ids) + return [ + (property_id, prop.identity.uprn) + for property_id, prop in zip(property_ids, properties, strict=True) + if prop.identity.uprn is not None + ] def _fetch(self, property_id: int, uprn: int) -> _Fetched: # No unit open here — this is the external-IO phase. diff --git a/repositories/epc/epc_postgres_repository.py b/repositories/epc/epc_postgres_repository.py index b1368916..525476ea 100644 --- a/repositories/epc/epc_postgres_repository.py +++ b/repositories/epc/epc_postgres_repository.py @@ -1,7 +1,8 @@ from __future__ import annotations +from collections.abc import Sequence from datetime import date -from typing import Optional, TypeVar +from typing import Optional, Protocol, TypeVar from sqlmodel import Session, col, delete, select @@ -56,6 +57,20 @@ def _require(value: Optional[_T], field: str) -> _T: return value +class _HasEpcPropertyId(Protocol): + epc_property_id: int + + +_RowT = TypeVar("_RowT", bound=_HasEpcPropertyId) + + +def _group_by_epc(rows: Sequence[_RowT]) -> dict[int, list[_RowT]]: + grouped: dict[int, list[_RowT]] = {} + for row in rows: + grouped.setdefault(row.epc_property_id, []).append(row) + return grouped + + class EpcPostgresRepository(EpcRepository): """Maps EpcPropertyData to/from the epc_property parent row + child tables. @@ -194,6 +209,117 @@ class EpcPostgresRepository(EpcRepository): return None return self.get(row.id) + def get_for_properties( + self, property_ids: list[int] + ) -> dict[int, EpcPropertyData]: + """Bulk-hydrate a batch's EPCs in a handful of per-table IN queries + (ADR-0012), not N x per-property. Load-whole per ADR-0002.""" + if not property_ids: + return {} + parents = self._session.exec( + select(EpcPropertyModel) + .where(col(EpcPropertyModel.property_id).in_(property_ids)) + .order_by(EpcPropertyModel.id) # type: ignore[arg-type] + ).all() + parent_by_property: dict[int, EpcPropertyModel] = {} + for parent in parents: + if parent.property_id is not None and parent.id is not None: + parent_by_property.setdefault(parent.property_id, parent) + epc_ids = [p.id for p in parent_by_property.values() if p.id is not None] + if not epc_ids: + return {} + + perf_by = { + r.epc_property_id: r + for r in self._session.exec( + select(EpcPropertyEnergyPerformanceModel).where( + col(EpcPropertyEnergyPerformanceModel.epc_property_id).in_(epc_ids) + ) + ).all() + } + flat_by = { + r.epc_property_id: r + for r in self._session.exec( + select(EpcFlatDetailsModel).where( + col(EpcFlatDetailsModel.epc_property_id).in_(epc_ids) + ) + ).all() + } + rhi_by = { + r.epc_property_id: r + for r in self._session.exec( + select(EpcRenewableHeatIncentiveModel).where( + col(EpcRenewableHeatIncentiveModel.epc_property_id).in_(epc_ids) + ) + ).all() + } + elements_by = _group_by_epc( + self._session.exec( + select(EpcEnergyElementModel) + .where(col(EpcEnergyElementModel.epc_property_id).in_(epc_ids)) + .order_by(EpcEnergyElementModel.id) # type: ignore[arg-type] + ).all() + ) + heating_by = _group_by_epc( + self._session.exec( + select(EpcMainHeatingDetailModel) + .where(col(EpcMainHeatingDetailModel.epc_property_id).in_(epc_ids)) + .order_by(EpcMainHeatingDetailModel.id) # type: ignore[arg-type] + ).all() + ) + parts_by = _group_by_epc( + self._session.exec( + select(EpcBuildingPartModel) + .where(col(EpcBuildingPartModel.epc_property_id).in_(epc_ids)) + .order_by(EpcBuildingPartModel.id) # type: ignore[arg-type] + ).all() + ) + windows_by = _group_by_epc( + self._session.exec( + select(EpcWindowModel) + .where(col(EpcWindowModel.epc_property_id).in_(epc_ids)) + .order_by(EpcWindowModel.id) # type: ignore[arg-type] + ).all() + ) + part_ids = [ + bp.id + for parts in parts_by.values() + for bp in parts + if bp.id is not None + ] + floor_dims_by_part = self._floor_dims_by_part(part_ids) + + result: dict[int, EpcPropertyData] = {} + for property_id, parent in parent_by_property.items(): + epc_id = _require(parent.id, "id") + result[property_id] = self._compose( + p=parent, + perf=perf_by.get(epc_id), + elements=elements_by.get(epc_id, []), + heating_rows=heating_by.get(epc_id, []), + part_rows=parts_by.get(epc_id, []), + floor_dims_by_part=floor_dims_by_part, + window_rows=windows_by.get(epc_id, []), + flat_row=flat_by.get(epc_id), + rhi_row=rhi_by.get(epc_id), + ) + return result + + def _floor_dims_by_part( + self, part_ids: list[int] + ) -> dict[int, list[EpcFloorDimensionModel]]: + if not part_ids: + return {} + rows = self._session.exec( + select(EpcFloorDimensionModel) + .where(col(EpcFloorDimensionModel.epc_building_part_id).in_(part_ids)) + .order_by(EpcFloorDimensionModel.id) # type: ignore[arg-type] + ).all() + grouped: dict[int, list[EpcFloorDimensionModel]] = {} + for row in rows: + grouped.setdefault(row.epc_building_part_id, []).append(row) + return grouped + def get(self, epc_property_id: int) -> EpcPropertyData: p = self._session.get(EpcPropertyModel, epc_property_id) if p is None: @@ -234,7 +360,35 @@ class EpcPostgresRepository(EpcRepository): EpcRenewableHeatIncentiveModel.epc_property_id == epc_property_id ) ).first() + window_rows = self._windows(epc_property_id) + floor_dims_by_part = self._floor_dims_by_part( + [bp.id for bp in part_rows if bp.id is not None] + ) + return self._compose( + p=p, + perf=perf, + elements=elements, + heating_rows=heating_rows, + part_rows=part_rows, + floor_dims_by_part=floor_dims_by_part, + window_rows=window_rows, + flat_row=flat_row, + rhi_row=rhi_row, + ) + def _compose( + self, + *, + p: EpcPropertyModel, + perf: Optional[EpcPropertyEnergyPerformanceModel], + elements: list[EpcEnergyElementModel], + heating_rows: list[EpcMainHeatingDetailModel], + part_rows: list[EpcBuildingPartModel], + floor_dims_by_part: dict[int, list[EpcFloorDimensionModel]], + window_rows: list[EpcWindowModel], + flat_row: Optional[EpcFlatDetailsModel], + rhi_row: Optional[EpcRenewableHeatIncentiveModel], + ) -> EpcPropertyData: def _elements(element_type: str) -> list[EnergyElement]: return [self._to_energy_element(e) for e in elements if e.element_type == element_type] @@ -256,9 +410,14 @@ class EpcPostgresRepository(EpcRepository): main_heating=_elements("main_heating"), door_count=p.door_count, sap_heating=self._to_sap_heating(p, heating_rows), - sap_windows=[self._to_window(w) for w in self._windows(epc_property_id)], + sap_windows=[self._to_window(w) for w in window_rows], sap_energy_source=self._to_energy_source(p), - sap_building_parts=[self._to_building_part(bp) for bp in part_rows], + sap_building_parts=[ + self._to_building_part( + bp, floor_dims_by_part.get(bp.id, []) if bp.id is not None else [] + ) + for bp in part_rows + ], solar_water_heating=p.solar_water_heating, has_hot_water_cylinder=p.has_hot_water_cylinder, has_fixed_air_conditioning=p.has_fixed_air_conditioning, @@ -519,14 +678,9 @@ class EpcPostgresRepository(EpcRepository): ) @private - def _to_building_part(self, bp: EpcBuildingPartModel) -> SapBuildingPart: - floor_rows = list( - self._session.exec( - select(EpcFloorDimensionModel) - .where(EpcFloorDimensionModel.epc_building_part_id == bp.id) - .order_by(EpcFloorDimensionModel.id) # type: ignore[arg-type] - ).all() - ) + def _to_building_part( + self, bp: EpcBuildingPartModel, floor_rows: list[EpcFloorDimensionModel] + ) -> SapBuildingPart: return SapBuildingPart( identifier=BuildingPartIdentifier(bp.identifier), construction_age_band=bp.construction_age_band, diff --git a/repositories/epc/epc_repository.py b/repositories/epc/epc_repository.py index fb83bdbc..171d098e 100644 --- a/repositories/epc/epc_repository.py +++ b/repositories/epc/epc_repository.py @@ -28,3 +28,11 @@ class EpcRepository(ABC): @abstractmethod def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: ... + + @abstractmethod + def get_for_properties( + self, property_ids: list[int] + ) -> dict[int, EpcPropertyData]: + """Bulk-hydrate a batch's EPCs, keyed by property_id (only those with an + EPC are present). A handful of per-table queries, not N per property.""" + ... diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index c1b631dd..e0b4f9ff 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -1,7 +1,8 @@ from __future__ import annotations -from sqlmodel import Session +from sqlmodel import Session, col, select +from domain.property.properties import Properties from domain.property.property import Property, PropertyIdentity from infrastructure.postgres.property_table import PropertyRow from repositories.epc.epc_repository import EpcRepository @@ -34,3 +35,30 @@ class PropertyPostgresRepository(PropertyRepository): identity=identity, epc=self._epc_repo.get_for_property(property_id), ) + + def get_many(self, property_ids: list[int]) -> Properties: + if not property_ids: + return Properties([]) + rows = self._session.exec( + select(PropertyRow).where(col(PropertyRow.id).in_(property_ids)) + ).all() + row_by_id = {row.id: row for row in rows if row.id is not None} + epcs = self._epc_repo.get_for_properties(property_ids) + items: list[Property] = [] + for property_id in property_ids: + row = row_by_id.get(property_id) + if row is None: + raise ValueError(f"property {property_id} not found") + items.append( + Property( + identity=PropertyIdentity( + portfolio_id=row.portfolio_id, + postcode=row.postcode, + address=row.address, + uprn=row.uprn, + landlord_property_id=row.landlord_property_id, + ), + epc=epcs.get(property_id), + ) + ) + return Properties(items) diff --git a/repositories/property/property_repository.py b/repositories/property/property_repository.py index 0a9045be..1f3df1da 100644 --- a/repositories/property/property_repository.py +++ b/repositories/property/property_repository.py @@ -2,6 +2,7 @@ from __future__ import annotations from abc import ABC, abstractmethod +from domain.property.properties import Properties from domain.property.property import Property @@ -15,3 +16,10 @@ class PropertyRepository(ABC): @abstractmethod def get(self, property_id: int) -> Property: ... + + @abstractmethod + def get_many(self, property_ids: list[int]) -> Properties: + """Load a batch of Properties whole, in a handful of per-table queries + rather than one round-trip per property (ADR-0012). Order follows the + input ids.""" + ... diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py index 5891434a..24138520 100644 --- a/tests/orchestration/fakes.py +++ b/tests/orchestration/fakes.py @@ -11,6 +11,7 @@ from typing import Any, Optional from datatypes.epc.domain.epc_property_data import EpcPropertyData from domain.baseline.baseline_performance import BaselinePerformance +from domain.property.properties import Properties from domain.property.property import Property from repositories.baseline.baseline_repository import BaselineRepository from repositories.epc.epc_repository import EpcRepository @@ -26,6 +27,9 @@ class FakePropertyRepo(PropertyRepository): def get(self, property_id: int) -> Property: return self._by_id[property_id] + def get_many(self, property_ids: list[int]) -> Properties: + return Properties([self._by_id[property_id] for property_id in property_ids]) + class FakeEpcRepo(EpcRepository): def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None: @@ -49,6 +53,15 @@ class FakeEpcRepo(EpcRepository): def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: return self._by_property.get(property_id) + def get_for_properties( + self, property_ids: list[int] + ) -> dict[int, EpcPropertyData]: + return { + property_id: self._by_property[property_id] + for property_id in property_ids + if property_id in self._by_property + } + class FakeSolarRepo(SolarRepository): def __init__(self) -> None: diff --git a/tests/repositories/epc/test_epc_bulk_read.py b/tests/repositories/epc/test_epc_bulk_read.py new file mode 100644 index 00000000..8601bcf4 --- /dev/null +++ b/tests/repositories/epc/test_epc_bulk_read.py @@ -0,0 +1,81 @@ +"""Bulk EPC read: get_for_properties hydrates a batch in a handful of per-table +queries, not N x per-property (ADR-0012, #1138).""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from sqlalchemy import Engine, event +from sqlmodel import Session + +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from repositories.epc.epc_postgres_repository import EpcPostgresRepository + +_JSON_SAMPLES = Path(__file__).resolve().parents[3] / "backend/epc_api/json_samples" + + +def _load_epc() -> EpcPropertyData: + raw: dict[str, Any] = json.loads( + (_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text() + ) + return EpcPropertyDataMapper.from_api_response(raw) + + +def _count_queries(engine: Engine, work: Callable[[], None]) -> int: + count = 0 + + def _before(*_args: Any, **_kwargs: Any) -> None: + nonlocal count + count += 1 + + event.listen(engine, "before_cursor_execute", _before) + try: + work() + finally: + event.remove(engine, "before_cursor_execute", _before) + return count + + +def test_get_for_properties_hydrates_the_whole_batch(db_engine: Engine) -> None: + # Arrange — the same sample EPC persisted for two properties. + epc = _load_epc() + with Session(db_engine) as session: + repo = EpcPostgresRepository(session) + repo.save(epc, property_id=10) + repo.save(epc, property_id=11) + session.commit() + + # Act + with Session(db_engine) as session: + result = EpcPostgresRepository(session).get_for_properties([10, 11]) + + # Assert — both fully hydrated (load-whole, ADR-0002). + assert result == {10: epc, 11: epc} + + +def test_get_for_properties_round_trips_do_not_scale_with_batch_size( + db_engine: Engine, +) -> None: + # Arrange + epc = _load_epc() + with Session(db_engine) as session: + repo = EpcPostgresRepository(session) + repo.save(epc, property_id=10) + repo.save(epc, property_id=11) + session.commit() + + def _read(property_ids: list[int]) -> None: + with Session(db_engine) as session: + EpcPostgresRepository(session).get_for_properties(property_ids) + + # Act — count queries for a 1-property batch vs a 2-property batch. + one = _count_queries(db_engine, lambda: _read([10])) + two = _count_queries(db_engine, lambda: _read([10, 11])) + + # Assert — same number of round-trips regardless of batch size (one query + # per table, not per property). + assert one == two