Model/orchestration/ingestion_orchestrator.py
Khalim Conn-Kowlessar 8685f8ba3a perf(repos): bulk get_many / get_for_properties — batch reads, not N round-trips (#1138)
Final slice of ADR-0012: collapse the per-property read round-trips a batch
made (Baseline hydrated ~8 queries x 30 properties one at a time) into a
handful of per-table IN queries.

- EpcPostgresRepository: extracted a shared `_compose(rows)` from `get` (the
  windows + floor-dim fetches are now passed in, not fetched inline), so both
  `get` and the new `get_for_properties(property_ids)` build EpcPropertyData
  from pre-fetched rows. `get_for_properties` fetches each child table once
  (`WHERE epc_property_id IN ...`), groups in memory, and composes — load-whole
  per ADR-0002.
- PropertyRepository.get_many(property_ids) -> Properties: one query for the
  property rows + one bulk EPC hydration, composed in input order.
- BaselineOrchestrator / IngestionOrchestrator read the batch via get_many
  instead of N x get.
- Ports + fakes gain the bulk methods.

The #1129 round-trip fidelity test stays green (the compose extraction is
behaviour-preserving). New tests: bulk hydration correctness + round-trips are
constant w.r.t. batch size (one-per-table, proven by query count). 123 pass;
pyright strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 10:33:24 +00:00

97 lines
3.8 KiB
Python

from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Optional, Protocol
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.unit_of_work import UnitOfWork
class EpcFetcher(Protocol):
"""The slice of the New-EPC-API client Ingestion needs (e.g. EpcClientService)."""
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: ...
class SolarFetcher(Protocol):
"""The slice of the Google Solar client Ingestion needs (e.g. GoogleSolarApiClient)."""
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]: ...
@dataclass
class _Fetched:
"""One property's externally-fetched source data, awaiting the write phase."""
property_id: int
epc: Optional[EpcPropertyData]
solar_insights: Optional[dict[str, Any]]
class IngestionOrchestrator:
"""Stage 1: acquire a batch's external source data and persist it.
Runs in two phases so a DB connection is never held during external IO
(ADR-0012): **fetch** the whole batch — read each UPRN, fetch its EPC, resolve
coordinates from the Geospatial reference Repo, thread those into the Solar
fetcher — with *no unit open*; then **write** the batch in one Unit of Work
and commit once. Fetchers never call each other (ADR-0011); the orchestrator
threads the coordinate. Coordinates are reference data (deterministic from
UPRN), resolved transiently to drive the Solar fetch, never persisted.
The geospatial repo reads S3 reference data, not the transactional store, so
it is injected separately rather than taken from the unit.
"""
def __init__(
self,
*,
unit_of_work: Callable[[], UnitOfWork],
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
) -> None:
self._unit_of_work = unit_of_work
self._epc_fetcher = epc_fetcher
self._geospatial_repo = geospatial_repo
self._solar_fetcher = solar_fetcher
def run(self, property_ids: list[int]) -> None:
uprns = self._uprns_for(property_ids)
fetched = [self._fetch(property_id, uprn) for property_id, uprn in uprns]
self._persist(fetched)
def _uprns_for(self, property_ids: list[int]) -> list[tuple[int, int]]:
# A short read unit; properties with no UPRN (e.g. landlord_property_id
# only) are skipped — a later Site-Notes path covers them.
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
return [
(property_id, prop.identity.uprn)
for property_id, prop in zip(property_ids, properties, strict=True)
if prop.identity.uprn is not None
]
def _fetch(self, property_id: int, uprn: int) -> _Fetched:
# No unit open here — this is the external-IO phase.
epc = self._epc_fetcher.get_by_uprn(uprn)
solar_insights: Optional[dict[str, Any]] = None
coordinates = self._geospatial_repo.coordinates_for(uprn)
if coordinates is not None:
solar_insights = self._solar_fetcher.get_building_insights(
coordinates.longitude, coordinates.latitude
)
return _Fetched(property_id, epc, solar_insights)
def _persist(self, fetched: list[_Fetched]) -> None:
with self._unit_of_work() as uow:
for item in fetched:
if item.epc is not None:
uow.epc.save(item.epc, property_id=item.property_id)
if item.solar_insights is not None:
uow.solar.save(item.property_id, item.solar_insights)
uow.commit()