feat(epc-prediction): slice-5c predicted-EPC persistence slot

Add a `source` discriminator (lodged | predicted) to the EPC store so a Property
holds a lodged EPC and a predicted one (EPC Prediction gap-fill) at once
(ADR-0031). EpcRepository.save gains source="lodged"; idempotent delete is now
per-source (a predicted save no longer wipes lodged, and vice versa);
get_for_property/get_for_properties filter lodged; new get_predicted_for_property
/ get_predicted_for_properties read predicted. PropertyPostgresRepository.get +
get_many hydrate Property.predicted_epc, so the predicted picture reaches the
modelling read (both load via get_many). FakeEpcRepo mirrors the dual slot.

EpcPropertyModel gains `source` (default "lodged"); the test DB builds from the
SQLModel mirror so this is exercised without the prod migration. The matching
Drizzle change (column + per-(property_id,source) uniqueness) is the team's to
action before merge — docs/MIGRATION_NOTE_predicted_epc_source.md.

3 store tests (coexist, idempotent predicted re-save leaves lodged, lodged-only
has no predicted) + property-repo wiring; 85 pass across affected suites; new
code pyright-clean (2 pre-existing wwhrs errors in epc_property_table untouched).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-16 03:50:19 +00:00
parent 6979607ace
commit fd43cf2d23
7 changed files with 237 additions and 21 deletions

View file

@ -0,0 +1,51 @@
# Migration note — `epc_property.source` (predicted-EPC slot)
**For the team to action before merging the EPC Prediction production-wiring
branch.** The model-side code is done and tested against the SQLModel-built test
DB; the **production Drizzle schema needs a matching column** that this repo does
not own.
## What changed in code (this branch)
Per **ADR-0031**, a Property can now hold a **lodged** EPC and a **predicted** EPC
(EPC Prediction gap-fill) at the same time. The two are distinguished by a new
`source` discriminator on the `epc_property` row:
- `infrastructure/postgres/epc_property_table.py``EpcPropertyModel` gains
`source: str = Field(default="lodged")`.
- `repositories/epc/epc_postgres_repository.py``save(..., source="lodged")`
writes it; `_delete_for_property` is now per-source (idempotency no longer wipes
the other slot); `get_for_property` / `get_for_properties` filter `source =
'lodged'`; new `get_predicted_for_property` / `get_predicted_for_properties`
read `source = 'predicted'`.
The test database is built from the SQLModel mirrors via `create_all`, so tests
already exercise the column. **Production is not** — hence this note.
## Required Drizzle migration
On the `epc_property` table:
1. **Add column** `source``text` (or your enum), **NOT NULL**, **default
`'lodged'`**. The default backfills every existing row as a real EPC, which is
correct (all current rows are lodged).
2. **Relax any single-row-per-property uniqueness.** If a unique constraint /
index exists on `epc_property(property_id)`, it must become
**`(property_id, source)`** — a property may now have one `lodged` row and one
`predicted` row. (Verify whether such a constraint exists; the SQLModel mirror
has none, but the production schema may.)
3. **Recommended index** `(property_id, source)` — every predicted/lodged read
filters on both columns.
## Allowed values
`source ∈ {'lodged', 'predicted'}` (see `EpcSource` in
`repositories/epc/epc_repository.py`). No other values are written.
## Why
ADR-0031: predicted EPCs are stored in their own slot rather than overwriting the
lodged `epc`, so (a) provenance is structural — the Validation Cohort excludes
predicted-sourced Properties and the UI flags them — and (b) lodged + predicted
coexist, which the planned **EPC Anomaly Flags** feature needs (compare a
Property's lodged EPC against its predicted one).

View file

@ -25,6 +25,12 @@ class EpcPropertyModel(SQLModel, table=True):
property_id: Optional[int] = Field(default=None)
portfolio_id: Optional[int] = Field(default=None)
uploaded_file_id: Optional[int] = Field(default=None)
# Provenance of this EPC picture: "lodged" (a real public/landlord EPC) or
# "predicted" (EPC Prediction gap-fill, ADR-0031). A property may hold one of
# each, so reads filter on it. Defaults to "lodged" — every existing row is a
# real EPC. (Requires a matching `source` column in the Drizzle schema — see
# docs/handover; the SQLModel mirror is what the test DB builds from.)
source: str = Field(default="lodged")
# Identity / admin
uprn: Optional[int] = Field(default=None)
@ -190,6 +196,7 @@ class EpcPropertyModel(SQLModel, table=True):
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
source: str = "lodged",
) -> EpcPropertyModel:
es = data.sap_energy_source
h = data.sap_heating
@ -202,6 +209,7 @@ class EpcPropertyModel(SQLModel, table=True):
return cls(
property_id=property_id,
portfolio_id=portfolio_id,
source=source,
uprn=data.uprn,
uprn_source=data.uprn_source,
report_reference=data.report_reference,

View file

@ -45,7 +45,7 @@ from infrastructure.postgres.epc_property_table import (
EpcRenewableHeatIncentiveModel,
EpcWindowModel,
)
from repositories.epc.epc_repository import EpcRepository
from repositories.epc.epc_repository import EpcRepository, EpcSource
from utilities.private import private
_T = TypeVar("_T")
@ -88,14 +88,16 @@ class EpcPostgresRepository(EpcRepository):
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
source: EpcSource = "lodged",
) -> int:
# Idempotent on property_id: a re-run replaces the property's EPC graph
# rather than duplicating it (ADR-0012). Anonymous saves (no property_id)
# always insert.
# Idempotent on (property_id, source): a re-run replaces the property's
# EPC graph for THAT source rather than duplicating it (ADR-0012), and a
# predicted save leaves the lodged one intact, and vice versa (ADR-0031).
# Anonymous saves (no property_id) always insert.
if property_id is not None:
self._delete_for_property(property_id)
self._delete_for_property(property_id, source)
parent = EpcPropertyModel.from_epc_property_data(
data, property_id=property_id, portfolio_id=portfolio_id
data, property_id=property_id, portfolio_id=portfolio_id, source=source
)
self._session.add(parent)
self._session.flush()
@ -154,15 +156,16 @@ class EpcPostgresRepository(EpcRepository):
)
return epc_property_id
def _delete_for_property(self, property_id: int) -> None:
"""Remove the property's existing EPC graph (parent + child tables) so a
re-save replaces rather than duplicates (ADR-0012)."""
def _delete_for_property(self, property_id: int, source: EpcSource) -> None:
"""Remove the property's existing EPC graph for `source` (parent + child
tables) so a re-save replaces rather than duplicates (ADR-0012), without
disturbing the other source's slot (ADR-0031)."""
epc_ids = [
i
for i in self._session.exec(
select(EpcPropertyModel.id).where(
EpcPropertyModel.property_id == property_id
)
select(EpcPropertyModel.id)
.where(EpcPropertyModel.property_id == property_id)
.where(EpcPropertyModel.source == source)
).all()
if i is not None
]
@ -200,9 +203,20 @@ class EpcPostgresRepository(EpcRepository):
)
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
return self._get_for_property(property_id, source="lodged")
def get_predicted_for_property(
self, property_id: int
) -> Optional[EpcPropertyData]:
return self._get_for_property(property_id, source="predicted")
def _get_for_property(
self, property_id: int, source: EpcSource
) -> Optional[EpcPropertyData]:
row = self._session.exec(
select(EpcPropertyModel)
.where(EpcPropertyModel.property_id == property_id)
.where(EpcPropertyModel.source == source)
.order_by(EpcPropertyModel.id) # type: ignore[arg-type]
).first()
if row is None or row.id is None:
@ -212,13 +226,26 @@ class EpcPostgresRepository(EpcRepository):
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's EPCs in a handful of per-table IN queries
(ADR-0012), not N x per-property. Load-whole per ADR-0002."""
"""Bulk-hydrate a batch's LODGED EPCs, keyed by property_id."""
return self._for_properties(property_ids, source="lodged")
def get_predicted_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's PREDICTED EPCs (ADR-0031), keyed by property_id."""
return self._for_properties(property_ids, source="predicted")
def _for_properties(
self, property_ids: list[int], source: EpcSource
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's EPCs of one `source` in a handful of per-table IN
queries (ADR-0012), not N x per-property. Load-whole per ADR-0002."""
if not property_ids:
return {}
parents = self._session.exec(
select(EpcPropertyModel)
.where(col(EpcPropertyModel.property_id).in_(property_ids))
.where(EpcPropertyModel.source == source)
.order_by(EpcPropertyModel.id) # type: ignore[arg-type]
).all()
parent_by_property: dict[int, EpcPropertyModel] = {}

View file

@ -1,10 +1,14 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
from typing import Literal, Optional
from datatypes.epc.domain.epc_property_data import EpcPropertyData
# Provenance of a persisted EPC picture (ADR-0031): a real "lodged" EPC, or a
# "predicted" one synthesised by EPC Prediction. A property can hold one of each.
EpcSource = Literal["lodged", "predicted"]
class EpcRepository(ABC):
"""Persists and loads the structured EPC Property Data slice.
@ -12,7 +16,8 @@ class EpcRepository(ABC):
`save` writes the `EpcPropertyData` to the `epc_property` parent row and its
child tables; `get` reconstructs the persisted projection back into an
`EpcPropertyData`. Round-trip fidelity over that projection is pinned by the
Slice-1 round-trip test (Hestia-Homes/Model#1129).
Slice-1 round-trip test (Hestia-Homes/Model#1129). Each EPC carries a
`source` so a lodged and a predicted picture coexist per property (ADR-0031).
"""
@abstractmethod
@ -21,18 +26,36 @@ class EpcRepository(ABC):
data: EpcPropertyData,
property_id: int | None = None,
portfolio_id: int | None = None,
source: EpcSource = "lodged",
) -> int: ...
@abstractmethod
def get(self, epc_property_id: int) -> EpcPropertyData: ...
@abstractmethod
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]: ...
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
"""The property's LODGED EPC (the predicted slot is read separately)."""
...
@abstractmethod
def get_predicted_for_property(
self, property_id: int
) -> Optional[EpcPropertyData]:
"""The property's PREDICTED EPC (EPC Prediction gap-fill), or None."""
...
@abstractmethod
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's EPCs, keyed by property_id (only those with an
EPC are present). A handful of per-table queries, not N per property."""
"""Bulk-hydrate a batch's LODGED EPCs, keyed by property_id (only those
with one are present). A handful of per-table queries, not N per property."""
...
@abstractmethod
def get_predicted_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
"""Bulk-hydrate a batch's PREDICTED EPCs (ADR-0031), keyed by property_id
(only those with one are present)."""
...

View file

@ -72,6 +72,7 @@ class PropertyPostgresRepository(PropertyRepository):
return Property(
identity=identity,
epc=self._epc().get_for_property(property_id),
predicted_epc=self._epc().get_predicted_for_property(property_id),
planning_restrictions=_restrictions_of(row.uprn, restrictions),
)
@ -83,6 +84,7 @@ class PropertyPostgresRepository(PropertyRepository):
).all()
row_by_id = {row.id: row for row in rows}
epcs = self._epc().get_for_properties(property_ids)
predicted_epcs = self._epc().get_predicted_for_properties(property_ids)
restrictions: dict[int, PlanningRestrictions] = self._restrictions_for(
[row.uprn for row in rows if row.uprn is not None]
)
@ -101,6 +103,7 @@ class PropertyPostgresRepository(PropertyRepository):
landlord_property_id=row.landlord_property_id,
),
epc=epcs.get(property_id),
predicted_epc=predicted_epcs.get(property_id),
planning_restrictions=_restrictions_of(row.uprn, restrictions),
)
)

View file

@ -20,7 +20,7 @@ from domain.property.property import Property
from repositories.plan.plan_repository import PlanRepository
from repositories.product.product_repository import ProductRepository
from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.epc.epc_repository import EpcRepository, EpcSource
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
@ -76,16 +76,24 @@ class FakeEpcRepo(EpcRepository):
def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None:
self.saved: list[tuple[EpcPropertyData, Optional[int]]] = []
self._by_property = by_property or {}
# Predicted EPCs live in their own slot, coexisting with lodged (ADR-0031).
self._predicted_by_property: dict[int, EpcPropertyData] = {}
def save(
self,
data: EpcPropertyData,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
source: EpcSource = "lodged",
) -> int:
self.saved.append((data, property_id))
if property_id is not None:
self._by_property[property_id] = data
slot = (
self._predicted_by_property
if source == "predicted"
else self._by_property
)
slot[property_id] = data
return len(self.saved)
def get(self, epc_property_id: int) -> EpcPropertyData: # pragma: no cover
@ -94,6 +102,11 @@ class FakeEpcRepo(EpcRepository):
def get_for_property(self, property_id: int) -> Optional[EpcPropertyData]:
return self._by_property.get(property_id)
def get_predicted_for_property(
self, property_id: int
) -> Optional[EpcPropertyData]:
return self._predicted_by_property.get(property_id)
def get_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
@ -103,6 +116,15 @@ class FakeEpcRepo(EpcRepository):
if property_id in self._by_property
}
def get_predicted_for_properties(
self, property_ids: list[int]
) -> dict[int, EpcPropertyData]:
return {
property_id: self._predicted_by_property[property_id]
for property_id in property_ids
if property_id in self._predicted_by_property
}
class FakeSolarRepo(SolarRepository):
"""In-memory Google Solar insights store keyed by UPRN. Seed `by_uprn` to

View file

@ -0,0 +1,82 @@
"""EPC store gains a `source` discriminator so a lodged EPC and a predicted EPC
(EPC Prediction gap-fill) coexist per property ADR-0031 slice-5c."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from sqlalchemy import Engine
from sqlmodel import Session
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
_JSON_SAMPLES = Path(__file__).resolve().parents[3] / "backend/epc_api/json_samples"
def _epc() -> EpcPropertyData:
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
return EpcPropertyDataMapper.from_api_response(raw)
def test_lodged_and_predicted_epcs_coexist_per_property(db_engine: Engine) -> None:
# Arrange — the same property gets a lodged EPC and a predicted EPC.
epc = _epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(epc, property_id=42)
repo.save(epc, property_id=42, source="predicted")
session.commit()
# Act
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
lodged = repo.get_for_property(42)
predicted = repo.get_predicted_for_property(42)
# Assert — both slots are populated independently; neither overwrote the other.
assert lodged == epc
assert predicted == epc
def test_re_saving_a_predicted_epc_leaves_the_lodged_one_intact(
db_engine: Engine,
) -> None:
# Arrange — a lodged EPC, then a predicted one saved twice (idempotent re-run).
epc = _epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(epc, property_id=7)
repo.save(epc, property_id=7, source="predicted")
repo.save(epc, property_id=7, source="predicted")
session.commit()
# Act
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
lodged = repo.get_for_property(7)
predicted_bulk = repo.get_predicted_for_properties([7])
# Assert — the lodged slot survived the predicted re-saves; bulk read finds it.
assert lodged == epc
assert predicted_bulk == {7: epc}
def test_a_lodged_only_property_has_no_predicted_epc(db_engine: Engine) -> None:
# Arrange — only a lodged EPC is saved.
epc = _epc()
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
repo.save(epc, property_id=9)
session.commit()
# Act / Assert — the predicted slot is empty; the lodged one is unaffected.
with Session(db_engine) as session:
repo = EpcPostgresRepository(session)
assert repo.get_predicted_for_property(9) is None
assert repo.get_for_property(9) == epc