From 68a5de28e2b5b34d82e12eb3e1c869c00db4594b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 15:02:33 +0000 Subject: [PATCH] added tasks so Khalim can reeview it --- backend/app/db/functions/tasks/Tasks.py | 155 ++++++++++++++++++++---- backend/app/tasks/router.py | 54 ++++++++- backend/app/tasks/schema.py | 5 + 3 files changed, 189 insertions(+), 25 deletions(-) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 18900c83..06e1c6fe 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -24,6 +24,9 @@ class SubTaskInterface: CRUD operations for SubTask + cascading Task progress updates. """ + # -------------------------------------------------------- + # CREATE SUBTASK + # -------------------------------------------------------- def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None): now = datetime.now(timezone.utc) @@ -35,18 +38,22 @@ class SubTaskInterface: subtask = SubTask( taskId=task_id, inputs=json.dumps(inputs) if inputs else None, - jobStarted=now, + status="waiting", + jobStarted=None, + jobCompleted=None, ) session.add(subtask) session.commit() session.refresh(subtask) - # Recalculate the parent task status + # Recalculate parent task progress self._update_task_progress(session, task_id) - return subtask + # -------------------------------------------------------- + # UPDATE STATUS (in progress, complete, failed) + # -------------------------------------------------------- def update_subtask_status(self, subtask_id: UUID, status: str): now = datetime.now(timezone.utc) @@ -57,12 +64,12 @@ class SubTaskInterface: normalized = status.lower() - # Start time + # When job really starts if normalized == "in progress" and subtask.jobStarted is None: subtask.jobStarted = now - # Completed time - if normalized == "complete": + # Completed or failed + if normalized in ("complete", "failed"): subtask.jobCompleted = now subtask.status = normalized @@ -71,14 +78,80 @@ class SubTaskInterface: session.add(subtask) session.commit() - # Re-evaluate the task + # Recalculate task status self._update_task_progress(session, subtask.taskId) session.refresh(subtask) return subtask # -------------------------------------------------------- - # Task Progress Calculation + # UPDATE OUTPUTS + # -------------------------------------------------------- + def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.outputs = json.dumps(outputs) + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # UPDATE CLOUD LOGS URL + # -------------------------------------------------------- + def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.cloudLogsURL = cloud_logs_url + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # SET BOTH OUTPUT + LOGS + # -------------------------------------------------------- + def set_subtask_result( + self, + subtask_id: UUID, + outputs: Optional[Dict[str, Any]] = None, + cloud_logs_url: Optional[str] = None, + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + subtask.updatedAt = now + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # TASK PROGRESS CALCULATION # -------------------------------------------------------- def _update_task_progress(self, session: Session, task_id: UUID): task = session.get(Task, task_id) @@ -89,14 +162,9 @@ class SubTaskInterface: select(SubTask).where(SubTask.taskId == task_id) ).all() - if not subtasks: - return - - statuses = [st.status.lower() for st in subtasks] + statuses = [s.status.lower() for s in subtasks] now = datetime.now(timezone.utc) - # Priority: - # failed > in progress > complete if "failed" in statuses: task.status = "failed" task.jobCompleted = now @@ -105,16 +173,61 @@ class SubTaskInterface: task.status = "complete" task.jobCompleted = now - else: + elif "in progress" in statuses: task.status = "in progress" if task.jobStarted is None: task.jobStarted = now - task.jobCompleted = None # still running + + else: + # All waiting + task.status = "waiting" + task.jobStarted = None + task.jobCompleted = None task.updatedAt = now session.add(task) session.commit() + def finalize_subtask( + self, + subtask_id: UUID, + status: str, + outputs: Optional[Dict[str, Any]], + cloud_logs_url: Optional[str] + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + if normalized not in ("complete", "failed"): + raise ValueError("Status must be 'complete' or 'failed'") + + # Set outputs + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + # Set logs + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + # Status + timestamps + subtask.status = normalized + subtask.jobCompleted = now + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Update parent task (complete/failed) + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + # ============================================================ # Task Interface @@ -133,23 +246,24 @@ class TasksInterface: ): now = datetime.now(timezone.utc) - # Step 1: Create the task with get_db_session() as session: task = Task( taskSource=task_source, service=service, - jobStarted=now, + status="waiting", + jobStarted=None, + jobCompleted=None, ) session.add(task) session.commit() session.refresh(task) - # Step 2: Create first subtask using SubTaskInterface + # Create first subtask in waiting state subtask_interface = SubTaskInterface() subtask = subtask_interface.create_subtask( task_id=task.id, - inputs=inputs + inputs=inputs, ) return task.id, subtask.id @@ -176,5 +290,4 @@ class TasksInterface: session.add(task) session.commit() session.refresh(task) - return task diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py index d324f9ba..2a45a303 100644 --- a/backend/app/tasks/router.py +++ b/backend/app/tasks/router.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException from uuid import UUID +import json # ← REQUIRED for json.loads from backend.app.dependencies import validate_token from backend.app.tasks.schema import ( @@ -7,9 +8,12 @@ from backend.app.tasks.schema import ( UpdateTaskStatusRequest, CreateSubTaskRequest, UpdateSubTaskStatusRequest, + FinalizeSubTaskRequest, ) +# Correct location of interfaces from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface + from backend.app.db.connection import get_db_session from backend.app.db.models.tasks import Task, SubTask from sqlmodel import select @@ -22,6 +26,9 @@ router = APIRouter( ) +# ============================================================ +# Create Task +# ============================================================ @router.post("/", summary="Create a new task and its first subtask") async def create_task(req: CreateTaskRequest): tasks = TasksInterface() @@ -33,6 +40,9 @@ async def create_task(req: CreateTaskRequest): return {"task_id": task_id, "subtask_id": subtask_id} +# ============================================================ +# Get Task + Subtasks +# ============================================================ @router.get("/{task_id}", summary="Get a task and its subtasks") async def get_task(task_id: UUID): with get_db_session() as session: @@ -44,12 +54,13 @@ async def get_task(task_id: UUID): select(SubTask).where(SubTask.taskId == task_id) ).all() - # Deserialize JSON inputs back to dict formatted = [] for st in subtasks: formatted.append({ **st.dict(), - "inputs": json.loads(st.inputs) if st.inputs else None + "inputs": json.loads(st.inputs) if st.inputs else None, + "outputs": json.loads(st.outputs) if st.outputs else None, + "cloud_logs_url": st.cloudLogsURL, }) return { @@ -58,6 +69,9 @@ async def get_task(task_id: UUID): } +# ============================================================ +# Update Task Status +# ============================================================ @router.put("/{task_id}/status", summary="Update a task's status") async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): tasks = TasksInterface() @@ -67,17 +81,24 @@ async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) + +# ============================================================ +# Create Additional Subtask +# ============================================================ @router.post("/{task_id}/subtasks", summary="Create a new subtask under a task") async def create_subtask(task_id: UUID, req: CreateSubTaskRequest): subtasks = SubTaskInterface() try: st = subtasks.create_subtask(task_id, req.inputs) - return {"subtask_id": st.id, "task_id": task_id} + return {"subtask_id": st.id, "task_id": task_id, "status": st.status} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) -@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status") +# ============================================================ +# Update Subtask Status +# ============================================================ +@router.put("/subtask/{subtask_id}/status", summary="Update a subtask's status") async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest): subtasks = SubTaskInterface() try: @@ -85,3 +106,28 @@ async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusReques return {"subtask_id": st.id, "status": st.status} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) + + +# === +# Sub task is complete +@router.post("/subtask/{subtask_id}/finalize", summary="Finalize a subtask with status, outputs, logs") +async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest): + subtasks = SubTaskInterface() + + try: + st = subtasks.finalize_subtask( + subtask_id=subtask_id, + status=req.status, + outputs=req.outputs, + cloud_logs_url=req.cloud_logs_url + ) + + return { + "subtask_id": st.id, + "status": st.status, + "outputs": req.outputs, + "cloud_logs_url": req.cloud_logs_url, + } + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py index 66be61e7..b1a923b3 100644 --- a/backend/app/tasks/schema.py +++ b/backend/app/tasks/schema.py @@ -19,3 +19,8 @@ class CreateSubTaskRequest(BaseModel): class UpdateSubTaskStatusRequest(BaseModel): status: str + +class FinalizeSubTaskRequest(BaseModel): + status: str # "complete" or "failed" + outputs: Optional[Dict[str, Any]] = None + cloud_logs_url: Optional[str] = None \ No newline at end of file