From 01636514aaa58f8749af194fac0ac31cd8a79284 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Thu, 9 Apr 2026 10:32:59 +0000 Subject: [PATCH] pull diffing logic out of loading method --- etl/hubspot/hubspotDataTodB.py | 181 ++++++++++++++-------------- etl/hubspot/scripts/scraper/main.py | 37 ++++-- 2 files changed, 118 insertions(+), 100 deletions(-) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index 4f43f1f7..1c4b6b54 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -96,13 +96,103 @@ class HubspotDataToDb: return sha256.hexdigest() def update_deal_with_checks( - self, deal_in_db: HubspotDealData, hubspot_client: HubspotClient + self, + deal_in_db: HubspotDealData, + hubspot_client: HubspotClient, + hs_deal: Dict[str, str], + hs_company_id: Optional[str], + hs_listing: Optional[Dict[str, str]], ) -> bool: """ - Checks if a deal needs updating and syncs it with HubSpot. - Also handles major_condition_issue_photos file upload to S3 with integrity check. + Updates deal in database and handles major_condition_issue_photos file upload to S3 with integrity check. """ + self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) + # Handle photo upload if it exists but S3 URL is missing + if self._needs_photo_upload(deal_in_db): + print( + f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." + ) + + photo_url = hs_deal.get("major_condition_issue_photos") + + if photo_url: + self._upload_photo_to_s3( + deal_in_db, + photo_url, + hubspot_client, + verify=True, + ) + + # persist change + with db_read_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = ( + deal_in_db.major_condition_issue_evidence_s3_url + ) + session.add(db_record) + session.commit() + + return False + else: + print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") + + else: + print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") + + return True + + def upsert_deal( + self, + deal_data: Dict[str, str], + company: Optional[str], + listing: Optional[dict[str, str]], + hubspot_client: HubspotClient, + ): + """ + Inserts or updates a deal record. + Also uploads photos if present and adds S3 URL. + """ + with db_read_session() as session: + deal_id = deal_data.get("hs_object_id") + + statement = select(HubspotDealData).where( + HubspotDealData.deal_id == deal_id + ) + existing = session.exec(statement).first() + + if existing: + print(f"🔄 Updating existing deal (deal_id={deal_id})") + self._update_existing_deal(existing, deal_data, listing, company) + + self._handle_existing_photo_upload(existing, hubspot_client) + + session.add(existing) + session.commit() + session.refresh(existing) + return existing + + else: + print(f"🆕 Inserting new deal (deal_id={deal_id})") + new_record: HubspotDealData = self._build_new_deal( + deal_id, deal_data, listing, company + ) + + # Handle upload at insert time + self._handle_new_photo_upload(new_record, hubspot_client) + + session.add(new_record) + session.commit() + session.refresh(new_record) + return new_record + + def _deprecated_diff( + self, + deal_in_db: HubspotDealData, + hs_deal: Dict[str, str], + hs_company_id: Optional[str], + hs_listing: Optional[Dict[str, str]], + ): def soft_assert(condition: bool, message: str = "Assertion Failed"): if not condition: print(f"⚠️ Soft Assert Failed: {message}") @@ -111,14 +201,6 @@ class HubspotDataToDb: print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") - hs_deal: Dict[str, str] - hs_company_id: Optional[str] - hs_listing: Optional[Dict[str, str]] - - hs_deal, hs_company_id, hs_listing = ( - hubspot_client.get_deal_and_company_and_listing(deal_in_db.deal_id) - ) - # Soft compare key fields checks = [ soft_assert( @@ -291,87 +373,10 @@ class HubspotDataToDb: print( f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot." ) - self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) return False - # Handle photo upload if it exists but S3 URL is missing - if self._needs_photo_upload(deal_in_db): - print( - f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." - ) - - photo_url = hs_deal.get("major_condition_issue_photos") - - if photo_url: - self._upload_photo_to_s3( - deal_in_db, - photo_url, - hubspot_client, - verify=True, # 👈 key difference - ) - - # persist change - with db_read_session() as session: - db_record = session.get(HubspotDealData, deal_in_db.id) - db_record.major_condition_issue_evidence_s3_url = ( - deal_in_db.major_condition_issue_evidence_s3_url - ) - session.add(db_record) - session.commit() - - return False - else: - print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") - - else: - print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") - return True - def upsert_deal( - self, - deal_data: Dict[str, str], - company: Optional[str], - listing: Optional[dict[str, str]], - hubspot_client: HubspotClient, - ): - """ - Inserts or updates a deal record. - Also uploads photos if present and adds S3 URL. - """ - with db_read_session() as session: - deal_id = deal_data.get("hs_object_id") - - statement = select(HubspotDealData).where( - HubspotDealData.deal_id == deal_id - ) - existing = session.exec(statement).first() - - if existing: - print(f"🔄 Updating existing deal (deal_id={deal_id})") - self._update_existing_deal(existing, deal_data, listing, company) - - self._handle_existing_photo_upload(existing, hubspot_client) - - session.add(existing) - session.commit() - session.refresh(existing) - return existing - - else: - print(f"🆕 Inserting new deal (deal_id={deal_id})") - new_record: HubspotDealData = self._build_new_deal( - deal_id, deal_data, listing, company - ) - - # Handle upload at insert time - self._handle_new_photo_upload(new_record, hubspot_client) - - session.add(new_record) - session.commit() - session.refresh(new_record) - return new_record - def _update_existing_deal( self, existing: HubspotDealData, diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 8c4af1a7..768a86eb 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -1,8 +1,7 @@ from backend.app.db.models.organisation import HubspotDealData from etl.hubspot.hubspotClient import HubspotClient -# from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb -from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb from backend.utils.subtasks import task_handler from typing import Any, Dict, Optional @@ -24,11 +23,6 @@ def handler(body: dict[str, Any], context: Any) -> None: hubspot_deal_id ) - if not db_deal: - # New hubspot deal, no diffing to do - # TODO: Trigger hubspot to db ETL - return - hubspot_deal: Dict[str, str] company: Optional[str] listing: Optional[dict[str, str]] @@ -37,10 +31,14 @@ def handler(body: dict[str, Any], context: Any) -> None: hubspot_deal_id ) - if HubspotDealDiffer.check_for_pashub_trigger( - new_deal=hubspot_deal, old_deal=db_deal - ): - # TODO: trigger pashub file fetcher + if not db_deal: + # New hubspot deal, no diffing to do + if company: + company_data: CompanyData = hubspot_client.get_company_information(company) + db_client: HubspotDataToDb = HubspotDataToDb() + db_client.upsert_company(company_data) + + db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client) return if HubspotDealDiffer.check_for_db_update_trigger( @@ -49,7 +47,22 @@ def handler(body: dict[str, Any], context: Any) -> None: new_listing=listing, old_deal=db_deal, ): - # TODO: trigger db upsert + db_client.update_deal_with_checks( + deal_in_db=db_deal, + hubspot_client=hubspot_client, + hs_deal=hubspot_deal, + hs_company_id=company, + hs_listing=listing, + ) + return + + # ============================== + # Orchestration of other lambdas + # ============================== + if HubspotDealDiffer.check_for_pashub_trigger( + new_deal=hubspot_deal, old_deal=db_deal + ): + # TODO: trigger pashub file fetcher return # if db_deal: