From 64e21adefb8726282b89af859a46bf1ebca01eb0 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 24 Nov 2025 17:39:25 +0000 Subject: [PATCH] added async limit --- .../src/dashboard/scripts/hubspot_to_s3.py | 78 +++++++++++++------ 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3.py b/backend/src/dashboard/scripts/hubspot_to_s3.py index 7df4e3d..d2a2e86 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3.py @@ -1,64 +1,92 @@ import asyncio import json from tqdm import tqdm +from datetime import datetime + from dashboard.services.hubspot_client import Pipeline from dashboard.services.hubspot_client_async import HubSpotClientAsync from dashboard.services.file_manager import FileManager -from datetime import datetime OUTPUT_FILE = "hubspot_deals.json" +# ------------------------------------------------------- +# WORKER — pulls deals from the queue and fetches info +# ------------------------------------------------------- +async def worker(id, queue, hubspot, results, pbar): + while True: + deal_id = await queue.get() + if deal_id is None: # poison pill = stop worker + queue.task_done() + break + try: + data = await hubspot.from_deal_get_info(deal_id) + results.append(data) + except Exception as e: + # You can add logging here if needed + pass + + pbar.update(1) + queue.task_done() + + +# ------------------------------------------------------- +# MAIN EXECUTION +# ------------------------------------------------------- async def main(): hubspot = HubSpotClientAsync() # Fetch all deals in the pipeline - deals = await hubspot.get_deal_ids_by_pipeline(Pipeline.OPERATIONS_SOCIAL_HOUSING.value) + deals = await hubspot.get_deal_ids_by_pipeline( + Pipeline.OPERATIONS_SOCIAL_HOUSING.value + ) total = len(deals) - print(f"Total deals to fetch: {total}") + print(f"Total deals: {total}") + queue = asyncio.Queue() results = [] - tasks = [asyncio.create_task(hubspot.from_deal_get_info(deal_id)) for deal_id in deals] - success = 0 - failed = 0 + # prefill queue + for deal_id in deals: + await queue.put(deal_id) + + # PROPER concurrency — same as semaphore limit + NUM_WORKERS = 5 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) - for task in asyncio.as_completed(tasks): - try: - result = await task - results.append(result) - success += 1 - except Exception as e: - failed += 1 - finally: - pbar.set_postfix({ - "ok": success, - "fail": failed, - "active": len(asyncio.all_tasks()) # shows concurrent load - }) - pbar.update(1) + workers = [ + asyncio.create_task(worker(i, queue, hubspot, results, pbar)) + for i in range(NUM_WORKERS) + ] + + await queue.join() + + # Stop workers + for _ in range(NUM_WORKERS): + await queue.put(None) + + await asyncio.gather(*workers) pbar.close() - # Save final results + # Save output with open(OUTPUT_FILE, "w") as f: json.dump(results, f, indent=2) - print(f"Done! Saved {len(results)} deals. Failed: {failed}") - return results + print(f"Done! Saved {len(results)} deals.") if __name__ == "__main__": asyncio.run(main()) + fm = FileManager() timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") s3_filename = f"hubspot_deals_{timestamp}.json" - s3_uri = fm.upload_to_s3( + fm.upload_to_s3( OUTPUT_FILE, bucket="retrofit-data-dev", object_name=f"hubspot_insight/{s3_filename}" - ) \ No newline at end of file + )