Batch EPC writes via save_batch() on EpcPostgresRepository 🟥

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Roth 2026-06-29 12:19:49 +00:00
parent 98ba8d89dc
commit fe69bccf22
4 changed files with 206 additions and 3 deletions

View file

@ -1,10 +1,12 @@
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Optional, Protocol, TypeVar
from typing import Any, Optional, Protocol, TypeVar
from sqlmodel import Session, col, delete, select
from sqlalchemy import insert as _sa_insert
from sqlmodel import Session, SQLModel, col, delete, select
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import (
@ -54,6 +56,23 @@ from utilities.private import private
_T = TypeVar("_T")
@dataclass(frozen=True)
class EpcSaveRequest:
data: EpcPropertyData
property_id: Optional[int] = None
portfolio_id: Optional[int] = None
source: EpcSource = field(default="lodged")
def _col_values(model: SQLModel, exclude: frozenset[str] = frozenset()) -> dict[str, Any]:
"""Extract column-keyed values from a SQLModel instance for Core INSERT."""
return {
c.name: getattr(model, c.name)
for c in model.__table__.c # type: ignore[attr-defined]
if c.name not in exclude
}
def _require(value: Optional[_T], field: str) -> _T:
if value is None:
raise ValueError(f"epc_property row is missing required field {field!r}")
@ -181,6 +200,12 @@ class EpcPostgresRepository(EpcRepository):
)
return epc_property_id
def save_batch(self, requests: list[EpcSaveRequest]) -> list[int]:
raise NotImplementedError
def _delete_for_properties(self, property_ids: list[int], source: EpcSource) -> None:
raise NotImplementedError
def _delete_for_property(self, property_id: int, source: EpcSource) -> None:
"""Remove the property's existing EPC graph for `source` (parent + child
tables) so a re-save replaces rather than duplicates (ADR-0012), without

View file

@ -1,10 +1,13 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Literal, Optional
from typing import TYPE_CHECKING, Literal, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
if TYPE_CHECKING:
from repositories.epc.epc_postgres_repository import EpcSaveRequest
# Provenance of a persisted EPC picture (ADR-0031): a real "lodged" EPC, or a
# "predicted" one synthesised by EPC Prediction. A property can hold one of each.
EpcSource = Literal["lodged", "predicted"]
@ -29,6 +32,9 @@ class EpcRepository(ABC):
source: EpcSource = "lodged",
) -> int: ...
@abstractmethod
def save_batch(self, requests: "list[EpcSaveRequest]") -> list[int]: ...
@abstractmethod
def get(self, epc_property_id: int) -> EpcPropertyData: ...

View file

@ -17,6 +17,7 @@ from domain.modelling.scenario import Scenario
from domain.property_baseline.property_baseline_performance import PropertyBaselinePerformance
from domain.property.properties import Properties
from domain.property.property import Property
from repositories.epc.epc_postgres_repository import EpcSaveRequest
from repositories.plan.plan_repository import PlanRepository
from repositories.product.product_repository import ProductRepository
from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository
@ -130,6 +131,9 @@ class FakeEpcRepo(EpcRepository):
if property_id in self._predicted_by_property
}
def save_batch(self, requests: list[EpcSaveRequest]) -> list[int]:
return [self.save(r.data, r.property_id, r.portfolio_id, r.source) for r in requests]
class FakeSolarRepo(SolarRepository):
"""In-memory Google Solar insights store keyed by UPRN. Seed `by_uprn` to

View file

@ -0,0 +1,168 @@
"""Batch EPC write path — save_batch() correctness and safety tests.
Guards the four user stories from #1348:
1. FK mis-wiring regression: building-part IDs must not be crossed between
properties in the same save_batch() call.
2. save()/save_batch() parity: the single-property delegation path is loss-free.
3. Batch idempotency: a second save_batch() with the same requests replaces,
not duplicates.
4. Source isolation: lodged and predicted slots coexist after separate
save_batch() calls on the same property IDs.
"""
from __future__ import annotations
import json
from dataclasses import replace
from pathlib import Path
from typing import Any
import pytest
from sqlalchemy import Engine
from sqlmodel import Session
from datatypes.epc.domain.epc_property_data import EpcPropertyData, SapFloorDimension
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from repositories.epc.epc_postgres_repository import EpcPostgresRepository, EpcSaveRequest
_JSON_SAMPLES = Path(__file__).resolve().parents[3] / "backend/epc_api/json_samples"
def _load_epc(schema_dir: str = "RdSAP-Schema-21.0.0") -> EpcPropertyData:
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / schema_dir / "epc.json").read_text()
)
return EpcPropertyDataMapper.from_api_response(raw)
def _with_floor_areas(epc: EpcPropertyData, areas_m2: list[float]) -> EpcPropertyData:
"""Replace the building parts with variants that have a single floor dimension
carrying the given total_floor_area_m2 making them easy to distinguish after
a round-trip without changing anything else about the EPC."""
template_bp = epc.sap_building_parts[0]
template_dim = template_bp.sap_floor_dimensions[0]
new_parts = [
replace(template_bp, sap_floor_dimensions=[replace(template_dim, total_floor_area_m2=a)])
for a in areas_m2
]
return replace(epc, sap_building_parts=new_parts)
# ---------------------------------------------------------------------------
# Tracer bullet: single-request save_batch() is loss-free vs save()
# ---------------------------------------------------------------------------
def test_single_request_save_batch_matches_save(db_engine: Engine) -> None:
# Arrange
epc = _load_epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
epc_id_via_save = repo.save(epc, property_id=1001)
epc_id_via_batch = repo.save_batch([EpcSaveRequest(epc, property_id=1002)])[0]
session.commit()
# Act
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
via_save = repo.get(epc_id_via_save)
via_batch = repo.get(epc_id_via_batch)
# Assert — both paths reconstruct the original exactly.
assert via_save == epc
assert via_batch == epc
# ---------------------------------------------------------------------------
# FK mis-wiring regression: building-part IDs must not be crossed
# ---------------------------------------------------------------------------
def test_multi_property_building_part_ids_are_not_crossed(db_engine: Engine) -> None:
# Arrange — property A has 2 parts with distinctive areas; B has 1 with a
# third distinctive area. If part IDs are mis-wired the floor-dimension FK
# rows end up under the wrong property.
base = _load_epc()
epc_a = _with_floor_areas(base, [10.0, 20.0])
epc_b = _with_floor_areas(base, [99.0])
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save_batch([
EpcSaveRequest(epc_a, property_id=2001),
EpcSaveRequest(epc_b, property_id=2002),
])
session.commit()
# Act
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
reloaded_a = repo.get_for_property(2001)
reloaded_b = repo.get_for_property(2002)
# Assert — each property's building parts carry its own floor areas.
assert reloaded_a is not None
assert reloaded_b is not None
areas_a = sorted(
dim.total_floor_area_m2
for part in reloaded_a.sap_building_parts
for dim in part.sap_floor_dimensions
)
areas_b = sorted(
dim.total_floor_area_m2
for part in reloaded_b.sap_building_parts
for dim in part.sap_floor_dimensions
)
assert areas_a == [10.0, 20.0]
assert areas_b == [99.0]
# ---------------------------------------------------------------------------
# Idempotency: second save_batch() replaces, not duplicates
# ---------------------------------------------------------------------------
def test_save_batch_is_idempotent(db_engine: Engine) -> None:
# Arrange
epc = _load_epc()
requests = [EpcSaveRequest(epc, property_id=3001)]
with Session(db_engine) as session:
EpcPostgresRepository(session).save_batch(requests)
session.commit()
# Act — re-save the same batch.
with Session(db_engine) as session:
EpcPostgresRepository(session).save_batch(requests)
session.commit()
# Assert — exactly one EPC survives (no duplicate rows).
with Session(db_engine) as session:
result = EpcPostgresRepository(session).get_for_property(3001)
assert result == epc
# ---------------------------------------------------------------------------
# Source isolation: lodged and predicted slots survive separate batch saves
# ---------------------------------------------------------------------------
def test_lodged_and_predicted_batch_slots_are_independent(db_engine: Engine) -> None:
# Arrange — two properties each get a lodged EPC and then a predicted EPC
# via separate save_batch() calls.
epc = _load_epc()
property_ids = [4001, 4002]
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save_batch([EpcSaveRequest(epc, property_id=pid, source="lodged") for pid in property_ids])
repo.save_batch([EpcSaveRequest(epc, property_id=pid, source="predicted") for pid in property_ids])
session.commit()
# Act
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
lodged = repo.get_for_properties(property_ids)
predicted = repo.get_predicted_for_properties(property_ids)
# Assert — both slots are populated for both properties.
assert lodged == {4001: epc, 4002: epc}
assert predicted == {4001: epc, 4002: epc}