diff --git a/backend/utils/subtasks.py b/backend/utils/subtasks.py index 041494e9..e5668c53 100644 --- a/backend/utils/subtasks.py +++ b/backend/utils/subtasks.py @@ -5,7 +5,8 @@ from typing import Callable, Any from uuid import UUID import json -from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from backend.app.db.functions.tasks.Tasks import SubTaskInterface, TasksInterface +from utils.logger import setup_logger def subtask_handler(): @@ -93,3 +94,76 @@ def subtask_handler(): return wrapper return decorator + + +def task_handler(): + """ + Decorator that wraps a Lambda handler and automatically: + + - Parses body from the first SQS record (or uses the event dict directly) + - Creates a fresh Task + SubTask in the database + - Marks the subtask as in progress + - Executes the handler, passing the parsed body + - Marks complete on success, failed on exception (and re-raises) + """ + + def decorator(func: Callable[..., Any]): + + task_source = f"{func.__module__}.{func.__qualname__}" + + @wraps(func) + def wrapper(event: dict[str, Any], context: Any, *args, **kwargs): + + logger = setup_logger() + + # Parse body: Records-style SQS or plain dict event + if "Records" in event: + raw_body = event["Records"][0].get("body", {}) + if isinstance(raw_body, str): + try: + body = json.loads(raw_body) + except Exception: + body = {} + else: + body = raw_body or {} + else: + body = event + + # Create fresh task + subtask + logger.info("Creating task for source: %s", task_source) + task_id, subtask_id = TasksInterface.create_task( + task_source=task_source, + inputs=body, + ) + logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id) + + interface = SubTaskInterface() + + interface.update_subtask_status( + subtask_id=subtask_id, + status="in progress", + ) + + try: + result = func(body, context, *args, **kwargs) + + interface.update_subtask_status( + subtask_id=subtask_id, + status="complete", + outputs={"result": result} if result else None, + ) + logger.info("Task %s completed successfully", task_id) + return result + + except Exception as e: + logger.exception("Task %s failed: %s", task_id, e) + interface.update_subtask_status( + subtask_id=subtask_id, + status="failed", + outputs={"error": str(e)}, + ) + raise + + return wrapper + + return decorator diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 4525c8cb..55a7a372 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -9,26 +9,13 @@ from etl.hubspot.hubspotClient import HubspotClient from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from backend.utils.subtasks import task_handler from typing import Any import json -# @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id -def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: - if local is True: - event = { - "Records": [ - { - "body": json.dumps( - { - "hubspot_deal_id": "483651713260", - } - ) - } - ] - } - - body = json.loads(event["Records"][0]["body"]) +@task_handler() +def handler(body: dict[str, Any], context: Any) -> None: hubspot_deal_id = body.get("hubspot_deal_id", "") if hubspot_deal_id == "": @@ -46,5 +33,3 @@ def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: else: deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id) dbloader.upsert_deal(deal, company, listing, hubspot) - - print("Finsihed running") diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index fece17e0..3baa7a44 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -PORTFOLIO_ID = 640 -SCENARIOS = [1154] +PORTFOLIO_ID = 656 +SCENARIOS = [1177] scenario_names = { - 1154: "EPC - 10k Budget", + 1177: "EPC C; Proposed Measures", } -project_name = "First Charterhouse Investments" +project_name = "Walsall Council | WH:LG" def get_data(portfolio_id, scenario_ids):