Model/applications/ara_first_run/handler.py
Khalim Conn-Kowlessar f179950519 feat(baseline): wire BillDerivation into the orchestrator and persist the Bill (ADR-0014)
The PropertyBaselineOrchestrator now reads the current Fuel Rates snapshot
once per batch, builds a BillDerivation, and prices each scored property's
SapResult -> EnergyBreakdown into a Bill carried on PropertyBaselinePerformance
(None only on the stub no-calculator path). The Bill is flattened onto nullable
bill_* flat columns (per-section kwh+cost, standing charges, SEG credit, total)
on the postgres table, with bill_total_annual_bill_gbp as the not-null
discriminator on read-back. Section absent from the bill stays None, not 0.

Updated all four orchestrator construction sites to inject the FuelRatesRepository
port (handler + three test sites), and the FE migration doc to reflect the
prefixed columns and that they are now populated.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-02 18:51:18 +00:00

129 lines
4.9 KiB
Python

from __future__ import annotations
import os
from collections.abc import Callable
from typing import Any, Optional, Protocol
from sqlalchemy import Engine
from sqlmodel import Session
from applications.ara_first_run.ara_first_run_trigger_body import (
AraFirstRunTriggerBody,
)
from domain.property_baseline.calculator_rebaseliner import CalculatorRebaseliner
from domain.sap10_calculator.calculator import Sap10Calculator
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine
from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator
from orchestration.ara_first_run_pipeline import AraFirstRunPipeline
from orchestration.ingestion_orchestrator import (
EpcFetcher,
IngestionOrchestrator,
SolarFetcher,
)
from orchestration.modelling_orchestrator import ModellingOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.fuel_rates.fuel_rates_static_file_repository import (
FuelRatesStaticFileRepository,
)
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.scenario.scenario_repository import ScenarioRepository
from repositories.unit_of_work import UnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
# Module-scoped so the connection pool is reused across warm Lambda invocations
# rather than rebuilt per invocation (ADR-0012).
_engine: Optional[Engine] = None
def _get_engine() -> Engine:
global _engine
if _engine is None:
_engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
return _engine
class _RunsFirstRun(Protocol):
"""The slice of AraFirstRunPipeline the handler delegates to."""
def run(self, command: AraFirstRunTriggerBody) -> None: ...
def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None:
"""Validate the raw event body and hand the command to the pipeline.
The handler's entire decision logic — kept as a named seam so it is
exercised without the Lambda runtime. No business logic: validate, delegate.
"""
trigger = AraFirstRunTriggerBody.model_validate(body)
pipeline.run(trigger)
def build_first_run_pipeline(
*,
unit_of_work: Callable[[], UnitOfWork],
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
) -> AraFirstRunPipeline:
"""Compose the real three-stage pipeline on a Unit-of-Work factory.
Each stage opens its own unit(s) and commits per batch (ADR-0012); the
handler no longer holds a session. The source clients are passed in because
their config is not settled — see ``_source_clients_from_env``. Modelling is
stubbed (#1136); its Scenario / Materials ports are seams.
"""
return AraFirstRunPipeline(
ingestion=IngestionOrchestrator(
unit_of_work=unit_of_work,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
),
baseline=PropertyBaselineOrchestrator(
unit_of_work=unit_of_work,
# The calculator is load-bearing: effective=calculated for pre-10.2
# certs, lodged + divergence-logged at/above 10.2; a raise aborts the
# batch (ADR-0013 amendment).
rebaseliner=CalculatorRebaseliner(Sap10Calculator()),
fuel_rates=FuelRatesStaticFileRepository(),
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
),
)
def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarFetcher]:
"""The Ingestion source clients — EPC API, Google Solar, geospatial S3.
TODO(deploy): their config (EPC auth token, Google Solar API key, geospatial
S3 parquet reader), env-var names, and the pandas/s3fs runtime deps are not
settled — that wiring is a separate Terraform piece, out of scope for #1136.
Raises until then so the lambda fails loudly rather than half-running.
"""
raise NotImplementedError(
"ara_first_run source-client wiring (EPC / Google Solar / geospatial) "
"is pending the deploy/Terraform piece; see #1136."
)
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> None:
engine = _get_engine()
unit_of_work: Callable[[], UnitOfWork] = lambda: PostgresUnitOfWork(
lambda: Session(engine)
)
epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env()
pipeline = build_first_run_pipeline(
unit_of_work=unit_of_work,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
)
dispatch_first_run(body, pipeline=pipeline)