added tasks so Khalim can reeview it

This commit is contained in:
Jun-te Kim 2025-11-14 15:02:33 +00:00
parent 47be3ffea3
commit 68a5de28e2
3 changed files with 189 additions and 25 deletions

View file

@ -24,6 +24,9 @@ class SubTaskInterface:
CRUD operations for SubTask + cascading Task progress updates.
"""
# --------------------------------------------------------
# CREATE SUBTASK
# --------------------------------------------------------
def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None):
now = datetime.now(timezone.utc)
@ -35,18 +38,22 @@ class SubTaskInterface:
subtask = SubTask(
taskId=task_id,
inputs=json.dumps(inputs) if inputs else None,
jobStarted=now,
status="waiting",
jobStarted=None,
jobCompleted=None,
)
session.add(subtask)
session.commit()
session.refresh(subtask)
# Recalculate the parent task status
# Recalculate parent task progress
self._update_task_progress(session, task_id)
return subtask
# --------------------------------------------------------
# UPDATE STATUS (in progress, complete, failed)
# --------------------------------------------------------
def update_subtask_status(self, subtask_id: UUID, status: str):
now = datetime.now(timezone.utc)
@ -57,12 +64,12 @@ class SubTaskInterface:
normalized = status.lower()
# Start time
# When job really starts
if normalized == "in progress" and subtask.jobStarted is None:
subtask.jobStarted = now
# Completed time
if normalized == "complete":
# Completed or failed
if normalized in ("complete", "failed"):
subtask.jobCompleted = now
subtask.status = normalized
@ -71,14 +78,80 @@ class SubTaskInterface:
session.add(subtask)
session.commit()
# Re-evaluate the task
# Recalculate task status
self._update_task_progress(session, subtask.taskId)
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# Task Progress Calculation
# UPDATE OUTPUTS
# --------------------------------------------------------
def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.outputs = json.dumps(outputs)
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# UPDATE CLOUD LOGS URL
# --------------------------------------------------------
def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.cloudLogsURL = cloud_logs_url
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# SET BOTH OUTPUT + LOGS
# --------------------------------------------------------
def set_subtask_result(
self,
subtask_id: UUID,
outputs: Optional[Dict[str, Any]] = None,
cloud_logs_url: Optional[str] = None,
):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
if outputs is not None:
subtask.outputs = json.dumps(outputs)
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# TASK PROGRESS CALCULATION
# --------------------------------------------------------
def _update_task_progress(self, session: Session, task_id: UUID):
task = session.get(Task, task_id)
@ -89,14 +162,9 @@ class SubTaskInterface:
select(SubTask).where(SubTask.taskId == task_id)
).all()
if not subtasks:
return
statuses = [st.status.lower() for st in subtasks]
statuses = [s.status.lower() for s in subtasks]
now = datetime.now(timezone.utc)
# Priority:
# failed > in progress > complete
if "failed" in statuses:
task.status = "failed"
task.jobCompleted = now
@ -105,16 +173,61 @@ class SubTaskInterface:
task.status = "complete"
task.jobCompleted = now
else:
elif "in progress" in statuses:
task.status = "in progress"
if task.jobStarted is None:
task.jobStarted = now
task.jobCompleted = None # still running
else:
# All waiting
task.status = "waiting"
task.jobStarted = None
task.jobCompleted = None
task.updatedAt = now
session.add(task)
session.commit()
def finalize_subtask(
self,
subtask_id: UUID,
status: str,
outputs: Optional[Dict[str, Any]],
cloud_logs_url: Optional[str]
):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
normalized = status.lower()
if normalized not in ("complete", "failed"):
raise ValueError("Status must be 'complete' or 'failed'")
# Set outputs
if outputs is not None:
subtask.outputs = json.dumps(outputs)
# Set logs
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
# Status + timestamps
subtask.status = normalized
subtask.jobCompleted = now
subtask.updatedAt = now
session.add(subtask)
session.commit()
# Update parent task (complete/failed)
self._update_task_progress(session, subtask.taskId)
session.refresh(subtask)
return subtask
# ============================================================
# Task Interface
@ -133,23 +246,24 @@ class TasksInterface:
):
now = datetime.now(timezone.utc)
# Step 1: Create the task
with get_db_session() as session:
task = Task(
taskSource=task_source,
service=service,
jobStarted=now,
status="waiting",
jobStarted=None,
jobCompleted=None,
)
session.add(task)
session.commit()
session.refresh(task)
# Step 2: Create first subtask using SubTaskInterface
# Create first subtask in waiting state
subtask_interface = SubTaskInterface()
subtask = subtask_interface.create_subtask(
task_id=task.id,
inputs=inputs
inputs=inputs,
)
return task.id, subtask.id
@ -176,5 +290,4 @@ class TasksInterface:
session.add(task)
session.commit()
session.refresh(task)
return task

View file

@ -1,5 +1,6 @@
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID
import json # ← REQUIRED for json.loads
from backend.app.dependencies import validate_token
from backend.app.tasks.schema import (
@ -7,9 +8,12 @@ from backend.app.tasks.schema import (
UpdateTaskStatusRequest,
CreateSubTaskRequest,
UpdateSubTaskStatusRequest,
FinalizeSubTaskRequest,
)
# Correct location of interfaces
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
from backend.app.db.connection import get_db_session
from backend.app.db.models.tasks import Task, SubTask
from sqlmodel import select
@ -22,6 +26,9 @@ router = APIRouter(
)
# ============================================================
# Create Task
# ============================================================
@router.post("/", summary="Create a new task and its first subtask")
async def create_task(req: CreateTaskRequest):
tasks = TasksInterface()
@ -33,6 +40,9 @@ async def create_task(req: CreateTaskRequest):
return {"task_id": task_id, "subtask_id": subtask_id}
# ============================================================
# Get Task + Subtasks
# ============================================================
@router.get("/{task_id}", summary="Get a task and its subtasks")
async def get_task(task_id: UUID):
with get_db_session() as session:
@ -44,12 +54,13 @@ async def get_task(task_id: UUID):
select(SubTask).where(SubTask.taskId == task_id)
).all()
# Deserialize JSON inputs back to dict
formatted = []
for st in subtasks:
formatted.append({
**st.dict(),
"inputs": json.loads(st.inputs) if st.inputs else None
"inputs": json.loads(st.inputs) if st.inputs else None,
"outputs": json.loads(st.outputs) if st.outputs else None,
"cloud_logs_url": st.cloudLogsURL,
})
return {
@ -58,6 +69,9 @@ async def get_task(task_id: UUID):
}
# ============================================================
# Update Task Status
# ============================================================
@router.put("/{task_id}/status", summary="Update a task's status")
async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest):
tasks = TasksInterface()
@ -67,17 +81,24 @@ async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest):
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ============================================================
# Create Additional Subtask
# ============================================================
@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task")
async def create_subtask(task_id: UUID, req: CreateSubTaskRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.create_subtask(task_id, req.inputs)
return {"subtask_id": st.id, "task_id": task_id}
return {"subtask_id": st.id, "task_id": task_id, "status": st.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status")
# ============================================================
# Update Subtask Status
# ============================================================
@router.put("/subtask/{subtask_id}/status", summary="Update a subtask's status")
async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest):
subtasks = SubTaskInterface()
try:
@ -85,3 +106,28 @@ async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusReques
return {"subtask_id": st.id, "status": st.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ===
# Sub task is complete
@router.post("/subtask/{subtask_id}/finalize", summary="Finalize a subtask with status, outputs, logs")
async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.finalize_subtask(
subtask_id=subtask_id,
status=req.status,
outputs=req.outputs,
cloud_logs_url=req.cloud_logs_url
)
return {
"subtask_id": st.id,
"status": st.status,
"outputs": req.outputs,
"cloud_logs_url": req.cloud_logs_url,
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

View file

@ -19,3 +19,8 @@ class CreateSubTaskRequest(BaseModel):
class UpdateSubTaskStatusRequest(BaseModel):
status: str
class FinalizeSubTaskRequest(BaseModel):
status: str # "complete" or "failed"
outputs: Optional[Dict[str, Any]] = None
cloud_logs_url: Optional[str] = None