Merge pull request #1 from Hestia-Homes/feature/eco_dashboarding

Feature/eco dashboarding
This commit is contained in:
Jun-te Kim 2025-11-17 19:01:59 +00:00 committed by GitHub
commit c482b20631
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 2348 additions and 3 deletions

View file

@ -0,0 +1,24 @@
name: Hubspot Data to S3
on:
schedule:
- cron: '0 5 * * *' # Every day at 05:00 UTC
workflow_dispatch:
jobs:
gather_hubspot_data_and_upload_to_s3:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
# Build Docker image using .devcontainer/Dockerfile
- name: Build Docker image
run: |
docker build -f .devcontainer/Dockerfile -t latest-image .
# Run the script inside the container
- name: Run script in container
run: |
docker run latest-image \
bash -c "cd backend && poetry run python src/dashboard/src/scripts/hubspot_to_s3.py"

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
__pycache__
hubspot_deals.json

1636
backend/poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,14 @@ authors = [
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"requests (>=2.32.5,<3.0.0)"
"requests (>=2.32.5,<3.0.0)",
"hubspot-api-client (>=12.0.0,<13.0.0)",
"pydantic (>=2.12.4,<3.0.0)",
"ipykernel (>=7.1.0,<8.0.0)",
"pandas (>=2.3.3,<3.0.0)",
"tqdm (>=4.67.1,<5.0.0)",
"boto3 (>=1.40.74,<2.0.0)",
"dash (>=3.3.0,<4.0.0)"
]
[tool.poetry]

View file

@ -1 +1,108 @@
print("hello world")
from dash import Dash, html, dcc, callback, Output, Input, dash_table, html
import plotly.express as px
import pandas as pd
import json
import boto3
import re
import os
from datetime import datetime
BUCKET="retrofit-data-dev"
PREFIX="hubspot_insight/"
def get_latest_s3_file(bucket: str, prefix: str = "") -> str:
"""
Returns the key of the latest timestamped file in S3.
Files must contain a timestamp like: *_YYYYMMDD_HHMMSS.json
"""
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
raise FileNotFoundError("No files found in bucket/prefix.")
timestamp_regex = re.compile(r".*_(\d{8}_\d{6})\.json$")
files = []
for obj in response["Contents"]:
key = obj["Key"]
match = timestamp_regex.match(key)
if match:
files.append((match.group(1), key))
if not files:
raise FileNotFoundError("No timestamped files found.")
# Sort by timestamp descending
latest_key = sorted(files, key=lambda x: x[0], reverse=True)[0][1]
return latest_key
def download_and_read_latest(bucket: str, prefix: str = "", download_dir="downloads"):
s3 = boto3.client("s3")
os.makedirs(download_dir, exist_ok=True)
latest_key = get_latest_s3_file(bucket, prefix)
local_path = os.path.join(download_dir, latest_key.split("/")[-1])
s3.download_file(bucket, latest_key, local_path)
with open(local_path, "r") as f:
data = json.load(f)
return latest_key, local_path, data
latest_key, file_path, data = download_and_read_latest(BUCKET, PREFIX)
# ---------------------------------------------------------
# Extract relevant info into a table
# ---------------------------------------------------------
records = []
def iso_week(date_str):
if not date_str:
return None
return datetime.fromisoformat(date_str).isocalendar().week
for entry in data:
p = entry["deal_properties"]
c = entry["company_info"]
records.append({
"Deal Name": p.get("dealname"),
"Company": c.get("name"),
"Expected Start Date": p.get("expected_commencement_date"),
"Expected Week": iso_week(p.get("expected_commencement_date")),
"Design Planned Week": p.get("design_planned_week"),
"Design Completion": p.get("design_completion_date"),
"MTP Planned Week": p.get("mtp_planned_week"),
"MTP Completion": p.get("mtp_completion_date"),
"Retrofit Status": p.get("retrofit_design_status"),
"Deal ID": p.get("deal_id")
})
df = pd.DataFrame(records)
# ---------------------------------------------------------
# Dash App
# ---------------------------------------------------------
app = Dash(__name__)
app.layout = html.Div([
html.H1("Deal Scheduling Overview", style={"textAlign": "center"}),
dash_table.DataTable(
id="deal-table",
columns=[{"name": i, "id": i} for i in df.columns],
data=df.to_dict("records"),
page_size=20,
filter_action="native",
sort_action="native",
style_table={"overflowX": "scroll"},
style_header={"backgroundColor": "#f0f0f0", "fontWeight": "bold"},
style_cell={"padding": "8px", "textAlign": "left"},
)
])
if __name__ == "__main__":
app.run(debug=True)

View file

@ -0,0 +1,64 @@
import asyncio
import json
from tqdm import tqdm
from dashboard.services.hubspot_client import Pipeline
from dashboard.services.hubspot_client_async import HubSpotClientAsync
from dashboard.services.file_manager import FileManager
from datetime import datetime
OUTPUT_FILE = "hubspot_deals.json"
async def main():
hubspot = HubSpotClientAsync()
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline(Pipeline.OPERATIONS_SOCIAL_HOUSING.value)
total = len(deals)
print(f"Total deals to fetch: {total}")
results = []
tasks = [asyncio.create_task(hubspot.from_deal_get_info(deal_id)) for deal_id in deals]
success = 0
failed = 0
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
for task in asyncio.as_completed(tasks):
try:
result = await task
results.append(result)
success += 1
except Exception as e:
failed += 1
finally:
pbar.set_postfix({
"ok": success,
"fail": failed,
"active": len(asyncio.all_tasks()) # shows concurrent load
})
pbar.update(1)
pbar.close()
# Save final results
with open(OUTPUT_FILE, "w") as f:
json.dump(results, f, indent=2)
print(f"Done! Saved {len(results)} deals. Failed: {failed}")
return results
if __name__ == "__main__":
# asyncio.run(main())
fm = FileManager()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
s3_filename = f"hubspot_deals_{timestamp}.json"
s3_uri = fm.upload_to_s3(
OUTPUT_FILE,
bucket="retrofit-data-dev",
object_name=f"hubspot_insight/{s3_filename}"
)

View file

@ -0,0 +1,269 @@
import hubspot
from enum import Enum
import logging
from hubspot.crm.associations import ApiException
import os
import requests
class Companies(Enum):
ABRI = "237615001799"
SOUTHERN_HOUSING_GROUP = "109343619305"
LIVEWEST = "86205872354"
class DealStage(Enum):
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
CUSTOMER_CONTACTED = "888730834"
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
FILES_MISSING_FROM_ASSESSOR = "1887736000"
class Pipeline(Enum):
OPERATIONS_SOCIAL_HOUSING = "1167582403"
class HubSpotClient():
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
self.client = hubspot.Client.create(access_token=self.access_token)
self.all_deals = None
self.logger = logging
def get_all_deals(self):
self.all_deals = self.client.crm.deals.get_all()
return self.all_deals
def get_deal_ids_by_pipeline(self, pipeline_id):
"""
Get all deal IDs associated with a given pipeline.
"""
if self.all_deals is None:
self.get_all_deals()
# Filter deals where properties['pipeline'] matches the given pipeline_id
filtered_deals = [
deal for deal in self.all_deals
if deal.properties["pipeline"] == str(pipeline_id)
]
# Extract and return only the deal IDs
deal_ids = [deal.id for deal in filtered_deals]
return deal_ids
def from_deal_get_associated_company_id(self, deal_id: str):
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
associations_api = self.client.crm.associations.v4.basic_api
# Fetch associations for this specific deal only
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1 # Expect only one associated company
)
if not response.results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
company_id = response.results[0].to_object_id
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}")
return None
def from_deal_get_associated_listing(self, deal_id: str):
"""
Get the associated listing information for a given deal.
Returns a dictionary of listing properties, or None if not found.
"""
associations_api = self.client.crm.associations.v4.basic_api
listings_api = self.client.crm.objects.basic_api # works for custom objects like "listing"
# Fetch associated listing(s)
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="0-420", # <-- use your exact custom object name slug here
limit=1
)
if not response.results:
self.logger.info(f"No listing association found for deal {deal_id}")
return None
listing_id = response.results[0].to_object_id
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
# Fetch listing details (the "listing information")
listing = listings_api.get_by_id(
object_type="0-420", # again, must match your HubSpot object name
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
]
)
listing_info = listing.properties
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_get_info(self, deal_id):
# Fetch main deal properties
deal = self.client.crm.deals.basic_api.get_by_id(
deal_id,
properties=[
'dealname',
'dealstage',
'expected_commencement_date',
'mtp_planned_week',
'mtp_completion_date',
'design_planned_week',
'retrofit_design_status',
'design_completion_date',
]
)
deal_info = deal.properties
if deal_info:
deal_info.update({"deal_id": deal_id})
# Fetch line items
line_items = self.from_deal_get_line_items(deal_id)
company_id = self.from_deal_get_associated_company_id(deal_id)
company_info = {}
if company_id is not None:
company_info = self.get_company_information(company_id)
# Combine all into one dictionary
return {
"deal_properties": deal_info,
"line_items": line_items,
"company_info": company_info,
}
def get_company_information(self, company_id):
company = self.client.crm.companies.basic_api.get_by_id(
company_id,
properties=[
'name',
]
)
company_info = company.properties
return company_info
def get_all_pipelines(self):
"""
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
"""
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = pipelines_api.get_all(object_type="deals")
pipelines = [
{
"name": pipeline.label,
"id": pipeline.id
}
for pipeline in response.results
]
self.logger.info(f"Retrieved {len(pipelines)} pipelines.")
return pipelines
except Exception as e:
self.logger.error(f"Error retrieving pipelines: {e}")
return []
def get_deal_stages(self, pipeline_id=None):
"""
Retrieve all deal stages for a given pipeline.
If no pipeline_id is provided, retrieves all stages for all pipelines.
Returns a list of dicts with pipeline name, stage name, and stage ID.
"""
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = pipelines_api.get_all(object_type="deals")
all_stages = []
for pipeline in response.results:
# Skip other pipelines if a specific one is requested
if pipeline_id and pipeline.id != str(pipeline_id):
continue
stages = [
{
"pipeline_name": pipeline.label,
"pipeline_id": pipeline.id,
"stage_name": stage.label,
"stage_id": stage.id
}
for stage in pipeline.stages
]
all_stages.extend(stages)
if not all_stages:
self.logger.info(f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}")
else:
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
return all_stages
except Exception as e:
self.logger.error(f"Error retrieving deal stages: {e}")
return []
def from_deal_get_line_items(self, deal_id: str):
"""
Get all associated line items for a deal.
Returns a list of line item property dictionaries.
"""
try:
associations_api = self.client.crm.associations.v4.basic_api
line_items_api = self.client.crm.objects.basic_api
# Step 1: Get associated line item IDs
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="line_items",
limit=100 # increase if needed
)
if not response.results:
return []
line_item_ids = [item.to_object_id for item in response.results]
# Step 2: Fetch each line item
line_items = []
for line_item_id in line_item_ids:
item = line_items_api.get_by_id(
object_type="line_items",
object_id=line_item_id,
properties=[
"name",
"quantity",
"price",
"amount",
"hs_product_id",
"invoice_reference",
"invoiced"
]
)
line_items.append(item.properties)
return line_items
except Exception as e:
print(f"Error fetching line items for deal {deal_id}: {e}")
return []

View file

@ -0,0 +1,238 @@
import logging
import asyncio
from hubspot.crm.associations import ApiException
import hubspot
class HubSpotClientAsync:
API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
self.client = hubspot.Client.create(access_token=self.access_token)
self.all_deals = None
# cleaner logging — only warning+error by default
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [HubSpot] %(levelname)s: %(message)s"
)
self.logger = logging.getLogger(__name__)
# -----------------------------------
# Core Safe Request Wrapper
# -----------------------------------
async def _run(self, func, *args, **kwargs):
async with self.API_CONCURRENCY:
try:
result = await asyncio.to_thread(func, *args, **kwargs)
await asyncio.sleep(self.RATE_LIMIT_DELAY)
return result
except Exception as e:
if "429" in str(e):
self.logger.warning("Hit HubSpot rate limit → sleeping 10s")
await asyncio.sleep(10)
return await self._run(func, *args, **kwargs)
raise
# -----------------------------------
# Deals
# -----------------------------------
async def get_all_deals(self):
self.all_deals = await self._run(self.client.crm.deals.get_all)
return self.all_deals
async def get_deal_ids_by_pipeline(self, pipeline_id):
if self.all_deals is None:
await self.get_all_deals()
return [
deal.id
for deal in self.all_deals
if deal.properties.get("pipeline") == str(pipeline_id)
]
# -----------------------------------
# Associations
# -----------------------------------
async def from_deal_get_associated_company_id(self, deal_id: str):
try:
associations = self.client.crm.associations.v4.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"companies",
limit=1,
)
if not response.results:
return None
return response.results[0].to_object_id
except ApiException:
self.logger.warning(f"Failed to fetch company for deal {deal_id}")
return None
async def from_deal_get_associated_listing(self, deal_id: str):
associations = self.client.crm.associations.v4.basic_api
listings_api = self.client.crm.objects.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"0-420",
limit=1,
)
if not response.results:
return None
listing_id = response.results[0].to_object_id
listing = await self._run(
listings_api.get_by_id,
"0-420",
listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
)
return listing.properties
# -----------------------------------
# Deal Info
# -----------------------------------
async def from_deal_get_info(self, deal_id):
deal = await self._run(
self.client.crm.deals.basic_api.get_by_id,
deal_id,
properties=[
'dealname',
'dealstage',
'expected_commencement_date',
'mtp_planned_week',
'mtp_completion_date',
'design_planned_week',
'retrofit_design_status',
'design_completion_date',
]
)
deal_info = dict(deal.properties)
deal_info["deal_id"] = deal_id
line_items = await self.from_deal_get_line_items(deal_id)
company_id = await self.from_deal_get_associated_company_id(deal_id)
company_info = await self.get_company_information(company_id) if company_id else {}
return {
"deal_properties": deal_info,
"line_items": line_items,
"company_info": company_info,
}
# -----------------------------------
# Company Info
# -----------------------------------
async def get_company_information(self, company_id):
company = await self._run(
self.client.crm.companies.basic_api.get_by_id,
company_id,
properties=['name']
)
return company.properties
# -----------------------------------
# Pipelines
# -----------------------------------
async def get_all_pipelines(self):
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = await self._run(
pipelines_api.get_all,
object_type="deals",
)
return [
{"name": pipeline.label, "id": pipeline.id}
for pipeline in response.results
]
except Exception:
self.logger.error("Failed to fetch pipelines")
return []
async def get_deal_stages(self, pipeline_id=None):
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = await self._run(
pipelines_api.get_all,
object_type="deals",
)
stages = []
for pipeline in response.results:
if pipeline_id and pipeline.id != str(pipeline_id):
continue
for stage in pipeline.stages:
stages.append({
"pipeline_name": pipeline.label,
"pipeline_id": pipeline.id,
"stage_name": stage.label,
"stage_id": stage.id,
})
return stages
except Exception:
self.logger.error("Failed to fetch deal stages")
return []
# -----------------------------------
# Line Items
# -----------------------------------
async def from_deal_get_line_items(self, deal_id: str):
try:
associations = self.client.crm.associations.v4.basic_api
line_api = self.client.crm.objects.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"line_items",
limit=100,
)
if not response.results:
return []
line_items = []
for row in response.results:
item = await self._run(
line_api.get_by_id,
"line_items",
row.to_object_id,
properties=[
"name",
"quantity",
"price",
"amount",
"hs_product_id",
"invoice_reference",
"invoiced",
],
)
line_items.append(item.properties)
return line_items
except Exception:
self.logger.warning(f"Line items missing for deal {deal_id}")
return []