From 8eed0f1745069ac2a85aed0a91f499e9d6c54fd2 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 17 Dec 2025 13:50:25 +0000 Subject: [PATCH] see if that improved --- .../scripts/hubspot_to_s3_sales_forecast.py | 2 +- .../services/hubspot_client_async.py | 148 ++++++++++++++++-- 2 files changed, 135 insertions(+), 15 deletions(-) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py index ed4c2f6..8dadb23 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py @@ -56,7 +56,7 @@ async def main(): await queue.put(deal_id) # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 5 + NUM_WORKERS = 10 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index 7f7f3f4..a55e4e8 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -6,7 +6,7 @@ from datetime import datetime class HubSpotClientAsync: - API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency + API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe def __init__(self): @@ -20,6 +20,8 @@ class HubSpotClientAsync: format="%(asctime)s [HubSpot] %(levelname)s: %(message)s" ) self.logger = logging.getLogger(__name__) + self._company_cache = {} + # ----------------------------------- # Core Safe Request Wrapper @@ -28,7 +30,7 @@ class HubSpotClientAsync: async with self.API_CONCURRENCY: try: result = await asyncio.to_thread(func, *args, **kwargs) - await asyncio.sleep(self.RATE_LIMIT_DELAY) + # await asyncio.sleep(self.RATE_LIMIT_DELAY) return result except Exception as e: if "429" in str(e): @@ -135,13 +137,36 @@ class HubSpotClientAsync: deal_info = dict(deal.properties) deal_info["deal_id"] = deal_id - expected_commencement_history = await self.get_expected_commencement_history(deal_id) - deal_info["expected_commencement_history"] = expected_commencement_history + company_id_task = asyncio.create_task( + self.from_deal_get_associated_company_id(deal_id) + ) - line_items = await self.from_deal_get_line_items(deal_id) - company_id = await self.from_deal_get_associated_company_id(deal_id) - company_info = await self.get_company_information(company_id) - appointments = await self.from_deal_get_appointments(deal_id) + expected_task = asyncio.create_task( + self.get_expected_commencement_history(deal_id) + ) + + line_items_task = asyncio.create_task( + self.from_deal_get_line_items(deal_id) + ) + + appointments_task = asyncio.create_task( + self.from_deal_get_appointments(deal_id) + ) + + company_id = await company_id_task + company_info_task = asyncio.create_task( + self.get_company_information(company_id) + ) + + expected_commencement_history, line_items, appointments, company_info = await asyncio.gather( + expected_task, + line_items_task, + appointments_task, + company_info_task, + ) + + + deal_info["expected_commencement_history"] = expected_commencement_history return { "deal_properties": deal_info, @@ -154,17 +179,20 @@ class HubSpotClientAsync: # Company Info # ----------------------------------- async def get_company_information(self, company_id): - if company_id is None: - return { - "name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME" - } + if not company_id: + return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"} + + if company_id in self._company_cache: + return self._company_cache[company_id] company = await self._run( self.client.crm.companies.basic_api.get_by_id, company_id, - properties=['name'] + properties=["name"] ) - return company.properties + + self._company_cache[company_id] = company.properties + return company.properties @@ -331,3 +359,95 @@ class HubSpotClientAsync: json_ready_history.append(safe_entry) return json_ready_history +import asyncio +import json +from tqdm import tqdm +from datetime import datetime + +from dashboard.services.hubspot_client import Pipeline +from dashboard.services.hubspot_client_async import HubSpotClientAsync +from dashboard.services.file_manager import FileManager + +OUTPUT_FILE = "hubspot_deals.json" + +# ------------------------------------------------------- +# WORKER — pulls deals from the queue and fetches info +# ------------------------------------------------------- +async def worker(id, queue, hubspot, results, pbar): + while True: + deal_id = await queue.get() + if deal_id is None: # poison pill = stop worker + queue.task_done() + break + + try: + data = await hubspot.from_deal_get_info(deal_id) + results.append(data) + except Exception as e: + # You can add logging here if needed + pass + + pbar.update(1) + queue.task_done() + + +# ------------------------------------------------------- +# MAIN EXECUTION +# ------------------------------------------------------- +async def main(): + hubspot = HubSpotClientAsync() + + # Fetch all deals in the pipeline + deals = await hubspot.get_deal_ids_by_pipeline( + [Pipeline.OPERATIONS_SOCIAL_HOUSING.value] + ) + + total = len(deals) + print(f"Total deals: {total}") + + queue = asyncio.Queue() + results = [] + + # prefill queue + for deal_id in deals: + await queue.put(deal_id) + + # PROPER concurrency — same as semaphore limit + NUM_WORKERS = 5 + + pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) + + workers = [ + asyncio.create_task(worker(i, queue, hubspot, results, pbar)) + for i in range(NUM_WORKERS) + ] + + await queue.join() + + # Stop workers + for _ in range(NUM_WORKERS): + await queue.put(None) + + await asyncio.gather(*workers) + + pbar.close() + + # Save output + with open(OUTPUT_FILE, "w") as f: + json.dump(results, f, indent=2) + + print(f"Done! Saved {len(results)} deals.") + + +if __name__ == "__main__": + asyncio.run(main()) + + fm = FileManager() + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + s3_filename = f"hubspot_deals_{timestamp}.json" + + fm.upload_to_s3( + OUTPUT_FILE, + bucket="retrofit-data-dev", + object_name=f"hubspot_insight/{s3_filename}" + )