mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
its now perfect
This commit is contained in:
parent
3970d70518
commit
cca72928d9
1 changed files with 32 additions and 28 deletions
|
|
@ -2,22 +2,22 @@ import os
|
|||
from enum import Enum
|
||||
from typing import Optional, cast
|
||||
|
||||
from hubspot.client import Client
|
||||
from hubspot.crm.associations import ApiException
|
||||
from hubspot.crm.objects import SimplePublicObjectInput
|
||||
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi
|
||||
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi
|
||||
from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi
|
||||
from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi
|
||||
from hubspot.crm.pipelines.models import (
|
||||
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs]
|
||||
CollectionResponsePipelineNoPaging as PipelinesResponse,
|
||||
)
|
||||
from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline
|
||||
from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage
|
||||
from hubspot.crm.objects.models import SimplePublicObject as HubspotObject
|
||||
from hubspot.crm.associations.v4 import AssociationSpec
|
||||
from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi
|
||||
from hubspot.crm.associations.v4.models import (
|
||||
from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs]
|
||||
CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse,
|
||||
MultiAssociatedObjectWithLabel as AssociationsResult,
|
||||
ForwardPaging as AssociationsPaging,
|
||||
|
|
@ -362,15 +362,17 @@ class HubspotClient:
|
|||
self.logger.error(f"Failed to download file from HubSpot: {e}")
|
||||
raise
|
||||
|
||||
def create_line_item_from_product(self, product_id: str, quantity: int = 1):
|
||||
def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str:
|
||||
# Fetch product mapping
|
||||
product = self.client.crm.products.basic_api.get_by_id(
|
||||
products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType]
|
||||
product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType]
|
||||
product_id, properties=["name", "price", "hs_price"]
|
||||
)
|
||||
properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
name = product.properties.get("name")
|
||||
price = (
|
||||
product.properties.get("price") or product.properties.get("hs_price") or "0"
|
||||
name: str = properties.get("name") or ""
|
||||
price: str = (
|
||||
properties.get("price") or properties.get("hs_price") or "0"
|
||||
)
|
||||
|
||||
# Build line item payload
|
||||
|
|
@ -386,15 +388,16 @@ class HubspotClient:
|
|||
)
|
||||
|
||||
# Create line item
|
||||
line_item = self.client.crm.line_items.basic_api.create(line_item_input)
|
||||
return line_item.id
|
||||
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
|
||||
line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
|
||||
return cast(str, line_item.id) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str):
|
||||
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None:
|
||||
self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}")
|
||||
|
||||
association_api = self.client.crm.associations.v4.basic_api
|
||||
association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
|
||||
|
||||
association_api.create(
|
||||
association_api.create( # type: ignore[reportUnknownMemberType]
|
||||
"0-3", # to object type
|
||||
deal_id, # to object id
|
||||
"line_items", # from object type
|
||||
|
|
@ -409,23 +412,24 @@ class HubspotClient:
|
|||
|
||||
def add_product_line_item_to_deal(
|
||||
self, deal_id: str, product_id: str, quantity: int = 1
|
||||
):
|
||||
) -> str:
|
||||
# Step 1: Create the line item from product mapping
|
||||
line_item_id = self.create_line_item_from_product(product_id, quantity)
|
||||
line_item_id: str = self.create_line_item_from_product(product_id, quantity)
|
||||
|
||||
# Step 2: Associate the created line item to the deal
|
||||
self.associate_line_item_to_deal(line_item_id, deal_id)
|
||||
|
||||
return line_item_id
|
||||
|
||||
def delete_line_item(self, line_item_id: str):
|
||||
def delete_line_item(self, line_item_id: str) -> bool:
|
||||
"""
|
||||
Delete (archive) a line item in HubSpot by its ID.
|
||||
"""
|
||||
try:
|
||||
self.logger.info(f"Deleting line item {line_item_id}...")
|
||||
|
||||
self.client.crm.line_items.basic_api.archive(line_item_id)
|
||||
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
|
||||
line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
self.logger.info(f"Line item {line_item_id} deleted successfully.")
|
||||
return True
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue