perf(repos): bulk get_many / get_for_properties — batch reads, not N round-trips (#1138)

Final slice of ADR-0012: collapse the per-property read round-trips a batch
made (Baseline hydrated ~8 queries x 30 properties one at a time) into a
handful of per-table IN queries.

- EpcPostgresRepository: extracted a shared `_compose(rows)` from `get` (the
  windows + floor-dim fetches are now passed in, not fetched inline), so both
  `get` and the new `get_for_properties(property_ids)` build EpcPropertyData
  from pre-fetched rows. `get_for_properties` fetches each child table once
  (`WHERE epc_property_id IN ...`), groups in memory, and composes — load-whole
  per ADR-0002.
- PropertyRepository.get_many(property_ids) -> Properties: one query for the
  property rows + one bulk EPC hydration, composed in input order.
- BaselineOrchestrator / IngestionOrchestrator read the batch via get_many
  instead of N x get.
- Ports + fakes gain the bulk methods.

The #1129 round-trip fidelity test stays green (the compose extraction is
behaviour-preserving). New tests: bulk hydration correctness + round-trips are
constant w.r.t. batch size (one-per-table, proven by query count). 123 pass;
pyright strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-31 10:33:24 +00:00
parent 48a488d1e9
commit 8685f8ba3a
8 changed files with 313 additions and 20 deletions

View file

@ -38,8 +38,9 @@ class BaselineOrchestrator:
def run(self, property_ids: list[int]) -> None:
with self._unit_of_work() as uow:
for property_id in property_ids:
effective_epc = uow.property.get(property_id).effective_epc
properties = uow.property.get_many(property_ids)
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc = prop.effective_epc
lodged = lodged_performance(effective_epc)
effective, reason = self._rebaseliner.rebaseline(
effective_epc, lodged

View file

@ -69,12 +69,12 @@ class IngestionOrchestrator:
# A short read unit; properties with no UPRN (e.g. landlord_property_id
# only) are skipped — a later Site-Notes path covers them.
with self._unit_of_work() as uow:
pairs: list[tuple[int, int]] = []
for property_id in property_ids:
uprn = uow.property.get(property_id).identity.uprn
if uprn is not None:
pairs.append((property_id, uprn))
return pairs
properties = uow.property.get_many(property_ids)
return [
(property_id, prop.identity.uprn)
for property_id, prop in zip(property_ids, properties, strict=True)
if prop.identity.uprn is not None
]
def _fetch(self, property_id: int, uprn: int) -> _Fetched:
# No unit open here — this is the external-IO phase.

View file

@ -1,7 +1,8 @@
from __future__ import annotations
from collections.abc import Sequence
from datetime import date
from typing import Optional, TypeVar
from typing import Optional, Protocol, TypeVar
from sqlmodel import Session, col, delete, select
@ -56,6 +57,20 @@ def _require(value: Optional[_T], field: str) -> _T:
return value
class _HasEpcPropertyId(Protocol):
epc_property_id: int
_RowT = TypeVar("_RowT", bound=_HasEpcPropertyId)
def _group_by_epc(rows: Sequence[_RowT]) -> dict[int, list[_RowT]]:
grouped: dict[int, list[_RowT]] = {}
for row in rows:
grouped.setdefault(row.epc_property_id, []).append(row)
return grouped
class EpcPostgresRepository(EpcRepository):
"""Maps EpcPropertyData to/from the epc_property parent row + child tables.
@ -194,6 +209,117 @@ class EpcPostgresRepository(EpcRepository):
return None
return self.get(row.id)
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's EPCs in a handful of per-table IN queries
(ADR-0012), not N x per-property. Load-whole per ADR-0002."""
if not property_ids:
return {}
parents = self._session.exec(
select(EpcPropertyModel)
.where(col(EpcPropertyModel.property_id).in_(property_ids))
.order_by(EpcPropertyModel.id) # type: ignore[arg-type]
).all()
parent_by_property: dict[int, EpcPropertyModel] = {}
for parent in parents:
if parent.property_id is not None and parent.id is not None:
parent_by_property.setdefault(parent.property_id, parent)
epc_ids = [p.id for p in parent_by_property.values() if p.id is not None]
if not epc_ids:
return {}
perf_by = {
r.epc_property_id: r
for r in self._session.exec(
select(EpcPropertyEnergyPerformanceModel).where(
col(EpcPropertyEnergyPerformanceModel.epc_property_id).in_(epc_ids)
)
).all()
}
flat_by = {
r.epc_property_id: r
for r in self._session.exec(
select(EpcFlatDetailsModel).where(
col(EpcFlatDetailsModel.epc_property_id).in_(epc_ids)
)
).all()
}
rhi_by = {
r.epc_property_id: r
for r in self._session.exec(
select(EpcRenewableHeatIncentiveModel).where(
col(EpcRenewableHeatIncentiveModel.epc_property_id).in_(epc_ids)
)
).all()
}
elements_by = _group_by_epc(
self._session.exec(
select(EpcEnergyElementModel)
.where(col(EpcEnergyElementModel.epc_property_id).in_(epc_ids))
.order_by(EpcEnergyElementModel.id) # type: ignore[arg-type]
).all()
)
heating_by = _group_by_epc(
self._session.exec(
select(EpcMainHeatingDetailModel)
.where(col(EpcMainHeatingDetailModel.epc_property_id).in_(epc_ids))
.order_by(EpcMainHeatingDetailModel.id) # type: ignore[arg-type]
).all()
)
parts_by = _group_by_epc(
self._session.exec(
select(EpcBuildingPartModel)
.where(col(EpcBuildingPartModel.epc_property_id).in_(epc_ids))
.order_by(EpcBuildingPartModel.id) # type: ignore[arg-type]
).all()
)
windows_by = _group_by_epc(
self._session.exec(
select(EpcWindowModel)
.where(col(EpcWindowModel.epc_property_id).in_(epc_ids))
.order_by(EpcWindowModel.id) # type: ignore[arg-type]
).all()
)
part_ids = [
bp.id
for parts in parts_by.values()
for bp in parts
if bp.id is not None
]
floor_dims_by_part = self._floor_dims_by_part(part_ids)
result: dict[int, EpcPropertyData] = {}
for property_id, parent in parent_by_property.items():
epc_id = _require(parent.id, "id")
result[property_id] = self._compose(
p=parent,
perf=perf_by.get(epc_id),
elements=elements_by.get(epc_id, []),
heating_rows=heating_by.get(epc_id, []),
part_rows=parts_by.get(epc_id, []),
floor_dims_by_part=floor_dims_by_part,
window_rows=windows_by.get(epc_id, []),
flat_row=flat_by.get(epc_id),
rhi_row=rhi_by.get(epc_id),
)
return result
def _floor_dims_by_part(
self, part_ids: list[int]
) -> dict[int, list[EpcFloorDimensionModel]]:
if not part_ids:
return {}
rows = self._session.exec(
select(EpcFloorDimensionModel)
.where(col(EpcFloorDimensionModel.epc_building_part_id).in_(part_ids))
.order_by(EpcFloorDimensionModel.id) # type: ignore[arg-type]
).all()
grouped: dict[int, list[EpcFloorDimensionModel]] = {}
for row in rows:
grouped.setdefault(row.epc_building_part_id, []).append(row)
return grouped
def get(self, epc_property_id: int) -> EpcPropertyData:
p = self._session.get(EpcPropertyModel, epc_property_id)
if p is None:
@ -234,7 +360,35 @@ class EpcPostgresRepository(EpcRepository):
EpcRenewableHeatIncentiveModel.epc_property_id == epc_property_id
)
).first()
window_rows = self._windows(epc_property_id)
floor_dims_by_part = self._floor_dims_by_part(
[bp.id for bp in part_rows if bp.id is not None]
)
return self._compose(
p=p,
perf=perf,
elements=elements,
heating_rows=heating_rows,
part_rows=part_rows,
floor_dims_by_part=floor_dims_by_part,
window_rows=window_rows,
flat_row=flat_row,
rhi_row=rhi_row,
)
def _compose(
self,
*,
p: EpcPropertyModel,
perf: Optional[EpcPropertyEnergyPerformanceModel],
elements: list[EpcEnergyElementModel],
heating_rows: list[EpcMainHeatingDetailModel],
part_rows: list[EpcBuildingPartModel],
floor_dims_by_part: dict[int, list[EpcFloorDimensionModel]],
window_rows: list[EpcWindowModel],
flat_row: Optional[EpcFlatDetailsModel],
rhi_row: Optional[EpcRenewableHeatIncentiveModel],
) -> EpcPropertyData:
def _elements(element_type: str) -> list[EnergyElement]:
return [self._to_energy_element(e) for e in elements if e.element_type == element_type]
@ -256,9 +410,14 @@ class EpcPostgresRepository(EpcRepository):
main_heating=_elements("main_heating"),
door_count=p.door_count,
sap_heating=self._to_sap_heating(p, heating_rows),
sap_windows=[self._to_window(w) for w in self._windows(epc_property_id)],
sap_windows=[self._to_window(w) for w in window_rows],
sap_energy_source=self._to_energy_source(p),
sap_building_parts=[self._to_building_part(bp) for bp in part_rows],
sap_building_parts=[
self._to_building_part(
bp, floor_dims_by_part.get(bp.id, []) if bp.id is not None else []
)
for bp in part_rows
],
solar_water_heating=p.solar_water_heating,
has_hot_water_cylinder=p.has_hot_water_cylinder,
has_fixed_air_conditioning=p.has_fixed_air_conditioning,
@ -519,14 +678,9 @@ class EpcPostgresRepository(EpcRepository):
)
@private
def _to_building_part(self, bp: EpcBuildingPartModel) -> SapBuildingPart:
floor_rows = list(
self._session.exec(
select(EpcFloorDimensionModel)
.where(EpcFloorDimensionModel.epc_building_part_id == bp.id)
.order_by(EpcFloorDimensionModel.id) # type: ignore[arg-type]
).all()
)
def _to_building_part(
self, bp: EpcBuildingPartModel, floor_rows: list[EpcFloorDimensionModel]
) -> SapBuildingPart:
return SapBuildingPart(
identifier=BuildingPartIdentifier(bp.identifier),
construction_age_band=bp.construction_age_band,

View file

@ -28,3 +28,11 @@ class EpcRepository(ABC):
@abstractmethod
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: ...
@abstractmethod
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's EPCs, keyed by property_id (only those with an
EPC are present). A handful of per-table queries, not N per property."""
...

View file

@ -1,7 +1,8 @@
from __future__ import annotations
from sqlmodel import Session
from sqlmodel import Session, col, select
from domain.property.properties import Properties
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.property_table import PropertyRow
from repositories.epc.epc_repository import EpcRepository
@ -34,3 +35,30 @@ class PropertyPostgresRepository(PropertyRepository):
identity=identity,
epc=self._epc_repo.get_for_property(property_id),
)
def get_many(self, property_ids: list[int]) -> Properties:
if not property_ids:
return Properties([])
rows = self._session.exec(
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
).all()
row_by_id = {row.id: row for row in rows if row.id is not None}
epcs = self._epc_repo.get_for_properties(property_ids)
items: list[Property] = []
for property_id in property_ids:
row = row_by_id.get(property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
items.append(
Property(
identity=PropertyIdentity(
portfolio_id=row.portfolio_id,
postcode=row.postcode,
address=row.address,
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
),
epc=epcs.get(property_id),
)
)
return Properties(items)

View file

@ -2,6 +2,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from domain.property.properties import Properties
from domain.property.property import Property
@ -15,3 +16,10 @@ class PropertyRepository(ABC):
@abstractmethod
def get(self, property_id: int) -> Property: ...
@abstractmethod
def get_many(self, property_ids: list[int]) -> Properties:
"""Load a batch of Properties whole, in a handful of per-table queries
rather than one round-trip per property (ADR-0012). Order follows the
input ids."""
...

View file

@ -11,6 +11,7 @@ from typing import Any, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.baseline.baseline_performance import BaselinePerformance
from domain.property.properties import Properties
from domain.property.property import Property
from repositories.baseline.baseline_repository import BaselineRepository
from repositories.epc.epc_repository import EpcRepository
@ -26,6 +27,9 @@ class FakePropertyRepo(PropertyRepository):
def get(self, property_id: int) -> Property:
return self._by_id[property_id]
def get_many(self, property_ids: list[int]) -> Properties:
return Properties([self._by_id[property_id] for property_id in property_ids])
class FakeEpcRepo(EpcRepository):
def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None:
@ -49,6 +53,15 @@ class FakeEpcRepo(EpcRepository):
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
return self._by_property.get(property_id)
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
return {
property_id: self._by_property[property_id]
for property_id in property_ids
if property_id in self._by_property
}
class FakeSolarRepo(SolarRepository):
def __init__(self) -> None:

View file

@ -0,0 +1,81 @@
"""Bulk EPC read: get_for_properties hydrates a batch in a handful of per-table
queries, not N x per-property (ADR-0012, #1138)."""
from __future__ import annotations
import json
from collections.abc import Callable
from pathlib import Path
from typing import Any
from sqlalchemy import Engine, event
from sqlmodel import Session
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
_JSON_SAMPLES = Path(__file__).resolve().parents[3] / "backend/epc_api/json_samples"
def _load_epc() -> EpcPropertyData:
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
return EpcPropertyDataMapper.from_api_response(raw)
def _count_queries(engine: Engine, work: Callable[[], None]) -> int:
count = 0
def _before(*_args: Any, **_kwargs: Any) -> None:
nonlocal count
count += 1
event.listen(engine, "before_cursor_execute", _before)
try:
work()
finally:
event.remove(engine, "before_cursor_execute", _before)
return count
def test_get_for_properties_hydrates_the_whole_batch(db_engine: Engine) -> None:
# Arrange — the same sample EPC persisted for two properties.
epc = _load_epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(epc, property_id=10)
repo.save(epc, property_id=11)
session.commit()
# Act
with Session(db_engine) as session:
result = EpcPostgresRepository(session).get_for_properties([10, 11])
# Assert — both fully hydrated (load-whole, ADR-0002).
assert result == {10: epc, 11: epc}
def test_get_for_properties_round_trips_do_not_scale_with_batch_size(
db_engine: Engine,
) -> None:
# Arrange
epc = _load_epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(epc, property_id=10)
repo.save(epc, property_id=11)
session.commit()
def _read(property_ids: list[int]) -> None:
with Session(db_engine) as session:
EpcPostgresRepository(session).get_for_properties(property_ids)
# Act — count queries for a 1-property batch vs a 2-property batch.
one = _count_queries(db_engine, lambda: _read([10]))
two = _count_queries(db_engine, lambda: _read([10, 11]))
# Assert — same number of round-trips regardless of batch size (one query
# per table, not per property).
assert one == two