diff --git a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py index 8dadb23..ed4c2f6 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py @@ -56,7 +56,7 @@ async def main(): await queue.put(deal_id) # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 10 + NUM_WORKERS = 5 pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index a55e4e8..0eb4144 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -6,7 +6,7 @@ from datetime import datetime class HubSpotClientAsync: - API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency + API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe def __init__(self): @@ -20,8 +20,6 @@ class HubSpotClientAsync: format="%(asctime)s [HubSpot] %(levelname)s: %(message)s" ) self.logger = logging.getLogger(__name__) - self._company_cache = {} - # ----------------------------------- # Core Safe Request Wrapper @@ -137,36 +135,13 @@ class HubSpotClientAsync: deal_info = dict(deal.properties) deal_info["deal_id"] = deal_id - company_id_task = asyncio.create_task( - self.from_deal_get_associated_company_id(deal_id) - ) - - expected_task = asyncio.create_task( - self.get_expected_commencement_history(deal_id) - ) - - line_items_task = asyncio.create_task( - self.from_deal_get_line_items(deal_id) - ) - - appointments_task = asyncio.create_task( - self.from_deal_get_appointments(deal_id) - ) - - company_id = await company_id_task - company_info_task = asyncio.create_task( - self.get_company_information(company_id) - ) - - expected_commencement_history, line_items, appointments, company_info = await asyncio.gather( - expected_task, - line_items_task, - appointments_task, - company_info_task, - ) - - + expected_commencement_history = await self.get_expected_commencement_history(deal_id) deal_info["expected_commencement_history"] = expected_commencement_history + + line_items = await self.from_deal_get_line_items(deal_id) + company_id = await self.from_deal_get_associated_company_id(deal_id) + company_info = await self.get_company_information(company_id) + appointments = await self.from_deal_get_appointments(deal_id) return { "deal_properties": deal_info, @@ -179,20 +154,17 @@ class HubSpotClientAsync: # Company Info # ----------------------------------- async def get_company_information(self, company_id): - if not company_id: - return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"} - - if company_id in self._company_cache: - return self._company_cache[company_id] + if company_id is None: + return { + "name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME" + } company = await self._run( self.client.crm.companies.basic_api.get_by_id, company_id, - properties=["name"] + properties=['name'] ) - - self._company_cache[company_id] = company.properties - return company.properties + return company.properties @@ -359,95 +331,3 @@ class HubSpotClientAsync: json_ready_history.append(safe_entry) return json_ready_history -import asyncio -import json -from tqdm import tqdm -from datetime import datetime - -from dashboard.services.hubspot_client import Pipeline -from dashboard.services.hubspot_client_async import HubSpotClientAsync -from dashboard.services.file_manager import FileManager - -OUTPUT_FILE = "hubspot_deals.json" - -# ------------------------------------------------------- -# WORKER — pulls deals from the queue and fetches info -# ------------------------------------------------------- -async def worker(id, queue, hubspot, results, pbar): - while True: - deal_id = await queue.get() - if deal_id is None: # poison pill = stop worker - queue.task_done() - break - - try: - data = await hubspot.from_deal_get_info(deal_id) - results.append(data) - except Exception as e: - # You can add logging here if needed - pass - - pbar.update(1) - queue.task_done() - - -# ------------------------------------------------------- -# MAIN EXECUTION -# ------------------------------------------------------- -async def main(): - hubspot = HubSpotClientAsync() - - # Fetch all deals in the pipeline - deals = await hubspot.get_deal_ids_by_pipeline( - [Pipeline.OPERATIONS_SOCIAL_HOUSING.value] - ) - - total = len(deals) - print(f"Total deals: {total}") - - queue = asyncio.Queue() - results = [] - - # prefill queue - for deal_id in deals: - await queue.put(deal_id) - - # PROPER concurrency — same as semaphore limit - NUM_WORKERS = 5 - - pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) - - workers = [ - asyncio.create_task(worker(i, queue, hubspot, results, pbar)) - for i in range(NUM_WORKERS) - ] - - await queue.join() - - # Stop workers - for _ in range(NUM_WORKERS): - await queue.put(None) - - await asyncio.gather(*workers) - - pbar.close() - - # Save output - with open(OUTPUT_FILE, "w") as f: - json.dump(results, f, indent=2) - - print(f"Done! Saved {len(results)} deals.") - - -if __name__ == "__main__": - asyncio.run(main()) - - fm = FileManager() - timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") - s3_filename = f"hubspot_deals_{timestamp}.json" - - fm.upload_to_s3( - OUTPUT_FILE, - bucket="retrofit-data-dev", - object_name=f"hubspot_insight/{s3_filename}" - ) diff --git a/run_quick.sh b/run_quick.sh index d344dfa..956e573 100755 --- a/run_quick.sh +++ b/run_quick.sh @@ -1 +1 @@ -cd backend && poetry run python src/dashboard/scripts/quick_one.py +cd backend && poetry run python src/dashboard/scripts/hubspot_to_s3.py \ No newline at end of file