pull diffing logic out of loading method

This commit is contained in:
Daniel Roth 2026-04-09 10:32:59 +00:00
parent 605652b309
commit 01636514aa
2 changed files with 118 additions and 100 deletions

View file

@ -96,13 +96,103 @@ class HubspotDataToDb:
return sha256.hexdigest()
def update_deal_with_checks(
self, deal_in_db: HubspotDealData, hubspot_client: HubspotClient
self,
deal_in_db: HubspotDealData,
hubspot_client: HubspotClient,
hs_deal: Dict[str, str],
hs_company_id: Optional[str],
hs_listing: Optional[Dict[str, str]],
) -> bool:
"""
Checks if a deal needs updating and syncs it with HubSpot.
Also handles major_condition_issue_photos file upload to S3 with integrity check.
Updates deal in database and handles major_condition_issue_photos file upload to S3 with integrity check.
"""
self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
# Handle photo upload if it exists but S3 URL is missing
if self._needs_photo_upload(deal_in_db):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
self._upload_photo_to_s3(
deal_in_db,
photo_url,
hubspot_client,
verify=True,
)
# persist change
with db_read_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = (
deal_in_db.major_condition_issue_evidence_s3_url
)
session.add(db_record)
session.commit()
return False
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_deal(
self,
deal_data: Dict[str, str],
company: Optional[str],
listing: Optional[dict[str, str]],
hubspot_client: HubspotClient,
):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
"""
with db_read_session() as session:
deal_id = deal_data.get("hs_object_id")
statement = select(HubspotDealData).where(
HubspotDealData.deal_id == deal_id
)
existing = session.exec(statement).first()
if existing:
print(f"🔄 Updating existing deal (deal_id={deal_id})")
self._update_existing_deal(existing, deal_data, listing, company)
self._handle_existing_photo_upload(existing, hubspot_client)
session.add(existing)
session.commit()
session.refresh(existing)
return existing
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record: HubspotDealData = self._build_new_deal(
deal_id, deal_data, listing, company
)
# Handle upload at insert time
self._handle_new_photo_upload(new_record, hubspot_client)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def _deprecated_diff(
self,
deal_in_db: HubspotDealData,
hs_deal: Dict[str, str],
hs_company_id: Optional[str],
hs_listing: Optional[Dict[str, str]],
):
def soft_assert(condition: bool, message: str = "Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
@ -111,14 +201,6 @@ class HubspotDataToDb:
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal: Dict[str, str]
hs_company_id: Optional[str]
hs_listing: Optional[Dict[str, str]]
hs_deal, hs_company_id, hs_listing = (
hubspot_client.get_deal_and_company_and_listing(deal_in_db.deal_id)
)
# Soft compare key fields
checks = [
soft_assert(
@ -291,87 +373,10 @@ class HubspotDataToDb:
print(
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
)
self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
return False
# Handle photo upload if it exists but S3 URL is missing
if self._needs_photo_upload(deal_in_db):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
self._upload_photo_to_s3(
deal_in_db,
photo_url,
hubspot_client,
verify=True, # 👈 key difference
)
# persist change
with db_read_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = (
deal_in_db.major_condition_issue_evidence_s3_url
)
session.add(db_record)
session.commit()
return False
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_deal(
self,
deal_data: Dict[str, str],
company: Optional[str],
listing: Optional[dict[str, str]],
hubspot_client: HubspotClient,
):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
"""
with db_read_session() as session:
deal_id = deal_data.get("hs_object_id")
statement = select(HubspotDealData).where(
HubspotDealData.deal_id == deal_id
)
existing = session.exec(statement).first()
if existing:
print(f"🔄 Updating existing deal (deal_id={deal_id})")
self._update_existing_deal(existing, deal_data, listing, company)
self._handle_existing_photo_upload(existing, hubspot_client)
session.add(existing)
session.commit()
session.refresh(existing)
return existing
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record: HubspotDealData = self._build_new_deal(
deal_id, deal_data, listing, company
)
# Handle upload at insert time
self._handle_new_photo_upload(new_record, hubspot_client)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def _update_existing_deal(
self,
existing: HubspotDealData,

View file

@ -1,8 +1,7 @@
from backend.app.db.models.organisation import HubspotDealData
from etl.hubspot.hubspotClient import HubspotClient
# from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from backend.utils.subtasks import task_handler
from typing import Any, Dict, Optional
@ -24,11 +23,6 @@ def handler(body: dict[str, Any], context: Any) -> None:
hubspot_deal_id
)
if not db_deal:
# New hubspot deal, no diffing to do
# TODO: Trigger hubspot to db ETL
return
hubspot_deal: Dict[str, str]
company: Optional[str]
listing: Optional[dict[str, str]]
@ -37,10 +31,14 @@ def handler(body: dict[str, Any], context: Any) -> None:
hubspot_deal_id
)
if HubspotDealDiffer.check_for_pashub_trigger(
new_deal=hubspot_deal, old_deal=db_deal
):
# TODO: trigger pashub file fetcher
if not db_deal:
# New hubspot deal, no diffing to do
if company:
company_data: CompanyData = hubspot_client.get_company_information(company)
db_client: HubspotDataToDb = HubspotDataToDb()
db_client.upsert_company(company_data)
db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client)
return
if HubspotDealDiffer.check_for_db_update_trigger(
@ -49,7 +47,22 @@ def handler(body: dict[str, Any], context: Any) -> None:
new_listing=listing,
old_deal=db_deal,
):
# TODO: trigger db upsert
db_client.update_deal_with_checks(
deal_in_db=db_deal,
hubspot_client=hubspot_client,
hs_deal=hubspot_deal,
hs_company_id=company,
hs_listing=listing,
)
return
# ==============================
# Orchestration of other lambdas
# ==============================
if HubspotDealDiffer.check_for_pashub_trigger(
new_deal=hubspot_deal, old_deal=db_deal
):
# TODO: trigger pashub file fetcher
return
# if db_deal: