survey-extraction/etl/db/hubSpotLoad.py
Jun-te Kim 50ea324ca5 test
2026-03-30 18:31:05 +00:00

305 lines
13 KiB
Python

from etl.db.db import get_db_session, init_db
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
from sqlmodel import select
from etl.s3.s3_uploader import S3Uploader
import hashlib
import os
class HubspotTodb:
def __init__(self):
init_db()
self.s3 = S3Uploader()
def new_record_company(self, company_data):
"""Adds a new record to the hubspot_company_data table."""
with get_db_session() as session:
new_record = HubspotCommpanyData(
company_id=company_data.get("hs_object_id"),
company_name=company_data.get("name"),
)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client)
def find_all_deals_with_company_id(self, company_id):
"""Returns a list of deals for a given company_id."""
with get_db_session() as session:
return (
session.query(HubspotDealData)
.filter(HubspotDealData.company_id == company_id)
.all()
)
def find_deal_with_deal_id(self, deal_id):
with get_db_session() as session:
return (
session.query(HubspotDealData)
.filter(HubspotDealData.deal_id == deal_id)
.one()
)
def _sha256(self, file_path: str) -> str:
"""Compute SHA-256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def update_deal(self, deal_in_db, hubspot_client):
"""
Checks if a deal needs updating and syncs it with HubSpot.
Also handles major_condition_issue_photos file upload to S3 with integrity check.
"""
def soft_assert(condition, message="Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
return False
return True
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
deal_in_db.deal_id
)
# Soft compare key fields
checks = [
soft_assert(
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
),
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
soft_assert(
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
"landlord_property_id mismatch",
),
soft_assert(
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
),
soft_assert(
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
),
soft_assert(
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
),
soft_assert(
deal_in_db.project_code == hs_deal.get("project_code"),
"project_code mismatch",
),
soft_assert(
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
),
soft_assert(
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
"outcome_notes mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_description
== hs_deal.get("major_condition_issue_description"),
"major condition description mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_photos
== hs_deal.get("major_condition_issue_photos"),
"major condition issue photos mismatch",
),
soft_assert(
deal_in_db.coordination_status
== hs_deal.get("coordination_status__stage_1_"),
"coordination stage 1 status mismatch",
),
soft_assert(
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
"retrofit design mismatch",
),
]
# If discrepancies found, update from HubSpot
if not all(checks):
print(
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
)
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
return False
# Handle photo upload if it exists but S3 URL is missing
if (
deal_in_db.major_condition_issue_photos
and not deal_in_db.major_condition_issue_evidence_s3_url
):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
try:
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
local_file = hubspot_client.download_file_from_url(photo_url)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with get_db_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
)
return False
except Exception as e:
print(
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
)
# Continue without the file — don't crash the entire update
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
"""
with get_db_session() as session:
print(deal_data)
deal_id = deal_data.get("hs_object_id")
statement = select(HubspotDealData).where(
HubspotDealData.deal_id == deal_id
)
existing = session.exec(statement).first()
if existing:
print(f"🔄 Updating existing deal (deal_id={deal_id})")
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"landlord_property_id": listing.get("owner_property_id"),
"uprn": listing.get("national_uprn"),
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get(
"coordination_status__stage_1_"
),
"design_status": deal_data.get("retrofit_design_status"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
# Upload if photo exists but S3 link missing
if (
existing.major_condition_issue_photos
and not existing.major_condition_issue_evidence_s3_url
):
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
fresh_deal = hubspot_client.from_deal_get_info(existing.deal_id)
photo_url = fresh_deal.get("major_condition_issue_photos")
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(
photo_url
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
)
# Continue without the file — don't crash the update
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
session.add(existing)
session.commit()
session.refresh(existing)
return existing
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record = HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
landlord_property_id=listing.get("owner_property_id"),
uprn=listing.get("national_uprn"),
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get(
"major_condition_issue_photos"
),
coordination_status=deal_data.get("coordination_status__stage_1_"),
design_status=deal_data.get("retrofit_design_status"),
)
# Handle upload at insert time
if new_record.major_condition_issue_photos:
try:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
)
# Continue without the file — don't crash the insert
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record