from __future__ import annotations from uuid import UUID from infrastructure.address2uprn_queue_client import Address2UprnQueueClient from orchestration.task_orchestrator import TaskOrchestrator from domain.addresses.postcode_batching import iter_postcode_grouped_batches from repositories.unstandardised_address.unstandardised_address_list_repository import ( UnstandardisedAddressListRepository, ) class PostcodeSplitterOrchestrator: def __init__( self, task_orchestrator: TaskOrchestrator, unstandardised_address_repo: UnstandardisedAddressListRepository, queue_client: Address2UprnQueueClient, max_batch_size: int = 500, ) -> None: self._task_orchestrator = task_orchestrator self._unstandardised_address_repo = unstandardised_address_repo self._queue_client = queue_client self._max_batch_size = max_batch_size def split_and_dispatch( self, *, parent_task_id: UUID, parent_subtask_id: UUID, input_s3_uri: str, ) -> list[UUID]: addresses = self._unstandardised_address_repo.load_batch(input_s3_uri) path_prefix = ( f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}" ) child_ids: list[UUID] = [] for batch in iter_postcode_grouped_batches( addresses, max_batch_size=self._max_batch_size ): batch_uri = self._unstandardised_address_repo.save_batch(batch, path_prefix) child = self._task_orchestrator.create_child_subtask( parent_task_id, inputs={ "task_id": str(parent_task_id), "s3_uri": batch_uri, }, ) self._queue_client.publish( parent_task_id=parent_task_id, child_subtask_id=child.id, s3_uri=batch_uri, ) child_ids.append(child.id) return child_ids