added task and sub task interface

This commit is contained in:
Jun-te Kim 2025-11-14 14:25:28 +00:00
parent c617d603a3
commit d98ae7db7b
5 changed files with 320 additions and 6 deletions

View file

@ -1,10 +1,180 @@
from __future__ import annotations
class TasksInterface:
def __init__(self):
pass
# ---- Standard Library ----
from typing import Optional, Dict, Any
from datetime import datetime, timezone
from uuid import UUID
import json
# ---- SQLModel / SQLAlchemy ----
from sqlmodel import Session, select
# ---- DB Session ----
from backend.app.db.connection import get_db_session
# ---- Models ----
from backend.app.db.models.tasks import Task, SubTask
# ============================================================
# SubTask Interface
# ============================================================
class SubTaskInterface:
def __init__(self)
pass
"""
CRUD operations for SubTask + cascading Task progress updates.
"""
def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None):
now = datetime.now(timezone.utc)
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
subtask = SubTask(
taskId=task_id,
inputs=json.dumps(inputs) if inputs else None,
jobStarted=now,
)
session.add(subtask)
session.commit()
session.refresh(subtask)
# Recalculate the parent task status
self._update_task_progress(session, task_id)
return subtask
def update_subtask_status(self, subtask_id: UUID, status: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
normalized = status.lower()
# Start time
if normalized == "in progress" and subtask.jobStarted is None:
subtask.jobStarted = now
# Completed time
if normalized == "complete":
subtask.jobCompleted = now
subtask.status = normalized
subtask.updatedAt = now
session.add(subtask)
session.commit()
# Re-evaluate the task
self._update_task_progress(session, subtask.taskId)
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# Task Progress Calculation
# --------------------------------------------------------
def _update_task_progress(self, session: Session, task_id: UUID):
task = session.get(Task, task_id)
if not task:
return
subtasks = session.exec(
select(SubTask).where(SubTask.taskId == task_id)
).all()
if not subtasks:
return
statuses = [st.status.lower() for st in subtasks]
now = datetime.now(timezone.utc)
# Priority:
# failed > in progress > complete
if "failed" in statuses:
task.status = "failed"
task.jobCompleted = now
elif all(s == "complete" for s in statuses):
task.status = "complete"
task.jobCompleted = now
else:
task.status = "in progress"
if task.jobStarted is None:
task.jobStarted = now
task.jobCompleted = None # still running
task.updatedAt = now
session.add(task)
session.commit()
# ============================================================
# Task Interface
# ============================================================
class TasksInterface:
"""
High-level operations for Task records.
"""
def create_task(
self,
*,
task_source: str,
service: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
):
now = datetime.now(timezone.utc)
# Step 1: Create the task
with get_db_session() as session:
task = Task(
taskSource=task_source,
service=service,
jobStarted=now,
)
session.add(task)
session.commit()
session.refresh(task)
# Step 2: Create first subtask using SubTaskInterface
subtask_interface = SubTaskInterface()
subtask = subtask_interface.create_subtask(
task_id=task.id,
inputs=inputs
)
return task.id, subtask.id
def update_task_status(self, task_id: UUID, status: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
normalized = status.lower()
if normalized == "in progress" and task.jobStarted is None:
task.jobStarted = now
if normalized == "complete":
task.jobCompleted = now
task.status = normalized
task.updatedAt = now
session.add(task)
session.commit()
session.refresh(task)
return task

View file

@ -35,3 +35,39 @@ class Task(SQLModel, table=True):
# Relationship
subTasks: List["SubTask"] = Relationship(back_populates="task")
class SubTask(SQLModel, table=True):
__tablename__ = "sub_task"
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
)
taskId: UUID = Field(
foreign_key="tasks.id",
alias="task_id",
)
jobStarted: Optional[datetime] = Field(
default=None, alias="job_started"
)
jobCompleted: Optional[datetime] = Field(
default=None, alias="job_completed"
)
status: str = Field(default="In Progress")
inputs: Optional[str] = None
outputs: Optional[str] = None
cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url")
updatedAt: datetime = Field(
default_factory=datetime.utcnow,
alias="updated_at",
)
# Relationship
task: Optional[Task] = Relationship(back_populates="subTasks")

View file

View file

@ -0,0 +1,87 @@
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID
from backend.app.dependencies import validate_token
from backend.app.tasks.schema import (
CreateTaskRequest,
UpdateTaskStatusRequest,
CreateSubTaskRequest,
UpdateSubTaskStatusRequest,
)
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
from backend.app.db.connection import get_db_session
from backend.app.db.models.tasks import Task, SubTask
from sqlmodel import select
router = APIRouter(
prefix="/tasks",
tags=["tasks"],
dependencies=[Depends(validate_token)],
)
@router.post("/", summary="Create a new task and its first subtask")
async def create_task(req: CreateTaskRequest):
tasks = TasksInterface()
task_id, subtask_id = tasks.create_task(
task_source=req.task_source,
service=req.service,
inputs=req.inputs,
)
return {"task_id": task_id, "subtask_id": subtask_id}
@router.get("/{task_id}", summary="Get a task and its subtasks")
async def get_task(task_id: UUID):
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
subtasks = session.exec(
select(SubTask).where(SubTask.taskId == task_id)
).all()
# Deserialize JSON inputs back to dict
formatted = []
for st in subtasks:
formatted.append({
**st.dict(),
"inputs": json.loads(st.inputs) if st.inputs else None
})
return {
"task": task,
"subtasks": formatted,
}
@router.put("/{task_id}/status", summary="Update a task's status")
async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest):
tasks = TasksInterface()
try:
updated = tasks.update_task_status(task_id, req.status)
return {"task_id": updated.id, "status": updated.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task")
async def create_subtask(task_id: UUID, req: CreateSubTaskRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.create_subtask(task_id, req.inputs)
return {"subtask_id": st.id, "task_id": task_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status")
async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.update_subtask_status(subtask_id, req.status)
return {"subtask_id": st.id, "status": st.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))

View file

@ -0,0 +1,21 @@
from typing import Optional, Any, Dict
from uuid import UUID
from pydantic import BaseModel
class CreateTaskRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateTaskStatusRequest(BaseModel):
status: str
class CreateSubTaskRequest(BaseModel):
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateSubTaskStatusRequest(BaseModel):
status: str