From d5b7fb21b3e405e20d05acf498d399f602a12bff Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 15:36:58 +0000 Subject: [PATCH] add this to dev so i can test --- backend/app/tasks/router.py | 56 +++++++++++++++++++++++++++++++++++++ backend/app/tasks/schema.py | 7 ++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py index 2a45a303..90b62dd1 100644 --- a/backend/app/tasks/router.py +++ b/backend/app/tasks/router.py @@ -9,6 +9,7 @@ from backend.app.tasks.schema import ( CreateSubTaskRequest, UpdateSubTaskStatusRequest, FinalizeSubTaskRequest, + TaskSqsTriggerRequest ) # Correct location of interfaces @@ -131,3 +132,58 @@ async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest): except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) + + +# for testing: + +import boto3 +import json +from backend.app.tasks.schema import TaskSqsTriggerRequest +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface +from backend.app.config import get_settings + +sqs = boto3.client("sqs") + +@router.post("/trigger", summary="Create task + subtask and publish to SQS", status_code=202) +async def trigger_task(req: TaskSqsTriggerRequest): + """ + Creates a Task + SubTask, then pushes the SubTask into SQS so a Lambda can process it. + If inputs are empty, automatically replaced with {}. + """ + + settings = get_settings() + + tasks = TasksInterface() + + # ---- Normalize empty inputs ---- + inputs = req.inputs or {} # ensures {} even if null + + # ---- 1. Create Task + SubTask ---- + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=inputs, + ) + + # ---- 2. Prepare SQS payload ---- + sqs_payload = { + "subtask_id": str(subtask_id), + "params": inputs, + } + + try: + response = sqs.send_message( + QueueUrl=f"https://sqs.{settings.AWS_REGION}.amazonaws.com/" + f"{settings.AWS_ACCOUNT_ID}/lambda-example-queue", + MessageBody=json.dumps(sqs_payload) + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "message": "Task triggered", + "task_id": task_id, + "subtask_id": subtask_id, + "sqs_message_id": response.get("MessageId"), + "inputs_sent": inputs, + } \ No newline at end of file diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py index b1a923b3..a5b4424b 100644 --- a/backend/app/tasks/schema.py +++ b/backend/app/tasks/schema.py @@ -23,4 +23,9 @@ class UpdateSubTaskStatusRequest(BaseModel): class FinalizeSubTaskRequest(BaseModel): status: str # "complete" or "failed" outputs: Optional[Dict[str, Any]] = None - cloud_logs_url: Optional[str] = None \ No newline at end of file + cloud_logs_url: Optional[str] = None + +class TaskSqsTriggerRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message \ No newline at end of file