mirror of
https://github.com/Hestia-Homes/insight.git
synced 2026-06-08 11:17:25 +00:00
144 lines
5.6 KiB
Python
144 lines
5.6 KiB
Python
from pprint import pprint
|
||
from collections import defaultdict
|
||
import pandas as pd
|
||
|
||
from enum import Enum
|
||
from datetime import datetime
|
||
|
||
class ProductType(Enum):
|
||
EMPTY_CAVITY_ECO_4 = "Empty Cavity - ECO4"
|
||
|
||
|
||
|
||
class jsonReader:
|
||
def __init__(self, json_data):
|
||
self.raw_data = json_data
|
||
self.deals_by_line_item = defaultdict(list)
|
||
self.line_item_names = list
|
||
self.initial_setup()
|
||
|
||
def to_date_only(self, timestamp: str) -> str:
|
||
if timestamp is None:
|
||
return None
|
||
if timestamp.endswith("Z"):
|
||
timestamp = timestamp[:-1]
|
||
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
|
||
return dt.strftime("%Y-%m-%d")
|
||
|
||
def initial_setup(self):
|
||
"""
|
||
Build a dictionary mapping line item names -> list of deals
|
||
"""
|
||
for deal in self.raw_data:
|
||
line_items = deal.get("line_items", [])
|
||
|
||
if not line_items:
|
||
# Store empty deals under a special key
|
||
self.deals_by_line_item["__empty__"].append(deal)
|
||
continue
|
||
|
||
# Add this deal under each line item name
|
||
for item in line_items:
|
||
name = item.get("name")
|
||
if name:
|
||
self.deals_by_line_item[name].append(deal)
|
||
self.line_item_names = list(self.deals_by_line_item.keys())
|
||
|
||
def generate_df_via_product_type(self, product_type):
|
||
rows = []
|
||
for deals in self.deals_by_line_item[product_type]:
|
||
row = self._return_df_from_deal_info(deals, product_type)
|
||
rows.append(row)
|
||
|
||
if rows:
|
||
return pd.concat(rows, ignore_index=True)
|
||
|
||
|
||
def _return_df_from_deal_info(self, deal, product_type):
|
||
rows = []
|
||
|
||
if "ECO" in product_type or "EPC" in product_type:
|
||
if deal["attempts"]:
|
||
# Multiple attempts => multiple rows
|
||
for attempt in deal["attempts"]:
|
||
rows.append({
|
||
"submission_date": self.to_date_only(deal["deal_properties"].get("submission_date")),
|
||
"hubspot_id": deal["deal_properties"]["deal_id"],
|
||
"expected_commencement_date": deal["deal_properties"].get("expected_commencement_date"),
|
||
"work_type": product_type,
|
||
"price": next(
|
||
(item["price"] for item in deal["line_items"] if product_type in item["name"]),
|
||
None
|
||
)
|
||
})
|
||
else:
|
||
def historical_ecd_value_processes(timestamp):
|
||
if timestamp is None or timestamp == '':
|
||
return None
|
||
dt = datetime.strptime(timestamp, "%Y-%m-%d")
|
||
return dt.strftime("%Y-%m-%d")
|
||
history = deal["deal_properties"]["expected_commencement_history"]
|
||
|
||
# ---- SORT HISTORY: latest first ----
|
||
history_sorted = sorted(
|
||
history,
|
||
key=lambda h: datetime.strptime(h["timestamp"].split("T")[0], "%Y-%m-%d"),
|
||
reverse=True
|
||
)
|
||
|
||
# Extract latest expected commencement date
|
||
latest = history_sorted[0]
|
||
latest_ecd = historical_ecd_value_processes(latest["value"]) # returns YYYY-MM-DD or None
|
||
|
||
# Convert submission date
|
||
raw_submission_date = deal["deal_properties"].get("last_submission_date")
|
||
submission_date = self.to_date_only(raw_submission_date) if raw_submission_date else None
|
||
|
||
# Convert both to datetime for comparison
|
||
if submission_date and latest_ecd:
|
||
dt_sub = datetime.strptime(submission_date, "%Y-%m-%d")
|
||
dt_ecd = datetime.strptime(latest_ecd, "%Y-%m-%d")
|
||
|
||
# Only keep submission date if submission_date > latest ECD
|
||
if dt_sub <= dt_ecd:
|
||
submission_date = None
|
||
else:
|
||
submission_date = None
|
||
|
||
# 1️⃣ Add latest expected commencement date WITH conditional submission date
|
||
rows.append({
|
||
"submission_date": submission_date,
|
||
"expected_commencement_date": latest_ecd,
|
||
"hubspot_id": deal["deal_properties"]["deal_id"],
|
||
"work_type": product_type,
|
||
"price": next(
|
||
(item["price"] for item in deal["line_items"] if product_type in item["name"]),
|
||
None
|
||
)
|
||
})
|
||
|
||
# 2️⃣ Add the remaining history WITHOUT submission date
|
||
for attempt in history_sorted[1:]:
|
||
rows.append({
|
||
"submission_date": None,
|
||
"expected_commencement_date": historical_ecd_value_processes(attempt["value"]),
|
||
"hubspot_id": deal["deal_properties"]["deal_id"],
|
||
"work_type": product_type,
|
||
"price": next(
|
||
(item["price"] for item in deal["line_items"] if product_type in item["name"]),
|
||
None
|
||
)
|
||
})
|
||
|
||
# Return a DataFrame or None
|
||
return pd.DataFrame(rows) if rows else None
|
||
|
||
def find_all_job_with_line_item(self):
|
||
for i, deal in enumerate(self.raw_data):
|
||
if len(deal["line_items"])>0:
|
||
print(deal)
|
||
print(i)
|
||
break
|
||
|
||
def print_raw_data(self):
|
||
pprint(self.raw_data)
|