diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index d3f06d33..18900c83 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -1,10 +1,180 @@ +from __future__ import annotations -class TasksInterface: - def __init__(self): - pass - +# ---- Standard Library ---- +from typing import Optional, Dict, Any +from datetime import datetime, timezone +from uuid import UUID +import json + +# ---- SQLModel / SQLAlchemy ---- +from sqlmodel import Session, select + +# ---- DB Session ---- +from backend.app.db.connection import get_db_session + +# ---- Models ---- +from backend.app.db.models.tasks import Task, SubTask +# ============================================================ +# SubTask Interface +# ============================================================ class SubTaskInterface: - def __init__(self) - pass \ No newline at end of file + """ + CRUD operations for SubTask + cascading Task progress updates. + """ + + def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + subtask = SubTask( + taskId=task_id, + inputs=json.dumps(inputs) if inputs else None, + jobStarted=now, + ) + + session.add(subtask) + session.commit() + session.refresh(subtask) + + # Recalculate the parent task status + self._update_task_progress(session, task_id) + + return subtask + + def update_subtask_status(self, subtask_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + + # Start time + if normalized == "in progress" and subtask.jobStarted is None: + subtask.jobStarted = now + + # Completed time + if normalized == "complete": + subtask.jobCompleted = now + + subtask.status = normalized + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Re-evaluate the task + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # Task Progress Calculation + # -------------------------------------------------------- + def _update_task_progress(self, session: Session, task_id: UUID): + task = session.get(Task, task_id) + if not task: + return + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + if not subtasks: + return + + statuses = [st.status.lower() for st in subtasks] + now = datetime.now(timezone.utc) + + # Priority: + # failed > in progress > complete + if "failed" in statuses: + task.status = "failed" + task.jobCompleted = now + + elif all(s == "complete" for s in statuses): + task.status = "complete" + task.jobCompleted = now + + else: + task.status = "in progress" + if task.jobStarted is None: + task.jobStarted = now + task.jobCompleted = None # still running + + task.updatedAt = now + session.add(task) + session.commit() + + +# ============================================================ +# Task Interface +# ============================================================ +class TasksInterface: + """ + High-level operations for Task records. + """ + + def create_task( + self, + *, + task_source: str, + service: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ): + now = datetime.now(timezone.utc) + + # Step 1: Create the task + with get_db_session() as session: + task = Task( + taskSource=task_source, + service=service, + jobStarted=now, + ) + + session.add(task) + session.commit() + session.refresh(task) + + # Step 2: Create first subtask using SubTaskInterface + subtask_interface = SubTaskInterface() + subtask = subtask_interface.create_subtask( + task_id=task.id, + inputs=inputs + ) + + return task.id, subtask.id + + def update_task_status(self, task_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + normalized = status.lower() + + if normalized == "in progress" and task.jobStarted is None: + task.jobStarted = now + + if normalized == "complete": + task.jobCompleted = now + + task.status = normalized + task.updatedAt = now + + session.add(task) + session.commit() + session.refresh(task) + + return task diff --git a/backend/app/db/models/tasks.py b/backend/app/db/models/tasks.py index ed5b3710..d8007dcd 100644 --- a/backend/app/db/models/tasks.py +++ b/backend/app/db/models/tasks.py @@ -35,3 +35,39 @@ class Task(SQLModel, table=True): # Relationship subTasks: List["SubTask"] = Relationship(back_populates="task") + + +class SubTask(SQLModel, table=True): + __tablename__ = "sub_task" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + index=True, + ) + + taskId: UUID = Field( + foreign_key="tasks.id", + alias="task_id", + ) + + jobStarted: Optional[datetime] = Field( + default=None, alias="job_started" + ) + jobCompleted: Optional[datetime] = Field( + default=None, alias="job_completed" + ) + + status: str = Field(default="In Progress") + + inputs: Optional[str] = None + outputs: Optional[str] = None + cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url") + + updatedAt: datetime = Field( + default_factory=datetime.utcnow, + alias="updated_at", + ) + + # Relationship + task: Optional[Task] = Relationship(back_populates="subTasks") diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py new file mode 100644 index 00000000..d324f9ba --- /dev/null +++ b/backend/app/tasks/router.py @@ -0,0 +1,87 @@ +from fastapi import APIRouter, Depends, HTTPException +from uuid import UUID + +from backend.app.dependencies import validate_token +from backend.app.tasks.schema import ( + CreateTaskRequest, + UpdateTaskStatusRequest, + CreateSubTaskRequest, + UpdateSubTaskStatusRequest, +) + +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface +from backend.app.db.connection import get_db_session +from backend.app.db.models.tasks import Task, SubTask +from sqlmodel import select + + +router = APIRouter( + prefix="/tasks", + tags=["tasks"], + dependencies=[Depends(validate_token)], +) + + +@router.post("/", summary="Create a new task and its first subtask") +async def create_task(req: CreateTaskRequest): + tasks = TasksInterface() + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=req.inputs, + ) + return {"task_id": task_id, "subtask_id": subtask_id} + + +@router.get("/{task_id}", summary="Get a task and its subtasks") +async def get_task(task_id: UUID): + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + # Deserialize JSON inputs back to dict + formatted = [] + for st in subtasks: + formatted.append({ + **st.dict(), + "inputs": json.loads(st.inputs) if st.inputs else None + }) + + return { + "task": task, + "subtasks": formatted, + } + + +@router.put("/{task_id}/status", summary="Update a task's status") +async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): + tasks = TasksInterface() + try: + updated = tasks.update_task_status(task_id, req.status) + return {"task_id": updated.id, "status": updated.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + +@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task") +async def create_subtask(task_id: UUID, req: CreateSubTaskRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.create_subtask(task_id, req.inputs) + return {"subtask_id": st.id, "task_id": task_id} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status") +async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.update_subtask_status(subtask_id, req.status) + return {"subtask_id": st.id, "status": st.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py new file mode 100644 index 00000000..66be61e7 --- /dev/null +++ b/backend/app/tasks/schema.py @@ -0,0 +1,21 @@ +from typing import Optional, Any, Dict +from uuid import UUID +from pydantic import BaseModel + + +class CreateTaskRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateTaskStatusRequest(BaseModel): + status: str + + +class CreateSubTaskRequest(BaseModel): + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateSubTaskStatusRequest(BaseModel): + status: str