From b51a96e7d43dc31023bf6d473dd2df9d07fdb8ca Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 20 Mar 2026 11:53:01 +0000 Subject: [PATCH] fail gracefully --- etl/db/hubSpotLoad.py | 112 ++++--- etl/hubSpotClient/hubspotClient.py | 303 +++++++++++------- .../scripts/debug_one_address.py | 19 ++ etl/scraper/scraper.py | 1 + 4 files changed, 278 insertions(+), 157 deletions(-) create mode 100644 etl/hubSpotClient/scripts/debug_one_address.py diff --git a/etl/db/hubSpotLoad.py b/etl/db/hubSpotLoad.py index 617f2bc..55a1a59 100644 --- a/etl/db/hubSpotLoad.py +++ b/etl/db/hubSpotLoad.py @@ -35,6 +35,13 @@ class HubspotTodb: .filter(HubspotDealData.company_id == company_id) .all() ) + + def find_deal_with_deal_id(self, deal_id): + with get_db_session() as session: + return( + session.query(HubspotDealData) + .filter(HubspotDealData.deal_id == deal_id).one() + ) def _sha256(self, file_path: str) -> str: """Compute SHA-256 checksum of a file.""" @@ -130,33 +137,39 @@ class HubspotTodb: f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." ) - # Download from HubSpot - local_file = hubspot_client.download_file_from_url( - deal_in_db.major_condition_issue_photos - ) + photo_url = hs_deal.get("major_condition_issue_photos") + if photo_url: + try: + # Download from HubSpot using fresh URL from hs_deal (not stale DB URL) + local_file = hubspot_client.download_file_from_url(photo_url) - # Upload to S3 - bucket = "retrofit-data-dev" - s3_url = self.s3.upload_file( - local_file, bucket, prefix="hubspot/awaabs_law_evidence/" - ) + # Upload to S3 + bucket = "retrofit-data-dev" + s3_url = self.s3.upload_file( + local_file, bucket, prefix="hubspot/awaabs_law_evidence/" + ) - # Download again to verify integrity - downloaded = self.s3.download_from_url(s3_url) - if self._sha256(local_file) == self._sha256(downloaded): - print("✅ SHA256 match verified — upload successful.") + # Download again to verify integrity + downloaded = self.s3.download_from_url(s3_url) + if self._sha256(local_file) == self._sha256(downloaded): + print("✅ SHA256 match verified — upload successful.") + else: + print("❌ SHA256 mismatch — integrity check failed.") + raise ValueError("File integrity check failed after S3 upload.") + + # Update DB record with S3 URL + with get_db_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = s3_url + session.add(db_record) + session.commit() + print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}") + return False + except Exception as e: + print(f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}") + # Continue without the file — don't crash the entire update else: - print("❌ SHA256 mismatch — integrity check failed.") - raise ValueError("File integrity check failed after S3 upload.") - - # Update DB record with S3 URL - with get_db_session() as session: - db_record = session.get(HubspotDealData, deal_in_db.id) - db_record.major_condition_issue_evidence_s3_url = s3_url - session.add(db_record) - session.commit() - print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}") - return False + print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") else: print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") @@ -213,15 +226,24 @@ class HubspotTodb: existing.major_condition_issue_photos and not existing.major_condition_issue_evidence_s3_url ): - local_file = hubspot_client.download_file_from_url( - existing.major_condition_issue_photos - ) - s3_url = self.s3.upload_file( - local_file, - "retrofit-data-dev", - prefix="hubspot/awaabs_law_evidence/", - ) - existing.major_condition_issue_evidence_s3_url = s3_url + # Fetch fresh URL from HubSpot instead of using potentially expired stored URL + fresh_deal = hubspot_client.from_deal_get_info(existing.deal_id) + photo_url = fresh_deal.get("major_condition_issue_photos") + + if photo_url: + try: + local_file = hubspot_client.download_file_from_url(photo_url) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + existing.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print(f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}") + # Continue without the file — don't crash the update + else: + print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}") session.add(existing) session.commit() @@ -252,17 +274,23 @@ class HubspotTodb: # Handle upload at insert time if new_record.major_condition_issue_photos: - local_file = hubspot_client.download_file_from_url( - new_record.major_condition_issue_photos - ) - s3_url = self.s3.upload_file( - local_file, - "retrofit-data-dev", - prefix="hubspot/awaabs_law_evidence/", - ) - new_record.major_condition_issue_evidence_s3_url = s3_url + try: + local_file = hubspot_client.download_file_from_url( + new_record.major_condition_issue_photos + ) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + new_record.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print(f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}") + # Continue without the file — don't crash the insert session.add(new_record) session.commit() session.refresh(new_record) return new_record + + diff --git a/etl/hubSpotClient/hubspotClient.py b/etl/hubSpotClient/hubspotClient.py index 6bc3eab..a7cac44 100644 --- a/etl/hubSpotClient/hubspotClient.py +++ b/etl/hubSpotClient/hubspotClient.py @@ -10,6 +10,7 @@ from hubspot.crm.objects import SimplePublicObjectInput from hubspot.crm.associations.v4 import AssociationSpec from hubspot.crm.associations import ApiException + class Companies(Enum): ABRI = "237615001799" SOUTHERN_HOUSING_GROUP = "109343619305" @@ -19,6 +20,7 @@ class Companies(Enum): APPLE = "184769046716" THE_GUINESS_PARTNERSHIP = "86970043613" + class DealStage(Enum): SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914" SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915" @@ -26,14 +28,16 @@ class DealStage(Enum): SURVEYED_COMPLETED_SIGNED_OFF = "1617223916" FILES_MISSING_FROM_ASSESSOR = "1887736000" + class Pipeline(Enum): OPERATIONS_SOCIAL_HOUSING = "1167582403" -class HubSpotClient(): + +class HubSpotClient: def __init__(self): self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6" self.client = hubspot.Client.create(access_token=self.access_token) - self.logger = Logger(name='HubSpotClient', level=logging.INFO).get_logger() + self.logger = Logger(name="HubSpotClient", level=logging.INFO).get_logger() self.all_deals = None def get_all_deals(self): @@ -49,7 +53,8 @@ class HubSpotClient(): # Filter deals where properties['pipeline'] matches the given pipeline_id filtered_deals = [ - deal for deal in self.all_deals + deal + for deal in self.all_deals if deal.properties["pipeline"] == str(pipeline_id) ] @@ -57,35 +62,7 @@ class HubSpotClient(): deal_ids = [deal.id for deal in filtered_deals] return deal_ids - - def from_deal_get_associated_company_id(self, deal_id: str): - """ - Get the associated company ID from a given deal ID. - Returns the associated company ID, or None if not found. - """ - try: - associations_api = self.client.crm.associations.v4.basic_api - # Fetch associations for this specific deal only - response = associations_api.get_page( - object_type="deals", - object_id=deal_id, - to_object_type="companies", - limit=1 # Expect only one associated company - ) - - if not response.results: - self.logger.info(f"No company association found for deal {deal_id}") - return None - - company_id = response.results[0].to_object_id - self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}") - return company_id - - except ApiException as e: - self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}") - return None - def get_deals_from_company(self, company_id: str) -> list[str]: associations_api = self.client.crm.associations.v4.basic_api @@ -98,12 +75,10 @@ class HubSpotClient(): object_id=company_id, to_object_type="deals", limit=100, - after=after + after=after, ) - deal_ids.extend( - assoc.to_object_id for assoc in response.results - ) + deal_ids.extend(assoc.to_object_id for assoc in response.results) if not response.paging or not response.paging.next: break @@ -111,21 +86,23 @@ class HubSpotClient(): after = response.paging.next.after return deal_ids - + def from_deal_get_associated_listing(self, deal_id: str): """ Get the associated listing information for a given deal. Returns a dictionary of listing properties, or None if not found. """ associations_api = self.client.crm.associations.v4.basic_api - listings_api = self.client.crm.objects.basic_api # works for custom objects like "listing" + listings_api = ( + self.client.crm.objects.basic_api + ) # works for custom objects like "listing" # Fetch associated listing(s) response = associations_api.get_page( object_type="deals", object_id=deal_id, to_object_type="0-420", # <-- use your exact custom object name slug here - limit=1 + limit=1, ) if not response.results: @@ -143,31 +120,60 @@ class HubSpotClient(): "national_uprn", "domna_property_id", "owner_property_id", - ] + ], ) listing_info = listing.properties self.logger.info(f"Listing info for deal {deal_id}: {listing_info}") return listing_info - + def from_deal_get_info(self, deal_id): - deal = self.client.crm.deals.basic_api.get_by_id(deal_id, + deal = self.client.crm.deals.basic_api.get_by_id( + deal_id, properties=[ - 'dealname', - 'dealstage', - 'pipeline', - 'outcome', #outcome, - 'outcome_notes', #outcome notes - 'project_code', - 'major_condition_issue_description', - 'major_condition_issue_photos', - 'coordination_status__stage_1_', # Coordiantion Status (Stage 1), - 'retrofit_design_status', # Retrofit Design Status - ] + "dealname", + "dealstage", + "pipeline", + "outcome", # outcome, + "outcome_notes", # outcome notes + "project_code", + "major_condition_issue_description", + "major_condition_issue_photos", + "coordination_status__stage_1_", # Coordiantion Status (Stage 1), + "retrofit_design_status", # Retrofit Design Status + ], ) return deal.properties + def from_deal_get_associated_company_id(self, deal_id: str): + """ + Get the associated company ID from a given deal ID. + Returns the associated company ID, or None if not found. + """ + try: + associations_api = self.client.crm.associations.v4.basic_api + + # Fetch associations for this specific deal only + response = associations_api.get_page( + object_type="deals", + object_id=deal_id, + to_object_type="companies", + limit=1 # Expect only one associated company + ) + + if not response.results: + self.logger.info(f"No company association found for deal {deal_id}") + return None + + company_id = response.results[0].to_object_id + self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}") + return company_id + + except ApiException as e: + self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}") + return None + def get_deal_info_for_db(self, deal_id): deal = self.from_deal_get_info(deal_id) company = self.from_deal_get_associated_company_id(deal_id) @@ -175,17 +181,16 @@ class HubSpotClient(): return deal, company, listing - def get_company_information(self, company_id): company = self.client.crm.companies.basic_api.get_by_id( company_id, properties=[ - 'name', - ] + "name", + ], ) company_info = company.properties return company_info - + def get_all_pipelines(self): """ Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs. @@ -195,10 +200,7 @@ class HubSpotClient(): response = pipelines_api.get_all(object_type="deals") pipelines = [ - { - "name": pipeline.label, - "id": pipeline.id - } + {"name": pipeline.label, "id": pipeline.id} for pipeline in response.results ] @@ -208,7 +210,7 @@ class HubSpotClient(): except Exception as e: self.logger.error(f"Error retrieving pipelines: {e}") return [] - + def get_deal_stages(self, pipeline_id=None): """ Retrieve all deal stages for a given pipeline. @@ -231,7 +233,7 @@ class HubSpotClient(): "pipeline_name": pipeline.label, "pipeline_id": pipeline.id, "stage_name": stage.label, - "stage_id": stage.id + "stage_id": stage.id, } for stage in pipeline.stages ] @@ -239,7 +241,9 @@ class HubSpotClient(): all_stages.extend(stages) if not all_stages: - self.logger.info(f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}") + self.logger.info( + f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}" + ) else: self.logger.info(f"Retrieved {len(all_stages)} deal stages.") @@ -248,70 +252,137 @@ class HubSpotClient(): except Exception as e: self.logger.error(f"Error retrieving deal stages: {e}") return [] - + def download_file_from_url(self, download_url: str, save_path: str = None) -> str: """ Download a file from a HubSpot file URL (public or private), keeping its original file type. + Includes retry logic for transient failures and normalization of URL-encoded special characters. """ import mimetypes import requests import os + import time + import re + from urllib.parse import urlparse, urlunparse, unquote, quote - try: - headers = {} - if "hubspotusercontent" not in download_url: - headers["Authorization"] = f"Bearer {self.access_token}" + # Strip signature and expiration from CDN URLs to get a fresh one with response-content-disposition + # This is how HubSpot UI generates longer-lived URLs + if "cdnp1.hubspotusercontent" in download_url: + from urllib.parse import urlparse, urlunparse + parsed = urlparse(download_url) + # Keep only the path, strip all query params to force CDN to generate fresh signature + download_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?response-content-disposition=attachment" + self.logger.info( + f"Requesting fresh CDN signature with response-content-disposition: {download_url}" + ) - self.logger.info(f"Downloading HubSpot file: {download_url}") - response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True) - response.raise_for_status() + # Normalize URL-encoded special characters in the path (not query string) + # to avoid signature validation issues with special characters like %2c, %20 + parsed = urlparse(download_url) + clean_path = quote(unquote(parsed.path), safe="/@:") + download_url = urlunparse(parsed._replace(path=clean_path)) - # Try to infer filename from Content-Disposition header - content_disposition = response.headers.get("content-disposition") - if content_disposition and "filename=" in content_disposition: - filename = content_disposition.split("filename=")[1].strip('"') - else: - # fallback: extract from URL or content-type - filename = os.path.basename(download_url.split("?")[0]) or "hubspot_download" - if "." not in filename: - content_type = response.headers.get("content-type") - ext = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None - if ext: - filename += ext + max_attempts = 3 + retry_delays = [1, 2, 4] # exponential backoff in seconds + last_exception = None - # Make sure save_path is valid - if save_path is None: - save_path = os.path.abspath(filename) - elif os.path.isdir(save_path): - save_path = os.path.join(save_path, filename) - else: - # if user passes a file path directly, leave it - save_path = os.path.abspath(save_path) + for attempt in range(max_attempts): + try: + headers = {} + # Add auth token for API endpoints (not direct CDN URLs) + if "hubspot.com/form-integrations" in download_url or "api-eu1.hubspot.com" in download_url: + headers["Authorization"] = f"Bearer {self.access_token}" - with open(save_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + self.logger.info( + f"Downloading HubSpot file (attempt {attempt + 1}/{max_attempts}): {download_url}" + ) + response = requests.get( + download_url, + headers=headers, + stream=True, + allow_redirects=True, + timeout=(10, 30), + ) + response.raise_for_status() - self.logger.info(f"File downloaded successfully → {save_path}") - return save_path + # Try to infer filename from Content-Disposition header + content_disposition = response.headers.get("content-disposition") + if content_disposition and "filename=" in content_disposition: + filename = content_disposition.split("filename=")[1].strip('"') + else: + # fallback: extract from URL or content-type + filename = ( + os.path.basename(download_url.split("?")[0]) or "hubspot_download" + ) + if "." not in filename: + content_type = response.headers.get("content-type") + ext = ( + mimetypes.guess_extension(content_type.split(";")[0]) + if content_type + else None + ) + if ext: + filename += ext - except requests.exceptions.RequestException as e: - self.logger.error(f"Failed to download file from HubSpot: {e}") - raise + # Make sure save_path is valid + if save_path is None: + save_path = os.path.abspath(filename) + elif os.path.isdir(save_path): + save_path = os.path.join(save_path, filename) + else: + # if user passes a file path directly, leave it + save_path = os.path.abspath(save_path) + with open(save_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + self.logger.info(f"File downloaded successfully → {save_path}") + return save_path + + except requests.exceptions.HTTPError as e: + # Don't retry on 404 — file genuinely doesn't exist + if e.response is not None and e.response.status_code == 404: + self.logger.error(f"Failed to download file from HubSpot: {e}") + raise + last_exception = e + if attempt < max_attempts - 1: + delay = retry_delays[attempt] + self.logger.warning( + f"HTTP error (attempt {attempt + 1}/{max_attempts}): {e} — retrying in {delay}s" + ) + time.sleep(delay) + except ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ) as e: + last_exception = e + if attempt < max_attempts - 1: + delay = retry_delays[attempt] + self.logger.warning( + f"Connection/timeout error (attempt {attempt + 1}/{max_attempts}): {e} — retrying in {delay}s" + ) + time.sleep(delay) + except requests.exceptions.RequestException as e: + # Other request errors (e.g., invalid URL) — don't retry + self.logger.error(f"Failed to download file from HubSpot: {e}") + raise + + # If we got here, all retries failed + self.logger.error( + f"Failed to download file after {max_attempts} attempts: {last_exception}" + ) + raise last_exception def create_line_item_from_product(self, product_id: str, quantity: int = 1): # Fetch product mapping product = self.client.crm.products.basic_api.get_by_id( - product_id, - properties=["name", "price", "hs_price"] + product_id, properties=["name", "price", "hs_price"] ) name = product.properties.get("name") price = ( - product.properties.get("price") - or product.properties.get("hs_price") - or "0" + product.properties.get("price") or product.properties.get("hs_price") or "0" ) # Build line item payload @@ -322,33 +393,35 @@ class HubSpotClient(): "quantity": str(quantity), "price": price, "amount": str(float(price) * quantity), - "invoiced": "Outstanding" + "invoiced": "Outstanding", } ) # Create line item line_item = self.client.crm.line_items.basic_api.create(line_item_input) return line_item.id - + def associate_line_item_to_deal(self, line_item_id: str, deal_id: str): self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}") association_api = self.client.crm.associations.v4.basic_api association_api.create( - "0-3", # to object type - deal_id, # to object id - "line_items", # from object type - line_item_id, # from object id + "0-3", # to object type + deal_id, # to object id + "line_items", # from object type + line_item_id, # from object id [ AssociationSpec( association_category="HUBSPOT_DEFINED", - association_type_id=19 # line_item → deal + association_type_id=19, # line_item → deal ) - ] + ], ) - def add_product_line_item_to_deal(self, deal_id: str, product_id: str, quantity: int = 1): + def add_product_line_item_to_deal( + self, deal_id: str, product_id: str, quantity: int = 1 + ): # Step 1: Create the line item from product mapping line_item_id = self.create_line_item_from_product(product_id, quantity) @@ -363,7 +436,7 @@ class HubSpotClient(): """ try: self.logger.info(f"Deleting line item {line_item_id}...") - + self.client.crm.line_items.basic_api.archive(line_item_id) self.logger.info(f"Line item {line_item_id} deleted successfully.") @@ -371,4 +444,4 @@ class HubSpotClient(): except ApiException as e: self.logger.error(f"Failed to delete line item {line_item_id}: {e}") - return False \ No newline at end of file + return False diff --git a/etl/hubSpotClient/scripts/debug_one_address.py b/etl/hubSpotClient/scripts/debug_one_address.py new file mode 100644 index 0000000..1e5cd4d --- /dev/null +++ b/etl/hubSpotClient/scripts/debug_one_address.py @@ -0,0 +1,19 @@ +from etl.hubSpotClient.hubspotClient import HubSpotClient +from etl.db.hubSpotLoad import HubspotTodb + + + +working_deal_id = "319174821072" + +deal_id = "484368267483" + +hubspot = HubSpotClient() + + +db = HubspotTodb() +deal = db.find_deal_with_deal_id(deal_id) +deal2 = db.find_deal_with_deal_id(working_deal_id) + + +db.update_deal(deal2, hubspot) +db.update_deal(deal, hubspot) \ No newline at end of file diff --git a/etl/scraper/scraper.py b/etl/scraper/scraper.py index 542378f..b599f45 100644 --- a/etl/scraper/scraper.py +++ b/etl/scraper/scraper.py @@ -130,6 +130,7 @@ class SharePointScraper(): site_id=self.sharepoint_drive.value, ) + folders = self.get_folders_in_path(at_path) # Check if folder already exists (case-insensitive match)