From 3f0faa17cbd59c9421c6a7e32d03eb3d9b5b0666 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 15:39:32 +0000 Subject: [PATCH] reverse everything and save properly --- .../services/hubspot_client_async.py | 147 ++++-------------- run_quick.sh | 2 +- 2 files changed, 29 insertions(+), 120 deletions(-) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index ed0de4a..7f7f3f4 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -3,16 +3,11 @@ import asyncio from hubspot.crm.associations import ApiException import hubspot from datetime import datetime -import random class HubSpotClientAsync: API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe - BASE_BACKOFF = 10.00 # secon - MIN_BACKOFF = 10.00 # cap sle - MAX_RETRIES = 5 - def __init__(self): self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6" @@ -25,60 +20,22 @@ class HubSpotClientAsync: format="%(asctime)s [HubSpot] %(levelname)s: %(message)s" ) self.logger = logging.getLogger(__name__) - self._company_cache = {} - # ----------------------------------- # Core Safe Request Wrapper # ----------------------------------- async def _run(self, func, *args, **kwargs): - attempt = 0 - - while True: + async with self.API_CONCURRENCY: try: - async with self.API_CONCURRENCY: - return await asyncio.to_thread(func, *args, **kwargs) - + result = await asyncio.to_thread(func, *args, **kwargs) + await asyncio.sleep(self.RATE_LIMIT_DELAY) + return result except Exception as e: - attempt += 1 - error_str = str(e) - - is_rate_limit = "429" in error_str - is_server_error = "5" in error_str # defensive - - if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error): - raise - - # Exponential backoff with jitter - backoff = self.BASE_BACKOFF * (2 ** (attempt - 1)) - jitter = random.uniform(self.MIN_BACKOFF, self.MIN_BACKOFF + backoff) - - self.logger.warning( - f"HubSpot retry {attempt}/{self.MAX_RETRIES} " - f"after {jitter:.2f}s ({error_str})" - ) - - await asyncio.sleep(jitter) - - async def get_meeting_meta(self, meeting_id: str): - meetings_api = self.client.crm.objects.basic_api - - meeting = await self._run( - meetings_api.get_by_id, - "0-421", - meeting_id, - properties=[ - "hs_lastmodifieddate", - "submission_date", - ], - ) - - return { - "id": meeting_id, - "submission_date": meeting.properties.get("submission_date"), - "last_modified": meeting.properties.get("hs_lastmodifieddate"), - } - + if "429" in str(e): + self.logger.warning("Hit HubSpot rate limit → sleeping 10s") + await asyncio.sleep(10) + return await self._run(func, *args, **kwargs) + raise # ----------------------------------- # Deals @@ -178,36 +135,13 @@ class HubSpotClientAsync: deal_info = dict(deal.properties) deal_info["deal_id"] = deal_id - company_id_task = asyncio.create_task( - self.from_deal_get_associated_company_id(deal_id) - ) - - expected_task = asyncio.create_task( - self.get_expected_commencement_history(deal_id) - ) - - line_items_task = asyncio.create_task( - self.from_deal_get_line_items(deal_id) - ) - - appointments_task = asyncio.create_task( - self.from_deal_get_appointments(deal_id) - ) - - company_id = await company_id_task - company_info_task = asyncio.create_task( - self.get_company_information(company_id) - ) - - expected_commencement_history, line_items, appointments, company_info = await asyncio.gather( - expected_task, - line_items_task, - appointments_task, - company_info_task, - ) - - + expected_commencement_history = await self.get_expected_commencement_history(deal_id) deal_info["expected_commencement_history"] = expected_commencement_history + + line_items = await self.from_deal_get_line_items(deal_id) + company_id = await self.from_deal_get_associated_company_id(deal_id) + company_info = await self.get_company_information(company_id) + appointments = await self.from_deal_get_appointments(deal_id) return { "deal_properties": deal_info, @@ -220,20 +154,17 @@ class HubSpotClientAsync: # Company Info # ----------------------------------- async def get_company_information(self, company_id): - if not company_id: - return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"} - - if company_id in self._company_cache: - return self._company_cache[company_id] + if company_id is None: + return { + "name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME" + } company = await self._run( self.client.crm.companies.basic_api.get_by_id, company_id, - properties=["name"] + properties=['name'] ) - - self._company_cache[company_id] = company.properties - return company.properties + return company.properties @@ -295,7 +226,7 @@ class HubSpotClientAsync: "deals", deal_id, "line_items", - limit=10, + limit=100, ) if not response.results: @@ -334,7 +265,7 @@ class HubSpotClientAsync: "deals", deal_id, "0-421", - limit=20, + limit=100, ) if not response.results: @@ -363,39 +294,17 @@ class HubSpotClientAsync: return meeting.properties + async def from_deal_get_appointments(self, deal_id: str): meeting_ids = await self.from_deal_get_associated_meetings(deal_id) - if not meeting_ids: - return [] - - # 1) fetch metadata in parallel - meta_tasks = [ - asyncio.create_task(self.get_meeting_meta(mid)) - for mid in meeting_ids - ] - - meta_results = [] - for task in asyncio.as_completed(meta_tasks): - meta_results.append(await task) - - # 2) sort newest first - meta_results.sort( - key=lambda m: m["submission_date"] or m["last_modified"] or "", - reverse=True, - ) - - # 3) take ONLY latest 5 - latest_ids = [m["id"] for m in meta_results[:5]] - - # 4) fetch full meeting objects ONLY for those 5 - detail_tasks = [ - asyncio.create_task(self.get_meeting_info(mid)) - for mid in latest_ids + tasks = [ + asyncio.create_task(self.get_meeting_info(meeting_id)) + for meeting_id in meeting_ids ] results = [] - for task in asyncio.as_completed(detail_tasks): + for task in asyncio.as_completed(tasks): results.append(await task) return results diff --git a/run_quick.sh b/run_quick.sh index d344dfa..956e573 100755 --- a/run_quick.sh +++ b/run_quick.sh @@ -1 +1 @@ -cd backend && poetry run python src/dashboard/scripts/quick_one.py +cd backend && poetry run python src/dashboard/scripts/hubspot_to_s3.py \ No newline at end of file