From d5d9bad73a776eda8608fa85494a027614791070 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 10 Dec 2025 17:29:26 +0000 Subject: [PATCH] added --- .../src/dashboard/scripts/hubspot_to_s3.py | 2 +- .../scripts/hubspot_to_s3_sales_forecast.py | 95 +++++++++++++++++++ .../src/dashboard/services/file_manager.py | 38 +++++++- .../services/hubspot_client_async.py | 2 +- run_backend.sh | 2 +- 5 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py diff --git a/backend/src/dashboard/scripts/hubspot_to_s3.py b/backend/src/dashboard/scripts/hubspot_to_s3.py index d2a2e86..5f811fc 100644 --- a/backend/src/dashboard/scripts/hubspot_to_s3.py +++ b/backend/src/dashboard/scripts/hubspot_to_s3.py @@ -38,7 +38,7 @@ async def main(): # Fetch all deals in the pipeline deals = await hubspot.get_deal_ids_by_pipeline( - Pipeline.OPERATIONS_SOCIAL_HOUSING.value + [Pipeline.OPERATIONS_SOCIAL_HOUSING.value] ) total = len(deals) diff --git a/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py new file mode 100644 index 0000000..162b439 --- /dev/null +++ b/backend/src/dashboard/scripts/hubspot_to_s3_sales_forecast.py @@ -0,0 +1,95 @@ +import asyncio +import json +from tqdm import tqdm +from datetime import datetime + +from dashboard.services.hubspot_client import Pipeline +from dashboard.services.hubspot_client_async import HubSpotClientAsync +from dashboard.services.file_manager import FileManager + +OUTPUT_FILE = "hubspot_deals.json" + +# ------------------------------------------------------- +# WORKER — pulls deals from the queue and fetches info +# ------------------------------------------------------- +async def worker(id, queue, hubspot, results, pbar): + while True: + deal_id = await queue.get() + if deal_id is None: # poison pill = stop worker + queue.task_done() + break + + try: + data = await hubspot.from_deal_get_info(deal_id) + results.append(data) + except Exception as e: + # You can add logging here if needed + pass + + pbar.update(1) + queue.task_done() + + +# ------------------------------------------------------- +# MAIN EXECUTION +# ------------------------------------------------------- +async def main(): + hubspot = HubSpotClientAsync() + + # Fetch all deals in the pipeline + deals = await hubspot.get_deal_ids_by_pipeline([ + "2761590974", + "2774202608", + "2337194212", + "2870263028", + ]) + + total = len(deals) + print(f"Total deals: {total}") + + queue = asyncio.Queue() + results = [] + + # prefill queue + for deal_id in deals: + await queue.put(deal_id) + + # PROPER concurrency — same as semaphore limit + NUM_WORKERS = 5 + + pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True) + + workers = [ + asyncio.create_task(worker(i, queue, hubspot, results, pbar)) + for i in range(NUM_WORKERS) + ] + + await queue.join() + + # Stop workers + for _ in range(NUM_WORKERS): + await queue.put(None) + + await asyncio.gather(*workers) + + pbar.close() + + # Save output + with open(OUTPUT_FILE, "w") as f: + json.dump(results, f, indent=2) + + print(f"Done! Saved {len(results)} deals.") + + +if __name__ == "__main__": + asyncio.run(main()) + + fm = FileManager() + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + s3_filename = f"hubspot_deals_{timestamp}.json" + + fm.upload_to_s3( + OUTPUT_FILE, + bucket="retrofit-data-dev", + object_name=f"hubspot_insight/sales_forecast/{s3_filename}" + ) diff --git a/backend/src/dashboard/services/file_manager.py b/backend/src/dashboard/services/file_manager.py index 34884ec..8fb878a 100644 --- a/backend/src/dashboard/services/file_manager.py +++ b/backend/src/dashboard/services/file_manager.py @@ -59,14 +59,46 @@ class FileManager: return sorted(files, key=lambda x: x[0], reverse=True)[0][1] - def download_and_read_latest(self, bucket: str="retrofit-data-dev", prefix: str = "hubspot_insight/"): + def download_and_read_latest( + self, + bucket: str = "retrofit-data-dev", + prefix: str = "hubspot_insight/" + ): os.makedirs(self.download_dir, exist_ok=True) - latest_key = self.get_latest_s3_file(bucket, prefix) - local_path = os.path.join(self.download_dir, latest_key.split("/")[-1]) + # ------------------------------------------------------ + # Find latest file only at the top level in prefix + # ------------------------------------------------------ + response = self.s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + + if "Contents" not in response: + raise FileNotFoundError(f"No files found in {bucket}/{prefix}") + + # Filter ONLY files directly under hubspot_insight/ + top_level_files = [ + obj for obj in response["Contents"] + if obj["Key"].count("/") == prefix.count("/") + ] + + if not top_level_files: + raise FileNotFoundError( + f"No top-level files found in {bucket}/{prefix} (only subfolders exist)." + ) + + latest = max(top_level_files, key=lambda x: x["LastModified"]) + latest_key = latest["Key"] + + # ------------------------------------------------------ + # Download + # ------------------------------------------------------ + filename = latest_key.split("/")[-1] + local_path = os.path.join(self.download_dir, filename) self.s3.download_file(bucket, latest_key, local_path) + # ------------------------------------------------------ + # Read JSON + # ------------------------------------------------------ with open(local_path, "r") as f: data = json.load(f) diff --git a/backend/src/dashboard/services/hubspot_client_async.py b/backend/src/dashboard/services/hubspot_client_async.py index e5c972a..aae90ff 100644 --- a/backend/src/dashboard/services/hubspot_client_async.py +++ b/backend/src/dashboard/services/hubspot_client_async.py @@ -51,7 +51,7 @@ class HubSpotClientAsync: return [ deal.id for deal in self.all_deals - if deal.properties.get("pipeline") == str(pipeline_id) + if deal.properties.get("pipeline") in pipeline_id ] # ----------------------------------- diff --git a/run_backend.sh b/run_backend.sh index 32e6b12..d2724b9 100755 --- a/run_backend.sh +++ b/run_backend.sh @@ -1 +1 @@ -cd backend && poetry run python src/dashboard/main.py +cd backend && poetry run python src/dashboard/app.py