From 7fb9ab718e694d79a9dcb7c9d0b32182a6bfbb0b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 13:54:50 +0000 Subject: [PATCH 1/7] see if that improved --- backend/src/dashboard/scripts/hubspot_to_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3.py b/backend/src/dashboard/scripts/hubspot_to_s3.py index 5f811fc..1feebf0 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3.py @@ -52,7 +52,7 @@ async def main(): await queue.put(deal_id) # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 5 + NUM_WORKERS = 10 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) From 98ad9930c439d082ad149590d3a12633fec235b7 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 14:10:34 +0000 Subject: [PATCH 2/7] re made appointments --- .../services/hubspot_client_async.py | 187 ++++++++---------- 1 file changed, 80 insertions(+), 107 deletions(-) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index a55e4e8..f85c68b 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -8,6 +8,10 @@ from datetime import datetime class HubSpotClientAsync: API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe + BASE_BACKOFF = 0.5 # seconds + MAX_BACKOFF = 10.0 # cap sleep + MAX_RETRIES = 5 + def __init__(self): self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6" @@ -27,17 +31,56 @@ class HubSpotClientAsync: # Core Safe Request Wrapper # ----------------------------------- async def _run(self, func, *args, **kwargs): - async with self.API_CONCURRENCY: + attempt = 0 + + while True: try: - result = await asyncio.to_thread(func, *args, **kwargs) - # await asyncio.sleep(self.RATE_LIMIT_DELAY) - return result + async with self.API_CONCURRENCY: + return await asyncio.to_thread(func, *args, **kwargs) + except Exception as e: - if "429" in str(e): - self.logger.warning("Hit HubSpot rate limit → sleeping 10s") - await asyncio.sleep(10) - return await self._run(func, *args, **kwargs) - raise + attempt += 1 + error_str = str(e) + + is_rate_limit = "429" in error_str + is_server_error = "5" in error_str # defensive + + if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error): + raise + + # Exponential backoff with jitter + backoff = min( + self.MAX_BACKOFF, + self.BASE_BACKOFF * (2 ** (attempt - 1)) + ) + jitter = random.uniform(0, backoff) + + self.logger.warning( + f"HubSpot retry {attempt}/{self.MAX_RETRIES} " + f"after {jitter:.2f}s ({error_str})" + ) + + await asyncio.sleep(jitter) + + async def get_meeting_meta(self, meeting_id: str): + meetings_api = self.client.crm.objects.basic_api + + meeting = await self._run( + meetings_api.get_by_id, + "0-421", + meeting_id, + properties=[ + "hs_lastmodifieddate", + "submission_date", + ], + ) + + return { + "id": meeting_id, + "submission_date": meeting.properties.get("submission_date"), + "last_modified": meeting.properties.get("hs_lastmodifieddate"), + } + # ----------------------------------- # Deals @@ -293,7 +336,7 @@ class HubSpotClientAsync: "deals", deal_id, "0-421", - limit=100, + limit=20, ) if not response.results: @@ -322,17 +365,39 @@ class HubSpotClientAsync: return meeting.properties - async def from_deal_get_appointments(self, deal_id: str): meeting_ids = await self.from_deal_get_associated_meetings(deal_id) - tasks = [ - asyncio.create_task(self.get_meeting_info(meeting_id)) - for meeting_id in meeting_ids + if not meeting_ids: + return [] + + # 1) fetch metadata in parallel + meta_tasks = [ + asyncio.create_task(self.get_meeting_meta(mid)) + for mid in meeting_ids + ] + + meta_results = [] + for task in asyncio.as_completed(meta_tasks): + meta_results.append(await task) + + # 2) sort newest first + meta_results.sort( + key=lambda m: m["submission_date"] or m["last_modified"] or "", + reverse=True, + ) + + # 3) take ONLY latest 5 + latest_ids = [m["id"] for m in meta_results[:5]] + + # 4) fetch full meeting objects ONLY for those 5 + detail_tasks = [ + asyncio.create_task(self.get_meeting_info(mid)) + for mid in latest_ids ] results = [] - for task in asyncio.as_completed(tasks): + for task in asyncio.as_completed(detail_tasks): results.append(await task) return results @@ -359,95 +424,3 @@ class HubSpotClientAsync: json_ready_history.append(safe_entry) return json_ready_history -import asyncio -import json -from tqdm import tqdm -from datetime import datetime - -from dashboard.services.hubspot_client import Pipeline -from dashboard.services.hubspot_client_async import HubSpotClientAsync -from dashboard.services.file_manager import FileManager - -OUTPUT_FILE = "hubspot_deals.json" - -# ------------------------------------------------------- -# WORKER — pulls deals from the queue and fetches info -# ------------------------------------------------------- -async def worker(id, queue, hubspot, results, pbar): - while True: - deal_id = await queue.get() - if deal_id is None: # poison pill = stop worker - queue.task_done() - break - - try: - data = await hubspot.from_deal_get_info(deal_id) - results.append(data) - except Exception as e: - # You can add logging here if needed - pass - - pbar.update(1) - queue.task_done() - - -# ------------------------------------------------------- -# MAIN EXECUTION -# ------------------------------------------------------- -async def main(): - hubspot = HubSpotClientAsync() - - # Fetch all deals in the pipeline - deals = await hubspot.get_deal_ids_by_pipeline( - [Pipeline.OPERATIONS_SOCIAL_HOUSING.value] - ) - - total = len(deals) - print(f"Total deals: {total}") - - queue = asyncio.Queue() - results = [] - - # prefill queue - for deal_id in deals: - await queue.put(deal_id) - - # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 5 - - pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) - - workers = [ - asyncio.create_task(worker(i, queue, hubspot, results, pbar)) - for i in range(NUM_WORKERS) - ] - - await queue.join() - - # Stop workers - for _ in range(NUM_WORKERS): - await queue.put(None) - - await asyncio.gather(*workers) - - pbar.close() - - # Save output - with open(OUTPUT_FILE, "w") as f: - json.dump(results, f, indent=2) - - print(f"Done! Saved {len(results)} deals.") - - -if __name__ == "__main__": - asyncio.run(main()) - - fm = FileManager() - timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") - s3_filename = f"hubspot_deals_{timestamp}.json" - - fm.upload_to_s3( - OUTPUT_FILE, - bucket="retrofit-data-dev", - object_name=f"hubspot_insight/{s3_filename}" - ) From 05b1a018b79e5af6d61479450dc5aa710100cf9a Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 14:15:20 +0000 Subject: [PATCH 3/7] re made appointments --- backend/src/dashboard/services/hubspot_client_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index f85c68b..0d2e2fd 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -3,6 +3,7 @@ import asyncio from hubspot.crm.associations import ApiException import hubspot from datetime import datetime +import random class HubSpotClientAsync: From f996ded11c86201767981e7c6c4abf42364af418 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 14:24:19 +0000 Subject: [PATCH 4/7] re made appointments --- backend/src/dashboard/scripts/hubspot_to_s3.py | 2 +- .../dashboard/scripts/hubspot_to_s3_sales_forecast.py | 2 +- backend/src/dashboard/services/hubspot_client_async.py | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3.py b/backend/src/dashboard/scripts/hubspot_to_s3.py index 1feebf0..5f811fc 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3.py @@ -52,7 +52,7 @@ async def main(): await queue.put(deal_id) # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 10 + NUM_WORKERS = 5 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py index 8dadb23..ed4c2f6 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py @@ -56,7 +56,7 @@ async def main(): await queue.put(deal_id) # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 10 + NUM_WORKERS = 5 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index 0d2e2fd..574d050 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -7,10 +7,10 @@ import random class HubSpotClientAsync: - API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency + API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe - BASE_BACKOFF = 0.5 # seconds - MAX_BACKOFF = 10.0 # cap sleep + BASE_BACKOFF = 10.00 # secon + MIN_BACKOFF = 10.00 # cap sle MAX_RETRIES = 5 @@ -54,7 +54,7 @@ class HubSpotClientAsync: self.MAX_BACKOFF, self.BASE_BACKOFF * (2 ** (attempt - 1)) ) - jitter = random.uniform(0, backoff) + jitter = random.uniform(10, 10 + backoff) self.logger.warning( f"HubSpot retry {attempt}/{self.MAX_RETRIES} " @@ -298,7 +298,7 @@ class HubSpotClientAsync: "deals", deal_id, "line_items", - limit=100, + limit=10, ) if not response.results: From 926dd9951f4e905a8a29b966cc51b285a4a34438 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 14:26:48 +0000 Subject: [PATCH 5/7] re made appointments --- backend/src/dashboard/services/hubspot_client_async.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index 574d050..ed0de4a 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -50,11 +50,8 @@ class HubSpotClientAsync: raise # Exponential backoff with jitter - backoff = min( - self.MAX_BACKOFF, - self.BASE_BACKOFF * (2 ** (attempt - 1)) - ) - jitter = random.uniform(10, 10 + backoff) + backoff = self.BASE_BACKOFF * (2 ** (attempt - 1)) + jitter = random.uniform(self.MIN_BACKOFF, self.MIN_BACKOFF + backoff) self.logger.warning( f"HubSpot retry {attempt}/{self.MAX_RETRIES} " From 3f0faa17cbd59c9421c6a7e32d03eb3d9b5b0666 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 15:39:32 +0000 Subject: [PATCH 6/7] reverse everything and save properly --- .../services/hubspot_client_async.py | 147 ++++-------------- run_quick.sh | 2 +- 2 files changed, 29 insertions(+), 120 deletions(-) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index ed0de4a..7f7f3f4 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -3,16 +3,11 @@ import asyncio from hubspot.crm.associations import ApiException import hubspot from datetime import datetime -import random class HubSpotClientAsync: API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe - BASE_BACKOFF = 10.00 # secon - MIN_BACKOFF = 10.00 # cap sle - MAX_RETRIES = 5 - def __init__(self): self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6" @@ -25,60 +20,22 @@ class HubSpotClientAsync: format="%(asctime)s [HubSpot] %(levelname)s: %(message)s" ) self.logger = logging.getLogger(__name__) - self._company_cache = {} - # ----------------------------------- # Core Safe Request Wrapper # ----------------------------------- async def _run(self, func, *args, **kwargs): - attempt = 0 - - while True: + async with self.API_CONCURRENCY: try: - async with self.API_CONCURRENCY: - return await asyncio.to_thread(func, *args, **kwargs) - + result = await asyncio.to_thread(func, *args, **kwargs) + await asyncio.sleep(self.RATE_LIMIT_DELAY) + return result except Exception as e: - attempt += 1 - error_str = str(e) - - is_rate_limit = "429" in error_str - is_server_error = "5" in error_str # defensive - - if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error): - raise - - # Exponential backoff with jitter - backoff = self.BASE_BACKOFF * (2 ** (attempt - 1)) - jitter = random.uniform(self.MIN_BACKOFF, self.MIN_BACKOFF + backoff) - - self.logger.warning( - f"HubSpot retry {attempt}/{self.MAX_RETRIES} " - f"after {jitter:.2f}s ({error_str})" - ) - - await asyncio.sleep(jitter) - - async def get_meeting_meta(self, meeting_id: str): - meetings_api = self.client.crm.objects.basic_api - - meeting = await self._run( - meetings_api.get_by_id, - "0-421", - meeting_id, - properties=[ - "hs_lastmodifieddate", - "submission_date", - ], - ) - - return { - "id": meeting_id, - "submission_date": meeting.properties.get("submission_date"), - "last_modified": meeting.properties.get("hs_lastmodifieddate"), - } - + if "429" in str(e): + self.logger.warning("Hit HubSpot rate limit → sleeping 10s") + await asyncio.sleep(10) + return await self._run(func, *args, **kwargs) + raise # ----------------------------------- # Deals @@ -178,36 +135,13 @@ class HubSpotClientAsync: deal_info = dict(deal.properties) deal_info["deal_id"] = deal_id - company_id_task = asyncio.create_task( - self.from_deal_get_associated_company_id(deal_id) - ) - - expected_task = asyncio.create_task( - self.get_expected_commencement_history(deal_id) - ) - - line_items_task = asyncio.create_task( - self.from_deal_get_line_items(deal_id) - ) - - appointments_task = asyncio.create_task( - self.from_deal_get_appointments(deal_id) - ) - - company_id = await company_id_task - company_info_task = asyncio.create_task( - self.get_company_information(company_id) - ) - - expected_commencement_history, line_items, appointments, company_info = await asyncio.gather( - expected_task, - line_items_task, - appointments_task, - company_info_task, - ) - - + expected_commencement_history = await self.get_expected_commencement_history(deal_id) deal_info["expected_commencement_history"] = expected_commencement_history + + line_items = await self.from_deal_get_line_items(deal_id) + company_id = await self.from_deal_get_associated_company_id(deal_id) + company_info = await self.get_company_information(company_id) + appointments = await self.from_deal_get_appointments(deal_id) return { "deal_properties": deal_info, @@ -220,20 +154,17 @@ class HubSpotClientAsync: # Company Info # ----------------------------------- async def get_company_information(self, company_id): - if not company_id: - return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"} - - if company_id in self._company_cache: - return self._company_cache[company_id] + if company_id is None: + return { + "name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME" + } company = await self._run( self.client.crm.companies.basic_api.get_by_id, company_id, - properties=["name"] + properties=['name'] ) - - self._company_cache[company_id] = company.properties - return company.properties + return company.properties @@ -295,7 +226,7 @@ class HubSpotClientAsync: "deals", deal_id, "line_items", - limit=10, + limit=100, ) if not response.results: @@ -334,7 +265,7 @@ class HubSpotClientAsync: "deals", deal_id, "0-421", - limit=20, + limit=100, ) if not response.results: @@ -363,39 +294,17 @@ class HubSpotClientAsync: return meeting.properties + async def from_deal_get_appointments(self, deal_id: str): meeting_ids = await self.from_deal_get_associated_meetings(deal_id) - if not meeting_ids: - return [] - - # 1) fetch metadata in parallel - meta_tasks = [ - asyncio.create_task(self.get_meeting_meta(mid)) - for mid in meeting_ids - ] - - meta_results = [] - for task in asyncio.as_completed(meta_tasks): - meta_results.append(await task) - - # 2) sort newest first - meta_results.sort( - key=lambda m: m["submission_date"] or m["last_modified"] or "", - reverse=True, - ) - - # 3) take ONLY latest 5 - latest_ids = [m["id"] for m in meta_results[:5]] - - # 4) fetch full meeting objects ONLY for those 5 - detail_tasks = [ - asyncio.create_task(self.get_meeting_info(mid)) - for mid in latest_ids + tasks = [ + asyncio.create_task(self.get_meeting_info(meeting_id)) + for meeting_id in meeting_ids ] results = [] - for task in asyncio.as_completed(detail_tasks): + for task in asyncio.as_completed(tasks): results.append(await task) return results diff --git a/run_quick.sh b/run_quick.sh index d344dfa..956e573 100755 --- a/run_quick.sh +++ b/run_quick.sh @@ -1 +1 @@ -cd backend && poetry run python src/dashboard/scripts/quick_one.py +cd backend && poetry run python src/dashboard/scripts/hubspot_to_s3.py \ No newline at end of file From 8ee94d72adf9f6839a426df89fa786b527a3db80 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 15:41:12 +0000 Subject: [PATCH 7/7] reverse everything and save properly --- backend/src/dashboard/services/hubspot_client_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index 7f7f3f4..0eb4144 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -28,7 +28,7 @@ class HubSpotClientAsync: async with self.API_CONCURRENCY: try: result = await asyncio.to_thread(func, *args, **kwargs) - await asyncio.sleep(self.RATE_LIMIT_DELAY) + # await asyncio.sleep(self.RATE_LIMIT_DELAY) return result except Exception as e: if "429" in str(e):