Model/tests/domain/tasks/test_tasks.py
2026-05-20 14:00:19 +00:00

131 lines
3.3 KiB
Python

import pytest
from domain.tasks.subtasks import SubTaskStatus
from domain.tasks.tasks import Source, Task, TaskStatus
def test_create_task_starts_waiting() -> None:
# arrange / act
t = Task.create(
task_source="manual:test", source=Source.PORTFOLIO, source_id="abc-123"
)
# assert
assert t.status is TaskStatus.WAITING
assert t.source is Source.PORTFOLIO
assert t.source_id == "abc-123"
assert t.job_started is not None
assert t.job_completed is None
def test_create_task_rejects_blank_task_source() -> None:
# act / assert
with pytest.raises(ValueError, match="task_source"):
Task.create(task_source=" ")
def test_start_transitions_to_in_progress() -> None:
# arrange
t = Task.create(task_source="manual:test")
# act
t.start()
# assert
assert t.status is TaskStatus.IN_PROGRESS
def test_complete_marks_job_completed() -> None:
# arrange
t = Task.create(task_source="manual:test")
t.start()
# act
t.complete()
# assert
assert t.status is TaskStatus.COMPLETE
assert t.job_completed is not None
def test_fail_marks_job_completed() -> None:
# arrange
t = Task.create(task_source="manual:test")
# act
t.fail()
# assert
assert t.status is TaskStatus.FAILED
assert t.job_completed is not None
def test_start_rejects_from_terminal_status() -> None:
# arrange
t = Task.create(task_source="manual:test")
t.complete()
# act / assert
with pytest.raises(ValueError):
t.start()
def test_recalculate_with_empty_statuses_is_noop() -> None:
# arrange
t = Task.create(task_source="manual:test")
original_status = t.status
original_completed = t.job_completed
# act
t.recalculate_from_subtasks([])
# assert
assert t.status is original_status
assert t.job_completed is original_completed
def test_recalculate_all_waiting_keeps_waiting() -> None:
# arrange
t = Task.create(task_source="manual:test")
t.start() # task moved to IN_PROGRESS earlier
t.complete() # then COMPLETE, with job_completed set
# act
t.recalculate_from_subtasks([SubTaskStatus.WAITING, SubTaskStatus.WAITING])
# assert
assert t.status is TaskStatus.WAITING
assert t.job_completed is None
def test_recalculate_any_in_progress_marks_in_progress() -> None:
# arrange
t = Task.create(task_source="manual:test")
# act
t.recalculate_from_subtasks(
[SubTaskStatus.WAITING, SubTaskStatus.IN_PROGRESS, SubTaskStatus.COMPLETE]
)
# assert
assert t.status is TaskStatus.IN_PROGRESS
assert t.job_completed is None
def test_recalculate_all_complete_marks_complete() -> None:
# arrange
t = Task.create(task_source="manual:test")
# act
t.recalculate_from_subtasks([SubTaskStatus.COMPLETE, SubTaskStatus.COMPLETE])
# assert
assert t.status is TaskStatus.COMPLETE
assert t.job_completed is not None
def test_recalculate_any_failed_marks_failed_even_with_others() -> None:
# arrange
t = Task.create(task_source="manual:test")
# act
t.recalculate_from_subtasks(
[SubTaskStatus.IN_PROGRESS, SubTaskStatus.COMPLETE, SubTaskStatus.FAILED]
)
# assert
assert t.status is TaskStatus.FAILED
assert t.job_completed is not None