"""End-to-end through-repos integration for First Run (ADR-0012, #1138). Real PostgresUnitOfWork over an ephemeral DB: Ingestion writes the EPC, Baseline reads it back *through the repo* (not in memory), and a re-run replaces rather than duplicates. Stub Modelling. The source clients are faked (no IO).""" from __future__ import annotations import dataclasses import json from dataclasses import dataclass from pathlib import Path from typing import Any, Optional from sqlalchemy import Engine from sqlmodel import Session, col, select from datatypes.epc.domain.epc import Epc from datatypes.epc.domain.epc_property_data import EpcPropertyData from datatypes.epc.domain.mapper import EpcPropertyDataMapper from domain.property_baseline.rebaseliner import StubRebaseliner from domain.sap10_calculator.calculator import Sap10Calculator from domain.modelling.portfolio_goal import PortfolioGoal from infrastructure.postgres.modelling import ScenarioModel from domain.geospatial.coordinates import Coordinates from domain.geospatial.planning_restrictions import PlanningRestrictions from domain.geospatial.spatial_reference import SpatialReference from tests.domain.modelling._elmhurst_recommendation import ( parse_recommendation_summary, ) from infrastructure.postgres.property_baseline_performance_table import ( PropertyBaselinePerformanceModel, ) from infrastructure.postgres.epc_property_table import EpcPropertyModel from infrastructure.postgres.modelling import PlanModel, RecommendationModel from infrastructure.postgres.product_table import MaterialRow from infrastructure.postgres.property_table import PropertyRow from tests.domain.sap10_calculator.worksheet._elmhurst_worksheet_000490 import ( build_epc as _build_uninsulated_cavity_and_floor_epc, ) from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator from orchestration.ara_first_run_pipeline import AraFirstRunPipeline from orchestration.ingestion_orchestrator import IngestionOrchestrator from orchestration.modelling_orchestrator import ModellingOrchestrator from repositories.epc.epc_postgres_repository import EpcPostgresRepository from repositories.property_baseline.property_baseline_postgres_repository import ( PropertyBaselinePostgresRepository, ) from repositories.fuel_rates.fuel_rates_static_file_repository import ( FuelRatesStaticFileRepository, ) from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.postgres_unit_of_work import PostgresUnitOfWork _JSON_SAMPLES = Path(__file__).resolve().parents[2] / "backend/epc_api/json_samples" @dataclass class _FakeCommand: portfolio_id: int property_ids: list[int] scenario_ids: list[int] class _FetcherReturning: def __init__(self, epc: EpcPropertyData) -> None: self._epc = epc def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: return self._epc class _NoCoordinates(GeospatialRepository): def coordinates_for(self, uprn: int) -> Optional[Coordinates]: return None # skip the solar leg — not under test here class _UnusedSolarFetcher: def get_building_insights( self, longitude: float, latitude: float ) -> dict[str, Any]: # pragma: no cover return {} def _lodged_epc() -> EpcPropertyData: # A real, persistable EPC (so it round-trips through the EPC repo), with the # recorded-performance fields the sample leaves blank filled in so Baseline # can read its Lodged Performance. raw: dict[str, Any] = json.loads( (_JSON_SAMPLES / "RdSAP-Schema-21.0.0" / "epc.json").read_text() ) epc = EpcPropertyDataMapper.from_api_response(raw) return dataclasses.replace( epc, energy_rating_current=72, current_energy_efficiency_band=Epc.C, co2_emissions_current=1.8, energy_consumption_current=180, ) def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun( db_engine: Engine, ) -> None: # Arrange — a property row to ingest against, and the EPC its fetcher returns. with Session(db_engine) as session: session.add( PropertyRow( id=10, portfolio_id=1, postcode="A0 0AA", address="1 Some Street", uprn=12345, ) ) # Modelling now runs for real: it reads scenario 7 (the command's # scenario_ids) through the repo, so the row must exist. session.add( ScenarioModel( id=7, goal=PortfolioGoal.INCREASING_EPC, goal_value="C", is_default=True ) ) # The sample EPC's solid floor is uninsulated, so the floor generator # fires during candidate generation and prices against this Product. The # ventilation Measure Dependency is built for every not-yet-ventilated # dwelling, so its Product must exist too (ADR-0016). The EPC also lodges # a single-glazed window, so the glazing generator fires and reaches for # the double-glazing Product (ADR-0022). session.add_all( [ MaterialRow( id=5, type="air_source_heat_pump", total_cost=12000.0, cost_unit="gbp_per_unit", is_active=True, description="Air source heat pump", ), MaterialRow( id=1, type="solid_floor_insulation", total_cost=25.0, cost_unit="gbp_per_m2", is_active=True, description="Solid floor insulation", ), MaterialRow( id=2, type="mechanical_ventilation", total_cost=450.0, cost_unit="gbp_per_unit", is_active=True, description="Mechanical extract ventilation unit", ), MaterialRow( id=3, type="double_glazing", total_cost=600.0, cost_unit="gbp_per_unit", is_active=True, description="Double glazing", ), MaterialRow( id=4, type="low_energy_lighting_installation", total_cost=8.0, cost_unit="gbp_per_unit", is_active=True, description="LED bulb", ), MaterialRow( id=6, type="boiler_upgrade", total_cost=3000.0, cost_unit="gbp_per_unit", is_active=True, description="Gas condensing boiler", ), MaterialRow( id=7, type="roomstat_programmer_trvs", total_cost=500.0, cost_unit="gbp_per_unit", is_active=True, description="Heating controls + cylinder tune-up", ), MaterialRow( id=8, type="time_temperature_zone_control", total_cost=900.0, cost_unit="gbp_per_unit", is_active=True, description="Zoned heating controls + cylinder tune-up", ), # secondary_heating_removal is off-catalogue (priced from the # JSON overlay, not the pgEnum-constrained material table). ] ) session.commit() def unit_of_work() -> PostgresUnitOfWork: return PostgresUnitOfWork(lambda: Session(db_engine)) pipeline = AraFirstRunPipeline( ingestion=IngestionOrchestrator( unit_of_work=unit_of_work, epc_fetcher=_FetcherReturning(_lodged_epc()), geospatial_repo=_NoCoordinates(), solar_fetcher=_UnusedSolarFetcher(), ), baseline=PropertyBaselineOrchestrator( unit_of_work=unit_of_work, rebaseliner=StubRebaseliner(), fuel_rates=FuelRatesStaticFileRepository(), ), modelling=ModellingOrchestrator( unit_of_work=unit_of_work, calculator=Sap10Calculator(), fuel_rates=FuelRatesStaticFileRepository(), ), ) command = _FakeCommand(portfolio_id=1, property_ids=[10], scenario_ids=[7]) # Act — First Run, then a re-run over the same batch. pipeline.run(command) pipeline.run(command) # Assert — Baseline read the EPC Ingestion persisted (through the repo, only # property_ids crossed the stage boundary), and the re-run replaced rather # than duplicated either row. with Session(db_engine) as session: baseline = PropertyBaselinePostgresRepository(session).get_for_property(10) epc_rows = session.exec( select(EpcPropertyModel).where(EpcPropertyModel.property_id == 10) ).all() baseline_rows = session.exec( select(PropertyBaselinePerformanceModel).where( PropertyBaselinePerformanceModel.property_id == 10 ) ).all() assert baseline is not None assert baseline.lodged.sap_score == 72 assert baseline.space_heating_kwh == 13120.0 assert len(epc_rows) == 1 assert len(baseline_rows) == 1 def test_modelling_optimises_and_persists_a_multi_measure_plan( db_engine: Engine, ) -> None: # Arrange — an EPC with an uninsulated cavity wall AND an uninsulated # suspended floor (loft already at 300mm), so the wall + floor Generators # both fire and the Optimiser selects from two groups. We drive the # Modelling stage directly off a repo-seeded EPC rather than the full # pipeline: this calculator fixture has no lodged recorded-performance # fields, so the Baseline stage (not under test here) can't run on it. # SAP-numeric correctness is pinned in test_elmhurst_cascade_pins; here we # prove the multi-measure Plan is optimised, priced, attributed and # persisted. The property is band D (~57.4) and tops out at ~61, so the # goal-C target is unreachable — this exercises the least-cost-to-target # objective's **max-gain fallback** (ADR-0016 amendment): best effort, all # measures, below target. with Session(db_engine) as session: session.add( PropertyRow( id=30, portfolio_id=1, postcode="A0 0AA", address="3 Some Street", uprn=33333, ) ) session.add( ScenarioModel( id=7, goal=PortfolioGoal.INCREASING_EPC, goal_value="C", is_default=True ) ) session.add_all( [ MaterialRow( id=5, type="air_source_heat_pump", total_cost=12000.0, cost_unit="gbp_per_unit", is_active=True, description="Air source heat pump", ), MaterialRow( id=1, type="cavity_wall_insulation", total_cost=18.5, cost_unit="gbp_per_m2", is_active=True, description="Cavity wall insulation", ), MaterialRow( id=2, type="suspended_floor_insulation", total_cost=25.0, cost_unit="gbp_per_m2", is_active=True, description="Suspended floor insulation", ), MaterialRow( id=3, type="mechanical_ventilation", total_cost=450.0, cost_unit="gbp_per_unit", is_active=True, description="Mechanical extract ventilation unit", ), MaterialRow( id=4, type="low_energy_lighting_installation", total_cost=8.0, cost_unit="gbp_per_unit", is_active=True, description="LED bulb", ), # No secondary_heating_removal row: the FE-owned ``material.type`` # pgEnum cannot carry that Measure Type, so it is priced from the # committed off-catalogue JSON overlay (£270, no material_id) that # the Unit of Work layers over the catalogue, not from the DB. ] ) session.commit() EpcPostgresRepository(session).save( _build_uninsulated_cavity_and_floor_epc(), property_id=30, portfolio_id=1, ) session.commit() def unit_of_work() -> PostgresUnitOfWork: return PostgresUnitOfWork(lambda: Session(db_engine)) # Act ModellingOrchestrator( unit_of_work=unit_of_work, calculator=Sap10Calculator(), fuel_rates=FuelRatesStaticFileRepository(), ).run(property_ids=[30], scenario_ids=[7], portfolio_id=1) # Assert — one Plan with three Plan Measures: the wall + floor the Optimiser # chose, plus the ventilation Measure Dependency the wall forces in # (ADR-0016). Each is priced and attributed, linked by plan_id. with Session(db_engine) as session: plan = session.exec( select(PlanModel).where(col(PlanModel.property_id) == 30) ).first() assert plan is not None rec_rows = session.exec( select(RecommendationModel).where( col(RecommendationModel.plan_id) == plan.id ) ).all() assert plan.scenario_id == 7 assert plan.portfolio_id == 1 assert plan.is_default is True assert plan.post_sap_points is not None assert plan.post_epc_rating is not None assert plan.cost_of_works is not None assert plan.cost_of_works > 0.0 # Plan-level energy/bill figures derived from the post-package bill vs the # baseline bill at the run's Fuel Rates (ADR-0014 amendment). The max-gain # fallback package is ASHP-led on a *gas* dwelling whose suspended floor is # correctly modelled as exposed (is_exposed_floor now round-trips through # persistence — #TBD): it saves a large amount of delivered energy, but the # gas→electricity fuel switch trades cheap gas kWh for pricier electricity # kWh, so the *bill* is roughly neutral (marginally negative). The exact # value is pinned by the telescoping cascade below; here we only assert the # figure is produced (sign is not load-bearing for a SAP-max fallback). assert plan.post_energy_bill is not None and plan.post_energy_bill > 0.0 assert plan.post_energy_consumption is not None assert plan.post_energy_consumption > 0.0 assert plan.energy_bill_savings is not None assert plan.energy_consumption_savings is not None assert plan.energy_consumption_savings > 0.0 by_type = {rec.type: rec for rec in rec_rows} # The gain-maximising package: the efficient representative heat pump # (Vaillant aroTHERM plus 5 kW, ADR-0025) now raises SAP even on this gas # dwelling, plus the cheap positive-SAP fabric/lighting/secondary measures. # CAVITY-WALL INSULATION is NOT selected: it earns +2.9 SAP alone, but the # fabric→ventilation forced dependency (ADR-0016) drags the wall+ventilation # pair to a NET −1.8 SAP (−0.9 on top of the ASHP package), so the Optimiser # correctly leaves the wall — and therefore its forced ventilation — out. # (The forced wall→ventilation edge itself is covered by # test_measure_dependency / test_optimiser; here we prove the end-to-end # optimise→persist→telescope pipeline on the package the Optimiser keeps.) # The sample EPC lodges 8 low-energy-unknown bulbs (LED upgrade, ADR-0023) # and an electric secondary heater (SAP 691, removal offered per ADR-0028). assert set(by_type) == { "suspended_floor_insulation", "low_energy_lighting", "air_source_heat_pump", "secondary_heating_removal", } # Each catalogue-sourced measure carries the id of the Product it installs # (the MaterialRow ids seeded above), replacing the retired # recommendation_materials BOM with a single material_id on the row. assert by_type["air_source_heat_pump"].material_id == 5 assert by_type["suspended_floor_insulation"].material_id == 2 assert by_type["low_energy_lighting"].material_id == 4 # Secondary heating removal is priced from the off-catalogue JSON overlay # (£270 flat per-dwelling, ADR-0028), so it carries no catalogue material_id. assert by_type["secondary_heating_removal"].material_id is None assert by_type["secondary_heating_removal"].estimated_cost is not None assert abs(by_type["secondary_heating_removal"].estimated_cost - 270.0) <= 1e-6 for rec in rec_rows: assert rec.default is True assert rec.already_installed is False assert rec.sap_points is not None assert rec.estimated_cost is not None # Per-measure bill savings (telescoping cascade, ADR-0014 amendment): each # measure carries its delivered-kWh and £ saving, and they telescope exactly # to the Plan's headline savings. for rec in rec_rows: assert rec.kwh_savings is not None assert rec.energy_cost_savings is not None kwh_total: float = sum(rec.kwh_savings or 0.0 for rec in rec_rows) cost_total: float = sum(rec.energy_cost_savings or 0.0 for rec in rec_rows) assert plan.energy_consumption_savings is not None assert plan.energy_bill_savings is not None assert abs(kwh_total - plan.energy_consumption_savings) <= 1e-6 assert abs(cost_total - plan.energy_bill_savings) <= 1e-6 def test_modelling_recommends_nothing_when_already_at_the_target_band( db_engine: Engine, ) -> None: # Arrange — the same band-D property (~57.4), but a goal of band D, which it # already meets. Least-cost-to-target recommends the cheapest package that # *reaches* the target — and the target is already reached, so the cheapest # package is the empty one. (The old max-gain objective would have # recommended wall + floor + ventilation here, improving within the band the # property is already in — exactly the over-recommendation this objective # removes.) ADR-0016 amendment. with Session(db_engine) as session: session.add( PropertyRow( id=31, portfolio_id=1, postcode="A0 0AA", address="4 Some Street", uprn=44444, ) ) session.add( ScenarioModel( id=8, goal=PortfolioGoal.INCREASING_EPC, goal_value="D", is_default=True ) ) # The fabric Generators + the ventilation dependency builder still run # during candidate generation, so their Products must exist even though # nothing is ultimately selected. session.add_all( [ MaterialRow( id=14, type="air_source_heat_pump", total_cost=12000.0, cost_unit="gbp_per_unit", is_active=True, description="Air source heat pump", ), MaterialRow( id=10, type="cavity_wall_insulation", total_cost=18.5, cost_unit="gbp_per_m2", is_active=True, description="Cavity wall insulation", ), MaterialRow( id=11, type="suspended_floor_insulation", total_cost=25.0, cost_unit="gbp_per_m2", is_active=True, description="Suspended floor insulation", ), MaterialRow( id=12, type="mechanical_ventilation", total_cost=450.0, cost_unit="gbp_per_unit", is_active=True, description="Mechanical extract ventilation unit", ), MaterialRow( id=13, type="low_energy_lighting_installation", total_cost=8.0, cost_unit="gbp_per_unit", is_active=True, description="LED bulb", ), # secondary_heating_removal is off-catalogue (priced from the # JSON overlay, not the pgEnum-constrained material table). ] ) session.commit() EpcPostgresRepository(session).save( _build_uninsulated_cavity_and_floor_epc(), property_id=31, portfolio_id=1, ) session.commit() def unit_of_work() -> PostgresUnitOfWork: return PostgresUnitOfWork(lambda: Session(db_engine)) # Act ModellingOrchestrator( unit_of_work=unit_of_work, calculator=Sap10Calculator(), fuel_rates=FuelRatesStaticFileRepository(), ).run(property_ids=[31], scenario_ids=[8], portfolio_id=1) # Assert — a Plan is persisted with no measures and zero cost; the # post-retrofit figure is the unchanged baseline (still band D). with Session(db_engine) as session: plan = session.exec( select(PlanModel).where(col(PlanModel.property_id) == 31) ).first() assert plan is not None rec_rows = session.exec( select(RecommendationModel).where( col(RecommendationModel.plan_id) == plan.id ) ).all() assert rec_rows == [] assert plan.cost_of_works == 0.0 assert plan.post_epc_rating is Epc.D # No measures → post bill equals the baseline bill → zero savings, but the # post-retrofit bill/consumption are still the (non-zero) current figures. assert plan.post_energy_bill is not None and plan.post_energy_bill > 0.0 assert plan.post_energy_consumption is not None assert plan.post_energy_consumption > 0.0 assert plan.energy_bill_savings == 0.0 assert plan.energy_consumption_savings == 0.0 class _NoEpcFetcher: """An EPC fetcher that returns nothing — the EPC is seeded directly so this e2e drives only the spatial-reference half of Ingestion.""" def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]: return None class _SpatialByUprn(GeospatialRepository): """Resolves a per-UPRN spatial reference (coordinates nulled — the Solar leg is not under test).""" def __init__(self, by_uprn: dict[int, SpatialReference]) -> None: self._by_uprn = by_uprn def coordinates_for(self, uprn: int) -> Optional[Coordinates]: return None def spatial_for(self, uprn: int) -> Optional[SpatialReference]: return self._by_uprn.get(uprn) def test_listed_uprn_ingested_blocks_solid_wall_insulation_in_modelling( db_engine: Engine, ) -> None: # Arrange — two solid-brick uninsulated dwellings: one in a listed building, # one unrestricted. Ingestion caches each UPRN's planning protections; the # EPC is seeded directly (the solid-wall mechanics are pinned elsewhere). listed_reference = SpatialReference( coordinates=None, restrictions=PlanningRestrictions(is_listed=True) ) unrestricted_reference = SpatialReference( coordinates=None, restrictions=PlanningRestrictions() ) solid_brick_epc = parse_recommendation_summary("solid_brick_ewi_001431_before.pdf") with Session(db_engine) as session: session.add_all( [ PropertyRow( id=40, portfolio_id=1, postcode="A0 0AA", address="Listed House", uprn=44444, ), PropertyRow( id=41, portfolio_id=1, postcode="A0 0AA", address="Unrestricted House", uprn=55555, ), ScenarioModel( id=7, goal=PortfolioGoal.INCREASING_EPC, goal_value="C", is_default=True, ), ] ) # The solid-brick EPC fires the floor + solid-wall Generators and the # ventilation dependency, so every Product they reach for must exist. session.add_all( [ MaterialRow( id=5, type="air_source_heat_pump", total_cost=12000.0, cost_unit="gbp_per_unit", is_active=True, description="Air source heat pump", ), MaterialRow( id=1, type="external_wall_insulation", total_cost=100.0, cost_unit="gbp_per_m2", is_active=True, description="External wall insulation", ), MaterialRow( id=2, type="internal_wall_insulation", total_cost=90.0, cost_unit="gbp_per_m2", is_active=True, description="Internal wall insulation", ), MaterialRow( id=3, type="solid_floor_insulation", total_cost=25.0, cost_unit="gbp_per_m2", is_active=True, description="Solid floor insulation", ), MaterialRow( id=4, type="mechanical_ventilation", total_cost=450.0, cost_unit="gbp_per_unit", is_active=True, description="Mechanical extract ventilation unit", ), ] ) session.commit() epc_repo = EpcPostgresRepository(session) epc_repo.save(solid_brick_epc, property_id=40, portfolio_id=1) epc_repo.save(solid_brick_epc, property_id=41, portfolio_id=1) session.commit() def unit_of_work() -> PostgresUnitOfWork: return PostgresUnitOfWork(lambda: Session(db_engine)) geospatial_repo = _SpatialByUprn( {44444: listed_reference, 55555: unrestricted_reference} ) # Act — Ingestion caches the protections per UPRN, then Modelling reads them # back off the Property (through the repo) and gates the solid-wall measures. IngestionOrchestrator( unit_of_work=unit_of_work, epc_fetcher=_NoEpcFetcher(), geospatial_repo=geospatial_repo, solar_fetcher=_UnusedSolarFetcher(), ).run([40, 41]) ModellingOrchestrator( unit_of_work=unit_of_work, calculator=Sap10Calculator(), fuel_rates=FuelRatesStaticFileRepository(), ).run(property_ids=[40, 41], scenario_ids=[7], portfolio_id=1) # Assert — a listed building blocks the fabric-protected measures: both # solid-wall Options AND the ASHP bundle (all gated on `blocks_internal`, # ADR-0024). So the listed dwelling gets neither, while the unrestricted one # gets the ASHP bundle (which the efficient Vaillant now makes the Optimiser # select — ADR-0025, so walls are no longer needed to reach the band). The # only difference between them is the planning status Ingestion cached, # proving the gate end to end (ADR-0019/0020/0024). _PROTECTED_TYPES = { "external_wall_insulation", "internal_wall_insulation", "air_source_heat_pump", } with Session(db_engine) as session: listed_types = _plan_measure_types(session, property_id=40) unrestricted_types = _plan_measure_types(session, property_id=41) assert _PROTECTED_TYPES.isdisjoint(listed_types) assert "air_source_heat_pump" in unrestricted_types def _plan_measure_types(session: Session, *, property_id: int) -> set[str]: plan = session.exec( select(PlanModel).where(col(PlanModel.property_id) == property_id) ).first() assert plan is not None rec_rows = session.exec( select(RecommendationModel).where(col(RecommendationModel.plan_id) == plan.id) ).all() return {rec.type for rec in rec_rows}