reverse everything and save properly

This commit is contained in:
Jun-te Kim 2025-12-17 15:39:32 +00:00
parent 926dd9951f
commit 3f0faa17cb
2 changed files with 29 additions and 120 deletions

View file

@ -3,16 +3,11 @@ import asyncio
from hubspot.crm.associations import ApiException
import hubspot
from datetime import datetime
import random
class HubSpotClientAsync:
API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
BASE_BACKOFF = 10.00 # secon
MIN_BACKOFF = 10.00 # cap sle
MAX_RETRIES = 5
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
@ -25,60 +20,22 @@ class HubSpotClientAsync:
format="%(asctime)s [HubSpot] %(levelname)s: %(message)s"
)
self.logger = logging.getLogger(__name__)
self._company_cache = {}
# -----------------------------------
# Core Safe Request Wrapper
# -----------------------------------
async def _run(self, func, *args, **kwargs):
attempt = 0
while True:
async with self.API_CONCURRENCY:
try:
async with self.API_CONCURRENCY:
return await asyncio.to_thread(func, *args, **kwargs)
result = await asyncio.to_thread(func, *args, **kwargs)
await asyncio.sleep(self.RATE_LIMIT_DELAY)
return result
except Exception as e:
attempt += 1
error_str = str(e)
is_rate_limit = "429" in error_str
is_server_error = "5" in error_str # defensive
if attempt > self.MAX_RETRIES or not (is_rate_limit or is_server_error):
raise
# Exponential backoff with jitter
backoff = self.BASE_BACKOFF * (2 ** (attempt - 1))
jitter = random.uniform(self.MIN_BACKOFF, self.MIN_BACKOFF + backoff)
self.logger.warning(
f"HubSpot retry {attempt}/{self.MAX_RETRIES} "
f"after {jitter:.2f}s ({error_str})"
)
await asyncio.sleep(jitter)
async def get_meeting_meta(self, meeting_id: str):
meetings_api = self.client.crm.objects.basic_api
meeting = await self._run(
meetings_api.get_by_id,
"0-421",
meeting_id,
properties=[
"hs_lastmodifieddate",
"submission_date",
],
)
return {
"id": meeting_id,
"submission_date": meeting.properties.get("submission_date"),
"last_modified": meeting.properties.get("hs_lastmodifieddate"),
}
if "429" in str(e):
self.logger.warning("Hit HubSpot rate limit → sleeping 10s")
await asyncio.sleep(10)
return await self._run(func, *args, **kwargs)
raise
# -----------------------------------
# Deals
@ -178,36 +135,13 @@ class HubSpotClientAsync:
deal_info = dict(deal.properties)
deal_info["deal_id"] = deal_id
company_id_task = asyncio.create_task(
self.from_deal_get_associated_company_id(deal_id)
)
expected_task = asyncio.create_task(
self.get_expected_commencement_history(deal_id)
)
line_items_task = asyncio.create_task(
self.from_deal_get_line_items(deal_id)
)
appointments_task = asyncio.create_task(
self.from_deal_get_appointments(deal_id)
)
company_id = await company_id_task
company_info_task = asyncio.create_task(
self.get_company_information(company_id)
)
expected_commencement_history, line_items, appointments, company_info = await asyncio.gather(
expected_task,
line_items_task,
appointments_task,
company_info_task,
)
expected_commencement_history = await self.get_expected_commencement_history(deal_id)
deal_info["expected_commencement_history"] = expected_commencement_history
line_items = await self.from_deal_get_line_items(deal_id)
company_id = await self.from_deal_get_associated_company_id(deal_id)
company_info = await self.get_company_information(company_id)
appointments = await self.from_deal_get_appointments(deal_id)
return {
"deal_properties": deal_info,
@ -220,20 +154,17 @@ class HubSpotClientAsync:
# Company Info
# -----------------------------------
async def get_company_information(self, company_id):
if not company_id:
return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"}
if company_id in self._company_cache:
return self._company_cache[company_id]
if company_id is None:
return {
"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"
}
company = await self._run(
self.client.crm.companies.basic_api.get_by_id,
company_id,
properties=["name"]
properties=['name']
)
self._company_cache[company_id] = company.properties
return company.properties
return company.properties
@ -295,7 +226,7 @@ class HubSpotClientAsync:
"deals",
deal_id,
"line_items",
limit=10,
limit=100,
)
if not response.results:
@ -334,7 +265,7 @@ class HubSpotClientAsync:
"deals",
deal_id,
"0-421",
limit=20,
limit=100,
)
if not response.results:
@ -363,39 +294,17 @@ class HubSpotClientAsync:
return meeting.properties
async def from_deal_get_appointments(self, deal_id: str):
meeting_ids = await self.from_deal_get_associated_meetings(deal_id)
if not meeting_ids:
return []
# 1) fetch metadata in parallel
meta_tasks = [
asyncio.create_task(self.get_meeting_meta(mid))
for mid in meeting_ids
]
meta_results = []
for task in asyncio.as_completed(meta_tasks):
meta_results.append(await task)
# 2) sort newest first
meta_results.sort(
key=lambda m: m["submission_date"] or m["last_modified"] or "",
reverse=True,
)
# 3) take ONLY latest 5
latest_ids = [m["id"] for m in meta_results[:5]]
# 4) fetch full meeting objects ONLY for those 5
detail_tasks = [
asyncio.create_task(self.get_meeting_info(mid))
for mid in latest_ids
tasks = [
asyncio.create_task(self.get_meeting_info(meeting_id))
for meeting_id in meeting_ids
]
results = []
for task in asyncio.as_completed(detail_tasks):
for task in asyncio.as_completed(tasks):
results.append(await task)
return results

View file

@ -1 +1 @@
cd backend && poetry run python src/dashboard/scripts/quick_one.py
cd backend && poetry run python src/dashboard/scripts/hubspot_to_s3.py