Define new plan route and modify trigger request object

This commit is contained in:
Daniel Roth 2026-02-23 17:40:06 +00:00
parent cda31420e8
commit 2a4fb23f5f
2 changed files with 27 additions and 12 deletions

View file

@ -12,6 +12,9 @@ from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.config import get_settings
from sqlalchemy.orm import sessionmaker
from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest,
)
from utils.logger import setup_logger
from backend.app.db.connection import db_engine
@ -24,7 +27,7 @@ router = APIRouter(
prefix="/plan",
tags=["plan"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
responses={404: {"description": "Not found"}},
)
sqs_client = boto3.client("sqs")
@ -43,6 +46,13 @@ def db_session():
session.close()
@router.post("/categoisation", status_code=202)
async def trigger_categorisation(body: CategorisationTriggerRequest):
payload = CategorisationTriggerRequest.model_validate(body)
logger.info("API triggered with body: %s", payload)
@router.post("/trigger", status_code=202)
async def trigger_plan_entrypoint(body: PlanTriggerRequest):
"""
@ -59,7 +69,10 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
return {"message": "Invalid request"}, 400
# If file_format is domna_asset_list and type is xlsx, read and chunk it
if data.get("file_format") == "domna_asset_list" and data.get("file_type") == "xlsx":
if (
data.get("file_format") == "domna_asset_list"
and data.get("file_type") == "xlsx"
):
try:
total_rows = data.get("sheet_count", 0)
@ -88,8 +101,8 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
"multi_plan": body.multi_plan,
},
)
# Insert the scenario ID into the data payload
data["scenario_id"] = scenario_id
@ -99,7 +112,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
task_source="backend/plan/router.py:trigger_plan_entrypoint",
service="plan_engine",
inputs=data,
task_only=True
task_only=True,
)
subtask_interface = SubTaskInterface()
@ -109,13 +122,14 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
index_end = min((i + 1) * chunk_size, total_rows)
message_payload = {
**data, "index_start": index_start, "index_end": index_end,
**data,
"index_start": index_start,
"index_end": index_end,
}
# Create a subtask for this chunk
subtask_id = subtask_interface.create_subtask(
task_id=task_id,
inputs=message_payload
task_id=task_id, inputs=message_payload
)
# Add task and subtask to message
@ -125,8 +139,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
message_body = json.dumps(message_payload)
response = sqs_client.send_message(
QueueUrl=settings.ENGINE_SQS_URL,
MessageBody=message_body
QueueUrl=settings.ENGINE_SQS_URL, MessageBody=message_body
)
logger.info(
f"Chunk {i} sent to SQS. Rows {index_start}{index_end}. Message ID: {response.get('MessageId')}"
@ -153,8 +166,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
data["subtask_id"] = str(subtask_id)
message_body = json.dumps(data)
response = sqs_client.send_message(
QueueUrl=settings.ENGINE_SQS_URL,
MessageBody=message_body
QueueUrl=settings.ENGINE_SQS_URL, MessageBody=message_body
)
logger.info(f"SQS message sent. Message ID: {response.get('MessageId')}")
except Exception as e:

View file

@ -8,5 +8,8 @@ class CategorisationTriggerRequest(BaseModel):
scenarios_to_consider: Optional[List[int]] = None
scenario_priority_order: Optional[List[int]] = None
property_bucket_index: Optional[int] = None
num_property_buckets: Optional[int] = None
# {"portfolio_id": 556, "scenarios_to_consider": [1039,1041], "scenario_priority_order": [1041,1039]}