diff --git a/.devcontainer/backend/devcontainer.json b/.devcontainer/backend/devcontainer.json index 3727d8a3..ac654ac1 100644 --- a/.devcontainer/backend/devcontainer.json +++ b/.devcontainer/backend/devcontainer.json @@ -6,7 +6,8 @@ "workspaceFolder": "/workspaces/model", "postStartCommand": "bash .devcontainer/backend/post-install.sh", "mounts": [ - "source=${localEnv:HOME},target=/home/vscode,type=bind" + // "source=${localEnv:HOME},target=/home/vscode,type=bind", + "source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached" ], "customizations": { "vscode": { diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 1ecd1f40..08138ff1 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -68,15 +68,15 @@ async def trigger_categorisation( print("num_property_buckets", num_property_batches) # Create task - # task_id, _ = TasksInterface.create_task( - # task_source="backend/plan/router.py:trigger_categorisation", - # service="plan_engine", - # inputs=payload.model_dump(), - # task_only=True, - # ) + task_id, _ = TasksInterface.create_task( + task_source="backend/plan/router.py:trigger_categorisation", + service="plan_engine", + inputs=payload.model_dump(), + task_only=True, + ) # Dispatch requests to lambdas - # subtask_interface = SubTaskInterface() + subtask_interface = SubTaskInterface() for batch_index in range(num_property_batches): @@ -88,30 +88,30 @@ async def trigger_categorisation( if not batch_property_ids: continue - # bucket_property_ids: List[int] = [ - # pid for pid in property_ids if pid % num_buckets == bucket_index - # ] - # bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest( - # portfolio_id=payload.portfolio_id, - # scenarios_to_consider=payload.scenarios_to_consider, - # scenario_priority_order=payload.scenario_priority_order, - # min_property_id=min(bucket_property_ids), - # max_property_id=max(bucket_property_ids), - # ) - # # Create sub-task for each - # subtask_id: UUID = subtask_interface.create_subtask( - # task_id=task_id, inputs=bucket_request.model_dump() - # ) - # bucket_request.subtask_id = str(subtask_id) + batch_property_ids: List[int] = [ + pid for pid in property_ids if pid % num_property_batches == batch_index + ] + batch_request: CategorisationTriggerRequest = CategorisationTriggerRequest( + portfolio_id=payload.portfolio_id, + scenarios_to_consider=payload.scenarios_to_consider, + scenario_priority_order=payload.scenario_priority_order, + min_property_id=min(batch_property_ids), + max_property_id=max(batch_property_ids), + ) + # Create sub-task for each + subtask_id: UUID = subtask_interface.create_subtask( + task_id=task_id, inputs=batch_request.model_dump() + ) + batch_request.subtask_id = str(subtask_id) - # response = sqs_client.send_message( - # QueueUrl="categorisation-queue-dev", - # MessageBody=bucket_request.model_dump_json(), - # ) + response = sqs_client.send_message( + QueueUrl="categorisation-queue-dev", + MessageBody=batch_request.model_dump_json(), + ) logger.info( - # f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}–{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}" - f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}–{max(batch_property_ids)}" + f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}–{max(batch_property_ids)}. Message ID: {response.get('MessageId')}" + # f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}–{max(batch_property_ids)}" ) await asyncio.sleep(0.05) # Small delay to avoid SQS throttling