mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-30 13:10:56 +00:00
305 lines
13 KiB
Python
305 lines
13 KiB
Python
from etl.db.db import get_db_session, init_db
|
|
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
|
|
from sqlmodel import select
|
|
from etl.s3.s3_uploader import S3Uploader
|
|
import hashlib
|
|
import os
|
|
|
|
|
|
class HubspotTodb:
|
|
def __init__(self):
|
|
init_db()
|
|
self.s3 = S3Uploader()
|
|
|
|
def new_record_company(self, company_data):
|
|
"""Adds a new record to the hubspot_company_data table."""
|
|
with get_db_session() as session:
|
|
new_record = HubspotCommpanyData(
|
|
company_id=company_data.get("hs_object_id"),
|
|
company_name=company_data.get("name"),
|
|
)
|
|
session.add(new_record)
|
|
session.commit()
|
|
session.refresh(new_record)
|
|
return new_record
|
|
|
|
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
|
|
print("⚠️ Deprecated — use the new interface instead.")
|
|
return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client)
|
|
|
|
def find_all_deals_with_company_id(self, company_id):
|
|
"""Returns a list of deals for a given company_id."""
|
|
with get_db_session() as session:
|
|
return (
|
|
session.query(HubspotDealData)
|
|
.filter(HubspotDealData.company_id == company_id)
|
|
.all()
|
|
)
|
|
|
|
def find_deal_with_deal_id(self, deal_id):
|
|
with get_db_session() as session:
|
|
return (
|
|
session.query(HubspotDealData)
|
|
.filter(HubspotDealData.deal_id == deal_id)
|
|
.one()
|
|
)
|
|
|
|
def _sha256(self, file_path: str) -> str:
|
|
"""Compute SHA-256 checksum of a file."""
|
|
sha256 = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
sha256.update(chunk)
|
|
return sha256.hexdigest()
|
|
|
|
def update_deal(self, deal_in_db, hubspot_client):
|
|
"""
|
|
Checks if a deal needs updating and syncs it with HubSpot.
|
|
Also handles major_condition_issue_photos file upload to S3 with integrity check.
|
|
"""
|
|
|
|
def soft_assert(condition, message="Assertion Failed"):
|
|
if not condition:
|
|
print(f"⚠️ Soft Assert Failed: {message}")
|
|
return False
|
|
return True
|
|
|
|
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
|
|
|
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
|
|
deal_in_db.deal_id
|
|
)
|
|
|
|
# Soft compare key fields
|
|
checks = [
|
|
soft_assert(
|
|
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
|
|
),
|
|
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
|
|
soft_assert(
|
|
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
|
|
"landlord_property_id mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
|
|
),
|
|
soft_assert(
|
|
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
|
|
),
|
|
soft_assert(
|
|
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
|
|
),
|
|
soft_assert(
|
|
deal_in_db.project_code == hs_deal.get("project_code"),
|
|
"project_code mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
|
|
),
|
|
soft_assert(
|
|
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
|
|
"outcome_notes mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.major_condition_issue_description
|
|
== hs_deal.get("major_condition_issue_description"),
|
|
"major condition description mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.major_condition_issue_photos
|
|
== hs_deal.get("major_condition_issue_photos"),
|
|
"major condition issue photos mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.coordination_status
|
|
== hs_deal.get("coordination_status__stage_1_"),
|
|
"coordination stage 1 status mismatch",
|
|
),
|
|
soft_assert(
|
|
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
|
|
"retrofit design mismatch",
|
|
),
|
|
]
|
|
|
|
# If discrepancies found, update from HubSpot
|
|
if not all(checks):
|
|
print(
|
|
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
|
|
)
|
|
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
|
|
return False
|
|
|
|
# Handle photo upload if it exists but S3 URL is missing
|
|
if (
|
|
deal_in_db.major_condition_issue_photos
|
|
and not deal_in_db.major_condition_issue_evidence_s3_url
|
|
):
|
|
print(
|
|
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
|
|
)
|
|
|
|
photo_url = hs_deal.get("major_condition_issue_photos")
|
|
if photo_url:
|
|
try:
|
|
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
|
|
local_file = hubspot_client.download_file_from_url(photo_url)
|
|
|
|
# Upload to S3
|
|
bucket = "retrofit-data-dev"
|
|
s3_url = self.s3.upload_file(
|
|
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
|
|
)
|
|
|
|
# Download again to verify integrity
|
|
downloaded = self.s3.download_from_url(s3_url)
|
|
if self._sha256(local_file) == self._sha256(downloaded):
|
|
print("✅ SHA256 match verified — upload successful.")
|
|
else:
|
|
print("❌ SHA256 mismatch — integrity check failed.")
|
|
raise ValueError("File integrity check failed after S3 upload.")
|
|
|
|
# Update DB record with S3 URL
|
|
with get_db_session() as session:
|
|
db_record = session.get(HubspotDealData, deal_in_db.id)
|
|
db_record.major_condition_issue_evidence_s3_url = s3_url
|
|
session.add(db_record)
|
|
session.commit()
|
|
print(
|
|
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
print(
|
|
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
|
|
)
|
|
# Continue without the file — don't crash the entire update
|
|
else:
|
|
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
|
|
|
|
else:
|
|
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
|
|
|
|
return True
|
|
|
|
def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client):
|
|
"""
|
|
Inserts or updates a deal record.
|
|
Also uploads photos if present and adds S3 URL.
|
|
"""
|
|
with get_db_session() as session:
|
|
print(deal_data)
|
|
deal_id = deal_data.get("hs_object_id")
|
|
|
|
statement = select(HubspotDealData).where(
|
|
HubspotDealData.deal_id == deal_id
|
|
)
|
|
existing = session.exec(statement).first()
|
|
|
|
if existing:
|
|
print(f"🔄 Updating existing deal (deal_id={deal_id})")
|
|
|
|
for attr, value in {
|
|
"dealname": deal_data.get("dealname"),
|
|
"dealstage": deal_data.get("dealstage"),
|
|
"landlord_property_id": listing.get("owner_property_id"),
|
|
"uprn": listing.get("national_uprn"),
|
|
"outcome": deal_data.get("outcome"),
|
|
"outcome_notes": deal_data.get("outcome_notes"),
|
|
"project_code": deal_data.get("project_code"),
|
|
"company_id": company,
|
|
"major_condition_issue_description": deal_data.get(
|
|
"major_condition_issue_description"
|
|
),
|
|
"major_condition_issue_photos": deal_data.get(
|
|
"major_condition_issue_photos"
|
|
),
|
|
"major_condition_issue_description": deal_data.get(
|
|
"major_condition_issue_description"
|
|
),
|
|
"major_condition_issue_photos": deal_data.get(
|
|
"major_condition_issue_photos"
|
|
),
|
|
"coordination_status": deal_data.get(
|
|
"coordination_status__stage_1_"
|
|
),
|
|
"design_status": deal_data.get("retrofit_design_status"),
|
|
}.items():
|
|
setattr(existing, attr, value or getattr(existing, attr))
|
|
|
|
# Upload if photo exists but S3 link missing
|
|
if (
|
|
existing.major_condition_issue_photos
|
|
and not existing.major_condition_issue_evidence_s3_url
|
|
):
|
|
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
|
|
fresh_deal = hubspot_client.from_deal_get_info(existing.deal_id)
|
|
photo_url = fresh_deal.get("major_condition_issue_photos")
|
|
|
|
if photo_url:
|
|
try:
|
|
local_file = hubspot_client.download_file_from_url(
|
|
photo_url
|
|
)
|
|
s3_url = self.s3.upload_file(
|
|
local_file,
|
|
"retrofit-data-dev",
|
|
prefix="hubspot/awaabs_law_evidence/",
|
|
)
|
|
existing.major_condition_issue_evidence_s3_url = s3_url
|
|
except Exception as e:
|
|
print(
|
|
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
|
|
)
|
|
# Continue without the file — don't crash the update
|
|
else:
|
|
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
|
|
|
|
session.add(existing)
|
|
session.commit()
|
|
session.refresh(existing)
|
|
return existing
|
|
|
|
else:
|
|
print(f"🆕 Inserting new deal (deal_id={deal_id})")
|
|
new_record = HubspotDealData(
|
|
deal_id=deal_id,
|
|
dealname=deal_data.get("dealname"),
|
|
dealstage=deal_data.get("dealstage"),
|
|
landlord_property_id=listing.get("owner_property_id"),
|
|
uprn=listing.get("national_uprn"),
|
|
outcome=deal_data.get("outcome"),
|
|
outcome_notes=deal_data.get("outcome_notes"),
|
|
project_code=deal_data.get("project_code"),
|
|
company_id=company,
|
|
major_condition_issue_description=deal_data.get(
|
|
"major_condition_issue_description"
|
|
),
|
|
major_condition_issue_photos=deal_data.get(
|
|
"major_condition_issue_photos"
|
|
),
|
|
coordination_status=deal_data.get("coordination_status__stage_1_"),
|
|
design_status=deal_data.get("retrofit_design_status"),
|
|
)
|
|
|
|
# Handle upload at insert time
|
|
if new_record.major_condition_issue_photos:
|
|
try:
|
|
local_file = hubspot_client.download_file_from_url(
|
|
new_record.major_condition_issue_photos
|
|
)
|
|
s3_url = self.s3.upload_file(
|
|
local_file,
|
|
"retrofit-data-dev",
|
|
prefix="hubspot/awaabs_law_evidence/",
|
|
)
|
|
new_record.major_condition_issue_evidence_s3_url = s3_url
|
|
except Exception as e:
|
|
print(
|
|
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
|
|
)
|
|
# Continue without the file — don't crash the insert
|
|
|
|
session.add(new_record)
|
|
session.commit()
|
|
session.refresh(new_record)
|
|
return new_record
|