feat(repos): idempotent EPC + Baseline writes (replace by property_id) (#1138)

Re-runs of a First Run batch re-save a property's data; that must replace,
not duplicate (ADR-0012 idempotent batch writes).

- `EpcPostgresRepository.save` deletes the property's existing EPC graph
  (parent + all child tables, floor-dims via their building parts) before
  inserting, when a `property_id` is given. Anonymous saves still insert.
- `BaselinePostgresRepository.save` deletes the existing row for the
  `property_id` before inserting — no more unique-constraint violation on
  re-save; also what the re-score-on-override path needs.
- Solar already upserts, so it's unchanged.

The #1129 round-trip fidelity test stays green (delete-first is a no-op on
a first save). 2 new tests (re-save replaces, not duplicates). pyright
strict clean; AAA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-31 09:41:39 +00:00
parent 4daba1f7c5
commit 559ae1b4ec
4 changed files with 149 additions and 2 deletions

View file

@ -2,7 +2,7 @@ from __future__ import annotations
from typing import Optional
from sqlmodel import Session, select
from sqlmodel import Session, col, delete, select
from domain.baseline.baseline_performance import BaselinePerformance
from infrastructure.postgres.baseline_performance_table import (
@ -18,6 +18,13 @@ class BaselinePostgresRepository(BaselineRepository):
self._session = session
def save(self, baseline: BaselinePerformance, property_id: int) -> int:
# Idempotent on property_id: a re-run (or re-score) replaces the row
# rather than hitting the unique constraint (ADR-0012).
self._session.exec( # type: ignore[call-overload]
delete(BaselinePerformanceModel).where(
col(BaselinePerformanceModel.property_id) == property_id
)
)
row = BaselinePerformanceModel.from_domain(baseline, property_id)
self._session.add(row)
self._session.flush()

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import date
from typing import Optional, TypeVar
from sqlmodel import Session, select
from sqlmodel import Session, col, delete, select
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import (
@ -74,6 +74,11 @@ class EpcPostgresRepository(EpcRepository):
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> int:
# Idempotent on property_id: a re-run replaces the property's EPC graph
# rather than duplicating it (ADR-0012). Anonymous saves (no property_id)
# always insert.
if property_id is not None:
self._delete_for_property(property_id)
parent = EpcPropertyModel.from_epc_property_data(
data, property_id=property_id, portfolio_id=portfolio_id
)
@ -134,6 +139,51 @@ class EpcPostgresRepository(EpcRepository):
)
return epc_property_id
def _delete_for_property(self, property_id: int) -> None:
"""Remove the property's existing EPC graph (parent + child tables) so a
re-save replaces rather than duplicates (ADR-0012)."""
epc_ids = [
i
for i in self._session.exec(
select(EpcPropertyModel.id).where(
EpcPropertyModel.property_id == property_id
)
).all()
if i is not None
]
if not epc_ids:
return
part_ids = [
i
for i in self._session.exec(
select(EpcBuildingPartModel.id).where(
col(EpcBuildingPartModel.epc_property_id).in_(epc_ids)
)
).all()
if i is not None
]
if part_ids:
self._session.exec( # type: ignore[call-overload]
delete(EpcFloorDimensionModel).where(
col(EpcFloorDimensionModel.epc_building_part_id).in_(part_ids)
)
)
for child in (
EpcPropertyEnergyPerformanceModel,
EpcEnergyElementModel,
EpcMainHeatingDetailModel,
EpcBuildingPartModel,
EpcWindowModel,
EpcFlatDetailsModel,
EpcRenewableHeatIncentiveModel,
):
self._session.exec( # type: ignore[call-overload]
delete(child).where(col(child.epc_property_id).in_(epc_ids))
)
self._session.exec( # type: ignore[call-overload]
delete(EpcPropertyModel).where(col(EpcPropertyModel.id).in_(epc_ids))
)
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
row = self._session.exec(
select(EpcPropertyModel)

View file

@ -44,6 +44,44 @@ def test_baseline_performance_round_trips(db_engine: Engine) -> None:
assert loaded == baseline
def test_resaving_baseline_for_a_property_replaces_rather_than_duplicating(
db_engine: Engine,
) -> None:
# Arrange — a re-run re-establishes the same property's baseline with a
# different rating.
first = _baseline()
rerun = BaselinePerformance(
lodged=Performance(
sap_score=80,
epc_band=Epc.B,
co2_emissions=1.2,
primary_energy_intensity=150,
),
effective=Performance(
sap_score=80,
epc_band=Epc.B,
co2_emissions=1.2,
primary_energy_intensity=150,
),
rebaseline_reason="none",
space_heating_kwh=4000.0,
water_heating_kwh=1800.0,
)
# Act — save twice for the same property_id (must not hit the unique
# constraint, must overwrite).
with Session(db_engine) as session:
repo = BaselinePostgresRepository(session)
repo.save(first, property_id=10)
repo.save(rerun, property_id=10)
session.commit()
# Assert
with Session(db_engine) as session:
loaded = BaselinePostgresRepository(session).get_for_property(10)
assert loaded == rerun
def test_get_for_property_returns_none_when_absent(db_engine: Engine) -> None:
# Arrange / Act
with Session(db_engine) as session:

View file

@ -0,0 +1,52 @@
"""A re-run of First Run re-saves a property's EPC; that must replace the prior
row, not duplicate it (ADR-0012 idempotent batch writes, #1138)."""
from __future__ import annotations
import dataclasses
import json
from pathlib import Path
from typing import Any
from sqlalchemy import Engine
from sqlmodel import Session, select
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from infrastructure.postgres.epc_property_table import EpcPropertyModel
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
_JSON_SAMPLES = Path(__file__).resolve().parents[3] / "backend/epc_api/json_samples"
def _load_epc() -> EpcPropertyData:
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
return EpcPropertyDataMapper.from_api_response(raw)
def test_resaving_an_epc_for_a_property_replaces_rather_than_duplicates(
db_engine: Engine,
) -> None:
# Arrange — same property re-ingested with a changed field.
original = _load_epc()
updated = dataclasses.replace(original, status="re-run-sentinel")
# Act — save twice for the same property_id (a re-run).
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(original, property_id=10)
repo.save(updated, property_id=10)
session.commit()
# Assert — exactly one EPC row for the property, holding the latest data.
with Session(db_engine) as session:
rows = session.exec(
select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10)
).all()
reloaded = EpcPostgresRepository(session).get_for_property(10)
assert len(rows) == 1
assert reloaded is not None
assert reloaded.status == "re-run-sentinel"