Model/domain/tasks/subtasks.py
2026-05-19 16:35:09 +00:00

55 lines
1.7 KiB
Python

from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import UUID, uuid4
class SubTaskStatus(str, Enum):
WAITING = "waiting"
IN_PROGRESS = "in progress"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class SubTask:
id: UUID
task_id: UUID
status: SubTaskStatus = SubTaskStatus.WAITING
inputs: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
cloud_logs_url: Optional[str] = None
job_started: Optional[datetime] = None
job_completed: Optional[datetime] = None
@classmethod
def create(
cls, *, task_id: UUID, inputs: Optional[dict[str, Any]] = None
) -> "SubTask":
return cls(
id=uuid4(),
task_id=task_id,
status=SubTaskStatus.WAITING,
inputs=inputs,
)
def start(self, cloud_logs_url: Optional[str] = None) -> None:
if self.status not in (SubTaskStatus.WAITING, SubTaskStatus.IN_PROGRESS):
raise ValueError(f"cannot start subtask in status {self.status}")
if self.job_started is None:
self.job_started = datetime.now(timezone.utc)
self.status = SubTaskStatus.IN_PROGRESS
if cloud_logs_url is not None:
self.cloud_logs_url = cloud_logs_url
def complete(self, result: Any = None) -> None:
self.status = SubTaskStatus.COMPLETE
self.job_completed = datetime.now(timezone.utc)
if result is not None:
self.outputs = {"result": result}
def fail(self, error: BaseException) -> None:
self.status = SubTaskStatus.FAILED
self.job_completed = datetime.now(timezone.utc)
self.outputs = {"error": str(error)}