mirror of
https://github.com/Hestia-Homes/insight.git
synced 2026-06-08 11:17:25 +00:00
re made appointments
This commit is contained in:
parent
7fb9ab718e
commit
98ad9930c4
1 changed files with 80 additions and 107 deletions
|
|
@ -8,6 +8,10 @@ from datetime import datetime
|
|||
class HubSpotClientAsync:
|
||||
API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency
|
||||
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
|
||||
BASE_BACKOFF = 0.5 # seconds
|
||||
MAX_BACKOFF = 10.0 # cap sleep
|
||||
MAX_RETRIES = 5
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
|
||||
|
|
@ -27,17 +31,56 @@ class HubSpotClientAsync:
|
|||
# Core Safe Request Wrapper
|
||||
# -----------------------------------
|
||||
async def _run(self, func, *args, **kwargs):
|
||||
async with self.API_CONCURRENCY:
|
||||
attempt = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = await asyncio.to_thread(func, *args, **kwargs)
|
||||
# await asyncio.sleep(self.RATE_LIMIT_DELAY)
|
||||
return result
|
||||
async with self.API_CONCURRENCY:
|
||||
return await asyncio.to_thread(func, *args, **kwargs)
|
||||
|
||||
except Exception as e:
|
||||
if "429" in str(e):
|
||||
self.logger.warning("Hit HubSpot rate limit → sleeping 10s")
|
||||
await asyncio.sleep(10)
|
||||
return await self._run(func, *args, **kwargs)
|
||||
raise
|
||||
attempt += 1
|
||||
error_str = str(e)
|
||||
|
||||
is_rate_limit = "429" in error_str
|
||||
is_server_error = "5" in error_str # defensive
|
||||
|
||||
if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error):
|
||||
raise
|
||||
|
||||
# Exponential backoff with jitter
|
||||
backoff = min(
|
||||
self.MAX_BACKOFF,
|
||||
self.BASE_BACKOFF * (2 ** (attempt - 1))
|
||||
)
|
||||
jitter = random.uniform(0, backoff)
|
||||
|
||||
self.logger.warning(
|
||||
f"HubSpot retry {attempt}/{self.MAX_RETRIES} "
|
||||
f"after {jitter:.2f}s ({error_str})"
|
||||
)
|
||||
|
||||
await asyncio.sleep(jitter)
|
||||
|
||||
async def get_meeting_meta(self, meeting_id: str):
|
||||
meetings_api = self.client.crm.objects.basic_api
|
||||
|
||||
meeting = await self._run(
|
||||
meetings_api.get_by_id,
|
||||
"0-421",
|
||||
meeting_id,
|
||||
properties=[
|
||||
"hs_lastmodifieddate",
|
||||
"submission_date",
|
||||
],
|
||||
)
|
||||
|
||||
return {
|
||||
"id": meeting_id,
|
||||
"submission_date": meeting.properties.get("submission_date"),
|
||||
"last_modified": meeting.properties.get("hs_lastmodifieddate"),
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------
|
||||
# Deals
|
||||
|
|
@ -293,7 +336,7 @@ class HubSpotClientAsync:
|
|||
"deals",
|
||||
deal_id,
|
||||
"0-421",
|
||||
limit=100,
|
||||
limit=20,
|
||||
)
|
||||
|
||||
if not response.results:
|
||||
|
|
@ -322,17 +365,39 @@ class HubSpotClientAsync:
|
|||
|
||||
return meeting.properties
|
||||
|
||||
|
||||
async def from_deal_get_appointments(self, deal_id: str):
|
||||
meeting_ids = await self.from_deal_get_associated_meetings(deal_id)
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(self.get_meeting_info(meeting_id))
|
||||
for meeting_id in meeting_ids
|
||||
if not meeting_ids:
|
||||
return []
|
||||
|
||||
# 1) fetch metadata in parallel
|
||||
meta_tasks = [
|
||||
asyncio.create_task(self.get_meeting_meta(mid))
|
||||
for mid in meeting_ids
|
||||
]
|
||||
|
||||
meta_results = []
|
||||
for task in asyncio.as_completed(meta_tasks):
|
||||
meta_results.append(await task)
|
||||
|
||||
# 2) sort newest first
|
||||
meta_results.sort(
|
||||
key=lambda m: m["submission_date"] or m["last_modified"] or "",
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
# 3) take ONLY latest 5
|
||||
latest_ids = [m["id"] for m in meta_results[:5]]
|
||||
|
||||
# 4) fetch full meeting objects ONLY for those 5
|
||||
detail_tasks = [
|
||||
asyncio.create_task(self.get_meeting_info(mid))
|
||||
for mid in latest_ids
|
||||
]
|
||||
|
||||
results = []
|
||||
for task in asyncio.as_completed(tasks):
|
||||
for task in asyncio.as_completed(detail_tasks):
|
||||
results.append(await task)
|
||||
|
||||
return results
|
||||
|
|
@ -359,95 +424,3 @@ class HubSpotClientAsync:
|
|||
json_ready_history.append(safe_entry)
|
||||
|
||||
return json_ready_history
|
||||
import asyncio
|
||||
import json
|
||||
from tqdm import tqdm
|
||||
from datetime import datetime
|
||||
|
||||
from dashboard.services.hubspot_client import Pipeline
|
||||
from dashboard.services.hubspot_client_async import HubSpotClientAsync
|
||||
from dashboard.services.file_manager import FileManager
|
||||
|
||||
OUTPUT_FILE = "hubspot_deals.json"
|
||||
|
||||
# -------------------------------------------------------
|
||||
# WORKER — pulls deals from the queue and fetches info
|
||||
# -------------------------------------------------------
|
||||
async def worker(id, queue, hubspot, results, pbar):
|
||||
while True:
|
||||
deal_id = await queue.get()
|
||||
if deal_id is None: # poison pill = stop worker
|
||||
queue.task_done()
|
||||
break
|
||||
|
||||
try:
|
||||
data = await hubspot.from_deal_get_info(deal_id)
|
||||
results.append(data)
|
||||
except Exception as e:
|
||||
# You can add logging here if needed
|
||||
pass
|
||||
|
||||
pbar.update(1)
|
||||
queue.task_done()
|
||||
|
||||
|
||||
# -------------------------------------------------------
|
||||
# MAIN EXECUTION
|
||||
# -------------------------------------------------------
|
||||
async def main():
|
||||
hubspot = HubSpotClientAsync()
|
||||
|
||||
# Fetch all deals in the pipeline
|
||||
deals = await hubspot.get_deal_ids_by_pipeline(
|
||||
[Pipeline.OPERATIONS_SOCIAL_HOUSING.value]
|
||||
)
|
||||
|
||||
total = len(deals)
|
||||
print(f"Total deals: {total}")
|
||||
|
||||
queue = asyncio.Queue()
|
||||
results = []
|
||||
|
||||
# prefill queue
|
||||
for deal_id in deals:
|
||||
await queue.put(deal_id)
|
||||
|
||||
# PROPER concurrency — same as semaphore limit
|
||||
NUM_WORKERS = 5
|
||||
|
||||
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
|
||||
|
||||
workers = [
|
||||
asyncio.create_task(worker(i, queue, hubspot, results, pbar))
|
||||
for i in range(NUM_WORKERS)
|
||||
]
|
||||
|
||||
await queue.join()
|
||||
|
||||
# Stop workers
|
||||
for _ in range(NUM_WORKERS):
|
||||
await queue.put(None)
|
||||
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
pbar.close()
|
||||
|
||||
# Save output
|
||||
with open(OUTPUT_FILE, "w") as f:
|
||||
json.dump(results, f, indent=2)
|
||||
|
||||
print(f"Done! Saved {len(results)} deals.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
fm = FileManager()
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
s3_filename = f"hubspot_deals_{timestamp}.json"
|
||||
|
||||
fm.upload_to_s3(
|
||||
OUTPUT_FILE,
|
||||
bucket="retrofit-data-dev",
|
||||
object_name=f"hubspot_insight/{s3_filename}"
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue