add this to dev so i can test

This commit is contained in:
Jun-te Kim 2025-11-14 15:36:58 +00:00
parent 68a5de28e2
commit d5b7fb21b3
2 changed files with 62 additions and 1 deletions

View file

@ -9,6 +9,7 @@ from backend.app.tasks.schema import (
CreateSubTaskRequest,
UpdateSubTaskStatusRequest,
FinalizeSubTaskRequest,
TaskSqsTriggerRequest
)
# Correct location of interfaces
@ -131,3 +132,58 @@ async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest):
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# for testing:
import boto3
import json
from backend.app.tasks.schema import TaskSqsTriggerRequest
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
from backend.app.config import get_settings
sqs = boto3.client("sqs")
@router.post("/trigger", summary="Create task + subtask and publish to SQS", status_code=202)
async def trigger_task(req: TaskSqsTriggerRequest):
"""
Creates a Task + SubTask, then pushes the SubTask into SQS so a Lambda can process it.
If inputs are empty, automatically replaced with {}.
"""
settings = get_settings()
tasks = TasksInterface()
# ---- Normalize empty inputs ----
inputs = req.inputs or {} # ensures {} even if null
# ---- 1. Create Task + SubTask ----
task_id, subtask_id = tasks.create_task(
task_source=req.task_source,
service=req.service,
inputs=inputs,
)
# ---- 2. Prepare SQS payload ----
sqs_payload = {
"subtask_id": str(subtask_id),
"params": inputs,
}
try:
response = sqs.send_message(
QueueUrl=f"https://sqs.{settings.AWS_REGION}.amazonaws.com/"
f"{settings.AWS_ACCOUNT_ID}/lambda-example-queue",
MessageBody=json.dumps(sqs_payload)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"message": "Task triggered",
"task_id": task_id,
"subtask_id": subtask_id,
"sqs_message_id": response.get("MessageId"),
"inputs_sent": inputs,
}

View file

@ -23,4 +23,9 @@ class UpdateSubTaskStatusRequest(BaseModel):
class FinalizeSubTaskRequest(BaseModel):
status: str # "complete" or "failed"
outputs: Optional[Dict[str, Any]] = None
cloud_logs_url: Optional[str] = None
cloud_logs_url: Optional[str] = None
class TaskSqsTriggerRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message