see if that improved

This commit is contained in:
Jun-te Kim 2025-12-17 13:50:25 +00:00
parent d713ec30a3
commit 8eed0f1745
2 changed files with 135 additions and 15 deletions

View file

@ -56,7 +56,7 @@ async def main():
await queue.put(deal_id)
# PROPER concurrency — same as semaphore limit
NUM_WORKERS = 5
NUM_WORKERS = 10
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)

View file

@ -6,7 +6,7 @@ from datetime import datetime
class HubSpotClientAsync:
API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency
API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
def __init__(self):
@ -20,6 +20,8 @@ class HubSpotClientAsync:
format="%(asctime)s [HubSpot] %(levelname)s: %(message)s"
)
self.logger = logging.getLogger(__name__)
self._company_cache = {}
# -----------------------------------
# Core Safe Request Wrapper
@ -28,7 +30,7 @@ class HubSpotClientAsync:
async with self.API_CONCURRENCY:
try:
result = await asyncio.to_thread(func, *args, **kwargs)
await asyncio.sleep(self.RATE_LIMIT_DELAY)
# await asyncio.sleep(self.RATE_LIMIT_DELAY)
return result
except Exception as e:
if "429" in str(e):
@ -135,13 +137,36 @@ class HubSpotClientAsync:
deal_info = dict(deal.properties)
deal_info["deal_id"] = deal_id
expected_commencement_history = await self.get_expected_commencement_history(deal_id)
deal_info["expected_commencement_history"] = expected_commencement_history
company_id_task = asyncio.create_task(
self.from_deal_get_associated_company_id(deal_id)
)
line_items = await self.from_deal_get_line_items(deal_id)
company_id = await self.from_deal_get_associated_company_id(deal_id)
company_info = await self.get_company_information(company_id)
appointments = await self.from_deal_get_appointments(deal_id)
expected_task = asyncio.create_task(
self.get_expected_commencement_history(deal_id)
)
line_items_task = asyncio.create_task(
self.from_deal_get_line_items(deal_id)
)
appointments_task = asyncio.create_task(
self.from_deal_get_appointments(deal_id)
)
company_id = await company_id_task
company_info_task = asyncio.create_task(
self.get_company_information(company_id)
)
expected_commencement_history, line_items, appointments, company_info = await asyncio.gather(
expected_task,
line_items_task,
appointments_task,
company_info_task,
)
deal_info["expected_commencement_history"] = expected_commencement_history
return {
"deal_properties": deal_info,
@ -154,17 +179,20 @@ class HubSpotClientAsync:
# Company Info
# -----------------------------------
async def get_company_information(self, company_id):
if company_id is None:
return {
"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"
}
if not company_id:
return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"}
if company_id in self._company_cache:
return self._company_cache[company_id]
company = await self._run(
self.client.crm.companies.basic_api.get_by_id,
company_id,
properties=['name']
properties=["name"]
)
return company.properties
self._company_cache[company_id] = company.properties
return company.properties
@ -331,3 +359,95 @@ class HubSpotClientAsync:
json_ready_history.append(safe_entry)
return json_ready_history
import asyncio
import json
from tqdm import tqdm
from datetime import datetime
from dashboard.services.hubspot_client import Pipeline
from dashboard.services.hubspot_client_async import HubSpotClientAsync
from dashboard.services.file_manager import FileManager
OUTPUT_FILE = "hubspot_deals.json"
# -------------------------------------------------------
# WORKER — pulls deals from the queue and fetches info
# -------------------------------------------------------
async def worker(id, queue, hubspot, results, pbar):
while True:
deal_id = await queue.get()
if deal_id is None: # poison pill = stop worker
queue.task_done()
break
try:
data = await hubspot.from_deal_get_info(deal_id)
results.append(data)
except Exception as e:
# You can add logging here if needed
pass
pbar.update(1)
queue.task_done()
# -------------------------------------------------------
# MAIN EXECUTION
# -------------------------------------------------------
async def main():
hubspot = HubSpotClientAsync()
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline(
[Pipeline.OPERATIONS_SOCIAL_HOUSING.value]
)
total = len(deals)
print(f"Total deals: {total}")
queue = asyncio.Queue()
results = []
# prefill queue
for deal_id in deals:
await queue.put(deal_id)
# PROPER concurrency — same as semaphore limit
NUM_WORKERS = 5
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
workers = [
asyncio.create_task(worker(i, queue, hubspot, results, pbar))
for i in range(NUM_WORKERS)
]
await queue.join()
# Stop workers
for _ in range(NUM_WORKERS):
await queue.put(None)
await asyncio.gather(*workers)
pbar.close()
# Save output
with open(OUTPUT_FILE, "w") as f:
json.dump(results, f, indent=2)
print(f"Done! Saved {len(results)} deals.")
if __name__ == "__main__":
asyncio.run(main())
fm = FileManager()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
s3_filename = f"hubspot_deals_{timestamp}.json"
fm.upload_to_s3(
OUTPUT_FILE,
bucket="retrofit-data-dev",
object_name=f"hubspot_insight/{s3_filename}"
)