From 093b93ec1c2d54809dd5d2f3161f31c088cd68dd Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 09:58:23 +0000 Subject: [PATCH 1/7] forgot to add records for handler --- etl/hubspot/scripts/scraper/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index f5afef52..570461d7 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -13,12 +13,13 @@ from typing import Any # @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id -def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: +def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: if local is True: body = { - "hubspot_deal_id": "254427203793", + "hubspot_deal_id": "409487859944", } + body = event["Records"][0]["body"] hubspot_deal_id = body.get("hubspot_deal_id", "") if hubspot_deal_id == "": From 0f892a69d989c45d219b70caa3465c4bafe58b6c Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 10:15:20 +0000 Subject: [PATCH 2/7] forgot json --- etl/hubspot/scripts/scraper/main.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 570461d7..d7f6add2 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -10,16 +10,25 @@ from etl.hubspot.hubspotClient import HubspotClient from etl.hubspot.hubspotDataTodB import HubspotDataToDb from typing import Any +import json # @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id -def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: +def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: if local is True: - body = { - "hubspot_deal_id": "409487859944", + event = { + "Records": [ + { + "body": json.dumps( + { + "hubspot_deal_id": "409487859944", + } + ) + } + ] } - body = event["Records"][0]["body"] + body = json.loads(event["Records"][0]["body"]) hubspot_deal_id = body.get("hubspot_deal_id", "") if hubspot_deal_id == "": From 60f3d25c29893cbe1cbec5b9e01f0ebd8285b250 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 10:18:37 +0000 Subject: [PATCH 3/7] forgot to call it eventr --- etl/hubspot/scripts/scraper/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index d7f6add2..459ea5c2 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -14,7 +14,7 @@ import json # @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id -def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: +def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: if local is True: event = { "Records": [ From 8a9f51f9e6efe1a9c9d250b7f9ebe4508a869b71 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 10:33:06 +0000 Subject: [PATCH 4/7] include hubspot api key --- .github/workflows/_deploy_lambda.yml | 2 ++ .github/workflows/deploy_terraform.yml | 1 + infrastructure/terraform/lambda/hubspot_deal_etl/main.tf | 1 + .../terraform/lambda/hubspot_deal_etl/variables.tf | 5 +++++ 4 files changed, 9 insertions(+) diff --git a/.github/workflows/_deploy_lambda.yml b/.github/workflows/_deploy_lambda.yml index 707c9e00..009b9ff8 100644 --- a/.github/workflows/_deploy_lambda.yml +++ b/.github/workflows/_deploy_lambda.yml @@ -80,6 +80,8 @@ on: required: false TF_VAR_pashub_password: required: false + TF_VAR_hubspot_api_key: + required: false jobs: deploy: runs-on: ubuntu-latest diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index cbcd88c4..fccc6da4 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -518,6 +518,7 @@ jobs: TF_VAR_db_host: ${{ secrets.DEV_DB_HOST }} TF_VAR_db_name: ${{ secrets.DEV_DB_NAME }} TF_VAR_db_port: ${{ secrets.DEV_DB_PORT }} + TF_VAR_hubspot_api_key: ${{ secrets.HUBSPOT_API_KEY }} AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf index 0f5cc6ba..6ce7a386 100644 --- a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf @@ -38,6 +38,7 @@ module "hubspot_deal_etl" { DB_HOST = var.db_host DB_NAME = var.db_name DB_PORT = var.db_port + HUBSPOT_API_KEY = var.hubspot_api_key } } diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf index 2e7da609..285b6a4c 100644 --- a/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf @@ -41,6 +41,11 @@ variable "db_host" { type = string } +variable "hubspot_api_key" { + type = string +} + + variable "db_name" { type = string } From 9b88eb3a94426cc2fb577ec0dd429d69c6fd20de Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 10:37:26 +0000 Subject: [PATCH 5/7] added to plan and destory --- .github/workflows/_deploy_lambda.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/_deploy_lambda.yml b/.github/workflows/_deploy_lambda.yml index 009b9ff8..3a407c5a 100644 --- a/.github/workflows/_deploy_lambda.yml +++ b/.github/workflows/_deploy_lambda.yml @@ -148,6 +148,7 @@ jobs: TF_VAR_social_housing_wave_3_sharepoint_id: ${{ secrets.TF_VAR_social_housing_wave_3_sharepoint_id }} TF_VAR_pashub_email: ${{ secrets.TF_VAR_pashub_email }} TF_VAR_pashub_password: ${{ secrets.TF_VAR_pashub_password }} + TF_VAR_hubspot_api_key: ${{ secrets.TF_VAR_hubspot_api_key }} run: | ECR_REPO_URL_VAR="" if [[ -n "${{ inputs.ecr_repo }}" ]]; then @@ -193,6 +194,7 @@ jobs: TF_VAR_social_housing_wave_3_sharepoint_id: ${{ secrets.TF_VAR_social_housing_wave_3_sharepoint_id }} TF_VAR_pashub_email: ${{ secrets.TF_VAR_pashub_email }} TF_VAR_pashub_password: ${{ secrets.TF_VAR_pashub_password }} + TF_VAR_hubspot_api_key: ${{ secrets.TF_VAR_hubspot_api_key }} run: | EXTRA_VARS="" if [[ -n "${{ inputs.ecr_repo }}" ]]; then From cee4e40bb1cc0f82ec45ba974c2ad63953e856ca Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 12:10:28 +0000 Subject: [PATCH 6/7] latest code --- backend/app/db/models/organisation.py | 2 ++ etl/hubspot/hubspotClient.py | 2 ++ etl/hubspot/hubspotDataTodB.py | 12 ++++++++++++ etl/hubspot/scripts/scraper/bulk_load.py | 3 ++- etl/hubspot/scripts/scraper/main.py | 2 +- 5 files changed, 19 insertions(+), 2 deletions(-) diff --git a/backend/app/db/models/organisation.py b/backend/app/db/models/organisation.py index a9718c42..3adc8e9c 100644 --- a/backend/app/db/models/organisation.py +++ b/backend/app/db/models/organisation.py @@ -63,6 +63,8 @@ class HubspotDealData(SQLModel, table=True): surveyor: Optional[str] = Field(default=None) confirmed_survey_date: Optional[datetime] = Field(default=None) confirmed_survey_time: Optional[str] = Field(default=None) + surveyed_date: Optional[datetime] = Field(default=None) + design_type: Optional[str] = Field(default=None) created_at: datetime = Field( sa_column=Column( diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index e5461c61..d74a5ed4 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -231,6 +231,8 @@ class HubspotClient: "surveyor", "confirmed_survey_date", "confirmed_survey_time", + "surveyed_date", + "design_type", ], ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index 0c38f483..7f06a29d 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -257,6 +257,14 @@ class HubspotDataToDb: deal_in_db.confirmed_survey_time == hs_deal.get("confirmed_survey_time"), "confirmed_survey_time mismatch", ), + soft_assert( + deal_in_db.surveyed_date == self._parse_hs_date(hs_deal.get("surveyed_date")), + "surveyed_date mismatch", + ), + soft_assert( + deal_in_db.design_type == hs_deal.get("design_type"), + "design_type mismatch", + ), ] # If discrepancies found, update from HubSpot @@ -380,6 +388,8 @@ class HubspotDataToDb: "surveyor": deal_data.get("surveyor"), "confirmed_survey_date": self._parse_hs_date(deal_data.get("confirmed_survey_date")), "confirmed_survey_time": deal_data.get("confirmed_survey_time"), + "surveyed_date": self._parse_hs_date(deal_data.get("surveyed_date")), + "design_type": deal_data.get("design_type"), }.items(): setattr(existing, attr, value or getattr(existing, attr)) @@ -462,6 +472,8 @@ class HubspotDataToDb: surveyor=deal_data.get("surveyor"), confirmed_survey_date=self._parse_hs_date(deal_data.get("confirmed_survey_date")), confirmed_survey_time=deal_data.get("confirmed_survey_time"), + surveyed_date=self._parse_hs_date(deal_data.get("surveyed_date")), + design_type=deal_data.get("design_type"), ) # Handle upload at insert time diff --git a/etl/hubspot/scripts/scraper/bulk_load.py b/etl/hubspot/scripts/scraper/bulk_load.py index 6fac23ea..5dc9570e 100644 --- a/etl/hubspot/scripts/scraper/bulk_load.py +++ b/etl/hubspot/scripts/scraper/bulk_load.py @@ -1,6 +1,7 @@ from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline from etl.hubspot.scripts.scraper.main import handler from tqdm import tqdm +import json PIPELINE_ID = Pipeline.OPERATIONS_SOCIAL_HOUSING.value @@ -29,7 +30,7 @@ def bulk_load(companies: list[Companies] | None = None) -> None: continue deal_bar.set_postfix({"status": "uploading", "deal": deal_id}) - handler({"hubspot_deal_id": deal_id}, context=None) + handler({"Records": [{"body": json.dumps({"hubspot_deal_id": deal_id})}]}, context=None) processed += 1 deal_bar.set_postfix({"status": "done", "deal": deal_id}) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 459ea5c2..4525c8cb 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -21,7 +21,7 @@ def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: { "body": json.dumps( { - "hubspot_deal_id": "409487859944", + "hubspot_deal_id": "483651713260", } ) } From 955dffd74d2e5877af6bc1eca576c716f15701c4 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 1 Apr 2026 14:59:28 +0000 Subject: [PATCH 7/7] added task handelr --- backend/utils/subtasks.py | 76 +++++++++++++++++++++++++++- etl/hubspot/scripts/scraper/main.py | 21 ++------ sfr/principal_pitch/2_export_data.py | 8 +-- 3 files changed, 82 insertions(+), 23 deletions(-) diff --git a/backend/utils/subtasks.py b/backend/utils/subtasks.py index 041494e9..e5668c53 100644 --- a/backend/utils/subtasks.py +++ b/backend/utils/subtasks.py @@ -5,7 +5,8 @@ from typing import Callable, Any from uuid import UUID import json -from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from backend.app.db.functions.tasks.Tasks import SubTaskInterface, TasksInterface +from utils.logger import setup_logger def subtask_handler(): @@ -93,3 +94,76 @@ def subtask_handler(): return wrapper return decorator + + +def task_handler(): + """ + Decorator that wraps a Lambda handler and automatically: + + - Parses body from the first SQS record (or uses the event dict directly) + - Creates a fresh Task + SubTask in the database + - Marks the subtask as in progress + - Executes the handler, passing the parsed body + - Marks complete on success, failed on exception (and re-raises) + """ + + def decorator(func: Callable[..., Any]): + + task_source = f"{func.__module__}.{func.__qualname__}" + + @wraps(func) + def wrapper(event: dict[str, Any], context: Any, *args, **kwargs): + + logger = setup_logger() + + # Parse body: Records-style SQS or plain dict event + if "Records" in event: + raw_body = event["Records"][0].get("body", {}) + if isinstance(raw_body, str): + try: + body = json.loads(raw_body) + except Exception: + body = {} + else: + body = raw_body or {} + else: + body = event + + # Create fresh task + subtask + logger.info("Creating task for source: %s", task_source) + task_id, subtask_id = TasksInterface.create_task( + task_source=task_source, + inputs=body, + ) + logger.info("Created task_id=%s subtask_id=%s", task_id, subtask_id) + + interface = SubTaskInterface() + + interface.update_subtask_status( + subtask_id=subtask_id, + status="in progress", + ) + + try: + result = func(body, context, *args, **kwargs) + + interface.update_subtask_status( + subtask_id=subtask_id, + status="complete", + outputs={"result": result} if result else None, + ) + logger.info("Task %s completed successfully", task_id) + return result + + except Exception as e: + logger.exception("Task %s failed: %s", task_id, e) + interface.update_subtask_status( + subtask_id=subtask_id, + status="failed", + outputs={"error": str(e)}, + ) + raise + + return wrapper + + return decorator diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 4525c8cb..55a7a372 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -9,26 +9,13 @@ from etl.hubspot.hubspotClient import HubspotClient from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from backend.utils.subtasks import task_handler from typing import Any import json -# @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id -def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: - if local is True: - event = { - "Records": [ - { - "body": json.dumps( - { - "hubspot_deal_id": "483651713260", - } - ) - } - ] - } - - body = json.loads(event["Records"][0]["body"]) +@task_handler() +def handler(body: dict[str, Any], context: Any) -> None: hubspot_deal_id = body.get("hubspot_deal_id", "") if hubspot_deal_id == "": @@ -46,5 +33,3 @@ def handler(event: dict[str, Any], context: Any, local: bool = False) -> None: else: deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id) dbloader.upsert_deal(deal, company, listing, hubspot) - - print("Finsihed running") diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index fece17e0..3baa7a44 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -PORTFOLIO_ID = 640 -SCENARIOS = [1154] +PORTFOLIO_ID = 656 +SCENARIOS = [1177] scenario_names = { - 1154: "EPC - 10k Budget", + 1177: "EPC C; Proposed Measures", } -project_name = "First Charterhouse Investments" +project_name = "Walsall Council | WH:LG" def get_data(portfolio_id, scenario_ids):