Model/orchestration/postcode_splitter_orchestrator.py
2026-05-20 13:21:11 +00:00

55 lines
1.8 KiB
Python

from __future__ import annotations
from uuid import UUID
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
from orchestration.task_orchestrator import TaskOrchestrator
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
from repositories.user_address.user_address_repository import UserAddressRepository
class PostcodeSplitterOrchestrator:
def __init__(
self,
task_orchestrator: TaskOrchestrator,
user_address_repo: UserAddressRepository,
queue_client: Address2UprnQueueClient,
max_batch_size: int = 500,
) -> None:
self._task_orchestrator = task_orchestrator
self._user_address_repo = user_address_repo
self._queue_client = queue_client
self._max_batch_size = max_batch_size
def split_and_dispatch(
self,
*,
parent_task_id: UUID,
parent_subtask_id: UUID,
input_s3_uri: str,
) -> list[UUID]:
addresses = self._user_address_repo.load_batch(input_s3_uri)
path_prefix = (
f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}"
)
child_ids: list[UUID] = []
for batch in iter_postcode_grouped_batches(
addresses, max_batch_size=self._max_batch_size
):
batch_uri = self._user_address_repo.save_batch(batch, path_prefix)
child = self._task_orchestrator.create_child_subtask(
parent_task_id,
inputs={
"task_id": str(parent_task_id),
"s3_uri": batch_uri,
},
)
self._queue_client.publish(
parent_task_id=parent_task_id,
child_subtask_id=child.id,
s3_uri=batch_uri,
)
child_ids.append(child.id)
return child_ids