Merge pull request #133 from Hestia-Homes/feature/do_not_merge_this_branch
Some checks failed
Lambda Main Workflow / extractor-and-loader (push) Has been cancelled
Lambda Main Workflow / walthamforest-etl (push) Has been cancelled
Run Database Migration on Production (aws-dev) / db_migration (push) Has been cancelled
Lambda Main Workflow / shared-lambda-terraform (push) Has been cancelled
Lambda Main Workflow / lambda-ecr-example (push) Has been cancelled

fail gracefully
This commit is contained in:
Jun-te Kim 2026-03-20 13:10:01 +00:00 committed by GitHub
commit b05b8dd6f0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 278 additions and 157 deletions

View file

@ -35,6 +35,13 @@ class HubspotTodb:
.filter(HubspotDealData.company_id == company_id)
.all()
)
def find_deal_with_deal_id(self, deal_id):
with get_db_session() as session:
return(
session.query(HubspotDealData)
.filter(HubspotDealData.deal_id == deal_id).one()
)
def _sha256(self, file_path: str) -> str:
"""Compute SHA-256 checksum of a file."""
@ -130,33 +137,39 @@ class HubspotTodb:
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
# Download from HubSpot
local_file = hubspot_client.download_file_from_url(
deal_in_db.major_condition_issue_photos
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
try:
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
local_file = hubspot_client.download_file_from_url(photo_url)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with get_db_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}")
return False
except Exception as e:
print(f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}")
# Continue without the file — don't crash the entire update
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with get_db_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}")
return False
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
@ -213,15 +226,24 @@ class HubspotTodb:
existing.major_condition_issue_photos
and not existing.major_condition_issue_evidence_s3_url
):
local_file = hubspot_client.download_file_from_url(
existing.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
fresh_deal = hubspot_client.from_deal_get_info(existing.deal_id)
photo_url = fresh_deal.get("major_condition_issue_photos")
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(photo_url)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}")
# Continue without the file — don't crash the update
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
session.add(existing)
session.commit()
@ -252,17 +274,23 @@ class HubspotTodb:
# Handle upload at insert time
if new_record.major_condition_issue_photos:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
try:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}")
# Continue without the file — don't crash the insert
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record

View file

@ -10,6 +10,7 @@ from hubspot.crm.objects import SimplePublicObjectInput
from hubspot.crm.associations.v4 import AssociationSpec
from hubspot.crm.associations import ApiException
class Companies(Enum):
ABRI = "237615001799"
SOUTHERN_HOUSING_GROUP = "109343619305"
@ -19,6 +20,7 @@ class Companies(Enum):
APPLE = "184769046716"
THE_GUINESS_PARTNERSHIP = "86970043613"
class DealStage(Enum):
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
@ -26,14 +28,16 @@ class DealStage(Enum):
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
FILES_MISSING_FROM_ASSESSOR = "1887736000"
class Pipeline(Enum):
OPERATIONS_SOCIAL_HOUSING = "1167582403"
class HubSpotClient():
class HubSpotClient:
def __init__(self):
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
self.client = hubspot.Client.create(access_token=self.access_token)
self.logger = Logger(name='HubSpotClient', level=logging.INFO).get_logger()
self.logger = Logger(name="HubSpotClient", level=logging.INFO).get_logger()
self.all_deals = None
def get_all_deals(self):
@ -49,7 +53,8 @@ class HubSpotClient():
# Filter deals where properties['pipeline'] matches the given pipeline_id
filtered_deals = [
deal for deal in self.all_deals
deal
for deal in self.all_deals
if deal.properties["pipeline"] == str(pipeline_id)
]
@ -57,35 +62,7 @@ class HubSpotClient():
deal_ids = [deal.id for deal in filtered_deals]
return deal_ids
def from_deal_get_associated_company_id(self, deal_id: str):
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
associations_api = self.client.crm.associations.v4.basic_api
# Fetch associations for this specific deal only
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1 # Expect only one associated company
)
if not response.results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
company_id = response.results[0].to_object_id
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}")
return None
def get_deals_from_company(self, company_id: str) -> list[str]:
associations_api = self.client.crm.associations.v4.basic_api
@ -98,12 +75,10 @@ class HubSpotClient():
object_id=company_id,
to_object_type="deals",
limit=100,
after=after
after=after,
)
deal_ids.extend(
assoc.to_object_id for assoc in response.results
)
deal_ids.extend(assoc.to_object_id for assoc in response.results)
if not response.paging or not response.paging.next:
break
@ -111,21 +86,23 @@ class HubSpotClient():
after = response.paging.next.after
return deal_ids
def from_deal_get_associated_listing(self, deal_id: str):
"""
Get the associated listing information for a given deal.
Returns a dictionary of listing properties, or None if not found.
"""
associations_api = self.client.crm.associations.v4.basic_api
listings_api = self.client.crm.objects.basic_api # works for custom objects like "listing"
listings_api = (
self.client.crm.objects.basic_api
) # works for custom objects like "listing"
# Fetch associated listing(s)
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="0-420", # <-- use your exact custom object name slug here
limit=1
limit=1,
)
if not response.results:
@ -143,31 +120,60 @@ class HubSpotClient():
"national_uprn",
"domna_property_id",
"owner_property_id",
]
],
)
listing_info = listing.properties
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_get_info(self, deal_id):
deal = self.client.crm.deals.basic_api.get_by_id(deal_id,
deal = self.client.crm.deals.basic_api.get_by_id(
deal_id,
properties=[
'dealname',
'dealstage',
'pipeline',
'outcome', #outcome,
'outcome_notes', #outcome notes
'project_code',
'major_condition_issue_description',
'major_condition_issue_photos',
'coordination_status__stage_1_', # Coordiantion Status (Stage 1),
'retrofit_design_status', # Retrofit Design Status
]
"dealname",
"dealstage",
"pipeline",
"outcome", # outcome,
"outcome_notes", # outcome notes
"project_code",
"major_condition_issue_description",
"major_condition_issue_photos",
"coordination_status__stage_1_", # Coordiantion Status (Stage 1),
"retrofit_design_status", # Retrofit Design Status
],
)
return deal.properties
def from_deal_get_associated_company_id(self, deal_id: str):
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
associations_api = self.client.crm.associations.v4.basic_api
# Fetch associations for this specific deal only
response = associations_api.get_page(
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1 # Expect only one associated company
)
if not response.results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
company_id = response.results[0].to_object_id
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}")
return None
def get_deal_info_for_db(self, deal_id):
deal = self.from_deal_get_info(deal_id)
company = self.from_deal_get_associated_company_id(deal_id)
@ -175,17 +181,16 @@ class HubSpotClient():
return deal, company, listing
def get_company_information(self, company_id):
company = self.client.crm.companies.basic_api.get_by_id(
company_id,
properties=[
'name',
]
"name",
],
)
company_info = company.properties
return company_info
def get_all_pipelines(self):
"""
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
@ -195,10 +200,7 @@ class HubSpotClient():
response = pipelines_api.get_all(object_type="deals")
pipelines = [
{
"name": pipeline.label,
"id": pipeline.id
}
{"name": pipeline.label, "id": pipeline.id}
for pipeline in response.results
]
@ -208,7 +210,7 @@ class HubSpotClient():
except Exception as e:
self.logger.error(f"Error retrieving pipelines: {e}")
return []
def get_deal_stages(self, pipeline_id=None):
"""
Retrieve all deal stages for a given pipeline.
@ -231,7 +233,7 @@ class HubSpotClient():
"pipeline_name": pipeline.label,
"pipeline_id": pipeline.id,
"stage_name": stage.label,
"stage_id": stage.id
"stage_id": stage.id,
}
for stage in pipeline.stages
]
@ -239,7 +241,9 @@ class HubSpotClient():
all_stages.extend(stages)
if not all_stages:
self.logger.info(f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}")
self.logger.info(
f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}"
)
else:
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
@ -248,70 +252,137 @@ class HubSpotClient():
except Exception as e:
self.logger.error(f"Error retrieving deal stages: {e}")
return []
def download_file_from_url(self, download_url: str, save_path: str = None) -> str:
"""
Download a file from a HubSpot file URL (public or private), keeping its original file type.
Includes retry logic for transient failures and normalization of URL-encoded special characters.
"""
import mimetypes
import requests
import os
import time
import re
from urllib.parse import urlparse, urlunparse, unquote, quote
try:
headers = {}
if "hubspotusercontent" not in download_url:
headers["Authorization"] = f"Bearer {self.access_token}"
# Strip signature and expiration from CDN URLs to get a fresh one with response-content-disposition
# This is how HubSpot UI generates longer-lived URLs
if "cdnp1.hubspotusercontent" in download_url:
from urllib.parse import urlparse, urlunparse
parsed = urlparse(download_url)
# Keep only the path, strip all query params to force CDN to generate fresh signature
download_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?response-content-disposition=attachment"
self.logger.info(
f"Requesting fresh CDN signature with response-content-disposition: {download_url}"
)
self.logger.info(f"Downloading HubSpot file: {download_url}")
response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True)
response.raise_for_status()
# Normalize URL-encoded special characters in the path (not query string)
# to avoid signature validation issues with special characters like %2c, %20
parsed = urlparse(download_url)
clean_path = quote(unquote(parsed.path), safe="/@:")
download_url = urlunparse(parsed._replace(path=clean_path))
# Try to infer filename from Content-Disposition header
content_disposition = response.headers.get("content-disposition")
if content_disposition and "filename=" in content_disposition:
filename = content_disposition.split("filename=")[1].strip('"')
else:
# fallback: extract from URL or content-type
filename = os.path.basename(download_url.split("?")[0]) or "hubspot_download"
if "." not in filename:
content_type = response.headers.get("content-type")
ext = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
if ext:
filename += ext
max_attempts = 3
retry_delays = [1, 2, 4] # exponential backoff in seconds
last_exception = None
# Make sure save_path is valid
if save_path is None:
save_path = os.path.abspath(filename)
elif os.path.isdir(save_path):
save_path = os.path.join(save_path, filename)
else:
# if user passes a file path directly, leave it
save_path = os.path.abspath(save_path)
for attempt in range(max_attempts):
try:
headers = {}
# Add auth token for API endpoints (not direct CDN URLs)
if "hubspot.com/form-integrations" in download_url or "api-eu1.hubspot.com" in download_url:
headers["Authorization"] = f"Bearer {self.access_token}"
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.info(
f"Downloading HubSpot file (attempt {attempt + 1}/{max_attempts}): {download_url}"
)
response = requests.get(
download_url,
headers=headers,
stream=True,
allow_redirects=True,
timeout=(10, 30),
)
response.raise_for_status()
self.logger.info(f"File downloaded successfully → {save_path}")
return save_path
# Try to infer filename from Content-Disposition header
content_disposition = response.headers.get("content-disposition")
if content_disposition and "filename=" in content_disposition:
filename = content_disposition.split("filename=")[1].strip('"')
else:
# fallback: extract from URL or content-type
filename = (
os.path.basename(download_url.split("?")[0]) or "hubspot_download"
)
if "." not in filename:
content_type = response.headers.get("content-type")
ext = (
mimetypes.guess_extension(content_type.split(";")[0])
if content_type
else None
)
if ext:
filename += ext
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to download file from HubSpot: {e}")
raise
# Make sure save_path is valid
if save_path is None:
save_path = os.path.abspath(filename)
elif os.path.isdir(save_path):
save_path = os.path.join(save_path, filename)
else:
# if user passes a file path directly, leave it
save_path = os.path.abspath(save_path)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.info(f"File downloaded successfully → {save_path}")
return save_path
except requests.exceptions.HTTPError as e:
# Don't retry on 404 — file genuinely doesn't exist
if e.response is not None and e.response.status_code == 404:
self.logger.error(f"Failed to download file from HubSpot: {e}")
raise
last_exception = e
if attempt < max_attempts - 1:
delay = retry_delays[attempt]
self.logger.warning(
f"HTTP error (attempt {attempt + 1}/{max_attempts}): {e} — retrying in {delay}s"
)
time.sleep(delay)
except (
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
) as e:
last_exception = e
if attempt < max_attempts - 1:
delay = retry_delays[attempt]
self.logger.warning(
f"Connection/timeout error (attempt {attempt + 1}/{max_attempts}): {e} — retrying in {delay}s"
)
time.sleep(delay)
except requests.exceptions.RequestException as e:
# Other request errors (e.g., invalid URL) — don't retry
self.logger.error(f"Failed to download file from HubSpot: {e}")
raise
# If we got here, all retries failed
self.logger.error(
f"Failed to download file after {max_attempts} attempts: {last_exception}"
)
raise last_exception
def create_line_item_from_product(self, product_id: str, quantity: int = 1):
# Fetch product mapping
product = self.client.crm.products.basic_api.get_by_id(
product_id,
properties=["name", "price", "hs_price"]
product_id, properties=["name", "price", "hs_price"]
)
name = product.properties.get("name")
price = (
product.properties.get("price")
or product.properties.get("hs_price")
or "0"
product.properties.get("price") or product.properties.get("hs_price") or "0"
)
# Build line item payload
@ -322,33 +393,35 @@ class HubSpotClient():
"quantity": str(quantity),
"price": price,
"amount": str(float(price) * quantity),
"invoiced": "Outstanding"
"invoiced": "Outstanding",
}
)
# Create line item
line_item = self.client.crm.line_items.basic_api.create(line_item_input)
return line_item.id
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str):
self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}")
association_api = self.client.crm.associations.v4.basic_api
association_api.create(
"0-3", # to object type
deal_id, # to object id
"line_items", # from object type
line_item_id, # from object id
"0-3", # to object type
deal_id, # to object id
"line_items", # from object type
line_item_id, # from object id
[
AssociationSpec(
association_category="HUBSPOT_DEFINED",
association_type_id=19 # line_item → deal
association_type_id=19, # line_item → deal
)
]
],
)
def add_product_line_item_to_deal(self, deal_id: str, product_id: str, quantity: int = 1):
def add_product_line_item_to_deal(
self, deal_id: str, product_id: str, quantity: int = 1
):
# Step 1: Create the line item from product mapping
line_item_id = self.create_line_item_from_product(product_id, quantity)
@ -363,7 +436,7 @@ class HubSpotClient():
"""
try:
self.logger.info(f"Deleting line item {line_item_id}...")
self.client.crm.line_items.basic_api.archive(line_item_id)
self.logger.info(f"Line item {line_item_id} deleted successfully.")
@ -371,4 +444,4 @@ class HubSpotClient():
except ApiException as e:
self.logger.error(f"Failed to delete line item {line_item_id}: {e}")
return False
return False

View file

@ -0,0 +1,19 @@
from etl.hubSpotClient.hubspotClient import HubSpotClient
from etl.db.hubSpotLoad import HubspotTodb
working_deal_id = "319174821072"
deal_id = "484368267483"
hubspot = HubSpotClient()
db = HubspotTodb()
deal = db.find_deal_with_deal_id(deal_id)
deal2 = db.find_deal_with_deal_id(working_deal_id)
db.update_deal(deal2, hubspot)
db.update_deal(deal, hubspot)

View file

@ -130,6 +130,7 @@ class SharePointScraper():
site_id=self.sharepoint_drive.value,
)
folders = self.get_folders_in_path(at_path)
# Check if folder already exists (case-insensitive match)