get rid of comments

This commit is contained in:
Jun-te Kim 2026-05-20 13:21:11 +00:00
parent 8bb90a5aa5
commit d0cf3d14ad
22 changed files with 6 additions and 376 deletions

View file

@ -1,15 +1,3 @@
"""Lambda entrypoint for the postcode splitter slice.
The :func:`handler` function is decorated with ``@subtask_handler()`` so the
decorator owns the parent ``SubTask`` lifecycle (start/complete/fail) and
injects the decorator-owned :class:`TaskOrchestrator` as the third positional
argument. The handler itself does only two things:
1. Build a :class:`PostcodeSplitterOrchestrator` from env-driven config.
2. Delegate to ``split_and_dispatch`` and return its result so it lands in
``SubTask.outputs["result"]``.
"""
from __future__ import annotations
import os
@ -34,12 +22,6 @@ from utilities.aws_lambda.subtask_handler import subtask_handler
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> dict[str, list[str]]:
"""Validate the trigger body, build the splitter, dispatch children.
Reads ``S3_BUCKET_NAME`` and ``ADDRESS2UPRN_QUEUE_URL`` from the
environment to construct the typed S3/SQS clients. The return value
lands in ``SubTask.outputs["result"]`` via the decorator.
"""
trigger = PostcodeSplitterTriggerBody.model_validate(body)
bucket = os.environ["S3_BUCKET_NAME"]

View file

@ -1,13 +1,4 @@
#!/usr/bin/env python3
"""POST a single SQS-shaped event at the locally-running splitter Lambda.
The container built by docker-compose runs the AWS Lambda Runtime Interface
Emulator, which accepts invocations on the URL below. Replace the three
placeholder values with a real parent Task id, the splitter's own SubTask id
(both must already exist in the Postgres pointed at by .env.local), and the
s3://... URI of an uploaded address CSV.
"""
import json
import requests

View file

@ -1,30 +1,9 @@
"""Trigger payload model for the postcode splitter Lambda.
The decorator (``@subtask_handler``) already validates ``task_id`` and
``sub_task_id`` via :class:`SubtaskTriggerBody`; this model layers on the
splitter-specific ``s3_uri`` field while keeping ``extra="allow"`` so any
upstream-passthrough keys (e.g. ``portfolio_id``) survive untouched.
"""
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class PostcodeSplitterTriggerBody(BaseModel):
"""Validated body for the postcode splitter Lambda.
Attributes:
task_id: Parent ``Task`` id; used as the ``task_id`` input on each
child ``SubTask`` and as the ``parent_task_id`` on the fan-out
SQS messages.
sub_task_id: The splitter's own ``SubTask`` id; used as the path
segment under ``ara_postcode_splitter_batches/{task_id}/{...}``
so per-invocation outputs cannot collide.
s3_uri: ``s3://bucket/key`` URI of the uploaded address CSV the
splitter must read.
"""
model_config = ConfigDict(extra="allow")
task_id: UUID

View file

@ -1,22 +1,3 @@
"""Pure-Python postcode-grouped batching.
This module preserves the batching invariants from the legacy postcode
splitter (``backend/postcode_splitter/main.py``) without touching pandas,
S3, or SQS:
* Addresses are grouped by **Postcode** in *insertion order* -- the first
Postcode seen produces the first group.
* A Postcode group is never split across two batches.
* If a single Postcode group is larger than ``max_batch_size``, it is
flushed as its own oversize batch (any buffered groups go out first,
untouched).
* Adding a group that would push the buffer past ``max_batch_size`` first
flushes the existing buffer, then starts a new buffer with the group.
* Whatever remains in the buffer after the input is exhausted is flushed
as the final batch.
* Empty input yields no batches.
"""
from __future__ import annotations
from collections.abc import Iterable, Iterator
@ -30,19 +11,6 @@ def iter_postcode_grouped_batches(
*,
max_batch_size: int = 500,
) -> Iterator[list[UserAddress]]:
"""Yield batches of ``UserAddress`` grouped by Postcode.
Args:
addresses: An iterable of :class:`UserAddress`. Order is preserved
within each Postcode group, and groups are yielded in the order
their first member was seen.
max_batch_size: The soft upper bound on batch size, in number of
addresses. A single Postcode group larger than this cap is
dispatched whole (the cap is never used to split a group).
Yields:
Lists of ``UserAddress``. Each list is non-empty.
"""
if max_batch_size < 1:
raise ValueError("max_batch_size must be >= 1")
@ -77,12 +45,6 @@ def iter_postcode_grouped_batches(
def _group_by_postcode_in_order(
addresses: Iterable[UserAddress],
) -> dict[Postcode, list[UserAddress]]:
"""Group addresses by ``postcode`` preserving first-seen order.
Python dicts retain insertion order since 3.7, so a plain dict suffices
for the same effect as pandas ``groupby(..., sort=False)``. ``Postcode``
is a frozen value object, hence hashable and usable as the dict key.
"""
groups: dict[Postcode, list[UserAddress]] = {}
for address in addresses:
groups.setdefault(address.postcode, []).append(address)

View file

@ -1,11 +1,3 @@
"""The :class:`UserAddress` value object.
A frozen dataclass capturing the splitter's domain entity: the raw input
address line, a :class:`~domain.postcode.Postcode`, and an optional internal
reference from the customer dataset. The postcode is a value object that is
canonical by construction, so no caller can hold an un-normalised postcode.
"""
from __future__ import annotations
from dataclasses import dataclass, field
@ -15,32 +7,12 @@ from domain.postcode import Postcode
def _empty_source_row() -> dict[str, str]:
"""Typed default factory for :attr:`UserAddress.source_row`."""
return {}
@dataclass(frozen=True)
class UserAddress:
"""A user-supplied address paired with its canonical postcode.
Attributes:
user_address: The free-text address string as supplied upstream.
postcode: The postcode as a :class:`~domain.postcode.Postcode` value
object -- canonical (uppercased, whitespace stripped) by
construction.
internal_reference: Optional customer-side identifier preserved for
traceability through the matching pipeline.
source_row: The complete original CSV row this address was parsed
from, column name -> cell value. The splitter is a pass-through
router: it groups rows by postcode but must not drop the other
columns the downstream address2uprn stage relies on, so the raw
row travels alongside the parsed fields. Excluded from equality
and hashing -- identity stays defined by the parsed fields above.
"""
user_address: str
postcode: Postcode
internal_reference: Optional[str] = None
source_row: dict[str, str] = field(
default_factory=_empty_source_row, compare=False
)
source_row: dict[str, str] = field(default_factory=_empty_source_row, compare=False)

View file

@ -1,16 +1,3 @@
"""The :class:`Postcode` value object.
A frozen value object that owns postcode sanitisation. Constructing a
``Postcode`` always yields the canonical form -- uppercase with all
whitespace removed -- so no part of the domain can hold an un-normalised
postcode. This matches the legacy splitter's
``df["postcode"].str.upper().str.replace(" ", "")``.
``Postcode`` is the single sanitisation point: anywhere a postcode crosses a
domain boundary it should be wrapped in one, and ``str(postcode)`` gives the
canonical string back for serialisation.
"""
from __future__ import annotations
from dataclasses import dataclass
@ -18,18 +5,6 @@ from dataclasses import dataclass
@dataclass(frozen=True)
class Postcode:
"""A postcode held in canonical form.
The ``value`` passed to the constructor is sanitised eagerly in
:meth:`__post_init__` -- uppercased, with all whitespace (spaces, tabs,
newlines) removed -- so every ``Postcode`` instance is canonical by
construction. Two postcodes that differ only in surface whitespace or
case therefore compare equal.
Attributes:
value: The canonical postcode string (e.g. ``"SW1A1AA"``).
"""
value: str
def __post_init__(self) -> None:

View file

@ -4,12 +4,6 @@ from infrastructure.sqs_client import SqsClient
class Address2UprnQueueClient(SqsClient):
"""SQS client that publishes Address-to-UPRN fan-out messages.
The body shape is fixed by the downstream consumer:
``{"task_id": str, "sub_task_id": str, "s3_uri": str}``
"""
def publish(
self,
*,
@ -17,7 +11,6 @@ class Address2UprnQueueClient(SqsClient):
child_subtask_id: UUID,
s3_uri: str,
) -> str:
"""Send a typed Address-to-UPRN message. Returns the SQS ``MessageId``."""
return self.send(
{
"task_id": str(parent_task_id),

View file

@ -6,20 +6,7 @@ from infrastructure.s3_uri import parse_s3_uri
class CsvS3Client(S3Client):
""":class:`S3Client` subclass that round-trips CSV row dictionaries.
Rows are represented as ``list[dict[str, str]]`` the same shape used by
:func:`csv.DictReader`/``DictWriter`` which keeps the API trivially
compatible with existing CSV helpers in ``utils/s3.py``.
"""
def read_rows(self, s3_uri: str) -> list[dict[str, str]]:
"""Fetch the object at ``s3_uri`` and decode it as a CSV.
The bucket portion of the URI is validated against this client's
configured bucket so cross-bucket reads fail loudly rather than
silently fetching from the wrong place.
"""
bucket, key = parse_s3_uri(s3_uri)
if bucket != self.bucket:
raise ValueError(
@ -31,11 +18,6 @@ class CsvS3Client(S3Client):
return [dict(row) for row in reader]
def save_rows(self, rows: list[dict[str, str]], key: str) -> str:
"""Serialise ``rows`` to CSV under ``key`` and return the ``s3://`` URI.
An empty ``rows`` list is rejected because we cannot otherwise infer
a header row.
"""
if not rows:
raise ValueError("Cannot save an empty rows list: header is unknown")
buffer = StringIO()

View file

@ -2,13 +2,6 @@ from typing import Any
class S3Client:
"""Thin typed wrapper around a boto3 S3 client bound to a single bucket.
The class is deliberately small: it exposes only the byte-level
operations needed by the wider infrastructure layer. Serialisation
(CSV, JSON, etc.) lives in subclasses such as :class:`CsvS3Client`.
"""
def __init__(self, boto_s3_client: Any, bucket: str) -> None:
self._client = boto_s3_client
self._bucket = bucket
@ -18,7 +11,6 @@ class S3Client:
return self._bucket
def get_object(self, key: str) -> bytes:
"""Return the raw bytes stored at ``key`` in this client's bucket."""
response: dict[str, Any] = self._client.get_object(
Bucket=self._bucket, Key=key
)
@ -26,6 +18,5 @@ class S3Client:
return body
def put_object(self, key: str, body: bytes) -> str:
"""Write ``body`` to ``key`` and return the canonical ``s3://`` URI."""
self._client.put_object(Bucket=self._bucket, Key=key, Body=body)
return f"s3://{self._bucket}/{key}"

View file

@ -1,25 +1,7 @@
"""Parse S3 URIs into ``(bucket, key)`` pairs.
A pure-stdlib helper for the infrastructure layer. It deliberately pulls in
neither pandas, boto3, nor the legacy ``utils`` package, so slim Lambda images
that only need URI parsing do not drag the wider data stack along.
Two input shapes are supported:
* canonical S3 URIs --- ``s3://bucket/key``
* AWS S3 console URLs --- ``https://.../s3/object/bucket?prefix=key``
"""
from urllib.parse import unquote
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
"""Return the ``(bucket, key)`` pair addressed by ``s3_uri``.
Raises:
ValueError: if ``s3_uri`` is neither a well-formed ``s3://`` URI nor
an AWS console URL carrying a ``prefix`` query parameter.
"""
if s3_uri.startswith("s3://"):
parts = s3_uri[len("s3://") :].split("/", 1)
if len(parts) < 2 or not parts[0] or not parts[1]:

View file

@ -3,13 +3,6 @@ from typing import Any
class SqsClient:
"""Thin typed wrapper around a boto3 SQS client bound to one queue URL.
The body is JSON-serialised here so callers can pass plain dictionaries
instead of constructing message strings themselves. Typed publish
helpers (e.g. :class:`Address2UprnQueueClient`) build on this contract.
"""
def __init__(self, boto_sqs_client: Any, queue_url: str) -> None:
self._client = boto_sqs_client
self._queue_url = queue_url
@ -19,7 +12,6 @@ class SqsClient:
return self._queue_url
def send(self, body: dict[str, Any]) -> str:
"""JSON-serialise ``body`` and send it. Returns the SQS ``MessageId``."""
response: dict[str, Any] = self._client.send_message(
QueueUrl=self._queue_url,
MessageBody=json.dumps(body),

View file

@ -1,15 +1,3 @@
"""Use-case orchestrator for the postcode splitter Lambda.
Wires the slice-1 domain (``iter_postcode_grouped_batches``), the slice-3
``UserAddressRepository``, the slice-2 ``Address2UprnQueueClient``, and the
slice-4 ``TaskOrchestrator.create_child_subtask`` primitive together.
``split_and_dispatch`` loads the input batch, groups it into per-postcode
chunks, writes each chunk back to S3 under a deterministic prefix, creates a
WAITING child ``SubTask`` for it, and publishes the address-to-UPRN fan-out
message that downstream consumers pick up.
"""
from __future__ import annotations
from uuid import UUID
@ -21,15 +9,6 @@ from repositories.user_address.user_address_repository import UserAddressReposit
class PostcodeSplitterOrchestrator:
"""Split an uploaded address batch into postcode-grouped child SubTasks.
The orchestrator owns the algorithm; the IO collaborators
(:class:`UserAddressRepository`, :class:`Address2UprnQueueClient`) and
the :class:`TaskOrchestrator` lifecycle primitive are injected so the
same wiring can be exercised against moto/SQLite in tests and against
real AWS in the Lambda entrypoint.
"""
def __init__(
self,
task_orchestrator: TaskOrchestrator,
@ -49,19 +28,6 @@ class PostcodeSplitterOrchestrator:
parent_subtask_id: UUID,
input_s3_uri: str,
) -> list[UUID]:
"""Split ``input_s3_uri`` into postcode batches and dispatch each.
For each yielded batch:
1. Persist it under
``ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}``.
2. Create a WAITING child ``SubTask`` with
``inputs={"task_id": str(parent_task_id), "s3_uri": batch_uri}``.
3. Publish an ``address2UPRN`` SQS message referencing the new child.
Returns:
The list of child ``SubTask`` ids, in dispatch order.
"""
addresses = self._user_address_repo.load_batch(input_s3_uri)
path_prefix = (
f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}"

View file

@ -54,12 +54,6 @@ class TaskOrchestrator:
*,
inputs: Optional[dict[str, Any]] = None,
) -> SubTask:
"""Add a new WAITING SubTask under an existing parent Task.
Skips `_cascade`: a new WAITING child against an IN_PROGRESS parent
leaves the parent's status unchanged per `Task.recalculate_from_subtasks`,
so calling it here would be a no-op.
"""
subtask = SubTask.create(task_id=parent_task_id, inputs=inputs)
self._subtasks.create(subtask)
return subtask

View file

@ -1,18 +1,3 @@
"""CSV-on-S3 adapter for :class:`UserAddressRepository`.
Reads upload CSVs that carry a ``postcode`` column (plus optional
``Address 1``/``Address 2``/``Address 3`` and ``Internal Reference``), and
writes batch CSVs that pass *every* original column through unchanged with
one column appended -- ``postcode_clean`` (uppercase, whitespace-stripped) --
which the downstream address2uprn stage groups on.
The splitter is a pass-through router: it must not reshape or drop columns,
because address2uprn has not been migrated and still consumes the legacy
splitter's full-row output. The frontend pre-applies the user's column
mapping at upload time, so this adapter does NOT consult any
``BulkAddressUpload.column_mapping``.
"""
from __future__ import annotations
import uuid
@ -31,33 +16,11 @@ _POSTCODE_CLEAN_COLUMN: str = "postcode_clean"
class UserAddressCsvS3Repository(UserAddressRepository):
"""Persist :class:`UserAddress` batches as CSV objects in S3.
The repo owns the unique-filename-within-prefix convention
(``{ISO datetime}_{8-char uuid}.csv``); callers own the directory
hierarchy supplied as ``path_prefix``.
"""
def __init__(self, csv_client: CsvS3Client, bucket: str) -> None:
self._csv_client = csv_client
self._bucket = bucket
def load_batch(self, s3_uri: str) -> list[UserAddress]:
"""Load upload CSV rows into :class:`UserAddress` objects.
Each row's complete column set is preserved on
:attr:`UserAddress.source_row` so :meth:`save_batch` can pass it
through untouched. The parsed convenience fields are also populated:
``Address 1``/``Address 2``/``Address 3`` are concatenated with
``", "`` (skipping missing/empty parts) into ``user_address``, and
``Internal Reference`` is threaded to
:attr:`UserAddress.internal_reference` (``None`` when missing/empty).
Raises:
ValueError: if the CSV has rows but no ``postcode`` column --
without it the splitter cannot group, and silently emitting
empty postcodes would corrupt every downstream batch.
"""
rows = self._csv_client.read_rows(s3_uri)
if rows and _POSTCODE_COLUMN not in rows[0]:
raise ValueError(
@ -86,16 +49,6 @@ class UserAddressCsvS3Repository(UserAddressRepository):
return addresses
def save_batch(self, addresses: list[UserAddress], path_prefix: str) -> str:
"""Write a pass-through batch CSV under a unique key.
Each output row is the address's original ``source_row`` with a
``postcode_clean`` column appended (the canonical postcode the
downstream address2uprn stage groups on). No original column is
dropped or reshaped.
The key is ``{path_prefix}/{ISO-8601 datetime}_{8-char uuid}.csv``.
Returns the full ``s3://bucket/key`` URI.
"""
rows: list[dict[str, str]] = [
{**addr.source_row, _POSTCODE_CLEAN_COLUMN: str(addr.postcode)}
for addr in addresses

View file

@ -1,10 +1,3 @@
"""Abstract repository for :class:`UserAddress` batches.
Persistence-agnostic interface for loading and saving batches of
:class:`domain.addresses.user_address.UserAddress`. Concrete adapters --
e.g. :class:`UserAddressCsvS3Repository` -- live alongside this module.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
@ -13,18 +6,10 @@ from domain.addresses.user_address import UserAddress
class UserAddressRepository(ABC):
"""Load and persist batches of :class:`UserAddress`.
Implementations choose the underlying storage (S3 CSV, Postgres,
in-memory, ...) but must preserve the canonical column semantics:
the address text, postcode (a :class:`~domain.postcode.Postcode` value
object), and an optional internal reference.
"""
@abstractmethod
def load_batch(self, s3_uri: str) -> list[UserAddress]:
"""Read a batch of addresses from ``s3_uri`` and return domain objects."""
...
@abstractmethod
def save_batch(self, addresses: list[UserAddress], path_prefix: str) -> str:
"""Persist ``addresses`` under ``path_prefix`` and return the URI written."""
...

View file

@ -6,7 +6,6 @@ from domain.postcode import Postcode
def _addrs(postcode: str, n: int) -> list[UserAddress]:
"""Build ``n`` addresses sharing a postcode, with distinct address lines."""
return [
UserAddress(
user_address=f"{i} {postcode} Street", postcode=Postcode(postcode)

View file

@ -6,12 +6,5 @@ REGION = "us-east-1"
def make_boto_client(service_name: str) -> Any:
"""Construct a boto3 client typed as ``Any``.
boto3's overloaded ``client`` signature uses ``Literal[...]`` per service
in the installed stubs, which forces every call site to satisfy
``reportArgumentType`` and ``reportUnknownMemberType`` under strict
pyright. Centralising the cast keeps each test file clean.
"""
factory: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
return factory(service_name, region_name=REGION)

View file

@ -7,10 +7,6 @@ import pytest
@pytest.fixture(autouse=True)
def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
"""Stub AWS creds so botocore doesn't probe the host environment.
Applied automatically to every test in ``tests/infrastructure/``.
"""
keys = (
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",

View file

@ -1,13 +1,3 @@
"""Integration test: PostcodeSplitterOrchestrator wired end-to-end.
Combines moto S3 + moto SQS + an in-memory SQLite session for the
``TaskOrchestrator`` so the full slice-6 wiring is exercised through real
infrastructure adapters (not mocks). The fixture CSV spans three postcodes
with one oversize group, which forces both the buffer-flush-then-oversize
branch and the final-flush branch of
``iter_postcode_grouped_batches`` three batches in total.
"""
from __future__ import annotations
import json

View file

@ -7,10 +7,6 @@ import pytest
@pytest.fixture(autouse=True)
def _aws_creds() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
"""Stub AWS creds so botocore doesn't probe the host environment.
Applied automatically to every test in ``tests/repositories/user_address/``.
"""
keys = (
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",

View file

@ -1,11 +1,3 @@
"""Tests for the @subtask_handler decorator.
Covers the contract that the decorator owns the parent SubTask lifecycle and
injects the decorator-owned TaskOrchestrator as a third positional argument
to the wrapped function so the handler can compose its own use-case
orchestrator that shares the session.
"""
import logging
from collections.abc import Generator, Iterator
from contextlib import contextmanager
@ -14,8 +6,6 @@ from typing import Any
from uuid import UUID
import pytest
_LOGGER_NAME = "utilities.aws_lambda.subtask_handler"
from sqlmodel import Session, SQLModel, create_engine
from domain.tasks.subtasks import SubTaskStatus
@ -25,6 +15,8 @@ from repositories.tasks.subtask_postgres_repository import SubTaskPostgresReposi
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
from utilities.aws_lambda.subtask_handler import subtask_handler
_LOGGER_NAME = "utilities.aws_lambda.subtask_handler"
@dataclass
class Harness:
@ -58,8 +50,6 @@ def _direct_event(task_id: UUID, subtask_id: UUID) -> dict[str, Any]:
def test_subtask_handler_injects_orchestrator_as_third_positional_argument(
harness: Harness,
) -> None:
"""The wrapped function receives the decorator-owned TaskOrchestrator
so it can share the session with its own use-case orchestrator."""
_, subtask = harness.orchestrator.create_task_with_subtask(
task_source="manual:test"
)
@ -123,9 +113,6 @@ def test_subtask_handler_marks_parent_failed_and_reraises_on_error(
def test_subtask_handler_injected_orchestrator_can_create_child_subtask(
harness: Harness,
) -> None:
"""Smoke check the share-the-session promise: the injected orchestrator
is the same one the decorator owns, so a handler can use it to create
child SubTasks under the same session."""
task, subtask = harness.orchestrator.create_task_with_subtask(
task_source="manual:test"
)
@ -150,8 +137,6 @@ def test_subtask_handler_injected_orchestrator_can_create_child_subtask(
def test_subtask_handler_logs_subtask_lifecycle_on_success(
harness: Harness, caplog: pytest.LogCaptureFixture
) -> None:
"""Start and completion are logged at INFO so a successful invocation
leaves a CloudWatch breadcrumb (not just the Lambda runtime lines)."""
task, subtask = harness.orchestrator.create_task_with_subtask(
task_source="manual:test"
)
@ -172,8 +157,6 @@ def test_subtask_handler_logs_subtask_lifecycle_on_success(
def test_subtask_handler_logs_exception_on_failure(
harness: Harness, caplog: pytest.LogCaptureFixture
) -> None:
"""A failing subtask is logged at ERROR with the traceback attached,
before the exception propagates for the Lambda runtime to surface."""
task, subtask = harness.orchestrator.create_task_with_subtask(
task_source="manual:test"
)
@ -198,8 +181,6 @@ def test_subtask_handler_logs_exception_on_failure(
def test_subtask_handler_records_cloudwatch_url_on_subtask(
harness: Harness, monkeypatch: pytest.MonkeyPatch
) -> None:
"""With the AWS Lambda runtime's log env vars present, a CloudWatch deep
link is built and persisted on the SubTask."""
monkeypatch.setenv("AWS_REGION", "eu-west-2")
monkeypatch.setenv(
"AWS_LAMBDA_LOG_GROUP_NAME", "/aws/lambda/postcode-splitter"
@ -232,8 +213,6 @@ def test_subtask_handler_records_cloudwatch_url_on_subtask(
def test_subtask_handler_leaves_cloudwatch_url_unset_outside_lambda(
harness: Harness, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Outside a real Lambda (e.g. the local RIE) the runtime log env vars
are absent, so cloud_logs_url is left unset rather than storing junk."""
for var in (
"AWS_REGION",
"AWS_LAMBDA_LOG_GROUP_NAME",

View file

@ -1,15 +1,7 @@
"""@subtask_handler decorator for Lambdas that operate on existing SubTasks.
Translates an AWS Lambda invocation (SQS-shaped or direct) into
TaskOrchestrator.run_subtask(...) calls, emitting an INFO log line for each
subtask's start and completion and a logged exception on failure. Those lines
land in CloudWatch via the Lambda runtime's stdout/stderr capture.
Each subtask also records ``cloud_logs_url`` -- a deep link to this
invocation's CloudWatch log stream -- so an operator can jump from a SubTask
row straight to its logs. It is built from the environment variables the AWS
Lambda runtime sets, so it is populated only on real Lambda invocations and
left unset under the local RIE (which does not export them).
TaskOrchestrator.run_subtask(...) calls.
"""
import json
@ -94,24 +86,10 @@ def _records(event: dict[str, Any]) -> list[dict[str, Any]]:
def _console_encode(value: str) -> str:
"""Encode a value for a CloudWatch console deep link.
The console expects URL-encoding with the percent signs themselves
re-encoded as ``$25`` -- e.g. ``/`` becomes ``%2F`` becomes ``$252F``.
"""
return quote(value, safe="").replace("%", "$25")
def _cloudwatch_url() -> Optional[str]:
"""Build a CloudWatch console URL for this invocation's log stream.
Sourced entirely from the environment variables the AWS Lambda runtime
sets -- ``AWS_REGION``, ``AWS_LAMBDA_LOG_GROUP_NAME`` and
``AWS_LAMBDA_LOG_STREAM_NAME``. Returns None when any is absent, which is
the case outside a real Lambda (the local RIE does not export them) -- so
``SubTask.cloud_logs_url`` is left unset rather than storing a link that
points nowhere.
"""
region = os.environ.get("AWS_REGION")
log_group = os.environ.get("AWS_LAMBDA_LOG_GROUP_NAME")
log_stream = os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")