Model/etl/hubspot/hubspotClient.py
2026-04-20 15:44:54 +00:00

524 lines
23 KiB
Python

import os
import time
from enum import Enum
from typing import Optional, cast, Callable, Any
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.products.api.basic_api import BasicApi as ProductsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.line_items.api.basic_api import BasicApi as LineItemsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs]
CollectionResponsePipelineNoPaging as PipelinesResponse,
)
from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs]
CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse,
MultiAssociatedObjectWithLabel as AssociationsResult,
ForwardPaging as AssociationsPaging,
NextPage as AssociationsPagingNext,
)
from backend.app.config import get_settings
from etl.hubspot.company_data import CompanyData
from utils.logger import setup_logger
import mimetypes
import requests
class Companies(Enum):
ABRI = "237615001799"
SOUTHERN_HOUSING_GROUP = "109343619305"
LIVEWEST = "86205872354"
SURESERVE = "301745289413"
HOMEGROUP = "94946071794"
APPLE = "184769046716"
THE_GUINESS_PARTNERSHIP = "86970043613"
CALICO_HOMES = "86975437046"
class DealStage(Enum):
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
CUSTOMER_CONTACTED = "888730834"
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
FILES_MISSING_FROM_ASSESSOR = "1887736000"
class Pipeline(Enum):
OPERATIONS_SOCIAL_HOUSING = "1167582403"
# TODO get guiness working from here
class HubspotClient:
def __init__(self):
"""
Hey Tech Team, Hubspot Library doesn't do type hitting.
We have type hinted stuff but pylance never becomes happy.
However, because I added the type hinting to the best of ability
and you'll still get sensible ide suggestions.
"""
settings = get_settings()
access_token = settings.HUBSPOT_API_KEY
if access_token is None:
raise RuntimeError("Missing HUBSPOT_API_KEY in env")
self.access_token: str = access_token
self.logger = setup_logger()
self.client: Client = Client.create(access_token=self.access_token) # type: ignore[reportUnknownMemberType]
# [Developer Only]
# Add a dot in front of client and see the wonders of ide suggestions
# This wouldn't work if we didn't add ': Client' to self.client.
# Sorry - not sorry but enjoy, Past Junte 13/03/2026
# self.client
def _call_with_retry(self, fn: Callable[[], Any], max_retries: int = 2) -> Any:
"""
Call fn(), retrying up to max_retries times on 429 rate-limit errors.
Waits the minimal amount: the remaining interval window reported by HubSpot headers.
Falls back to the full interval (10s) if headers are absent.
Note: each HubSpot sub-module (deals, companies, etc.) ships its own ApiException
class with no shared base beyond Exception, so we detect 429s via duck-typing.
"""
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as e:
status = getattr(e, "status", None)
if status != 429 or attempt == max_retries:
raise
headers = getattr(e, "headers", None) or {}
interval_ms = int(
headers.get("x-hubspot-ratelimit-interval-milliseconds", 10000)
)
wait_s = interval_ms / 1000.0
self.logger.warning(
f"HubSpot 429 (attempt {attempt + 1}/{max_retries}), "
f"waiting {wait_s:.1f}s before retry."
)
time.sleep(wait_s)
raise RuntimeError("Unreachable") # pragma: no cover
def get_deal_ids_from_company(self, company_id: str) -> list[str]:
associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType]
self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
)
deal_ids: list[str] = []
after: Optional[str] = None
while True:
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="companies",
object_id=company_id,
to_object_type="deals",
limit=100,
after=after,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
for assoc in results:
assoc: AssociationsResult
object_id: str = cast(str, assoc.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
deal_ids.append(object_id)
paging: Optional[AssociationsPaging] = cast(Optional[AssociationsPaging], response.paging) # type: ignore[reportUnknownMemberType]
if not paging:
break
paging_next: Optional[AssociationsPagingNext] = cast(Optional[AssociationsPagingNext], paging.next) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
if not paging_next:
break
after = cast(str, paging_next.after) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
return deal_ids
def from_deal_id_get_associated_company_id(self, deal_id: str) -> Optional[str]:
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
# Fetch associations for this specific deal only
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
if not results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
first: AssociationsResult = results[0]
company_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(
f"Error fetching associated company for deal {deal_id}: {e}"
)
return None
def from_deal_id_get_associated_listing(
self, deal_id: str
) -> Optional[dict[str, str]]:
"""
Get the associated listing information for a given deal.
Returns a dictionary of listing properties, or None if not found.
"""
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing"
# Fetch associated listing(s)
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="0-420",
limit=1,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
if not results:
self.logger.info(f"No listing association found for deal {deal_id}")
return None
first: AssociationsResult = results[0]
listing_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
# Fetch listing details (the "listing information")
listing: HubspotObject = self._call_with_retry(
lambda: listings_api.get_by_id( # type: ignore[reportUnknownMemberType]
object_type="0-420",
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
)
)
listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType]
listing_info["listing_id"] = listing_id
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_id_get_info(
self, deal_id: str
) -> dict[str, str]: # TODO: add dataclass for this
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
deal: HubspotObject = self._call_with_retry(
lambda: deals_api.get_by_id( # type: ignore[reportUnknownMemberType]
deal_id,
properties=[
"dealname",
"dealstage",
"pipeline",
"outcome",
"outcome_notes",
"project_code",
"major_condition_issue_description",
"major_condition_issue_photos",
"coordination_status__stage_1_",
"coordination_comments",
"retrofit_design_status",
"pashub_link",
"sharepoint_link",
"dampmould_growth",
"damp_mould_and_repairs_comments",
"pre_sap_score_dropdown",
"coordinator",
"mtp_completion_date",
"mtp_re_model_completion_date",
"ioe_v3_completion_date",
"proposed_measures_dropdown",
"approved_package",
"designer",
"design_completion_date",
"actual_measures_installed",
"installer",
"installer_handover",
"lodgement_status",
"measures_lodgement_date",
"lodgement_date",
"expected_commencement_date",
"surveyor",
"confirmed_survey_date",
"confirmed_survey_time",
"surveyed_date",
"design_type",
"batch",
"block_reference",
"epc_prn",
"potential_post_sap_score_dropdown",
"ei_score",
"ei_score__potential_",
"epc_sap_score",
"epc_sap_score__potential_",
],
)
)
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
return deal_info
def get_deal_and_company_and_listing(
self, deal_id: str
) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]:
deal: dict[str, str] = self.from_deal_id_get_info(deal_id)
company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id)
listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing(
deal_id
)
return deal, company, listing
def get_company_information(self, company_id: str) -> CompanyData:
companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType]
company: HubspotObject = self._call_with_retry(
lambda: companies_api.get_by_id( # type: ignore[reportUnknownMemberType]
company_id,
properties=["name"],
)
)
company_info: CompanyData = company.properties # type: ignore[reportUnknownMemberType]
return company_info
def get_all_pipelines(self) -> list[dict[str, str]]:
"""
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
"""
try:
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
results: list[HubspotPipeline] = cast(list[HubspotPipeline], response.results) # type: ignore[reportUnknownMemberType]
pipelines: list[dict[str, str]] = []
for pipeline in results:
pipeline: HubspotPipeline
pipelines.append(
{
"name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
"id": cast(str, pipeline.id), # type: ignore[reportUnknownMemberType]
}
)
self.logger.info(f"Retrieved {len(pipelines)} pipelines.")
return pipelines
except Exception as e:
self.logger.error(f"Error retrieving pipelines: {e}")
return []
def get_deal_stages_from_pipeline_id(
self, pipeline_id: Optional[str] = None
) -> list[dict[str, str]]:
"""
Retrieve all deal stages for a given pipeline.
If no pipeline_id is provided, retrieves all stages for all pipelines.
Returns a list of dicts with pipeline name, stage name, and stage ID.
"""
try:
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
all_stages: list[dict[str, str]] = []
for pipeline in cast(list[HubspotPipeline], response.results): # type: ignore[reportUnknownMemberType]
pipeline: HubspotPipeline
# Skip other pipelines if a specific one is requested
pipeline_id_str: str = cast(str, pipeline.id) # type: ignore[reportUnknownMemberType]
if pipeline_id and pipeline_id_str != str(pipeline_id):
continue
for stage in cast(list[HubspotPipelineStage], pipeline.stages): # type: ignore[reportUnknownMemberType]
stage: HubspotPipelineStage
all_stages.append(
{
"pipeline_name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
"pipeline_id": pipeline_id_str,
"stage_name": cast(str, stage.label), # type: ignore[reportUnknownMemberType]
"stage_id": cast(str, stage.id), # type: ignore[reportUnknownMemberType]
}
)
if not all_stages:
self.logger.info(
f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}"
)
else:
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
return all_stages
except Exception as e:
self.logger.error(f"Error retrieving deal stages: {e}")
return []
def download_file_from_url(
self, download_url: str, save_path: Optional[str] = None
) -> str:
"""
Download a file from a HubSpot file URL (public or private), keeping its original file type.
"""
try:
headers: dict[str, str] = {}
if "hubspotusercontent" not in download_url:
headers["Authorization"] = f"Bearer {self.access_token}"
self.logger.info(f"Downloading HubSpot file: {download_url}")
response = requests.get(
download_url, headers=headers, stream=True, allow_redirects=True
)
response.raise_for_status()
# Try to infer filename from Content-Disposition header
content_disposition = response.headers.get("content-disposition")
if content_disposition and "filename=" in content_disposition:
filename = content_disposition.split("filename=")[1].strip('"')
else:
# fallback: extract from URL or content-type
filename = (
os.path.basename(download_url.split("?")[0]) or "hubspot_download"
)
if "." not in filename:
content_type = response.headers.get("content-type")
ext = (
mimetypes.guess_extension(content_type.split(";")[0])
if content_type
else None
)
if ext:
filename += ext
# Make sure save_path is valid
if save_path is None:
save_path = os.path.abspath(filename)
elif os.path.isdir(save_path):
save_path = os.path.join(save_path, filename)
else:
# if user passes a file path directly, leave it
save_path = os.path.abspath(save_path)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.info(f"File downloaded successfully → {save_path}")
return save_path
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to download file from HubSpot: {e}")
raise
def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str:
# Fetch product mapping
products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType]
product: HubspotObject = self._call_with_retry(
lambda: products_api.get_by_id( # type: ignore[reportUnknownMemberType]
product_id, properties=["name", "price", "hs_price"]
)
)
properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType]
name: str = properties.get("name") or ""
price: str = properties.get("price") or properties.get("hs_price") or "0"
# Build line item payload
line_item_input = SimplePublicObjectInput(
properties={
"hs_product_id": product_id,
"name": name,
"quantity": str(quantity),
"price": price,
"amount": str(float(price) * quantity),
"invoiced": "Outstanding",
}
)
# Create line item
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
line_item: HubspotObject = self._call_with_retry(
lambda: line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
)
return cast(str, line_item.id) # type: ignore[reportUnknownMemberType]
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None:
self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}")
association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
self._call_with_retry(
lambda: association_api.create( # type: ignore[reportUnknownMemberType]
"0-3",
deal_id,
"line_items",
line_item_id,
[
AssociationSpec(
association_category="HUBSPOT_DEFINED",
association_type_id=19,
)
],
)
)
def add_product_line_item_to_deal(
self, deal_id: str, product_id: str, quantity: int = 1
) -> str:
# Step 1: Create the line item from product mapping
line_item_id: str = self.create_line_item_from_product(product_id, quantity)
# Step 2: Associate the created line item to the deal
self.associate_line_item_to_deal(line_item_id, deal_id)
return line_item_id
def delete_line_item(self, line_item_id: str) -> bool:
"""
Delete (archive) a line item in HubSpot by its ID.
"""
try:
self.logger.info(f"Deleting line item {line_item_id}...")
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType]
self.logger.info(f"Line item {line_item_id} deleted successfully.")
return True
except ApiException as e:
self.logger.error(f"Failed to delete line item {line_item_id}: {e}")
return False