mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
197 lines
5.9 KiB
Python
197 lines
5.9 KiB
Python
from collections.abc import Iterator
|
|
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
from sqlalchemy import Engine
|
|
from sqlmodel import Session
|
|
|
|
from domain.tasks.subtasks import SubTask, SubTaskStatus
|
|
from domain.tasks.tasks import Source, TaskStatus
|
|
from orchestration.task_orchestrator import TaskOrchestrator
|
|
from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository
|
|
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
|
|
|
|
|
|
@dataclass
|
|
class Harness:
|
|
orchestrator: TaskOrchestrator
|
|
tasks: TaskPostgresRepository
|
|
subtasks: SubTaskPostgresRepository
|
|
|
|
|
|
@pytest.fixture
|
|
def harness(db_engine: Engine) -> Iterator[Harness]:
|
|
with Session(db_engine) as session:
|
|
tasks = TaskPostgresRepository(session=session)
|
|
subtasks = SubTaskPostgresRepository(session=session)
|
|
yield Harness(
|
|
orchestrator=TaskOrchestrator(task_repo=tasks, subtask_repo=subtasks),
|
|
tasks=tasks,
|
|
subtasks=subtasks,
|
|
)
|
|
|
|
|
|
def test_create_task_with_subtask_creates_both_in_waiting(
|
|
harness: Harness,
|
|
) -> None:
|
|
# act
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test",
|
|
inputs={"foo": "bar"},
|
|
source=Source.PORTFOLIO,
|
|
source_id="abc",
|
|
)
|
|
|
|
# assert
|
|
assert task.status is TaskStatus.WAITING
|
|
assert subtask.status is SubTaskStatus.WAITING
|
|
assert subtask.task_id == task.id
|
|
assert subtask.inputs == {"foo": "bar"}
|
|
|
|
|
|
def test_start_subtask_cascades_to_in_progress(harness: Harness) -> None:
|
|
# arrange
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
|
|
# act
|
|
started = harness.orchestrator.start_subtask(
|
|
subtask.id, cloud_logs_url="https://example/log"
|
|
)
|
|
|
|
# assert
|
|
assert started.status is SubTaskStatus.IN_PROGRESS
|
|
assert started.cloud_logs_url == "https://example/log"
|
|
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
|
|
|
|
|
|
def test_complete_subtask_cascades_to_complete(harness: Harness) -> None:
|
|
# arrange
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
harness.orchestrator.start_subtask(subtask.id)
|
|
|
|
# act
|
|
harness.orchestrator.complete_subtask(subtask.id, {"value": 42})
|
|
|
|
# assert
|
|
done_subtask = harness.subtasks.get(subtask.id)
|
|
done_task = harness.tasks.get(task.id)
|
|
assert done_subtask.outputs == {"result": {"value": 42}}
|
|
assert done_task.status is TaskStatus.COMPLETE
|
|
assert done_task.job_completed is not None
|
|
|
|
|
|
def test_fail_subtask_cascades_to_failed(harness: Harness) -> None:
|
|
# arrange
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
|
|
# act
|
|
harness.orchestrator.fail_subtask(subtask.id, RuntimeError("boom"))
|
|
|
|
# assert
|
|
failed_subtask = harness.subtasks.get(subtask.id)
|
|
failed_task = harness.tasks.get(task.id)
|
|
assert failed_subtask.outputs == {"error": "boom"}
|
|
assert failed_task.status is TaskStatus.FAILED
|
|
|
|
|
|
def test_failed_subtask_locks_task_failed_even_with_others_complete(
|
|
harness: Harness,
|
|
) -> None:
|
|
# arrange
|
|
task, first = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
second = SubTask.create(task_id=task.id)
|
|
harness.subtasks.create(second)
|
|
|
|
# act
|
|
harness.orchestrator.complete_subtask(first.id)
|
|
harness.orchestrator.fail_subtask(second.id, RuntimeError("nope"))
|
|
|
|
# assert
|
|
assert harness.tasks.get(task.id).status is TaskStatus.FAILED
|
|
|
|
|
|
def test_mixed_complete_and_in_progress_keeps_task_in_progress(
|
|
harness: Harness,
|
|
) -> None:
|
|
# arrange
|
|
task, first = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
second = SubTask.create(task_id=task.id)
|
|
harness.subtasks.create(second)
|
|
|
|
# act
|
|
harness.orchestrator.complete_subtask(first.id)
|
|
harness.orchestrator.start_subtask(second.id)
|
|
|
|
# assert
|
|
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
|
|
|
|
|
|
def test_run_subtask_happy_path_returns_result_and_cascades_complete(
|
|
harness: Harness,
|
|
) -> None:
|
|
# arrange
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
|
|
# act
|
|
result = harness.orchestrator.run_subtask(subtask.id, work=lambda: {"answer": 42})
|
|
|
|
# assert
|
|
assert result == {"answer": 42}
|
|
assert harness.subtasks.get(subtask.id).status is SubTaskStatus.COMPLETE
|
|
assert harness.tasks.get(task.id).status is TaskStatus.COMPLETE
|
|
|
|
|
|
def test_create_child_subtask_adds_waiting_child_without_changing_parent_status(
|
|
harness: Harness,
|
|
) -> None:
|
|
# arrange
|
|
task, first = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
harness.orchestrator.start_subtask(first.id)
|
|
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
|
|
|
|
# act
|
|
child = harness.orchestrator.create_child_subtask(
|
|
task.id, inputs={"split": "a"}
|
|
)
|
|
|
|
# assert
|
|
persisted_child = harness.subtasks.get(child.id)
|
|
assert persisted_child.task_id == task.id
|
|
assert persisted_child.status is SubTaskStatus.WAITING
|
|
assert persisted_child.inputs == {"split": "a"}
|
|
assert persisted_child.id != first.id
|
|
# Cascade is a no-op: parent stays IN_PROGRESS.
|
|
assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS
|
|
|
|
|
|
def test_run_subtask_failing_work_marks_failed_and_reraises(
|
|
harness: Harness,
|
|
) -> None:
|
|
# arrange
|
|
task, subtask = harness.orchestrator.create_task_with_subtask(
|
|
task_source="manual:test"
|
|
)
|
|
|
|
def boom() -> None:
|
|
raise RuntimeError("boom")
|
|
|
|
# act / assert
|
|
with pytest.raises(RuntimeError, match="boom"):
|
|
harness.orchestrator.run_subtask(subtask.id, work=boom)
|
|
|
|
assert harness.subtasks.get(subtask.id).status is SubTaskStatus.FAILED
|
|
assert harness.tasks.get(task.id).status is TaskStatus.FAILED
|