feat(modelling): plan/recommendation SQLModel mirrors + PlanRepository (#1157)

Slice 3 of #1157. Persists a Plan and its Plan Measures to the live
plan / recommendation tables via SQLModel mirrors (ADR-0017).

- infrastructure/postgres/plan_table.py: PlanRow (`plan`) + RecommendationRow
  (`recommendation`) mirrors. RecommendationRow adds the new `plan_id` FK
  (ON DELETE CASCADE) linking each Plan Measure to its Plan, replacing the
  plan_recommendations m2m for new writes. from_domain mappers convert CO2
  kg → tonnes to match the live column contract and derive post_epc_rating
  from the rounded SAP. Only the impact + cost + identity columns the tracer
  fills are declared; energy/bill, U-value, valuation, labour, plan_type are
  left to later slices.
- PlanRepository port + PlanPostgresRepository.save(plan, *, property_id,
  scenario_id, portfolio_id, is_default) -> plan id. Idempotent replace:
  deleting the Plan cascades to its recommendation rows via plan_id, so a
  re-run overwrites (ADR-0012). No commit — the UoW owns the transaction.

2 tests (persist + idempotent re-run); pyright strict clean; 73 pass across
repositories/modelling/orchestration with no regressions.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-03 11:51:02 +00:00
parent 0ebd9cc7fd
commit d66f7eed84
6 changed files with 333 additions and 0 deletions

View file

@ -0,0 +1,118 @@
from __future__ import annotations
from typing import ClassVar, Optional
from sqlalchemy import BigInteger, Column, ForeignKey
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
from datatypes.epc.domain.epc import Epc
from domain.modelling.plan import Plan, PlanMeasure
# Calculator metrics are in kg CO₂/yr; the live `plan` / `recommendation`
# columns are tonnes (legacy `emissions_kg / 1000`). Convert on the way in.
_KG_PER_TONNE = 1000.0
class PlanRow(SQLModel, table=True):
"""SQLModel mirror of the live ``plan`` table (ADR-0017).
Declares only the columns the rebuild writes identity, the flat
post-retrofit headline figures, and the cost aggregates. The legacy
SQLAlchemy model owns the live reads and the columns left for later
slices (valuation, plan_type, the energy/bill cluster). The physical
table is the shared contract.
"""
__tablename__: ClassVar[str] = "plan" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int
property_id: int = Field(index=True)
scenario_id: Optional[int] = Field(default=None)
is_default: bool = False
post_sap_points: Optional[float] = Field(default=None)
post_epc_rating: Optional[Epc] = Field(
default=None,
sa_column=Column(SAEnum(Epc, name="epc"), nullable=True),
)
post_co2_emissions: Optional[float] = Field(default=None) # tonnes/yr
co2_savings: Optional[float] = Field(default=None) # tonnes/yr
cost_of_works: Optional[float] = Field(default=None)
contingency_cost: Optional[float] = Field(default=None)
@classmethod
def from_domain(
cls,
plan: Plan,
*,
property_id: int,
scenario_id: int,
portfolio_id: int,
is_default: bool,
) -> "PlanRow":
return cls(
portfolio_id=portfolio_id,
property_id=property_id,
scenario_id=scenario_id,
is_default=is_default,
post_sap_points=plan.post_sap_continuous,
post_epc_rating=plan.post_epc_rating,
post_co2_emissions=plan.post_retrofit.co2_kg_per_yr / _KG_PER_TONNE,
co2_savings=plan.co2_savings_kg_per_yr / _KG_PER_TONNE,
cost_of_works=plan.cost_of_works,
contingency_cost=plan.contingency_cost,
)
class RecommendationRow(SQLModel, table=True):
"""SQLModel mirror of the live ``recommendation`` table — one row per
persisted Plan Measure (ADR-0017). Adds the new ``plan_id`` FK linking the
measure to its Plan (ON DELETE CASCADE), replacing the ``plan_recommendations``
m2m for new writes. Only the impact + cost columns the tracer fills are
declared; the energy/bill, U-value, valuation and labour columns are left
to later slices.
"""
__tablename__: ClassVar[str] = "recommendation" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
property_id: int = Field(index=True)
plan_id: Optional[int] = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("plan.id", ondelete="CASCADE"),
nullable=True,
index=True,
),
)
type: str
measure_type: Optional[str] = Field(default=None)
description: str
estimated_cost: Optional[float] = Field(default=None)
sap_points: Optional[float] = Field(default=None)
co2_equivalent_savings: Optional[float] = Field(default=None) # tonnes/yr
default: bool = True
already_installed: bool = False
@classmethod
def from_domain(
cls, measure: PlanMeasure, *, property_id: int, plan_id: int
) -> "RecommendationRow":
return cls(
property_id=property_id,
plan_id=plan_id,
type=measure.measure_type,
measure_type=measure.measure_type,
description=measure.description,
estimated_cost=measure.cost.total,
sap_points=measure.impact.sap_points,
co2_equivalent_savings=(
measure.impact.co2_savings_kg_per_yr / _KG_PER_TONNE
),
default=True,
already_installed=False,
)

View file

View file

@ -0,0 +1,55 @@
from __future__ import annotations
from sqlmodel import Session, col, delete
from domain.modelling.plan import Plan
from infrastructure.postgres.plan_table import PlanRow, RecommendationRow
from repositories.plan.plan_repository import PlanRepository
class PlanPostgresRepository(PlanRepository):
"""Maps a Plan and its Plan Measures onto the live ``plan`` /
``recommendation`` tables (ADR-0017). Does not commit the Unit of Work
owns the transaction (ADR-0012)."""
def __init__(self, session: Session) -> None:
self._session = session
def save(
self,
plan: Plan,
*,
property_id: int,
scenario_id: int,
portfolio_id: int,
is_default: bool,
) -> int:
# Idempotent replace for (property_id, scenario_id): deleting the Plan
# cascades to its recommendation rows via the plan_id FK (ON DELETE
# CASCADE), so a re-run overwrites rather than duplicating (ADR-0012).
self._session.exec( # type: ignore[call-overload]
delete(PlanRow).where(
col(PlanRow.property_id) == property_id,
col(PlanRow.scenario_id) == scenario_id,
)
)
plan_row = PlanRow.from_domain(
plan,
property_id=property_id,
scenario_id=scenario_id,
portfolio_id=portfolio_id,
is_default=is_default,
)
self._session.add(plan_row)
self._session.flush()
if plan_row.id is None:
raise ValueError("plan row did not receive an id")
for measure in plan.measures:
self._session.add(
RecommendationRow.from_domain(
measure, property_id=property_id, plan_id=plan_row.id
)
)
return plan_row.id

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.modelling.plan import Plan
class PlanRepository(ABC):
"""Persists a Plan (and its Plan Measures) for a Property + Scenario.
One Plan per (Property, Scenario). The write is idempotent on re-run: it
replaces the existing Plan for that pair rather than duplicating (ADR-0012
/ ADR-0017). `portfolio_id` and `is_default` are supplied by the
orchestrator (the former from the trigger, the latter from the Scenario).
"""
@abstractmethod
def save(
self,
plan: Plan,
*,
property_id: int,
scenario_id: int,
portfolio_id: int,
is_default: bool,
) -> int:
"""Persist ``plan`` and return its Plan id, replacing any existing Plan
for ``(property_id, scenario_id)``."""
...

View file

View file

@ -0,0 +1,131 @@
"""Behaviour of the Postgres-backed PlanRepository: persisting a Plan and its
Plan Measures to the live ``plan`` / ``recommendation`` tables (ADR-0017).
The Plan is the parent; each selected Plan Measure is a ``recommendation`` row
linked by the new ``plan_id`` FK. A re-run replaces (delete the Plan for the
(property, scenario) cascade its recommendations insert fresh), so the
batch write is idempotent (ADR-0012). CO₂ is stored in tonnes (calculator kg
÷ 1000) to match the live column contract.
"""
from __future__ import annotations
from sqlalchemy import Engine
from sqlmodel import Session, col, select
from datatypes.epc.domain.epc import Epc
from domain.modelling.package_scorer import Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import Cost
from domain.modelling.scoring import MeasureImpact
from infrastructure.postgres.plan_table import PlanRow, RecommendationRow
from repositories.plan.plan_postgres_repository import PlanPostgresRepository
def _plan() -> Plan:
measures: tuple[PlanMeasure, ...] = (
PlanMeasure(
measure_type="cavity_wall_insulation",
description="Cavity wall insulation",
cost=Cost(total=1000.0, contingency_rate=0.10),
impact=MeasureImpact(
sap_points=8.0,
co2_savings_kg_per_yr=500.0,
energy_savings_kwh_per_yr=2000.0,
),
),
)
return Plan(
measures=measures,
baseline=Score(
sap_continuous=40.0,
co2_kg_per_yr=4000.0,
primary_energy_kwh_per_yr=20000.0,
),
post_retrofit=Score(
sap_continuous=70.0,
co2_kg_per_yr=3500.0,
primary_energy_kwh_per_yr=18000.0,
),
)
def test_save_persists_plan_and_its_measures_with_tonnes_and_band(
db_engine: Engine,
) -> None:
# Act
with Session(db_engine) as session:
plan_id: int = PlanPostgresRepository(session).save(
_plan(), property_id=10, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Assert
with Session(db_engine) as session:
plan_row = session.get(PlanRow, plan_id)
rec_rows = session.exec(
select(RecommendationRow).where(
col(RecommendationRow.plan_id) == plan_id
)
).all()
assert plan_row is not None
assert plan_row.property_id == 10
assert plan_row.scenario_id == 7
assert plan_row.portfolio_id == 1
assert plan_row.is_default is True
assert plan_row.post_sap_points is not None
assert plan_row.post_co2_emissions is not None
assert plan_row.co2_savings is not None
assert plan_row.cost_of_works is not None
assert plan_row.contingency_cost is not None
assert abs(plan_row.post_sap_points - 70.0) <= 1e-9
assert plan_row.post_epc_rating is Epc.C # SAP 70 → band C
assert abs(plan_row.post_co2_emissions - 3.5) <= 1e-9 # tonnes
assert abs(plan_row.co2_savings - 0.5) <= 1e-9 # (4000-3500)/1000
assert abs(plan_row.cost_of_works - 1000.0) <= 1e-9
assert abs(plan_row.contingency_cost - 100.0) <= 1e-9 # 1000 * 0.10
assert len(rec_rows) == 1
rec = rec_rows[0]
assert rec.estimated_cost is not None
assert rec.sap_points is not None
assert rec.co2_equivalent_savings is not None
assert rec.type == "cavity_wall_insulation"
assert rec.measure_type == "cavity_wall_insulation"
assert rec.description == "Cavity wall insulation"
assert abs(rec.estimated_cost - 1000.0) <= 1e-9
assert abs(rec.sap_points - 8.0) <= 1e-9
assert abs(rec.co2_equivalent_savings - 0.5) <= 1e-9 # tonnes
assert rec.default is True
assert rec.already_installed is False
def test_save_is_idempotent_on_rerun_for_the_same_property_and_scenario(
db_engine: Engine,
) -> None:
# Arrange — first run
with Session(db_engine) as session:
PlanPostgresRepository(session).save(
_plan(), property_id=10, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Act — re-run the same (property, scenario)
with Session(db_engine) as session:
PlanPostgresRepository(session).save(
_plan(), property_id=10, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Assert — replaced, not duplicated (cascade removed the old measures)
with Session(db_engine) as session:
plan_rows = session.exec(
select(PlanRow).where(col(PlanRow.property_id) == 10)
).all()
rec_rows = session.exec(
select(RecommendationRow).where(col(RecommendationRow.property_id) == 10)
).all()
assert len(plan_rows) == 1
assert len(rec_rows) == 1