Model/backend/app/tasks/schema.py

31 lines
No EOL
817 B
Python

from typing import Optional, Any, Dict
from uuid import UUID
from pydantic import BaseModel
class CreateTaskRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateTaskStatusRequest(BaseModel):
status: str
class CreateSubTaskRequest(BaseModel):
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateSubTaskStatusRequest(BaseModel):
status: str
class FinalizeSubTaskRequest(BaseModel):
status: str # "complete" or "failed"
outputs: Optional[Dict[str, Any]] = None
cloud_logs_url: Optional[str] = None
class TaskSqsTriggerRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message