feat(ara): AraFirstRunTriggerBody + ara_first_run lambda skeleton (#1130)

Stage-2 entry point for the First Run use case. Adds the
`ara_first_run` Lambda package mirroring the `postcode_splitter`
template, its typed trigger contract, and a stub `FirstRunPipeline`.

- `AraFirstRunTriggerBody`: thin command of five fields — `task_id`,
  `sub_task_id` (UUID, lifecycle), `portfolio_id`, `property_ids`,
  `scenario_ids` (int business IDs). No `model_config` override, so
  Pydantic's default `extra="ignore"` lets the FastAPI backend add
  fields without breaking deployed lambdas. UPRNs / Scenario defs are
  deliberately off the event — read from source-of-truth tables.
- Thin `handler.py`: validate-and-delegate only, via a named
  `dispatch_first_run` seam (testable without the Lambda runtime).
  Subtask status (in-progress/complete/failed) + CloudWatch log URL
  come for free from the existing `@subtask_handler()` decorator.
- `FirstRunPipeline` (orchestration/) stub: `run(command)` receives the
  validated command. Declares a structural `FirstRunCommand` Protocol
  (the three business fields) that `AraFirstRunTriggerBody` satisfies,
  so orchestration needs no application-layer import — rhymes with the
  `EpcFetcher`/`SolarFetcher` Protocols on IngestionOrchestrator
  (ADR-0011). Full Ingestion→Baseline→Modelling composition lands in
  #1136.
- Dockerfile / requirements.txt / local_handler/ mirror postcode_splitter.

TDD: 7 new tests (trigger-body validation incl. forward-compat +
id-types, pipeline seam, handler delegation). pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-30 20:38:15 +00:00
parent 1696cccba6
commit 75fbba60fc
15 changed files with 382 additions and 0 deletions

View file

@ -0,0 +1,34 @@
FROM public.ecr.aws/lambda/python:3.11
# Postgres host/port/database are baked into the image at build time from
# the deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets),
# mirroring applications/postcode_splitter/Dockerfile. They map onto the
# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT
# baked in -- Terraform injects those as Lambda env vars from Secrets Manager.
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV POSTGRES_HOST=${DEV_DB_HOST}
ENV POSTGRES_PORT=${DEV_DB_PORT}
ENV POSTGRES_DATABASE=${DEV_DB_NAME}
WORKDIR /var/task
COPY applications/ara_first_run/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the layered source the handler imports from. DDD-shaped packages only —
# no pandas, no legacy backend/.
COPY domain/ domain/
COPY infrastructure/ infrastructure/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY utilities/ utilities/
COPY applications/ applications/
# Place the handler at the Lambda task root so the runtime can resolve
# ``main.handler`` without an extra package prefix.
COPY applications/ara_first_run/handler.py /var/task/main.py
CMD ["main.handler"]

View file

View file

@ -0,0 +1,25 @@
from __future__ import annotations
from uuid import UUID
from pydantic import BaseModel
class AraFirstRunTriggerBody(BaseModel):
"""The SQS event the ``ara_first_run`` Lambda is triggered with.
A thin command. ``task_id``/``sub_task_id`` drive the SubTask lifecycle (the
``@subtask_handler`` decorator reads them); the three business fields are what
the pipeline threads downstream. UPRNs and Scenario definitions are
deliberately absent they are read from their source-of-truth tables, not
carried on the event (issue #1130).
No ``model_config`` override: Pydantic's default ``extra="ignore"`` lets the
FastAPI backend add fields to the payload without breaking deployed lambdas.
"""
task_id: UUID
sub_task_id: UUID
portfolio_id: int
property_ids: list[int]
scenario_ids: list[int]

View file

@ -0,0 +1,34 @@
from __future__ import annotations
from typing import Any, Protocol
from applications.ara_first_run.ara_first_run_trigger_body import (
AraFirstRunTriggerBody,
)
from orchestration.first_run_pipeline import FirstRunPipeline
from orchestration.task_orchestrator import TaskOrchestrator
from utilities.aws_lambda.subtask_handler import subtask_handler
class _RunsFirstRun(Protocol):
"""The slice of FirstRunPipeline the handler delegates to."""
def run(self, command: AraFirstRunTriggerBody) -> None: ...
def dispatch_first_run(body: dict[str, Any], *, pipeline: _RunsFirstRun) -> None:
"""Validate the raw event body and hand the command to the pipeline.
The handler's entire job — kept as a named seam so it is exercised without
the Lambda runtime. No business logic lives here: validate, then delegate
(issue #1130).
"""
trigger = AraFirstRunTriggerBody.model_validate(body)
pipeline.run(trigger)
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> None:
dispatch_first_run(body, pipeline=FirstRunPipeline())

View file

@ -0,0 +1,28 @@
# Local-test environment for the ara_first_run Lambda.
#
# cp .env.local.example .env.local then fill in the values below.
#
# .env.local is gitignored. The container hits a REAL Postgres (the SubTask
# lifecycle store), so every value here points at infrastructure that exists.
#
# NOTE: the DDD code uses different env var names than the repo root .env. The
# mapping (root .env name -> var here) is given per section. Keep comments on
# their own lines — docker-compose's env_file parser folds a trailing "# ..."
# into the value.
# --- Postgres (utilities/aws_lambda/default_orchestrator -> PostgresConfig.from_env) ---
# POSTGRES_HOST <- DB_HOST, PORT <- DB_PORT, USERNAME <- DB_USERNAME,
# PASSWORD <- DB_PASSWORD, DATABASE <- DB_NAME.
POSTGRES_HOST=
POSTGRES_PORT=5432
POSTGRES_USERNAME=
POSTGRES_PASSWORD=
POSTGRES_DATABASE=
# POSTGRES_DRIVER=psycopg2 (optional; defaults to psycopg2)
# --- AWS credentials for boto3 (used by later slices; the SubTask lifecycle
# CloudWatch URL is read from the Lambda runtime's own AWS_* env in prod) ---
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=eu-west-2
# AWS_SESSION_TOKEN= (only if using temporary/SSO credentials)

View file

@ -0,0 +1,9 @@
services:
ara-first-run:
build:
context: ../../../
dockerfile: applications/ara_first_run/Dockerfile
ports:
- "9002:8080"
env_file:
- .env.local

View file

@ -0,0 +1,30 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9002"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101, 102, 103],
"scenario_ids": [7, 8],
}
)
}
]
}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")"
if [ ! -f .env.local ]; then
cp .env.local.example .env.local
echo "Created .env.local from the template — fill it in, then re-run." >&2
exit 1
fi
docker compose build --no-cache
docker compose up --force-recreate

View file

@ -0,0 +1,4 @@
boto3
pydantic
sqlmodel
psycopg2-binary

View file

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Protocol
class FirstRunCommand(Protocol):
"""The slice of the trigger the pipeline threads downstream.
Only the business fields UPRNs and Scenario definitions are read from
their source-of-truth tables, not carried here. ``task_id``/``sub_task_id``
are deliberately absent: the SubTask lifecycle is the decorator's concern,
not the pipeline's. ``AraFirstRunTriggerBody`` satisfies this structurally,
so ``orchestration`` need not import the application-layer event type.
"""
@property
def portfolio_id(self) -> int: ...
@property
def property_ids(self) -> list[int]: ...
@property
def scenario_ids(self) -> list[int]: ...
class FirstRunPipeline:
"""Composes the First Run stages end-to-end (Ingestion -> Baseline ->
Modelling), threading only ``property_ids`` between them through repos
(ADR-0011).
Stub at this stage (#1130): ``run`` simply receives the validated command.
The real three-stage composition lands in #1136.
"""
def run(self, command: FirstRunCommand) -> None:
return None

View file

View file

@ -0,0 +1,97 @@
from __future__ import annotations
from uuid import UUID
import pytest
from pydantic import ValidationError
from applications.ara_first_run.ara_first_run_trigger_body import (
AraFirstRunTriggerBody,
)
def test_validates_well_formed_body_into_typed_fields() -> None:
# Arrange
body = {
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101, 102, 103],
"scenario_ids": [7, 8],
}
# Act
trigger = AraFirstRunTriggerBody.model_validate(body)
# Assert
assert trigger.task_id == UUID("e295d89b-a7c5-4a9a-8b4e-b405fab1f298")
assert trigger.sub_task_id == UUID("f4a9944f-41f0-4a33-8669-5016ec574068")
assert trigger.portfolio_id == 42
assert trigger.property_ids == [101, 102, 103]
assert trigger.scenario_ids == [7, 8]
def test_ignores_unknown_extra_fields() -> None:
# Arrange — the FastAPI backend may add fields the deployed lambda predates.
body = {
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101],
"scenario_ids": [7],
"a_field_added_later_by_the_backend": "ignore me",
}
# Act
trigger = AraFirstRunTriggerBody.model_validate(body)
# Assert — the unknown field is dropped, not retained or rejected.
assert not hasattr(trigger, "a_field_added_later_by_the_backend")
assert trigger.portfolio_id == 42
def test_rejects_body_missing_a_required_field() -> None:
# Arrange — scenario_ids omitted.
body = {
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101],
}
# Act / Assert
with pytest.raises(ValidationError) as exc_info:
AraFirstRunTriggerBody.model_validate(body)
assert "scenario_ids" in str(exc_info.value)
def test_rejects_non_uuid_task_id() -> None:
# Arrange
body = {
"task_id": "not-a-uuid",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101],
"scenario_ids": [7],
}
# Act / Assert
with pytest.raises(ValidationError) as exc_info:
AraFirstRunTriggerBody.model_validate(body)
assert "task_id" in str(exc_info.value)
def test_rejects_non_int_portfolio_id() -> None:
# Arrange — business IDs are integers, not strings.
body = {
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": "not-an-int",
"property_ids": [101],
"scenario_ids": [7],
}
# Act / Assert
with pytest.raises(ValidationError) as exc_info:
AraFirstRunTriggerBody.model_validate(body)
assert "portfolio_id" in str(exc_info.value)

View file

@ -0,0 +1,44 @@
from __future__ import annotations
from typing import Optional
from uuid import UUID
from applications.ara_first_run.ara_first_run_trigger_body import (
AraFirstRunTriggerBody,
)
from applications.ara_first_run.handler import dispatch_first_run
from orchestration.first_run_pipeline import FirstRunCommand
class _SpyPipeline:
"""Records the command it is asked to run, instead of composing stages."""
def __init__(self) -> None:
self.received: Optional[FirstRunCommand] = None
def run(self, command: FirstRunCommand) -> None:
self.received = command
def test_validates_the_event_body_and_delegates_the_command_to_the_pipeline() -> None:
# Arrange — a raw SQS body, as the decorator hands it to the handler.
body = {
"task_id": "e295d89b-a7c5-4a9a-8b4e-b405fab1f298",
"sub_task_id": "f4a9944f-41f0-4a33-8669-5016ec574068",
"portfolio_id": 42,
"property_ids": [101, 102],
"scenario_ids": [7],
}
pipeline = _SpyPipeline()
# Act
dispatch_first_run(body, pipeline=pipeline)
# Assert — the raw body was validated into the typed trigger and handed
# straight on, untouched.
received = pipeline.received
assert isinstance(received, AraFirstRunTriggerBody)
assert received.task_id == UUID("e295d89b-a7c5-4a9a-8b4e-b405fab1f298")
assert received.portfolio_id == 42
assert received.property_ids == [101, 102]
assert received.scenario_ids == [7]

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from dataclasses import dataclass
from orchestration.first_run_pipeline import FirstRunCommand, FirstRunPipeline
@dataclass
class _FakeCommand:
"""A stand-in for AraFirstRunTriggerBody — structurally a FirstRunCommand."""
portfolio_id: int
property_ids: list[int]
scenario_ids: list[int]
def test_run_accepts_the_validated_command() -> None:
# Arrange
command: FirstRunCommand = _FakeCommand(
portfolio_id=42, property_ids=[101, 102], scenario_ids=[7]
)
pipeline = FirstRunPipeline()
# Act
result = pipeline.run(command)
# Assert — the stub simply receives the command; full Ingestion -> Baseline
# -> Modelling composition lands in #1136.
assert result is None