Model/applications/ara_first_run/handler.py
Khalim Conn-Kowlessar b77fe26892 feat(first-run): FirstRunPipeline E2E — Ingestion → Baseline → Modelling (#1136)
Completes the First Run spine. Replaces the #1130 stub FirstRunPipeline
with the real three-stage composition and wires it into the handler.

- `FirstRunPipeline.run(command)` sequences Ingestion → Baseline →
  Modelling, threading **only** `property_ids` between stages (and
  `scenario_ids` into Modelling, off the command — never a prior stage's
  output). Stages are injected behind thin `IngestionStage` /
  `BaselineStage` / `ModellingStage` Protocols (the EpcFetcher/SolarFetcher
  idiom), so the handler owns wiring and tests substitute fakes (ADR-0011).
- `ModellingOrchestrator` stub + `ScenarioRepository` / `MaterialsRepository`
  seam ports — `run(property_ids, scenario_ids)` reads through repos, does
  no scoring yet. Method shapes deferred to the Modelling per-service grills
  (Scenario / Scenario Phase / Snapshot / Optimised Package / Plans are rich
  — not pre-empted here).
- Handler delegates to the real pipeline via `build_first_run_pipeline`
  (Postgres-backed repos off the session). The Ingestion source clients
  (EPC API / Google Solar / geospatial S3) are isolated behind one
  `_source_clients_from_env` seam that raises until the deploy/Terraform
  config settles — out of scope for this slice. Subtask complete/failed +
  CloudWatch URL still come from `@subtask_handler`.

Integration test (the criterion's centrepiece): wires REAL Ingestion +
REAL Baseline + stub Modelling through a shared fake EPC repo, with a
repo-backed PropertyRepo composing the Property from that slice. Proves
Baseline reads the very EPC Ingestion persisted — the through-repos
hand-off, no in-memory coupling. Plus a composition test pinning stage
order + only-property_ids threading.

TDD, one test → one impl. pyright strict clean; AAA layout. 116 pass in
the tests/ tree, no regressions.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-30 22:32:58 +00:00

121 lines
4.6 KiB
Python

from __future__ import annotations
import os
from typing import Any, Protocol
from sqlmodel import Session
from applications.ara_first_run.ara_first_run_trigger_body import (
AraFirstRunTriggerBody,
)
from domain.baseline.rebaseliner import StubRebaseliner
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine
from orchestration.baseline_orchestrator import BaselineOrchestrator
from orchestration.first_run_pipeline import FirstRunPipeline
from orchestration.ingestion_orchestrator import (
EpcFetcher,
IngestionOrchestrator,
SolarFetcher,
)
from orchestration.modelling_orchestrator import ModellingOrchestrator
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.baseline.baseline_postgres_repository import (
BaselinePostgresRepository,
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.geospatial.geospatial_repository import GeospatialRepository
from repositories.materials.materials_repository import MaterialsRepository
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from repositories.scenario.scenario_repository import ScenarioRepository
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from utilities.aws_lambda.subtask_handler import subtask_handler
class _RunsFirstRun(Protocol):
"""The slice of FirstRunPipeline the handler delegates to."""
def run(self, command: AraFirstRunTriggerBody) -> None: ...
def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None:
"""Validate the raw event body and hand the command to the pipeline.
The handler's entire decision logic — kept as a named seam so it is
exercised without the Lambda runtime. No business logic lives here: validate,
then delegate (issue #1130/#1136).
"""
trigger = AraFirstRunTriggerBody.model_validate(body)
pipeline.run(trigger)
def build_first_run_pipeline(
*,
session: Session,
epc_fetcher: EpcFetcher,
geospatial_repo: GeospatialRepository,
solar_fetcher: SolarFetcher,
) -> FirstRunPipeline:
"""Compose the real three-stage pipeline over Postgres-backed repos.
The stages share the session's repos and hand off only ``property_ids``
through them (ADR-0011). The source clients are passed in rather than built
here because their config is not settled — see ``_source_clients_from_env``.
Modelling is stubbed (#1136); its Scenario / Materials ports are seams.
"""
epc_repo = EpcPostgresRepository(session)
property_repo = PropertyPostgresRepository(session, epc_repo)
solar_repo = SolarPostgresRepository(session)
baseline_repo = BaselinePostgresRepository(session)
return FirstRunPipeline(
ingestion=IngestionOrchestrator(
property_repo=property_repo,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
epc_repo=epc_repo,
solar_repo=solar_repo,
),
baseline=BaselineOrchestrator(
property_repo=property_repo,
rebaseliner=StubRebaseliner(),
baseline_repo=baseline_repo,
),
modelling=ModellingOrchestrator(
scenario_repo=ScenarioRepository(),
materials_repo=MaterialsRepository(),
),
)
def _source_clients_from_env() -> tuple[EpcFetcher, GeospatialRepository, SolarFetcher]:
"""The Ingestion source clients — EPC API, Google Solar, geospatial S3.
TODO(deploy): their config (EPC auth token, Google Solar API key, geospatial
S3 parquet reader), env-var names, and the pandas/s3fs runtime deps are not
settled — that wiring is a separate Terraform piece, out of scope for #1136.
Raises until then so the lambda fails loudly rather than half-running.
"""
raise NotImplementedError(
"ara_first_run source-client wiring (EPC / Google Solar / geospatial) "
"is pending the deploy/Terraform piece; see #1136."
)
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> None:
engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
epc_fetcher, geospatial_repo, solar_fetcher = _source_clients_from_env()
with Session(engine) as session:
pipeline = build_first_run_pipeline(
session=session,
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
)
dispatch_first_run(body, pipeline=pipeline)
session.commit()