Model/tests/orchestration/test_ara_first_run_pipeline_integration.py
Khalim Conn-Kowlessar 15da2d3970 feat(baseline): CalculatorRebaseliner — calculator goes load-bearing (ADR-0013 amend)
Slice 5a: the promotion. Replaces StubRebaseliner in production and collapses the
shadow runner into the rebaseliner (ADR-0013 amendment).

- CalculatorRebaseliner runs Sap10Calculator on every Property:
  * sap_version < 10.2 -> Effective Performance IS the calculator output
    (band via Epc.from_sap_score, CO2 kg->t, PEUI rounded), reason "pre_sap10".
  * sap_version >= 10.2 -> Effective = lodged (API figures on-target), and the
    calculator only logs divergence (SAP>0.5, PEUI/CO2 1%) as a validation signal.
  * a calculator raise propagates -> batch aborts (ADR-0012); fix the cert at once.
- Rebaseliner.rebaseline gains property_id (for the divergence log).
- LoggingCalculatorShadow / the calculator_shadow seam removed from the
  orchestrator; its divergence-comparison logic now lives in the rebaseliner.
- StubRebaseliner kept (signature updated) for orchestrator/repo unit tests.

The SapResult->EnergyBreakdown adapter + BillDerivation wiring (to populate the
bill block) follow once the appliances/cooking SapResult fields land.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-02 10:04:24 +00:00

146 lines
5.2 KiB
Python

"""End-to-end through-repos integration for First Run (ADR-0012, #1138).
Real PostgresUnitOfWork over an ephemeral DB: Ingestion writes the EPC, Baseline
reads it back *through the repo* (not in memory), and a re-run replaces rather
than duplicates. Stub Modelling. The source clients are faked (no IO)."""
from __future__ import annotations
import dataclasses
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from sqlalchemy import Engine
from sqlmodel import Session, select
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.property_baseline.rebaseliner import StubRebaseliner
from domain.geospatial.coordinates import Coordinates
from infrastructure.postgres.property_baseline_performance_table import (
PropertyBaselinePerformanceModel,
)
from infrastructure.postgres.epc_property_table import EpcPropertyModel
from infrastructure.postgres.property_table import PropertyRow
from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator
from orchestration.ara_first_run_pipeline import AraFirstRunPipeline
from orchestration.ingestion_orchestrator import IngestionOrchestrator
from orchestration.modelling_orchestrator import ModellingOrchestrator
from repositories.property_baseline.property_baseline_postgres_repository import (
PropertyBaselinePostgresRepository,
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples"
@dataclass
class _FakeCommand:
portfolio_id: int
property_ids: list[int]
scenario_ids: list[int]
class _FetcherReturning:
def __init__(self, epc: EpcPropertyData) -> None:
self._epc = epc
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
return self._epc
class _NoCoordinates(GeospatialRepository):
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
return None # skip the solar leg — not under test here
class _UnusedSolarFetcher:
def get_building_insights(
self, longitude: float, latitude: float
) -> dict[str, Any]: # pragma: no cover
return {}
def _lodged_epc() -> EpcPropertyData:
# A real, persistable EPC (so it round-trips through the EPC repo), with the
# recorded-performance fields the sample leaves blank filled in so Baseline
# can read its Lodged Performance.
raw: dict[str, Any] = json.loads(
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
)
epc = EpcPropertyDataMapper.from_api_response(raw)
return dataclasses.replace(
epc,
energy_rating_current=72,
current_energy_efficiency_band=Epc.C,
co2_emissions_current=1.8,
energy_consumption_current=180,
)
def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
db_engine: Engine,
) -> None:
# Arrange — a property row to ingest against, and the EPC its fetcher returns.
with Session(db_engine) as session:
session.add(
PropertyRow(
id=10,
portfolio_id=1,
postcode="A0 0AA",
address="1 Some Street",
uprn=12345,
)
)
session.commit()
def unit_of_work() -> PostgresUnitOfWork:
return PostgresUnitOfWork(lambda: Session(db_engine))
pipeline = AraFirstRunPipeline(
ingestion=IngestionOrchestrator(
unit_of_work=unit_of_work,
epc_fetcher=_FetcherReturning(_lodged_epc()),
geospatial_repo=_NoCoordinates(),
solar_fetcher=_UnusedSolarFetcher(),
),
baseline=PropertyBaselineOrchestrator(
unit_of_work=unit_of_work,
rebaseliner=StubRebaseliner(),
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
),
)
command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])
# Act — First Run, then a re-run over the same batch.
pipeline.run(command)
pipeline.run(command)
# Assert — Baseline read the EPC Ingestion persisted (through the repo, only
# property_ids crossed the stage boundary), and the re-run replaced rather
# than duplicated either row.
with Session(db_engine) as session:
baseline = PropertyBaselinePostgresRepository(session).get_for_property(10)
epc_rows = session.exec(
select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10)
).all()
baseline_rows = session.exec(
select(PropertyBaselinePerformanceModel).where(
PropertyBaselinePerformanceModel.property_id == 10
)
).all()
assert baseline is not None
assert baseline.lodged.sap_score == 72
assert baseline.space_heating_kwh == 13120.0
assert len(epc_rows) == 1
assert len(baseline_rows) == 1