Model/infrastructure/postgres/modelling/recommendation_table.py
Khalim Conn-Kowlessar c1c7b06f09 refactor(modelling): consolidate plan/recommendation models into infrastructure
Move the live plan, recommendation, recommendation_materials and (retiring)
plan_recommendations tables into a new infrastructure/postgres/modelling/
subpackage as single SQLModel definitions (the epc_property pattern), absorbing
the rebuild's partial PlanRow/RecommendationRow mirrors and carrying full
legacy column parity plus recommendation.plan_id. Out-of-cluster references are
plain indexed ints (mirror convention); the live FKs are owned by the Drizzle
schema. backend/app/db/models/recommendations.py becomes a re-export shim
(ScenarioModel/InstalledMeasure stay for a later slice).

Fix the export conftest to create SQLModel-first (so Base funding_package's FK
to the now-SQLModel plan resolves) and skip the redundant drop_all on its
function-scoped throwaway DB (the epc enum type is now shared across both
metadatas). Resolves the pre-existing dual-definition collision: the rebuild
and legacy export suites are now co-runnable. No behaviour change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 21:00:14 +00:00

139 lines
5.5 KiB
Python

from __future__ import annotations
from datetime import datetime
from typing import ClassVar, Optional
from sqlalchemy import BigInteger, Column, ForeignKey, TIMESTAMP
from sqlalchemy import Enum as SAEnum
from sqlalchemy.sql import func
from sqlmodel import Field, SQLModel
from datatypes.enums import QuantityUnits
from domain.modelling.plan import PlanMeasure
# Calculator metrics are in kg CO₂/yr; the live ``recommendation`` column is
# tonnes (legacy ``emissions_kg / 1000``). Convert on the way in.
_KG_PER_TONNE = 1000.0
class RecommendationRow(SQLModel, table=True):
"""The single SQLModel definition of the live ``recommendation`` table
(ADR-0017 amendment) — one row per persisted Plan Measure.
Carries full legacy column parity (the readers iterate the columns / sum
them) **plus** ``plan_id``, the FK that links a measure to its Plan and
replaces the retired ``plan_recommendations`` m2m. Out-of-cluster columns
(``property_id``) are plain indexed ints, not FK constraints, matching the
mirror convention so ``SQLModel.metadata.create_all`` needs no foreign
table to exist (the live FKs are owned by the Drizzle schema).
"""
__tablename__: ClassVar[str] = "recommendation" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
property_id: int = Field(index=True)
plan_id: Optional[int] = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("plan.id", ondelete="CASCADE"),
nullable=True,
index=True,
),
)
created_at: Optional[datetime] = Field(
default=None,
sa_column=Column(TIMESTAMP, nullable=False, server_default=func.now()),
)
type: str
measure_type: Optional[str] = Field(default=None)
description: str
estimated_cost: Optional[float] = Field(default=None)
starting_u_value: Optional[float] = Field(default=None)
new_u_value: Optional[float] = Field(default=None)
sap_points: Optional[float] = Field(default=None)
heat_demand: Optional[float] = Field(default=None)
kwh_savings: Optional[float] = Field(default=None) # delivered kWh/yr
co2_equivalent_savings: Optional[float] = Field(default=None) # tonnes/yr
energy_savings: Optional[float] = Field(default=None)
energy_cost_savings: Optional[float] = Field(default=None) # £/yr
property_valuation_increase: Optional[float] = Field(default=None)
rental_yield_increase: Optional[float] = Field(default=None)
total_work_hours: Optional[float] = Field(default=None)
labour_days: Optional[float] = Field(default=None)
default: bool = True
already_installed: bool = False
@classmethod
def from_domain(
cls, measure: PlanMeasure, *, property_id: int, plan_id: int
) -> "RecommendationRow":
return cls(
property_id=property_id,
plan_id=plan_id,
type=measure.measure_type,
measure_type=measure.measure_type,
description=measure.description,
estimated_cost=measure.cost.total,
sap_points=measure.impact.sap_points,
co2_equivalent_savings=(
measure.impact.co2_savings_kg_per_yr / _KG_PER_TONNE
),
kwh_savings=measure.kwh_savings,
energy_cost_savings=measure.energy_cost_savings,
default=True,
already_installed=False,
)
class RecommendationMaterialRow(SQLModel, table=True):
"""The live ``recommendation_materials`` table — one row per material used
by a Recommendation. ``recommendation_id`` is an intra-cluster FK;
``material_id`` is a plain int (out-of-cluster, mirror convention)."""
__tablename__: ClassVar[str] = "recommendation_materials" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
recommendation_id: int = Field(
sa_column=Column(
BigInteger, ForeignKey("recommendation.id"), nullable=False
)
)
material_id: int = Field(index=True)
created_at: Optional[datetime] = Field(
default=None,
sa_column=Column(TIMESTAMP, nullable=False, server_default=func.now()),
)
depth: Optional[float] = Field(default=None)
quantity: Optional[float] = Field(default=None)
quantity_unit: Optional[QuantityUnits] = Field(
default=None,
sa_column=Column(
SAEnum(
QuantityUnits,
values_callable=lambda cls: [m.value for m in cls], # pyright: ignore[reportUnknownLambdaType, reportUnknownMemberType, reportUnknownVariableType]
),
nullable=True,
),
)
estimated_cost: Optional[float] = Field(default=None)
class PlanRecommendationRow(SQLModel, table=True):
"""The legacy ``plan_recommendations`` m2m — **being retired** (ADR-0017
amendment). Kept as an intra-cluster SQLModel row only for the transition
window while readers/writers move onto ``recommendation.plan_id``; dropped
once no caller remains. Both FKs are intra-cluster."""
__tablename__: ClassVar[str] = "plan_recommendations" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
plan_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("plan.id"), nullable=False)
)
recommendation_id: int = Field(
sa_column=Column(
BigInteger, ForeignKey("recommendation.id"), nullable=False
)
)