added dampandmouldandrepaircomments, more resilence on retries with hubspot api

This commit is contained in:
Jun-te Kim 2026-04-02 10:12:09 +00:00
parent 955dffd74d
commit acb44dc60a
5 changed files with 227 additions and 128 deletions

View file

@ -44,6 +44,7 @@ class HubspotDealData(SQLModel, table=True):
pashub_link: Optional[str] = Field(default=None)
sharepoint_link: Optional[str] = Field(default=None)
dampmould_growth: Optional[str] = Field(default=None)
damp_mould_and_repairs_comments: Optional[str] = Field(default=None)
pre_sap: Optional[str] = Field(default=None)
coordinator: Optional[str] = Field(default=None)
mtp_completion_date: Optional[datetime] = Field(default=None)

View file

@ -1,9 +1,12 @@
import os
import time
from enum import Enum
from typing import Optional, cast
from typing import Optional, cast, Callable, TypeVar
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs]
T = TypeVar("T")
from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs]
@ -83,6 +86,30 @@ class HubspotClient:
# Sorry - not sorry but enjoy, Past Junte 13/03/2026
# self.client
def _call_with_retry(self, fn: Callable[[], T], max_retries: int = 2) -> T:
"""
Call fn(), retrying up to max_retries times on 429 rate-limit errors.
Waits the minimal amount: the remaining interval window reported by HubSpot headers.
Falls back to the full interval (10s) if headers are absent.
"""
for attempt in range(max_retries + 1):
try:
return fn()
except ApiException as e:
if e.status != 429 or attempt == max_retries:
raise
headers = e.headers or {}
interval_ms = int(
headers.get("x-hubspot-ratelimit-interval-milliseconds", 10000)
)
wait_s = interval_ms / 1000.0
self.logger.warning(
f"HubSpot 429 (attempt {attempt + 1}/{max_retries}), "
f"waiting {wait_s:.1f}s before retry."
)
time.sleep(wait_s)
raise RuntimeError("Unreachable") # pragma: no cover
def get_deal_ids_from_company(self, company_id: str) -> list[str]:
associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType]
self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
@ -92,12 +119,14 @@ class HubspotClient:
after: Optional[str] = None
while True:
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="companies",
object_id=company_id,
to_object_type="deals",
limit=100,
after=after,
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="companies",
object_id=company_id,
to_object_type="deals",
limit=100,
after=after,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
@ -127,11 +156,13 @@ class HubspotClient:
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
# Fetch associations for this specific deal only
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1, # Expect only one associated company
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
@ -161,11 +192,13 @@ class HubspotClient:
listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing"
# Fetch associated listing(s)
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="0-420", # <-- to get an listing object
limit=1,
response: AssociationsPageResponse = self._call_with_retry(
lambda: associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="0-420",
limit=1,
)
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
@ -178,14 +211,16 @@ class HubspotClient:
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
# Fetch listing details (the "listing information")
listing: HubspotObject = listings_api.get_by_id( # type: ignore[reportUnknownMemberType]
object_type="0-420", # again, must match your HubSpot object name
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
listing: HubspotObject = self._call_with_retry(
lambda: listings_api.get_by_id( # type: ignore[reportUnknownMemberType]
object_type="0-420",
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
)
)
listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType]
@ -196,44 +231,47 @@ class HubspotClient:
def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]:
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
deal: HubspotObject = deals_api.get_by_id( # type: ignore[reportUnknownMemberType]
deal_id,
properties=[
"dealname",
"dealstage",
"pipeline",
"outcome",
"outcome_notes",
"project_code",
"major_condition_issue_description",
"major_condition_issue_photos",
"coordination_status__stage_1_",
"retrofit_design_status",
"pashub_link",
"sharepoint_link",
"dampmould_growth",
"pre_sap",
"coordinator",
"mtp_completion_date",
"mtp_re_model_completion_date",
"ioe_v3_completion_date",
"proposed_measures",
"approved_package",
"designer",
"design_completion_date",
"actual_measures_installed",
"installer",
"installer_handover",
"lodgement_status",
"measures_lodgement_date",
"lodgement_date",
"expected_commencement_date",
"surveyor",
"confirmed_survey_date",
"confirmed_survey_time",
"surveyed_date",
"design_type",
],
deal: HubspotObject = self._call_with_retry(
lambda: deals_api.get_by_id( # type: ignore[reportUnknownMemberType]
deal_id,
properties=[
"dealname",
"dealstage",
"pipeline",
"outcome",
"outcome_notes",
"project_code",
"major_condition_issue_description",
"major_condition_issue_photos",
"coordination_status__stage_1_",
"retrofit_design_status",
"pashub_link",
"sharepoint_link",
"dampmould_growth",
"damp_mould_and_repairs_comments",
"pre_sap",
"coordinator",
"mtp_completion_date",
"mtp_re_model_completion_date",
"ioe_v3_completion_date",
"proposed_measures",
"approved_package",
"designer",
"design_completion_date",
"actual_measures_installed",
"installer",
"installer_handover",
"lodgement_status",
"measures_lodgement_date",
"lodgement_date",
"expected_commencement_date",
"surveyor",
"confirmed_survey_date",
"confirmed_survey_time",
"surveyed_date",
"design_type",
],
)
)
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
@ -260,11 +298,11 @@ class HubspotClient:
def get_company_information(self, company_id: str) -> CompanyData:
companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType]
company: HubspotObject = companies_api.get_by_id( # type: ignore[reportUnknownMemberType]
company_id,
properties=[
"name",
],
company: HubspotObject = self._call_with_retry(
lambda: companies_api.get_by_id( # type: ignore[reportUnknownMemberType]
company_id,
properties=["name"],
)
)
company_info: CompanyData = company.properties # type: ignore[reportUnknownMemberType]
@ -401,8 +439,10 @@ class HubspotClient:
def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str:
# Fetch product mapping
products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType]
product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType]
product_id, properties=["name", "price", "hs_price"]
product: HubspotObject = self._call_with_retry(
lambda: products_api.get_by_id( # type: ignore[reportUnknownMemberType]
product_id, properties=["name", "price", "hs_price"]
)
)
properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType]
@ -423,7 +463,9 @@ class HubspotClient:
# Create line item
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
line_item: HubspotObject = self._call_with_retry(
lambda: line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
)
return cast(str, line_item.id) # type: ignore[reportUnknownMemberType]
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None:
@ -431,17 +473,19 @@ class HubspotClient:
association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
association_api.create( # type: ignore[reportUnknownMemberType]
"0-3", # to object type
deal_id, # to object id
"line_items", # from object type
line_item_id, # from object id
[
AssociationSpec(
association_category="HUBSPOT_DEFINED",
association_type_id=19, # line_item → deal
)
],
self._call_with_retry(
lambda: association_api.create( # type: ignore[reportUnknownMemberType]
"0-3",
deal_id,
"line_items",
line_item_id,
[
AssociationSpec(
association_category="HUBSPOT_DEFINED",
association_type_id=19,
)
],
)
)
def add_product_line_item_to_deal(

View file

@ -181,6 +181,11 @@ class HubspotDataToDb:
deal_in_db.dampmould_growth == hs_deal.get("dampmould_growth"),
"dampmould_growth mismatch",
),
soft_assert(
deal_in_db.damp_mould_and_repairs_comments
== hs_deal.get("damp_mould_and_repairs_comments"),
"damp_mould_and_repairs_comments mismatch",
),
soft_assert(
deal_in_db.pre_sap == hs_deal.get("pre_sap"),
"pre_sap mismatch",
@ -190,15 +195,18 @@ class HubspotDataToDb:
"coordinator mismatch",
),
soft_assert(
deal_in_db.mtp_completion_date == self._parse_hs_date(hs_deal.get("mtp_completion_date")),
deal_in_db.mtp_completion_date
== self._parse_hs_date(hs_deal.get("mtp_completion_date")),
"mtp_completion_date mismatch",
),
soft_assert(
deal_in_db.mtp_re_model_completion_date == self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")),
deal_in_db.mtp_re_model_completion_date
== self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")),
"mtp_re_model_completion_date mismatch",
),
soft_assert(
deal_in_db.ioe_v3_completion_date == self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")),
deal_in_db.ioe_v3_completion_date
== self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")),
"ioe_v3_completion_date mismatch",
),
soft_assert(
@ -214,11 +222,13 @@ class HubspotDataToDb:
"designer mismatch",
),
soft_assert(
deal_in_db.design_completion_date == self._parse_hs_date(hs_deal.get("design_completion_date")),
deal_in_db.design_completion_date
== self._parse_hs_date(hs_deal.get("design_completion_date")),
"design_completion_date mismatch",
),
soft_assert(
deal_in_db.actual_measures_installed == hs_deal.get("actual_measures_installed"),
deal_in_db.actual_measures_installed
== hs_deal.get("actual_measures_installed"),
"actual_measures_installed mismatch",
),
soft_assert(
@ -234,15 +244,18 @@ class HubspotDataToDb:
"lodgement_status mismatch",
),
soft_assert(
deal_in_db.measures_lodgement_date == self._parse_hs_date(hs_deal.get("measures_lodgement_date")),
deal_in_db.measures_lodgement_date
== self._parse_hs_date(hs_deal.get("measures_lodgement_date")),
"measures_lodgement_date mismatch",
),
soft_assert(
deal_in_db.lodgement_date == self._parse_hs_date(hs_deal.get("lodgement_date")),
deal_in_db.lodgement_date
== self._parse_hs_date(hs_deal.get("lodgement_date")),
"lodgement_date mismatch",
),
soft_assert(
deal_in_db.expected_commencement_date == self._parse_hs_date(hs_deal.get("expected_commencement_date")),
deal_in_db.expected_commencement_date
== self._parse_hs_date(hs_deal.get("expected_commencement_date")),
"expected_commencement_date mismatch",
),
soft_assert(
@ -250,15 +263,18 @@ class HubspotDataToDb:
"surveyor mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_date == self._parse_hs_date(hs_deal.get("confirmed_survey_date")),
deal_in_db.confirmed_survey_date
== self._parse_hs_date(hs_deal.get("confirmed_survey_date")),
"confirmed_survey_date mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_time == hs_deal.get("confirmed_survey_time"),
deal_in_db.confirmed_survey_time
== hs_deal.get("confirmed_survey_time"),
"confirmed_survey_time mismatch",
),
soft_assert(
deal_in_db.surveyed_date == self._parse_hs_date(hs_deal.get("surveyed_date")),
deal_in_db.surveyed_date
== self._parse_hs_date(hs_deal.get("surveyed_date")),
"surveyed_date mismatch",
),
soft_assert(
@ -369,26 +385,47 @@ class HubspotDataToDb:
"pashub_link": deal_data.get("pashub_link"),
"sharepoint_link": deal_data.get("sharepoint_link"),
"dampmould_growth": deal_data.get("dampmould_growth"),
"damp_mould_and_repairs_comments": deal_data.get("damp_mould_and_repairs_comments"),
"pre_sap": deal_data.get("pre_sap"),
"coordinator": deal_data.get("coordinator"),
"mtp_completion_date": self._parse_hs_date(deal_data.get("mtp_completion_date")),
"mtp_re_model_completion_date": self._parse_hs_date(deal_data.get("mtp_re_model_completion_date")),
"ioe_v3_completion_date": self._parse_hs_date(deal_data.get("ioe_v3_completion_date")),
"mtp_completion_date": self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
"mtp_re_model_completion_date": self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
"ioe_v3_completion_date": self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
"proposed_measures": deal_data.get("proposed_measures"),
"approved_package": deal_data.get("approved_package"),
"designer": deal_data.get("designer"),
"design_completion_date": self._parse_hs_date(deal_data.get("design_completion_date")),
"actual_measures_installed": deal_data.get("actual_measures_installed"),
"design_completion_date": self._parse_hs_date(
deal_data.get("design_completion_date")
),
"actual_measures_installed": deal_data.get(
"actual_measures_installed"
),
"installer": deal_data.get("installer"),
"installer_handover": deal_data.get("installer_handover"),
"lodgement_status": deal_data.get("lodgement_status"),
"measures_lodgement_date": self._parse_hs_date(deal_data.get("measures_lodgement_date")),
"lodgement_date": self._parse_hs_date(deal_data.get("lodgement_date")),
"expected_commencement_date": self._parse_hs_date(deal_data.get("expected_commencement_date")),
"measures_lodgement_date": self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
"lodgement_date": self._parse_hs_date(
deal_data.get("lodgement_date")
),
"expected_commencement_date": self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
"surveyor": deal_data.get("surveyor"),
"confirmed_survey_date": self._parse_hs_date(deal_data.get("confirmed_survey_date")),
"confirmed_survey_date": self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
"confirmed_survey_time": deal_data.get("confirmed_survey_time"),
"surveyed_date": self._parse_hs_date(deal_data.get("surveyed_date")),
"surveyed_date": self._parse_hs_date(
deal_data.get("surveyed_date")
),
"design_type": deal_data.get("design_type"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
@ -435,7 +472,7 @@ class HubspotDataToDb:
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
listing_id=listing.get("listing_id"),
listing_id=listing.get("listing_id", None),
landlord_property_id=listing.get("owner_property_id"),
uprn=listing.get("national_uprn"),
outcome=deal_data.get("outcome"),
@ -453,24 +490,41 @@ class HubspotDataToDb:
pashub_link=deal_data.get("pashub_link"),
sharepoint_link=deal_data.get("sharepoint_link"),
dampmould_growth=deal_data.get("dampmould_growth"),
damp_mould_and_repairs_comments=deal_data.get("damp_mould_and_repairs_comments"),
pre_sap=deal_data.get("pre_sap"),
coordinator=deal_data.get("coordinator"),
mtp_completion_date=self._parse_hs_date(deal_data.get("mtp_completion_date")),
mtp_re_model_completion_date=self._parse_hs_date(deal_data.get("mtp_re_model_completion_date")),
ioe_v3_completion_date=self._parse_hs_date(deal_data.get("ioe_v3_completion_date")),
mtp_completion_date=self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
mtp_re_model_completion_date=self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
ioe_v3_completion_date=self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
proposed_measures=deal_data.get("proposed_measures"),
approved_package=deal_data.get("approved_package"),
designer=deal_data.get("designer"),
design_completion_date=self._parse_hs_date(deal_data.get("design_completion_date")),
actual_measures_installed=deal_data.get("actual_measures_installed"),
design_completion_date=self._parse_hs_date(
deal_data.get("design_completion_date")
),
actual_measures_installed=deal_data.get(
"actual_measures_installed"
),
installer=deal_data.get("installer"),
installer_handover=deal_data.get("installer_handover"),
lodgement_status=deal_data.get("lodgement_status"),
measures_lodgement_date=self._parse_hs_date(deal_data.get("measures_lodgement_date")),
measures_lodgement_date=self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
lodgement_date=self._parse_hs_date(deal_data.get("lodgement_date")),
expected_commencement_date=self._parse_hs_date(deal_data.get("expected_commencement_date")),
expected_commencement_date=self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
surveyor=deal_data.get("surveyor"),
confirmed_survey_date=self._parse_hs_date(deal_data.get("confirmed_survey_date")),
confirmed_survey_date=self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
confirmed_survey_time=deal_data.get("confirmed_survey_time"),
surveyed_date=self._parse_hs_date(deal_data.get("surveyed_date")),
design_type=deal_data.get("design_type"),

View file

@ -2,11 +2,18 @@ from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline
from etl.hubspot.scripts.scraper.main import handler
from tqdm import tqdm
import json
import time
PIPELINE_ID = Pipeline.OPERATIONS_SOCIAL_HOUSING.value
companies = list([Companies.THE_GUINESS_PARTNERSHIP, Companies.SOUTHERN_HOUSING_GROUP])
companies = list(
[
# Companies.THE_GUINESS_PARTNERSHIP,
# Companies.SOUTHERN_HOUSING_GROUP,
Companies.CALICO_HOMES,
]
)
def bulk_load(companies: list[Companies] | None = None) -> None:
@ -17,20 +24,23 @@ def bulk_load(companies: list[Companies] | None = None) -> None:
hubspot = HubspotClient()
targets = companies or list(Companies)
for company in tqdm(targets, desc="Companies", unit="co"):
for company in tqdm(targets, desc="Companies", unit="co", leave=False):
company_id = company.value
deal_ids = hubspot.get_deal_ids_from_company(company_id)
processed = 0
with tqdm(deal_ids, desc=company.name, unit="deal", leave=False) as deal_bar:
with tqdm(deal_ids, desc=company.name, unit="deal", leave=True, position=0) as deal_bar:
for deal_id in deal_bar:
deal_data = hubspot.from_deal_id_get_info(deal_id)
if deal_data.get("pipeline") != PIPELINE_ID:
deal_bar.set_postfix({"status": "skip", "deal": deal_id})
continue
time.sleep(5)
deal_bar.set_postfix({"status": "uploading", "deal": deal_id})
handler({"Records": [{"body": json.dumps({"hubspot_deal_id": deal_id})}]}, context=None)
handler(
{"Records": [{"body": json.dumps({"hubspot_deal_id": deal_id})}]},
context=None,
)
processed += 1
deal_bar.set_postfix({"status": "done", "deal": deal_id})

View file

@ -1,17 +1,9 @@
"""
1) [completed]Get hubspot deal properties from one deal
2) Put it in some class
3) [completed] Load the db and check if upsert it into the table
4) [completed]Getting working on a AWS lambda
5) [completed] subtask and tasks history
6) [completed]The new sexy deal properties, move it over
"""
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from backend.utils.subtasks import task_handler
from typing import Any
import json
import time
@task_handler()
@ -25,9 +17,7 @@ def handler(body: dict[str, Any], context: Any) -> None:
hubspot: HubspotClient = HubspotClient()
dbloader: HubspotDataToDb = HubspotDataToDb()
deal = dbloader.find_deal_with_deal_id(hubspot_deal_id)
if deal:
dbloader.update_deal_with_checks(deal, hubspot)
else: