mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Slice 5a: the promotion. Replaces StubRebaseliner in production and collapses the
shadow runner into the rebaseliner (ADR-0013 amendment).
- CalculatorRebaseliner runs Sap10Calculator on every Property:
* sap_version < 10.2 -> Effective Performance IS the calculator output
(band via Epc.from_sap_score, CO2 kg->t, PEUI rounded), reason "pre_sap10".
* sap_version >= 10.2 -> Effective = lodged (API figures on-target), and the
calculator only logs divergence (SAP>0.5, PEUI/CO2 1%) as a validation signal.
* a calculator raise propagates -> batch aborts (ADR-0012); fix the cert at once.
- Rebaseliner.rebaseline gains property_id (for the divergence log).
- LoggingCalculatorShadow / the calculator_shadow seam removed from the
orchestrator; its divergence-comparison logic now lives in the rebaseliner.
- StubRebaseliner kept (signature updated) for orchestrator/repo unit tests.
The SapResult->EnergyBreakdown adapter + BillDerivation wiring (to populate the
bill block) follow once the appliances/cooking SapResult fields land.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
146 lines
5.2 KiB
Python
146 lines
5.2 KiB
Python
"""End-to-end through-repos integration for First Run (ADR-0012, #1138).
|
|
|
|
Real PostgresUnitOfWork over an ephemeral DB: Ingestion writes the EPC, Baseline
|
|
reads it back *through the repo* (not in memory), and a re-run replaces rather
|
|
than duplicates. Stub Modelling. The source clients are faked (no IO)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
from sqlalchemy import Engine
|
|
from sqlmodel import Session, select
|
|
|
|
from datatypes.epc.domain.epc import Epc
|
|
from datatypes.epc.domain.epc_property_data import EpcPropertyData
|
|
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
|
|
from domain.property_baseline.rebaseliner import StubRebaseliner
|
|
from domain.geospatial.coordinates import Coordinates
|
|
from infrastructure.postgres.property_baseline_performance_table import (
|
|
PropertyBaselinePerformanceModel,
|
|
)
|
|
from infrastructure.postgres.epc_property_table import EpcPropertyModel
|
|
from infrastructure.postgres.property_table import PropertyRow
|
|
from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator
|
|
from orchestration.ara_first_run_pipeline import AraFirstRunPipeline
|
|
from orchestration.ingestion_orchestrator import IngestionOrchestrator
|
|
from orchestration.modelling_orchestrator import ModellingOrchestrator
|
|
from repositories.property_baseline.property_baseline_postgres_repository import (
|
|
PropertyBaselinePostgresRepository,
|
|
)
|
|
from repositories.geospatial.geospatial_repository import GeospatialRepository
|
|
from repositories.materials.materials_repository import MaterialsRepository
|
|
from repositories.postgres_unit_of_work import PostgresUnitOfWork
|
|
from repositories.scenario.scenario_repository import ScenarioRepository
|
|
|
|
_JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples"
|
|
|
|
|
|
@dataclass
|
|
class _FakeCommand:
|
|
portfolio_id: int
|
|
property_ids: list[int]
|
|
scenario_ids: list[int]
|
|
|
|
|
|
class _FetcherReturning:
|
|
def __init__(self, epc: EpcPropertyData) -> None:
|
|
self._epc = epc
|
|
|
|
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
|
|
return self._epc
|
|
|
|
|
|
class _NoCoordinates(GeospatialRepository):
|
|
def coordinates_for(self, uprn: int) -> Optional[Coordinates]:
|
|
return None # skip the solar leg — not under test here
|
|
|
|
|
|
class _UnusedSolarFetcher:
|
|
def get_building_insights(
|
|
self, longitude: float, latitude: float
|
|
) -> dict[str, Any]: # pragma: no cover
|
|
return {}
|
|
|
|
|
|
def _lodged_epc() -> EpcPropertyData:
|
|
# A real, persistable EPC (so it round-trips through the EPC repo), with the
|
|
# recorded-performance fields the sample leaves blank filled in so Baseline
|
|
# can read its Lodged Performance.
|
|
raw: dict[str, Any] = json.loads(
|
|
(_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text()
|
|
)
|
|
epc = EpcPropertyDataMapper.from_api_response(raw)
|
|
return dataclasses.replace(
|
|
epc,
|
|
energy_rating_current=72,
|
|
current_energy_efficiency_band=Epc.C,
|
|
co2_emissions_current=1.8,
|
|
energy_consumption_current=180,
|
|
)
|
|
|
|
|
|
def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
|
|
db_engine: Engine,
|
|
) -> None:
|
|
# Arrange — a property row to ingest against, and the EPC its fetcher returns.
|
|
with Session(db_engine) as session:
|
|
session.add(
|
|
PropertyRow(
|
|
id=10,
|
|
portfolio_id=1,
|
|
postcode="A0 0AA",
|
|
address="1 Some Street",
|
|
uprn=12345,
|
|
)
|
|
)
|
|
session.commit()
|
|
|
|
def unit_of_work() -> PostgresUnitOfWork:
|
|
return PostgresUnitOfWork(lambda: Session(db_engine))
|
|
|
|
pipeline = AraFirstRunPipeline(
|
|
ingestion=IngestionOrchestrator(
|
|
unit_of_work=unit_of_work,
|
|
epc_fetcher=_FetcherReturning(_lodged_epc()),
|
|
geospatial_repo=_NoCoordinates(),
|
|
solar_fetcher=_UnusedSolarFetcher(),
|
|
),
|
|
baseline=PropertyBaselineOrchestrator(
|
|
unit_of_work=unit_of_work,
|
|
rebaseliner=StubRebaseliner(),
|
|
),
|
|
modelling=ModellingOrchestrator(
|
|
scenario_repo=ScenarioRepository(),
|
|
materials_repo=MaterialsRepository(),
|
|
),
|
|
)
|
|
command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7])
|
|
|
|
# Act — First Run, then a re-run over the same batch.
|
|
pipeline.run(command)
|
|
pipeline.run(command)
|
|
|
|
# Assert — Baseline read the EPC Ingestion persisted (through the repo, only
|
|
# property_ids crossed the stage boundary), and the re-run replaced rather
|
|
# than duplicated either row.
|
|
with Session(db_engine) as session:
|
|
baseline = PropertyBaselinePostgresRepository(session).get_for_property(10)
|
|
epc_rows = session.exec(
|
|
select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10)
|
|
).all()
|
|
baseline_rows = session.exec(
|
|
select(PropertyBaselinePerformanceModel).where(
|
|
PropertyBaselinePerformanceModel.property_id == 10
|
|
)
|
|
).all()
|
|
|
|
assert baseline is not None
|
|
assert baseline.lodged.sap_score == 72
|
|
assert baseline.space_heating_kwh == 13120.0
|
|
assert len(epc_rows) == 1
|
|
assert len(baseline_rows) == 1
|