added task handelr

This commit is contained in:
Jun-te Kim 2026-04-01 14:59:28 +00:00
parent cee4e40bb1
commit 955dffd74d
3 changed files with 82 additions and 23 deletions

View file

@ -5,7 +5,8 @@ from typing import Callable, Any
from uuid import UUID
import json
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.functions.tasks.Tasks import SubTaskInterface, TasksInterface
from utils.logger import setup_logger
def subtask_handler():
@ -93,3 +94,76 @@ def subtask_handler():
return wrapper
return decorator
def task_handler():
"""
Decorator that wraps a Lambda handler and automatically:
- Parses body from the first SQS record (or uses the event dict directly)
- Creates a fresh Task + SubTask in the database
- Marks the subtask as in progress
- Executes the handler, passing the parsed body
- Marks complete on success, failed on exception (and re-raises)
"""
def decorator(func: Callable[..., Any]):
task_source = f"{func.__module__}.{func.__qualname__}"
@wraps(func)
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
logger = setup_logger()
# Parse body: Records-style SQS or plain dict event
if "Records" in event:
raw_body = event["Records"][0].get("body", {})
if isinstance(raw_body, str):
try:
body = json.loads(raw_body)
except Exception:
body = {}
else:
body = raw_body or {}
else:
body = event
# Create fresh task + subtask
logger.info("Creating task for source: %s", task_source)
task_id, subtask_id = TasksInterface.create_task(
task_source=task_source,
inputs=body,
)
logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id)
interface = SubTaskInterface()
interface.update_subtask_status(
subtask_id=subtask_id,
status="in progress",
)
try:
result = func(body, context, *args, **kwargs)
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
)
logger.info("Task %s completed successfully", task_id)
return result
except Exception as e:
logger.exception("Task %s failed: %s", task_id, e)
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
raise
return wrapper
return decorator

View file

@ -9,26 +9,13 @@
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from backend.utils.subtasks import task_handler
from typing import Any
import json
# @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id
def handler(event: dict[str, Any], context: Any, local: bool = False) -> None:
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"hubspot_deal_id": "483651713260",
}
)
}
]
}
body = json.loads(event["Records"][0]["body"])
@task_handler()
def handler(body: dict[str, Any], context: Any) -> None:
hubspot_deal_id = body.get("hubspot_deal_id", "")
if hubspot_deal_id == "":
@ -46,5 +33,3 @@ def handler(event: dict[str, Any], context: Any, local: bool = False) -> None:
else:
deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id)
dbloader.upsert_deal(deal, company, listing, hubspot)
print("Finsihed running")

View file

@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict
from sqlalchemy import func
PORTFOLIO_ID = 640
SCENARIOS = [1154]
PORTFOLIO_ID = 656
SCENARIOS = [1177]
scenario_names = {
1154: "EPC - 10k Budget",
1177: "EPC C; Proposed Measures",
}
project_name = "First Charterhouse Investments"
project_name = "Walsall Council | WH:LG"
def get_data(portfolio_id, scenario_ids):