trigger lambda and subtasks from api

This commit is contained in:
Daniel Roth 2026-02-26 14:15:34 +00:00
parent 0165b36a2e
commit 432f050e69
2 changed files with 30 additions and 29 deletions

View file

@ -6,7 +6,8 @@
"workspaceFolder": "/workspaces/model",
"postStartCommand": "bash .devcontainer/backend/post-install.sh",
"mounts": [
"source=${localEnv:HOME},target=/home/vscode,type=bind"
// "source=${localEnv:HOME},target=/home/vscode,type=bind",
"source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached"
],
"customizations": {
"vscode": {

View file

@ -68,15 +68,15 @@ async def trigger_categorisation(
print("num_property_buckets", num_property_batches)
# Create task
# task_id, _ = TasksInterface.create_task(
# task_source="backend/plan/router.py:trigger_categorisation",
# service="plan_engine",
# inputs=payload.model_dump(),
# task_only=True,
# )
task_id, _ = TasksInterface.create_task(
task_source="backend/plan/router.py:trigger_categorisation",
service="plan_engine",
inputs=payload.model_dump(),
task_only=True,
)
# Dispatch requests to lambdas
# subtask_interface = SubTaskInterface()
subtask_interface = SubTaskInterface()
for batch_index in range(num_property_batches):
@ -88,30 +88,30 @@ async def trigger_categorisation(
if not batch_property_ids:
continue
# bucket_property_ids: List[int] = [
# pid for pid in property_ids if pid % num_buckets == bucket_index
# ]
# bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest(
# portfolio_id=payload.portfolio_id,
# scenarios_to_consider=payload.scenarios_to_consider,
# scenario_priority_order=payload.scenario_priority_order,
# min_property_id=min(bucket_property_ids),
# max_property_id=max(bucket_property_ids),
# )
# # Create sub-task for each
# subtask_id: UUID = subtask_interface.create_subtask(
# task_id=task_id, inputs=bucket_request.model_dump()
# )
# bucket_request.subtask_id = str(subtask_id)
batch_property_ids: List[int] = [
pid for pid in property_ids if pid % num_property_batches == batch_index
]
batch_request: CategorisationTriggerRequest = CategorisationTriggerRequest(
portfolio_id=payload.portfolio_id,
scenarios_to_consider=payload.scenarios_to_consider,
scenario_priority_order=payload.scenario_priority_order,
min_property_id=min(batch_property_ids),
max_property_id=max(batch_property_ids),
)
# Create sub-task for each
subtask_id: UUID = subtask_interface.create_subtask(
task_id=task_id, inputs=batch_request.model_dump()
)
batch_request.subtask_id = str(subtask_id)
# response = sqs_client.send_message(
# QueueUrl="categorisation-queue-dev",
# MessageBody=bucket_request.model_dump_json(),
# )
response = sqs_client.send_message(
QueueUrl="categorisation-queue-dev",
MessageBody=batch_request.model_dump_json(),
)
logger.info(
# f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}"
f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}{max(batch_property_ids)}"
f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}{max(batch_property_ids)}. Message ID: {response.get('MessageId')}"
# f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}{max(batch_property_ids)}"
)
await asyncio.sleep(0.05) # Small delay to avoid SQS throttling