This commit is contained in:
Jun-te Kim 2025-12-10 17:29:26 +00:00
parent 26d5357b08
commit d5d9bad73a
5 changed files with 133 additions and 6 deletions

View file

@ -38,7 +38,7 @@ async def main():
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline(
Pipeline.OPERATIONS_SOCIAL_HOUSING.value
[Pipeline.OPERATIONS_SOCIAL_HOUSING.value]
)
total = len(deals)

View file

@ -0,0 +1,95 @@
import asyncio
import json
from tqdm import tqdm
from datetime import datetime
from dashboard.services.hubspot_client import Pipeline
from dashboard.services.hubspot_client_async import HubSpotClientAsync
from dashboard.services.file_manager import FileManager
OUTPUT_FILE = "hubspot_deals.json"
# -------------------------------------------------------
# WORKER — pulls deals from the queue and fetches info
# -------------------------------------------------------
async def worker(id, queue, hubspot, results, pbar):
while True:
deal_id = await queue.get()
if deal_id is None: # poison pill = stop worker
queue.task_done()
break
try:
data = await hubspot.from_deal_get_info(deal_id)
results.append(data)
except Exception as e:
# You can add logging here if needed
pass
pbar.update(1)
queue.task_done()
# -------------------------------------------------------
# MAIN EXECUTION
# -------------------------------------------------------
async def main():
hubspot = HubSpotClientAsync()
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline([
"2761590974",
"2774202608",
"2337194212",
"2870263028",
])
total = len(deals)
print(f"Total deals: {total}")
queue = asyncio.Queue()
results = []
# prefill queue
for deal_id in deals:
await queue.put(deal_id)
# PROPER concurrency — same as semaphore limit
NUM_WORKERS = 5
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
workers = [
asyncio.create_task(worker(i, queue, hubspot, results, pbar))
for i in range(NUM_WORKERS)
]
await queue.join()
# Stop workers
for _ in range(NUM_WORKERS):
await queue.put(None)
await asyncio.gather(*workers)
pbar.close()
# Save output
with open(OUTPUT_FILE, "w") as f:
json.dump(results, f, indent=2)
print(f"Done! Saved {len(results)} deals.")
if __name__ == "__main__":
asyncio.run(main())
fm = FileManager()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
s3_filename = f"hubspot_deals_{timestamp}.json"
fm.upload_to_s3(
OUTPUT_FILE,
bucket="retrofit-data-dev",
object_name=f"hubspot_insight/sales_forecast/{s3_filename}"
)

View file

@ -59,14 +59,46 @@ class FileManager:
return sorted(files, key=lambda x: x[0], reverse=True)[0][1]
def download_and_read_latest(self, bucket: str="retrofit-data-dev", prefix: str = "hubspot_insight/"):
def download_and_read_latest(
self,
bucket: str = "retrofit-data-dev",
prefix: str = "hubspot_insight/"
):
os.makedirs(self.download_dir, exist_ok=True)
latest_key = self.get_latest_s3_file(bucket, prefix)
local_path = os.path.join(self.download_dir, latest_key.split("/")[-1])
# ------------------------------------------------------
# Find latest file only at the top level in prefix
# ------------------------------------------------------
response = self.s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
raise FileNotFoundError(f"No files found in {bucket}/{prefix}")
# Filter ONLY files directly under hubspot_insight/
top_level_files = [
obj for obj in response["Contents"]
if obj["Key"].count("/") == prefix.count("/")
]
if not top_level_files:
raise FileNotFoundError(
f"No top-level files found in {bucket}/{prefix} (only subfolders exist)."
)
latest = max(top_level_files, key=lambda x: x["LastModified"])
latest_key = latest["Key"]
# ------------------------------------------------------
# Download
# ------------------------------------------------------
filename = latest_key.split("/")[-1]
local_path = os.path.join(self.download_dir, filename)
self.s3.download_file(bucket, latest_key, local_path)
# ------------------------------------------------------
# Read JSON
# ------------------------------------------------------
with open(local_path, "r") as f:
data = json.load(f)

View file

@ -51,7 +51,7 @@ class HubSpotClientAsync:
return [
deal.id
for deal in self.all_deals
if deal.properties.get("pipeline") == str(pipeline_id)
if deal.properties.get("pipeline") in pipeline_id
]
# -----------------------------------

View file

@ -1 +1 @@
cd backend && poetry run python src/dashboard/main.py
cd backend && poetry run python src/dashboard/app.py