added async limit

This commit is contained in:
Jun-te Kim 2025-11-24 17:39:25 +00:00
parent fabe4fac6b
commit 64e21adefb

View file

@ -1,64 +1,92 @@
import asyncio
import json
from tqdm import tqdm
from datetime import datetime
from dashboard.services.hubspot_client import Pipeline
from dashboard.services.hubspot_client_async import HubSpotClientAsync
from dashboard.services.file_manager import FileManager
from datetime import datetime
OUTPUT_FILE = "hubspot_deals.json"
# -------------------------------------------------------
# WORKER — pulls deals from the queue and fetches info
# -------------------------------------------------------
async def worker(id, queue, hubspot, results, pbar):
while True:
deal_id = await queue.get()
if deal_id is None: # poison pill = stop worker
queue.task_done()
break
try:
data = await hubspot.from_deal_get_info(deal_id)
results.append(data)
except Exception as e:
# You can add logging here if needed
pass
pbar.update(1)
queue.task_done()
# -------------------------------------------------------
# MAIN EXECUTION
# -------------------------------------------------------
async def main():
hubspot = HubSpotClientAsync()
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline(Pipeline.OPERATIONS_SOCIAL_HOUSING.value)
deals = await hubspot.get_deal_ids_by_pipeline(
Pipeline.OPERATIONS_SOCIAL_HOUSING.value
)
total = len(deals)
print(f"Total deals to fetch: {total}")
print(f"Total deals: {total}")
queue = asyncio.Queue()
results = []
tasks = [asyncio.create_task(hubspot.from_deal_get_info(deal_id)) for deal_id in deals]
success = 0
failed = 0
# prefill queue
for deal_id in deals:
await queue.put(deal_id)
# PROPER concurrency — same as semaphore limit
NUM_WORKERS = 5
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
for task in asyncio.as_completed(tasks):
try:
result = await task
results.append(result)
success += 1
except Exception as e:
failed += 1
finally:
pbar.set_postfix({
"ok": success,
"fail": failed,
"active": len(asyncio.all_tasks()) # shows concurrent load
})
pbar.update(1)
workers = [
asyncio.create_task(worker(i, queue, hubspot, results, pbar))
for i in range(NUM_WORKERS)
]
await queue.join()
# Stop workers
for _ in range(NUM_WORKERS):
await queue.put(None)
await asyncio.gather(*workers)
pbar.close()
# Save final results
# Save output
with open(OUTPUT_FILE, "w") as f:
json.dump(results, f, indent=2)
print(f"Done! Saved {len(results)} deals. Failed: {failed}")
return results
print(f"Done! Saved {len(results)} deals.")
if __name__ == "__main__":
asyncio.run(main())
fm = FileManager()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
s3_filename = f"hubspot_deals_{timestamp}.json"
s3_uri = fm.upload_to_s3(
fm.upload_to_s3(
OUTPUT_FILE,
bucket="retrofit-data-dev",
object_name=f"hubspot_insight/{s3_filename}"
)
)