orchestration: add TaskOrchestrator.create_child_subtask primitive

Adds a primitive for creating a new WAITING SubTask under an existing
parent Task, routing all SubTask creation through the orchestrator
(replacing the legacy SubTaskInterface path used by the splitter).
Skips _cascade because a new WAITING child against an IN_PROGRESS
parent is a no-op under Task.recalculate_from_subtasks.
This commit is contained in:
Jun-te Kim 2026-05-19 17:19:41 +00:00
parent 7b00a33cd2
commit d7f14033ba
2 changed files with 38 additions and 0 deletions

View file

@ -48,6 +48,22 @@ class TaskOrchestrator:
self._subtasks.create(subtask)
return task, subtask
def create_child_subtask(
self,
parent_task_id: UUID,
*,
inputs: Optional[dict[str, Any]] = None,
) -> SubTask:
"""Add a new WAITING SubTask under an existing parent Task.
Skips `_cascade`: a new WAITING child against an IN_PROGRESS parent
leaves the parent's status unchanged per `Task.recalculate_from_subtasks`,
so calling it here would be a no-op.
"""
subtask = SubTask.create(task_id=parent_task_id, inputs=inputs)
self._subtasks.create(subtask)
return subtask
def start_subtask(
self, subtask_id: UUID, cloud_logs_url: Optional[str] = None
) -> SubTask:

View file

@ -134,6 +134,28 @@ def test_run_subtask_happy_path_returns_result_and_cascades_complete(
assert harness.tasks.get(task.id).status is TaskStatus.COMPLETE
def test_create_child_subtask_adds_waiting_child_without_changing_parent_status(
harness: Harness,
) -> None:
task, first = harness.orchestrator.create_task_with_subtask(
task_source="manual:test"
)
harness.orchestrator.start_subtask(first.id)
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
child = harness.orchestrator.create_child_subtask(
task.id, inputs={"split": "a"}
)
persisted_child = harness.subtasks.get(child.id)
assert persisted_child.task_id == task.id
assert persisted_child.status is SubTaskStatus.WAITING
assert persisted_child.inputs == {"split": "a"}
assert persisted_child.id != first.id
# Cascade is a no-op: parent stays IN_PROGRESS.
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
def test_run_subtask_failing_work_marks_failed_and_reraises(
harness: Harness,
) -> None: