from __future__ import annotations import os from collections.abc import Callable from typing import Any, Optional, Protocol from sqlalchemy import Engine from sqlmodel import Session from applications.ara_first_run.ara_first_run_trigger_body import ( AraFirstRunTriggerBody, ) from domain.property_baseline.calculator_rebaseliner import CalculatorRebaseliner from domain.sap10_calculator.calculator import Sap10Calculator from infrastructure.postgres.config import PostgresConfig from infrastructure.postgres.engine import make_engine from orchestration.property_baseline_orchestrator import PropertyBaselineOrchestrator from orchestration.ara_first_run_pipeline import AraFirstRunPipeline from orchestration.ingestion_orchestrator import ( EpcFetcher, IngestionOrchestrator, SolarFetcher, ) from orchestration.modelling_orchestrator import ModellingOrchestrator from orchestration.task_orchestrator import TaskOrchestrator from repositories.geospatial.geospatial_repository import GeospatialRepository from repositories.materials.materials_repository import MaterialsRepository from repositories.postgres_unit_of_work import PostgresUnitOfWork from repositories.scenario.scenario_repository import ScenarioRepository from repositories.unit_of_work import UnitOfWork from utilities.aws_lambda.subtask_handler import subtask_handler # Module-scoped so the connection pool is reused across warm Lambda invocations # rather than rebuilt per invocation (ADR-0012). _engine: Optional[Engine] = None def _get_engine() -> Engine: global _engine if _engine is None: _engine = make_engine(PostgresConfig.from_env(dict(os.environ))) return _engine class _RunsFirstRun(Protocol): """The slice of AraFirstRunPipeline the handler delegates to.""" def run(self, command: AraFirstRunTriggerBody) -> None: ... def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None: """Validate the raw event body and hand the command to the pipeline. The handler's entire decision logic — kept as a named seam so it is exercised without the Lambda runtime. No business logic: validate, delegate. """ trigger = AraFirstRunTriggerBody.model_validate(body) pipeline.run(trigger) def build_first_run_pipeline( *, unit_of_work: Callable[[], UnitOfWork], epc_fetcher: EpcFetcher, geospatial_repo: GeospatialRepository, solar_fetcher: SolarFetcher, ) -> AraFirstRunPipeline: """Compose the real three-stage pipeline on a Unit-of-Work factory. Each stage opens its own unit(s) and commits per batch (ADR-0012); the handler no longer holds a session. The source clients are passed in because their config is not settled — see ``_source_clients_from_env``. Modelling is stubbed (#1136); its Scenario / Materials ports are seams. """ return AraFirstRunPipeline( ingestion=IngestionOrchestrator( unit_of_work=unit_of_work, epc_fetcher=epc_fetcher, geospatial_repo=geospatial_repo, solar_fetcher=solar_fetcher, ), baseline=PropertyBaselineOrchestrator( unit_of_work=unit_of_work, # The calculator is load-bearing: effective=calculated for pre-10.2 # certs, lodged + divergence-logged at/above 10.2; a raise aborts the # batch (ADR-0013 amendment). rebaseliner=CalculatorRebaseliner(Sap10Calculator()), ), modelling=ModellingOrchestrator( scenario_repo=ScenarioRepository(), materials_repo=MaterialsRepository(), ), ) def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarFetcher]: """The Ingestion source clients — EPC API, Google Solar, geospatial S3. TODO(deploy): their config (EPC auth token, Google Solar API key, geospatial S3 parquet reader), env-var names, and the pandas/s3fs runtime deps are not settled — that wiring is a separate Terraform piece, out of scope for #1136. Raises until then so the lambda fails loudly rather than half-running. """ raise NotImplementedError( "ara_first_run source-client wiring (EPC / Google Solar / geospatial) " "is pending the deploy/Terraform piece; see #1136." ) @subtask_handler() def handler( body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator ) -> None: engine = _get_engine() unit_of_work: Callable[[], UnitOfWork] = lambda: PostgresUnitOfWork( lambda: Session(engine) ) epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env() pipeline = build_first_run_pipeline( unit_of_work=unit_of_work, epc_fetcher=epc_fetcher, geospatial_repo=geospatial_repo, solar_fetcher=solar_fetcher, ) dispatch_first_run(body, pipeline=pipeline)