diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index a55e4e8..f85c68b 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -8,6 +8,10 @@ from datetime import datetime class HubSpotClientAsync: API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe + BASE_BACKOFF = 0.5 # seconds + MAX_BACKOFF = 10.0 # cap sleep + MAX_RETRIES = 5 + def __init__(self): self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6" @@ -27,17 +31,56 @@ class HubSpotClientAsync: # Core Safe Request Wrapper # ----------------------------------- async def _run(self, func, *args, **kwargs): - async with self.API_CONCURRENCY: + attempt = 0 + + while True: try: - result = await asyncio.to_thread(func, *args, **kwargs) - # await asyncio.sleep(self.RATE_LIMIT_DELAY) - return result + async with self.API_CONCURRENCY: + return await asyncio.to_thread(func, *args, **kwargs) + except Exception as e: - if "429" in str(e): - self.logger.warning("Hit HubSpot rate limit → sleeping 10s") - await asyncio.sleep(10) - return await self._run(func, *args, **kwargs) - raise + attempt += 1 + error_str = str(e) + + is_rate_limit = "429" in error_str + is_server_error = "5" in error_str # defensive + + if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error): + raise + + # Exponential backoff with jitter + backoff = min( + self.MAX_BACKOFF, + self.BASE_BACKOFF * (2 ** (attempt - 1)) + ) + jitter = random.uniform(0, backoff) + + self.logger.warning( + f"HubSpot retry {attempt}/{self.MAX_RETRIES} " + f"after {jitter:.2f}s ({error_str})" + ) + + await asyncio.sleep(jitter) + + async def get_meeting_meta(self, meeting_id: str): + meetings_api = self.client.crm.objects.basic_api + + meeting = await self._run( + meetings_api.get_by_id, + "0-421", + meeting_id, + properties=[ + "hs_lastmodifieddate", + "submission_date", + ], + ) + + return { + "id": meeting_id, + "submission_date": meeting.properties.get("submission_date"), + "last_modified": meeting.properties.get("hs_lastmodifieddate"), + } + # ----------------------------------- # Deals @@ -293,7 +336,7 @@ class HubSpotClientAsync: "deals", deal_id, "0-421", - limit=100, + limit=20, ) if not response.results: @@ -322,17 +365,39 @@ class HubSpotClientAsync: return meeting.properties - async def from_deal_get_appointments(self, deal_id: str): meeting_ids = await self.from_deal_get_associated_meetings(deal_id) - tasks = [ - asyncio.create_task(self.get_meeting_info(meeting_id)) - for meeting_id in meeting_ids + if not meeting_ids: + return [] + + # 1) fetch metadata in parallel + meta_tasks = [ + asyncio.create_task(self.get_meeting_meta(mid)) + for mid in meeting_ids + ] + + meta_results = [] + for task in asyncio.as_completed(meta_tasks): + meta_results.append(await task) + + # 2) sort newest first + meta_results.sort( + key=lambda m: m["submission_date"] or m["last_modified"] or "", + reverse=True, + ) + + # 3) take ONLY latest 5 + latest_ids = [m["id"] for m in meta_results[:5]] + + # 4) fetch full meeting objects ONLY for those 5 + detail_tasks = [ + asyncio.create_task(self.get_meeting_info(mid)) + for mid in latest_ids ] results = [] - for task in asyncio.as_completed(tasks): + for task in asyncio.as_completed(detail_tasks): results.append(await task) return results @@ -359,95 +424,3 @@ class HubSpotClientAsync: json_ready_history.append(safe_entry) return json_ready_history -import asyncio -import json -from tqdm import tqdm -from datetime import datetime - -from dashboard.services.hubspot_client import Pipeline -from dashboard.services.hubspot_client_async import HubSpotClientAsync -from dashboard.services.file_manager import FileManager - -OUTPUT_FILE = "hubspot_deals.json" - -# ------------------------------------------------------- -# WORKER — pulls deals from the queue and fetches info -# ------------------------------------------------------- -async def worker(id, queue, hubspot, results, pbar): - while True: - deal_id = await queue.get() - if deal_id is None: # poison pill = stop worker - queue.task_done() - break - - try: - data = await hubspot.from_deal_get_info(deal_id) - results.append(data) - except Exception as e: - # You can add logging here if needed - pass - - pbar.update(1) - queue.task_done() - - -# ------------------------------------------------------- -# MAIN EXECUTION -# ------------------------------------------------------- -async def main(): - hubspot = HubSpotClientAsync() - - # Fetch all deals in the pipeline - deals = await hubspot.get_deal_ids_by_pipeline( - [Pipeline.OPERATIONS_SOCIAL_HOUSING.value] - ) - - total = len(deals) - print(f"Total deals: {total}") - - queue = asyncio.Queue() - results = [] - - # prefill queue - for deal_id in deals: - await queue.put(deal_id) - - # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 5 - - pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) - - workers = [ - asyncio.create_task(worker(i, queue, hubspot, results, pbar)) - for i in range(NUM_WORKERS) - ] - - await queue.join() - - # Stop workers - for _ in range(NUM_WORKERS): - await queue.put(None) - - await asyncio.gather(*workers) - - pbar.close() - - # Save output - with open(OUTPUT_FILE, "w") as f: - json.dump(results, f, indent=2) - - print(f"Done! Saved {len(results)} deals.") - - -if __name__ == "__main__": - asyncio.run(main()) - - fm = FileManager() - timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") - s3_filename = f"hubspot_deals_{timestamp}.json" - - fm.upload_to_s3( - OUTPUT_FILE, - bucket="retrofit-data-dev", - object_name=f"hubspot_insight/{s3_filename}" - )