mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
57 lines
2 KiB
Python
57 lines
2 KiB
Python
from __future__ import annotations
|
|
|
|
from uuid import UUID
|
|
|
|
from infrastructure.address2uprn_queue_client import Address2UprnQueueClient
|
|
from orchestration.task_orchestrator import TaskOrchestrator
|
|
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
|
|
from repositories.unstandardised_address.unstandardised_address_list_repository import (
|
|
UnstandardisedAddressListRepository,
|
|
)
|
|
|
|
|
|
class PostcodeSplitterOrchestrator:
|
|
def __init__(
|
|
self,
|
|
task_orchestrator: TaskOrchestrator,
|
|
unstandardised_address_repo: UnstandardisedAddressListRepository,
|
|
queue_client: Address2UprnQueueClient,
|
|
max_batch_size: int = 500,
|
|
) -> None:
|
|
self._task_orchestrator = task_orchestrator
|
|
self._unstandardised_address_repo = unstandardised_address_repo
|
|
self._queue_client = queue_client
|
|
self._max_batch_size = max_batch_size
|
|
|
|
def split_and_dispatch(
|
|
self,
|
|
*,
|
|
parent_task_id: UUID,
|
|
parent_subtask_id: UUID,
|
|
input_s3_uri: str,
|
|
) -> list[UUID]:
|
|
addresses = self._unstandardised_address_repo.load_batch(input_s3_uri)
|
|
path_prefix = (
|
|
f"ara_postcode_splitter_batches/{parent_task_id}/{parent_subtask_id}"
|
|
)
|
|
|
|
child_ids: list[UUID] = []
|
|
for batch in iter_postcode_grouped_batches(
|
|
addresses, max_batch_size=self._max_batch_size
|
|
):
|
|
batch_uri = self._unstandardised_address_repo.save_batch(batch, path_prefix)
|
|
child = self._task_orchestrator.create_child_subtask(
|
|
parent_task_id,
|
|
inputs={
|
|
"task_id": str(parent_task_id),
|
|
"s3_uri": batch_uri,
|
|
},
|
|
)
|
|
self._queue_client.publish(
|
|
parent_task_id=parent_task_id,
|
|
child_subtask_id=child.id,
|
|
s3_uri=batch_uri,
|
|
)
|
|
child_ids.append(child.id)
|
|
|
|
return child_ids
|