mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-08 11:17:29 +00:00
hubspot update fixed
This commit is contained in:
parent
4c7058b58a
commit
eb95dad5fe
2 changed files with 148 additions and 72 deletions
|
|
@ -1,50 +1,53 @@
|
|||
from etl.db.db import get_db_session, init_db
|
||||
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
|
||||
from sqlmodel import select
|
||||
from etl.s3.s3_uploader import S3Uploader
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
class HubspotTodb():
|
||||
|
||||
class HubspotTodb:
|
||||
def __init__(self):
|
||||
init_db()
|
||||
self.s3 = S3Uploader()
|
||||
|
||||
def new_record_to_hubspot_data(self, deal_data, company, listing):
|
||||
print("This has been depreciated using new interface")
|
||||
self.upsert_hubspot_deal(deal_data, company, listing)
|
||||
print("⚠️ Deprecated — use the new interface instead.")
|
||||
return self.upsert_hubspot_deal(deal_data, company, listing)
|
||||
|
||||
|
||||
def new_record_company(self, company_data):
|
||||
"""
|
||||
Adds a new records to the hubspot_compnay_data table
|
||||
"""
|
||||
|
||||
"""Adds a new record to the hubspot_company_data table."""
|
||||
with get_db_session() as session:
|
||||
new_record = HubspotCommpanyData(
|
||||
company_id=company_data.get("hs_object_id"),
|
||||
company_name=company_data.get("name")
|
||||
company_name=company_data.get("name"),
|
||||
)
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
return new_record
|
||||
|
||||
|
||||
def find_all_deals_with_company_id(self, company_id):
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
"""Returns a list of deals for a given company_id."""
|
||||
with get_db_session() as session:
|
||||
results = (
|
||||
return (
|
||||
session.query(HubspotDealData)
|
||||
.filter(HubspotDealData.company_id == company_id)
|
||||
.all()
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def _sha256(self, file_path: str) -> str:
|
||||
"""Compute SHA-256 checksum of a file."""
|
||||
sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
sha256.update(chunk)
|
||||
return sha256.hexdigest()
|
||||
|
||||
def update_deal(self, deal_in_db, hubspot_client):
|
||||
"""
|
||||
Checks if a deal needs updating and updates it in the db with the latest information.
|
||||
Performs soft assertions for field-level consistency between DB and HubSpot data.
|
||||
Checks if a deal needs updating and syncs it with HubSpot.
|
||||
Also handles major_condition_issue_photos file upload to S3 with integrity check.
|
||||
"""
|
||||
def soft_assert(condition, message="Assertion Failed"):
|
||||
if not condition:
|
||||
|
|
@ -52,70 +55,95 @@ class HubspotTodb():
|
|||
return False
|
||||
return True
|
||||
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
|
||||
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id)
|
||||
|
||||
|
||||
results = [
|
||||
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"),
|
||||
"deal_id mismatch"),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id,
|
||||
"company_id mismatch"),
|
||||
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
|
||||
"landlord_property_id mismatch"),
|
||||
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"),
|
||||
"outcome mismatch"),
|
||||
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"),
|
||||
"dealstage mismatch"),
|
||||
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"),
|
||||
"dealname mismatch"),
|
||||
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"),
|
||||
"project_code mismatch"),
|
||||
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"),
|
||||
"uprn mismatch"),
|
||||
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
|
||||
"outcome_notes mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"),
|
||||
"major condition description mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"),
|
||||
"major condition issue photos mismatch")
|
||||
# Soft compare key fields
|
||||
checks = [
|
||||
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
|
||||
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), "landlord_property_id mismatch"),
|
||||
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"),
|
||||
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"),
|
||||
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"),
|
||||
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), "project_code mismatch"),
|
||||
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"),
|
||||
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), "outcome_notes mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), "major condition description mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), "major condition issue photos mismatch"),
|
||||
]
|
||||
|
||||
# if any of the soft asserts failed
|
||||
if not all(results):
|
||||
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.")
|
||||
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing)
|
||||
return False
|
||||
# If discrepancies found, update from HubSpot
|
||||
if not all(checks):
|
||||
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot.")
|
||||
return self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
|
||||
|
||||
# Handle photo upload if it exists but S3 URL is missing
|
||||
if deal_in_db.major_condition_issue_photos and not deal_in_db.major_condition_issue_evidence_s3_url:
|
||||
print(f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3...")
|
||||
|
||||
# Download from HubSpot
|
||||
local_file = hubspot_client.download_file_from_url(deal_in_db.major_condition_issue_photos)
|
||||
|
||||
# Upload to S3
|
||||
bucket = "retrofit-data-dev"
|
||||
s3_url = self.s3.upload_file(local_file, bucket, prefix="hubspot/awaabs_law_evidence/")
|
||||
|
||||
# Download again to verify integrity
|
||||
downloaded = self.s3.download_from_url(s3_url)
|
||||
if self._sha256(local_file) == self._sha256(downloaded):
|
||||
print("✅ SHA256 match verified — upload successful.")
|
||||
else:
|
||||
print("❌ SHA256 mismatch — integrity check failed.")
|
||||
raise ValueError("File integrity check failed after S3 upload.")
|
||||
|
||||
# Update DB record with S3 URL
|
||||
with get_db_session() as session:
|
||||
db_record = session.get(HubspotDealData, deal_in_db.id)
|
||||
db_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
session.add(db_record)
|
||||
session.commit()
|
||||
print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}")
|
||||
|
||||
else:
|
||||
print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.")
|
||||
return True
|
||||
|
||||
def upsert_hubspot_deal(self, deal_data, company, listing):
|
||||
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
|
||||
|
||||
return True
|
||||
|
||||
def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client):
|
||||
"""
|
||||
Inserts a new record or updates an existing record in hubspot_deal_data.
|
||||
Uses deal_id (hs_object_id) as the unique identifier.
|
||||
Inserts or updates a deal record.
|
||||
Also uploads photos if present and adds S3 URL.
|
||||
"""
|
||||
with get_db_session() as session:
|
||||
deal_id = deal_data.get("hs_object_id")
|
||||
|
||||
# Use SQLModel's modern query style
|
||||
statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id)
|
||||
existing = session.exec(statement).first()
|
||||
|
||||
if existing:
|
||||
print(f"🔄 Updating existing deal (deal_id={deal_id})")
|
||||
|
||||
existing.dealname = deal_data.get("dealname", existing.dealname)
|
||||
existing.dealstage = deal_data.get("dealstage", existing.dealstage)
|
||||
existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id)
|
||||
existing.uprn = listing.get("national_uprn", existing.uprn)
|
||||
existing.outcome = deal_data.get("outcome", existing.outcome)
|
||||
existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes)
|
||||
existing.project_code = deal_data.get("project_code", existing.project_code)
|
||||
existing.company_id = company or existing.company_id
|
||||
existing.major_condition_issue_description = deal_data.get("major_condition_issue_description", existing.major_condition_issue_description)
|
||||
existing.major_condition_issue_photos = deal_data.get("major_condition_issue_photos", existing.major_condition_issue_photos)
|
||||
for attr, value in {
|
||||
"dealname": deal_data.get("dealname"),
|
||||
"dealstage": deal_data.get("dealstage"),
|
||||
"landlord_property_id": listing.get("owner_property_id"),
|
||||
"uprn": listing.get("national_uprn"),
|
||||
"outcome": deal_data.get("outcome"),
|
||||
"outcome_notes": deal_data.get("outcome_notes"),
|
||||
"project_code": deal_data.get("project_code"),
|
||||
"company_id": company,
|
||||
"major_condition_issue_description": deal_data.get("major_condition_issue_description"),
|
||||
"major_condition_issue_photos": deal_data.get("major_condition_issue_photos"),
|
||||
}.items():
|
||||
setattr(existing, attr, value or getattr(existing, attr))
|
||||
|
||||
# Upload if photo exists but S3 link missing
|
||||
if existing.major_condition_issue_photos and not existing.major_condition_issue_evidence_s3_url:
|
||||
local_file = hubspot_client.download_file_from_url(existing.major_condition_issue_photos)
|
||||
s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/")
|
||||
existing.major_condition_issue_evidence_s3_url = s3_url
|
||||
|
||||
session.add(existing)
|
||||
session.commit()
|
||||
|
|
@ -124,7 +152,6 @@ class HubspotTodb():
|
|||
|
||||
else:
|
||||
print(f"🆕 Inserting new deal (deal_id={deal_id})")
|
||||
|
||||
new_record = HubspotDealData(
|
||||
deal_id=deal_id,
|
||||
dealname=deal_data.get("dealname"),
|
||||
|
|
@ -135,10 +162,16 @@ class HubspotTodb():
|
|||
outcome_notes=deal_data.get("outcome_notes"),
|
||||
project_code=deal_data.get("project_code"),
|
||||
company_id=company,
|
||||
major_condition_issue_description = deal_data.get("major_condition_issue_description"),
|
||||
major_condition_issue_photos = deal_data.get("major_condition_issue_photos"),
|
||||
major_condition_issue_description=deal_data.get("major_condition_issue_description"),
|
||||
major_condition_issue_photos=deal_data.get("major_condition_issue_photos"),
|
||||
)
|
||||
|
||||
# Handle upload at insert time
|
||||
if new_record.major_condition_issue_photos:
|
||||
local_file = hubspot_client.download_file_from_url(new_record.major_condition_issue_photos)
|
||||
s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/")
|
||||
new_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import boto3
|
|||
from botocore.exceptions import ClientError
|
||||
from urllib.parse import urlparse
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
|
||||
class S3Uploader:
|
||||
"""
|
||||
|
|
@ -68,3 +68,46 @@ class S3Uploader:
|
|||
)
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"❌ Failed to generate signed URL: {e}")
|
||||
|
||||
|
||||
def download_from_url(self, s3_url: str, local_dir: str = ".", expires_in: int = 3600) -> str:
|
||||
"""
|
||||
Download a file from a public or private S3 URL.
|
||||
If private, generates a presigned URL first.
|
||||
|
||||
Args:
|
||||
s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt)
|
||||
local_dir (str): Folder to save the file in.
|
||||
expires_in (int): Presigned URL lifetime (seconds).
|
||||
|
||||
Returns:
|
||||
str: Local file path of the downloaded file.
|
||||
"""
|
||||
parsed = urlparse(s3_url)
|
||||
host_parts = parsed.netloc.split(".")
|
||||
if len(host_parts) < 3 or host_parts[1] != "s3":
|
||||
raise ValueError("❌ Not a valid S3 HTTPS URL")
|
||||
|
||||
bucket = host_parts[0]
|
||||
key = parsed.path.lstrip("/")
|
||||
|
||||
# Generate presigned URL (whether public or private)
|
||||
presigned_url = self.generate_presigned_url(bucket, key, expires_in)
|
||||
|
||||
filename = os.path.basename(key)
|
||||
local_path = os.path.join(local_dir, filename)
|
||||
|
||||
try:
|
||||
response = requests.get(presigned_url, stream=True)
|
||||
response.raise_for_status()
|
||||
|
||||
os.makedirs(local_dir, exist_ok=True)
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
print(f"✅ Downloaded: {local_path}")
|
||||
return local_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise RuntimeError(f"❌ Failed to download file: {e}")
|
||||
Loading…
Add table
Reference in a new issue