from pprint import pprint from collections import defaultdict import pandas as pd from enum import Enum from datetime import datetime class ProductType(Enum): EMPTY_CAVITY_ECO_4 = "Empty Cavity - ECO4" class jsonReader: def __init__(self, json_data): self.raw_data = json_data self.deals_by_line_item = defaultdict(list) self.line_item_names = list self.initial_setup() def to_date_only(self, timestamp: str) -> str: if timestamp is None: return None if timestamp.endswith("Z"): timestamp = timestamp[:-1] try: dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") except: dt = datetime.strptime(timestamp, "%Y-%m-%d") return dt.strftime("%Y-%m-%d") def initial_setup(self): """ Build a dictionary mapping line item names -> list of deals """ for deal in self.raw_data: line_items = deal.get("line_items", []) if not line_items: # Store empty deals under a special key self.deals_by_line_item["__empty__"].append(deal) continue # Add this deal under each line item name for item in line_items: name = item.get("name") if name: self.deals_by_line_item[name].append(deal) self.line_item_names = list(self.deals_by_line_item.keys()) def generate_df_via_product_type(self, product_type): rows = [] for deals in self.deals_by_line_item[product_type]: row = self._return_df_from_deal_info(deals, product_type) if row is not None: rows.append(row) if rows: return pd.concat(rows, ignore_index=True) def _return_df_from_deal_info(self, deal, product_type): rows = [] if deal["company_info"]["name"] != "Apple": if deal["attempts"]: # Multiple attempts => multiple rows for attempt in deal["attempts"]: data = { "submission_date": self.to_date_only(attempt["submission_date"]), "hubspot_id": deal["deal_properties"]["deal_id"], "expected_commencement_date": self.to_date_only(attempt["expected_commencement_date"]), "work_type": product_type, "price": next( (item["price"] for item in deal["line_items"] if product_type in item["name"]), None ), "deal_name": deal["deal_properties"]["dealname"], "company_name": deal["company_info"]["name"], } data = self._use_different_expected_commencement_data(data, deal) rows.append(data) else: def historical_ecd_value_processes(timestamp): if timestamp is None or timestamp == '': return None dt = datetime.strptime(timestamp, "%Y-%m-%d") return dt.strftime("%Y-%m-%d") history = deal["deal_properties"]["expected_commencement_history"] # ---- SORT HISTORY: latest first ---- history_sorted = sorted( history, key=lambda h: datetime.strptime(h["timestamp"].split("T")[0], "%Y-%m-%d"), reverse=True ) # Extract latest expected commencement date if history_sorted: latest = history_sorted[0] latest_ecd = historical_ecd_value_processes(latest["value"]) # returns YYYY-MM-DD or None # Convert submission date raw_submission_date = deal["deal_properties"].get("last_submission_date") submission_date = self.to_date_only(raw_submission_date) if raw_submission_date else None # Convert both to datetime for comparison if submission_date and latest_ecd: dt_sub = datetime.strptime(submission_date, "%Y-%m-%d") dt_ecd = datetime.strptime(latest_ecd, "%Y-%m-%d") # Only keep submission date if submission_date > latest ECD if dt_sub <= dt_ecd: submission_date = None else: submission_date = None # 1️⃣ Add latest expected commencement date WITH conditional submission date data = { "submission_date": submission_date, "expected_commencement_date": latest_ecd, "hubspot_id": deal["deal_properties"]["deal_id"], "work_type": product_type, "price": next( (item["price"] for item in deal["line_items"] if product_type in item["name"]), None ), "deal_name": deal["deal_properties"]["dealname"], "company_name": deal["company_info"]["name"], } data = self._use_different_expected_commencement_data(data, deal) rows.append(data) # 2️⃣ Add the remaining history WITHOUT submission date for attempt in history_sorted[1:]: data = { "submission_date": None, "expected_commencement_date": historical_ecd_value_processes(attempt["value"]), "hubspot_id": deal["deal_properties"]["deal_id"], "work_type": product_type, "price": next( (item["price"] for item in deal["line_items"] if product_type in item["name"]), None ), "deal_name": deal["deal_properties"]["dealname"], "company_name": deal["company_info"]["name"], } data = self._use_different_expected_commencement_data(data, deal) rows.append(data) # Return a DataFrame or None return pd.DataFrame(rows) if rows else None def _use_different_expected_commencement_data(self, org_data, deal): work_type = org_data['work_type'].lower() if "Coordination Stage".lower() in work_type: org_data.update({ "expected_commencement_date": self.to_date_only(deal["deal_properties"]["mtp_planned_week"]), "submission_date": self.to_date_only(deal["deal_properties"]["mtp_completion_date"]), }) elif "Design".lower() in work_type: org_data.update({ "expected_commencement_date": self.to_date_only(deal["deal_properties"]["design_planned_week"]), "submission_date": self.to_date_only(deal["deal_properties"]["design_completion_date"]), }) return org_data def find_all_job_with_line_item(self): for i, deal in enumerate(self.raw_data): if len(deal["line_items"])>0: print(deal) print(i) break def print_raw_data(self): pprint(self.raw_data)