mirror of
https://github.com/Hestia-Homes/insight.git
synced 2026-06-08 11:17:25 +00:00
Merge pull request #31 from Hestia-Homes/feature/make_it_live_ready
Feature/make it live ready
This commit is contained in:
commit
9e4e46fd62
3 changed files with 15 additions and 135 deletions
|
|
@ -56,7 +56,7 @@ async def main():
|
|||
await queue.put(deal_id)
|
||||
|
||||
# PROPER concurrency — same as semaphore limit
|
||||
NUM_WORKERS = 10
|
||||
NUM_WORKERS = 5
|
||||
|
||||
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from datetime import datetime
|
|||
|
||||
|
||||
class HubSpotClientAsync:
|
||||
API_CONCURRENCY = asyncio.Semaphore(10) # globally limit concurrency
|
||||
API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency
|
||||
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
|
||||
|
||||
def __init__(self):
|
||||
|
|
@ -20,8 +20,6 @@ class HubSpotClientAsync:
|
|||
format="%(asctime)s [HubSpot] %(levelname)s: %(message)s"
|
||||
)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._company_cache = {}
|
||||
|
||||
|
||||
# -----------------------------------
|
||||
# Core Safe Request Wrapper
|
||||
|
|
@ -137,36 +135,13 @@ class HubSpotClientAsync:
|
|||
deal_info = dict(deal.properties)
|
||||
deal_info["deal_id"] = deal_id
|
||||
|
||||
company_id_task = asyncio.create_task(
|
||||
self.from_deal_get_associated_company_id(deal_id)
|
||||
)
|
||||
|
||||
expected_task = asyncio.create_task(
|
||||
self.get_expected_commencement_history(deal_id)
|
||||
)
|
||||
|
||||
line_items_task = asyncio.create_task(
|
||||
self.from_deal_get_line_items(deal_id)
|
||||
)
|
||||
|
||||
appointments_task = asyncio.create_task(
|
||||
self.from_deal_get_appointments(deal_id)
|
||||
)
|
||||
|
||||
company_id = await company_id_task
|
||||
company_info_task = asyncio.create_task(
|
||||
self.get_company_information(company_id)
|
||||
)
|
||||
|
||||
expected_commencement_history, line_items, appointments, company_info = await asyncio.gather(
|
||||
expected_task,
|
||||
line_items_task,
|
||||
appointments_task,
|
||||
company_info_task,
|
||||
)
|
||||
|
||||
|
||||
expected_commencement_history = await self.get_expected_commencement_history(deal_id)
|
||||
deal_info["expected_commencement_history"] = expected_commencement_history
|
||||
|
||||
line_items = await self.from_deal_get_line_items(deal_id)
|
||||
company_id = await self.from_deal_get_associated_company_id(deal_id)
|
||||
company_info = await self.get_company_information(company_id)
|
||||
appointments = await self.from_deal_get_appointments(deal_id)
|
||||
|
||||
return {
|
||||
"deal_properties": deal_info,
|
||||
|
|
@ -179,20 +154,17 @@ class HubSpotClientAsync:
|
|||
# Company Info
|
||||
# -----------------------------------
|
||||
async def get_company_information(self, company_id):
|
||||
if not company_id:
|
||||
return {"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"}
|
||||
|
||||
if company_id in self._company_cache:
|
||||
return self._company_cache[company_id]
|
||||
if company_id is None:
|
||||
return {
|
||||
"name": "NO COMPANY ASSOCIATION IN HUBSPOT - FIX ME"
|
||||
}
|
||||
|
||||
company = await self._run(
|
||||
self.client.crm.companies.basic_api.get_by_id,
|
||||
company_id,
|
||||
properties=["name"]
|
||||
properties=['name']
|
||||
)
|
||||
|
||||
self._company_cache[company_id] = company.properties
|
||||
return company.properties
|
||||
return company.properties
|
||||
|
||||
|
||||
|
||||
|
|
@ -359,95 +331,3 @@ class HubSpotClientAsync:
|
|||
json_ready_history.append(safe_entry)
|
||||
|
||||
return json_ready_history
|
||||
import asyncio
|
||||
import json
|
||||
from tqdm import tqdm
|
||||
from datetime import datetime
|
||||
|
||||
from dashboard.services.hubspot_client import Pipeline
|
||||
from dashboard.services.hubspot_client_async import HubSpotClientAsync
|
||||
from dashboard.services.file_manager import FileManager
|
||||
|
||||
OUTPUT_FILE = "hubspot_deals.json"
|
||||
|
||||
# -------------------------------------------------------
|
||||
# WORKER — pulls deals from the queue and fetches info
|
||||
# -------------------------------------------------------
|
||||
async def worker(id, queue, hubspot, results, pbar):
|
||||
while True:
|
||||
deal_id = await queue.get()
|
||||
if deal_id is None: # poison pill = stop worker
|
||||
queue.task_done()
|
||||
break
|
||||
|
||||
try:
|
||||
data = await hubspot.from_deal_get_info(deal_id)
|
||||
results.append(data)
|
||||
except Exception as e:
|
||||
# You can add logging here if needed
|
||||
pass
|
||||
|
||||
pbar.update(1)
|
||||
queue.task_done()
|
||||
|
||||
|
||||
# -------------------------------------------------------
|
||||
# MAIN EXECUTION
|
||||
# -------------------------------------------------------
|
||||
async def main():
|
||||
hubspot = HubSpotClientAsync()
|
||||
|
||||
# Fetch all deals in the pipeline
|
||||
deals = await hubspot.get_deal_ids_by_pipeline(
|
||||
[Pipeline.OPERATIONS_SOCIAL_HOUSING.value]
|
||||
)
|
||||
|
||||
total = len(deals)
|
||||
print(f"Total deals: {total}")
|
||||
|
||||
queue = asyncio.Queue()
|
||||
results = []
|
||||
|
||||
# prefill queue
|
||||
for deal_id in deals:
|
||||
await queue.put(deal_id)
|
||||
|
||||
# PROPER concurrency — same as semaphore limit
|
||||
NUM_WORKERS = 5
|
||||
|
||||
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
|
||||
|
||||
workers = [
|
||||
asyncio.create_task(worker(i, queue, hubspot, results, pbar))
|
||||
for i in range(NUM_WORKERS)
|
||||
]
|
||||
|
||||
await queue.join()
|
||||
|
||||
# Stop workers
|
||||
for _ in range(NUM_WORKERS):
|
||||
await queue.put(None)
|
||||
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
pbar.close()
|
||||
|
||||
# Save output
|
||||
with open(OUTPUT_FILE, "w") as f:
|
||||
json.dump(results, f, indent=2)
|
||||
|
||||
print(f"Done! Saved {len(results)} deals.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
fm = FileManager()
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
s3_filename = f"hubspot_deals_{timestamp}.json"
|
||||
|
||||
fm.upload_to_s3(
|
||||
OUTPUT_FILE,
|
||||
bucket="retrofit-data-dev",
|
||||
object_name=f"hubspot_insight/{s3_filename}"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
cd backend && poetry run python src/dashboard/scripts/quick_one.py
|
||||
cd backend && poetry run python src/dashboard/scripts/hubspot_to_s3.py
|
||||
Loading…
Add table
Reference in a new issue