save async method

This commit is contained in:
Jun-te Kim 2025-11-17 18:27:38 +00:00
parent 11319a8394
commit 8ccc3783b3
5 changed files with 1496 additions and 294 deletions

964
backend/poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -10,7 +10,10 @@ requires-python = ">=3.12"
dependencies = [
"requests (>=2.32.5,<3.0.0)",
"hubspot-api-client (>=12.0.0,<13.0.0)",
"pydantic (>=2.12.4,<3.0.0)"
"pydantic (>=2.12.4,<3.0.0)",
"ipykernel (>=7.1.0,<8.0.0)",
"pandas (>=2.3.3,<3.0.0)",
"tqdm (>=4.67.1,<5.0.0)"
]
[tool.poetry]

View file

@ -1 +1,53 @@
from dashboard.
import asyncio
import json
from tqdm import tqdm
from dashboard.services.hubspot_client import Pipeline
from dashboard.services.hubspot_client_async import HubSpotClientAsync
OUTPUT_FILE = "hubspot_deals.json"
async def main():
hubspot = HubSpotClientAsync()
# Fetch all deals in the pipeline
deals = await hubspot.get_deal_ids_by_pipeline(Pipeline.OPERATIONS_SOCIAL_HOUSING.value)
total = len(deals)
print(f"Total deals to fetch: {total}")
results = []
tasks = [asyncio.create_task(hubspot.from_deal_get_info(deal_id)) for deal_id in deals]
success = 0
failed = 0
pbar = tqdm(total=total, desc="Fetching Deals", unit="deal", dynamic_ncols=True)
for task in asyncio.as_completed(tasks):
try:
result = await task
results.append(result)
success += 1
except Exception as e:
failed += 1
finally:
pbar.set_postfix({
"ok": success,
"fail": failed,
"active": len(asyncio.all_tasks()) # shows concurrent load
})
pbar.update(1)
pbar.close()
# Save final results
with open(OUTPUT_FILE, "w") as f:
json.dump(results, f, indent=2)
print(f"Done! Saved {len(results)} deals. Failed: {failed}")
return results
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,14 +1,14 @@
import hubspot
from enum import Enum
from hubspot.crm.deals import PublicObjectSearchRequest
from hubspot.crm.deals.models import SimplePublicObjectInput
import time
import logging
import traceback
from hubspot.crm.objects.notes import SimplePublicObjectInput as NoteInput
from hubspot.crm.associations import BatchInputPublicAssociation, PublicAssociation
import time
from hubspot.crm.associations import ApiException
import os
import requests
class Companies(Enum):
ABRI = "237615001799"
SOUTHERN_HOUSING_GROUP = "109343619305"
LIVEWEST = "86205872354"
class DealStage(Enum):
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
@ -17,306 +17,253 @@ class DealStage(Enum):
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
FILES_MISSING_FROM_ASSESSOR = "1887736000"
class Pipeline(Enum):
OPERATIONS_SOCIAL_HOUSING = "1167582403"
class HubSpotClient():
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
self.client = hubspot.Client.create(access_token=self.access_token)
self.all_deals = None
self.logger = logging
def get_all_deals(self):
return self.client.crm.deals.get_all()
self.all_deals = self.client.crm.deals.get_all()
return self.all_deals
def get_owner_name_from_id(self, owner_id):
owner = self.client.crm.owners.owners_api.get_by_id(owner_id)
time.sleep(1)
first_name = owner.first_name or ""
last_name = owner.last_name or ""
return f"{first_name} {last_name}".strip()
def get_deal_name_by_id(self, deal_id):
try:
deal = self.client.crm.deals.basic_api.get_by_id(deal_id)
time.sleep(1)
return deal.properties.get("dealname", "No deal name")
except Exception as e:
return "Unknown Deal" # Fallback if the deal name is not found
def get_listings_from_deals_id(self, deals_id):
from hubspot.crm.objects import PublicObjectSearchRequest
found_notes = []
after = None
while True:
# Correct filter for notes associated with the given deal ID
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [{
"propertyName": "associations.deal", # Filter by association to the deal
"operator": "EQ",
"value": deals_id,
}]
}],
properties=["domna_property_id", "owner_property_id", 'national_uprn'], # Properties of the note you need
limit=200,
after=after,
)
# Call the search API
response = self.client.crm.objects.search_api.do_search(object_type="0-420", public_object_search_request=search_request)
time.sleep(1)
# Add the results to the found_notes list
found_notes.extend(response.results)
# Handle pagination if more results are available
if not response.paging or not response.paging.next:
break
after = response.paging.next.after
if found_notes:
return found_notes[0]
return None
def get_domna_and_landlord_id(self, deals_id):
data = self.get_listings_from_deals_id(deals_id)
return data.properties['domna_property_id'], data.properties['owner_property_id'], data.properties.get('national_uprn', '') or ''
def get_notes_from_deals_id(self, deals_id):
from hubspot.crm.objects import PublicObjectSearchRequest
found_notes = []
after = None
while True:
# Correct filter for notes associated with the given deal ID
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [{
"propertyName": "associations.deal", # Filter by association to the deal
"operator": "EQ",
"value": deals_id,
}]
}],
properties=["hs_note_body", "hubspot_owner_id"], # Properties of the note you need
limit=200,
after=after,
)
# Call the search API
response = self.client.crm.objects.search_api.do_search(object_type="notes", public_object_search_request=search_request)
time.sleep(1)
# Add the results to the found_notes list
found_notes.extend(response.results)
# Handle pagination if more results are available
if not response.paging or not response.paging.next:
break
after = response.paging.next.after
all_notes = []
for note in found_notes:
# Extract note content and author information
note_body = note.properties.get("hs_note_body", "No content")
# Collect note details in a dictionary
all_notes.append({
"note_id": note.id,
"note": note_body,
"created_at": note.created_at.strftime("%Y-%m-%d %H:%M:%S"),
})
return all_notes
def get_all_deals_from_stage_id(self, stage_id):
found_deals = []
after = None
while True:
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [{
"propertyName": "dealstage",
"operator": "EQ",
"value": stage_id,
}]
}],
properties=[
"dealname",
"amount",
"hubspot_owner_id",
],
limit=200,
after=after,
)
response = self.client.crm.deals.search_api.do_search(search_request)
time.sleep(1)
found_deals.extend(response.results)
if not response.paging or not response.paging.next:
break
after = response.paging.next.after
all_deals = []
for deal in found_deals:
all_deals.append({
"deal_id": deal.id,
"value": deal.properties["amount"],
"deal_owner": deal.properties.get("hubspot_owner_id"),
})
return all_deals
def get_associations_for_deal(self, deal_id, to_object_type):
def get_deal_ids_by_pipeline(self, pipeline_id):
"""
Returns a list of associated object IDs of type `to_object_type`
(e.g. "contacts", "companies", "notes", etc.)
Get all deal IDs associated with a given pipeline.
"""
assoc_resp = self.client.crm.deals.associations_api.get_all(
deal_id=deal_id,
to_object_type=to_object_type
)
return [assoc.id for assoc in assoc_resp.results]
if self.all_deals is None:
self.get_all_deals()
def get_deals_from_deal_stage(self, deal_stage: DealStage):
found_deals = []
after = None
while True:
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [{
"propertyName": "dealstage",
"operator": "EQ",
"value": deal_stage.value,
}]
}],
properties=[
"dealname",
"number_of_wet_rooms_needing_ventilation",
"work_type",
"property_needs_trickle_vents",
"domna_survey_post_sap",
"existing_wall_insulation",
"installer",
"submission_folder",
],
limit=200,
after=after,
)
response = self.client.crm.deals.search_api.do_search(search_request)
found_deals.extend(response.results)
if not response.paging or not response.paging.next:
break
after = response.paging.next.after
# Filter deals where properties['pipeline'] matches the given pipeline_id
filtered_deals = [
deal for deal in self.all_deals
if deal.properties["pipeline"] == str(pipeline_id)
]
all_deals = []
for i,deal in enumerate(found_deals):
domna_id, landlord_id, uprn = self.get_domna_and_landlord_id(deal.id)
try:
deal_name = deal.properties['dealname']
self.logger.info(f"Validating <{deal_name}>")
# input(f"Press enter to verfiy <{deal_name}>")
all_deals.append(SubmissionInfoFromDeal(
deal_id= deal.properties["hs_object_id"],
deal_name=deal.properties["dealname"],
work_type=deal.properties["work_type"],
needs_trickle_ventilation=True if deal.properties.get("property_needs_trickle_vents", "NO").upper() == "YES" else False,
post_sap_score=int(deal.properties["domna_survey_post_sap"]),
existing_wall_insulation=deal.properties.get("existing_wall_insulation") if deal.properties.get("existing_wall_insulation") else "None",
no_of_wet_rooms=int(deal.properties["number_of_wet_rooms_needing_ventilation"]),
installer=deal.properties["installer"],
submission_folder_path = deal.properties["submission_folder"],
landlord_id = landlord_id,
domna_id = domna_id,
uprn = uprn,
))
except Exception as e:
def format_error_note(e):
note_text = "⚠️ <b>Automated Verification Failed:</b><br><br>"
# Extract and return only the deal IDs
deal_ids = [deal.id for deal in filtered_deals]
if hasattr(e, "errors") and callable(e.errors):
note_text += "❌ <b>Validation Errors:</b><br>"
for error in e.errors():
loc = error.get('loc', 'N/A')
msg = error.get('msg', 'N/A')
error_type = error.get('type', 'N/A')
error_input = error.get('input', 'N/A')
note_text += (
f"• <b>Field:</b> <code>{loc}</code><br>"
f"&nbsp;&nbsp;- <b>Message:</b> {msg}<br>"
f"&nbsp;&nbsp;- <b>Type:</b> {error_type}<br>"
f"&nbsp;&nbsp;- <b>Input:</b> {error_input}<br><br>"
)
else:
note_text += (
"❗ <b>Non-validation error:</b><br>"
f"<pre>{str(e)}</pre><br>"
)
note_text += (
"🛠️ Please review this error and take necessary actions.<br>"
"Contact <b>Jun-te</b> if help is needed: <b>+44 7519 530 549</b> or via Teams."
)
return note_text
deal_id = deal.properties['hs_object_id']
if hasattr(e, "errors"):
for error in e.errors():
self.add_note_to_deal(deal_id, format_error_note(e))
else:
self.logger.error(f"Non-validation error occurred: {str(e)}", exc_info=True)
self.logger.info(f"Deal name <{deal_name}> moving to 'needs additional information'")
self.move_deals_to_different_stage([deal_id], DealStage.FILES_MISSING_FROM_ASSESSOR.value)
return all_deals
return deal_ids
def print_all_pipeline_ids(self):
pipelines = self.client.crm.pipelines.pipelines_api.get_all(object_type="deals")
for pipeline in pipelines.results:
print(f"Pipeline: {pipeline.label}")
for stage in pipeline.stages:
print(f" - Label: {stage.label}")
print(f" ID: {stage.id}")
def move_deals_to_different_stage(self, list_of_deals_id, to_stage_id):
deal_properties = SimplePublicObjectInput(
properties={
"dealstage": to_stage_id
}
)
for deal_id in list_of_deals_id:
self.client.crm.deals.basic_api.update(
deal_id,
simple_public_object_input=deal_properties
)
self.logger.info(f"Deal {deal_id} moved to stage with ID {to_stage_id}.")
def add_note_to_deal(self, deal_id, note_text):
def from_deal_get_associated_company_id(self, deal_id: str):
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
# Generate current time in milliseconds since epoch
hs_timestamp = int(time.time() * 1000)
associations_api = self.client.crm.associations.v4.basic_api
# Step 1: Create the note with hs_timestamp
note = NoteInput(
properties={
"hs_note_body": note_text,
"hs_timestamp": hs_timestamp # Required field in your HubSpot setup
# Fetch associations for this specific deal only
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1 # Expect only one associated company
)
if not response.results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
company_id = response.results[0].to_object_id
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}")
return None
def from_deal_get_associated_listing(self, deal_id: str):
"""
Get the associated listing information for a given deal.
Returns a dictionary of listing properties, or None if not found.
"""
associations_api = self.client.crm.associations.v4.basic_api
listings_api = self.client.crm.objects.basic_api # works for custom objects like "listing"
# Fetch associated listing(s)
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="0-420", # <-- use your exact custom object name slug here
limit=1
)
if not response.results:
self.logger.info(f"No listing association found for deal {deal_id}")
return None
listing_id = response.results[0].to_object_id
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
# Fetch listing details (the "listing information")
listing = listings_api.get_by_id(
object_type="0-420", # again, must match your HubSpot object name
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
]
)
listing_info = listing.properties
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_get_info(self, deal_id):
# Fetch main deal properties
deal = self.client.crm.deals.basic_api.get_by_id(
deal_id,
properties=[
'dealname',
'dealstage',
'expected_commencement_date',
'mtp_planned_week',
'mtp_completion_date',
'design_planned_week',
'retrofit_design_status',
'design_completion_date',
]
)
deal_info = deal.properties
if deal_info:
deal_info.update({"deal_id": deal_id})
# Fetch line items
line_items = self.from_deal_get_line_items(deal_id)
company_id = self.from_deal_get_associated_company_id(deal_id)
company_info = {}
if company_id is not None:
company_info = self.get_company_information(company_id)
# Combine all into one dictionary
return {
"deal_properties": deal_info,
"line_items": line_items,
"company_info": company_info,
}
def get_company_information(self, company_id):
company = self.client.crm.companies.basic_api.get_by_id(
company_id,
properties=[
'name',
]
)
company_info = company.properties
return company_info
def get_all_pipelines(self):
"""
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
"""
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = pipelines_api.get_all(object_type="deals")
pipelines = [
{
"name": pipeline.label,
"id": pipeline.id
}
)
created_note = self.client.crm.objects.notes.basic_api.create(note)
note_id = created_note.id
for pipeline in response.results
]
# Step 2: Associate the note to the deal
association = PublicAssociation(
_from=note_id,
to=deal_id,
type="note_to_deal"
)
self.client.crm.associations.batch_api.create(
'notes',
'deals',
batch_input_public_association=BatchInputPublicAssociation(
inputs=[association]
)
)
self.logger.info(f"📝 Note added to deal {deal_id}: {note_text}")
self.logger.info(f"Retrieved {len(pipelines)} pipelines.")
return pipelines
except Exception as e:
self.logger.error(f"❌ Failed to add note to deal {deal_id}: {e}", exc_info=True)
self.logger.error(f"Error retrieving pipelines: {e}")
return []
def get_deal_stages(self, pipeline_id=None):
"""
Retrieve all deal stages for a given pipeline.
If no pipeline_id is provided, retrieves all stages for all pipelines.
Returns a list of dicts with pipeline name, stage name, and stage ID.
"""
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = pipelines_api.get_all(object_type="deals")
all_stages = []
for pipeline in response.results:
# Skip other pipelines if a specific one is requested
if pipeline_id and pipeline.id != str(pipeline_id):
continue
stages = [
{
"pipeline_name": pipeline.label,
"pipeline_id": pipeline.id,
"stage_name": stage.label,
"stage_id": stage.id
}
for stage in pipeline.stages
]
all_stages.extend(stages)
if not all_stages:
self.logger.info(f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}")
else:
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
return all_stages
except Exception as e:
self.logger.error(f"Error retrieving deal stages: {e}")
return []
def from_deal_get_line_items(self, deal_id: str):
"""
Get all associated line items for a deal.
Returns a list of line item property dictionaries.
"""
try:
associations_api = self.client.crm.associations.v4.basic_api
line_items_api = self.client.crm.objects.basic_api
# Step 1: Get associated line item IDs
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="line_items",
limit=100 # increase if needed
)
if not response.results:
return []
line_item_ids = [item.to_object_id for item in response.results]
# Step 2: Fetch each line item
line_items = []
for line_item_id in line_item_ids:
item = line_items_api.get_by_id(
object_type="line_items",
object_id=line_item_id,
properties=[
"name",
"quantity",
"price",
"amount",
"hs_product_id",
"invoice_reference",
"invoiced"
]
)
line_items.append(item.properties)
return line_items
except Exception as e:
print(f"Error fetching line items for deal {deal_id}: {e}")
return []

View file

@ -0,0 +1,238 @@
import logging
import asyncio
from hubspot.crm.associations import ApiException
import hubspot
class HubSpotClientAsync:
API_CONCURRENCY = asyncio.Semaphore(5) # globally limit concurrency
RATE_LIMIT_DELAY = 0.25 # 4 requests/sec → safe
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
self.client = hubspot.Client.create(access_token=self.access_token)
self.all_deals = None
# cleaner logging — only warning+error by default
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [HubSpot] %(levelname)s: %(message)s"
)
self.logger = logging.getLogger(__name__)
# -----------------------------------
# Core Safe Request Wrapper
# -----------------------------------
async def _run(self, func, *args, **kwargs):
async with self.API_CONCURRENCY:
try:
result = await asyncio.to_thread(func, *args, **kwargs)
await asyncio.sleep(self.RATE_LIMIT_DELAY)
return result
except Exception as e:
if "429" in str(e):
self.logger.warning("Hit HubSpot rate limit → sleeping 10s")
await asyncio.sleep(10)
return await self._run(func, *args, **kwargs)
raise
# -----------------------------------
# Deals
# -----------------------------------
async def get_all_deals(self):
self.all_deals = await self._run(self.client.crm.deals.get_all)
return self.all_deals
async def get_deal_ids_by_pipeline(self, pipeline_id):
if self.all_deals is None:
await self.get_all_deals()
return [
deal.id
for deal in self.all_deals
if deal.properties.get("pipeline") == str(pipeline_id)
]
# -----------------------------------
# Associations
# -----------------------------------
async def from_deal_get_associated_company_id(self, deal_id: str):
try:
associations = self.client.crm.associations.v4.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"companies",
limit=1,
)
if not response.results:
return None
return response.results[0].to_object_id
except ApiException:
self.logger.warning(f"Failed to fetch company for deal {deal_id}")
return None
async def from_deal_get_associated_listing(self, deal_id: str):
associations = self.client.crm.associations.v4.basic_api
listings_api = self.client.crm.objects.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"0-420",
limit=1,
)
if not response.results:
return None
listing_id = response.results[0].to_object_id
listing = await self._run(
listings_api.get_by_id,
"0-420",
listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
)
return listing.properties
# -----------------------------------
# Deal Info
# -----------------------------------
async def from_deal_get_info(self, deal_id):
deal = await self._run(
self.client.crm.deals.basic_api.get_by_id,
deal_id,
properties=[
'dealname',
'dealstage',
'expected_commencement_date',
'mtp_planned_week',
'mtp_completion_date',
'design_planned_week',
'retrofit_design_status',
'design_completion_date',
]
)
deal_info = dict(deal.properties)
deal_info["deal_id"] = deal_id
line_items = await self.from_deal_get_line_items(deal_id)
company_id = await self.from_deal_get_associated_company_id(deal_id)
company_info = await self.get_company_information(company_id) if company_id else {}
return {
"deal_properties": deal_info,
"line_items": line_items,
"company_info": company_info,
}
# -----------------------------------
# Company Info
# -----------------------------------
async def get_company_information(self, company_id):
company = await self._run(
self.client.crm.companies.basic_api.get_by_id,
company_id,
properties=['name']
)
return company.properties
# -----------------------------------
# Pipelines
# -----------------------------------
async def get_all_pipelines(self):
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = await self._run(
pipelines_api.get_all,
object_type="deals",
)
return [
{"name": pipeline.label, "id": pipeline.id}
for pipeline in response.results
]
except Exception:
self.logger.error("Failed to fetch pipelines")
return []
async def get_deal_stages(self, pipeline_id=None):
try:
pipelines_api = self.client.crm.pipelines.pipelines_api
response = await self._run(
pipelines_api.get_all,
object_type="deals",
)
stages = []
for pipeline in response.results:
if pipeline_id and pipeline.id != str(pipeline_id):
continue
for stage in pipeline.stages:
stages.append({
"pipeline_name": pipeline.label,
"pipeline_id": pipeline.id,
"stage_name": stage.label,
"stage_id": stage.id,
})
return stages
except Exception:
self.logger.error("Failed to fetch deal stages")
return []
# -----------------------------------
# Line Items
# -----------------------------------
async def from_deal_get_line_items(self, deal_id: str):
try:
associations = self.client.crm.associations.v4.basic_api
line_api = self.client.crm.objects.basic_api
response = await self._run(
associations.get_page,
"deals",
deal_id,
"line_items",
limit=100,
)
if not response.results:
return []
line_items = []
for row in response.results:
item = await self._run(
line_api.get_by_id,
"line_items",
row.to_object_id,
properties=[
"name",
"quantity",
"price",
"amount",
"hs_product_id",
"invoice_reference",
"invoiced",
],
)
line_items.append(item.properties)
return line_items
except Exception:
self.logger.warning(f"Line items missing for deal {deal_id}")
return []