diff --git a/etl/db/hubSpotLoad.py b/etl/db/hubSpotLoad.py index 209a0e5..286aa44 100644 --- a/etl/db/hubSpotLoad.py +++ b/etl/db/hubSpotLoad.py @@ -1,50 +1,53 @@ from etl.db.db import get_db_session, init_db from etl.models.topLevel import HubspotDealData, HubspotCommpanyData from sqlmodel import select +from etl.s3.s3_uploader import S3Uploader +import hashlib +import os -class HubspotTodb(): + +class HubspotTodb: def __init__(self): init_db() + self.s3 = S3Uploader() def new_record_to_hubspot_data(self, deal_data, company, listing): - print("This has been depreciated using new interface") - self.upsert_hubspot_deal(deal_data, company, listing) + print("⚠️ Deprecated — use the new interface instead.") + return self.upsert_hubspot_deal(deal_data, company, listing) - def new_record_company(self, company_data): - """ - Adds a new records to the hubspot_compnay_data table - """ - + """Adds a new record to the hubspot_company_data table.""" with get_db_session() as session: new_record = HubspotCommpanyData( company_id=company_data.get("hs_object_id"), - company_name=company_data.get("name") + company_name=company_data.get("name"), ) session.add(new_record) session.commit() session.refresh(new_record) return new_record - + def find_all_deals_with_company_id(self, company_id): - """ - Returns a list of records that have a company_id from the hubspot_deal_data table - """ - """ - Returns a list of records that have a company_id from the hubspot_deal_data table - """ + """Returns a list of deals for a given company_id.""" with get_db_session() as session: - results = ( + return ( session.query(HubspotDealData) .filter(HubspotDealData.company_id == company_id) .all() ) - return results - + + def _sha256(self, file_path: str) -> str: + """Compute SHA-256 checksum of a file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return sha256.hexdigest() + def update_deal(self, deal_in_db, hubspot_client): """ - Checks if a deal needs updating and updates it in the db with the latest information. - Performs soft assertions for field-level consistency between DB and HubSpot data. + Checks if a deal needs updating and syncs it with HubSpot. + Also handles major_condition_issue_photos file upload to S3 with integrity check. """ def soft_assert(condition, message="Assertion Failed"): if not condition: @@ -52,70 +55,95 @@ class HubspotTodb(): return False return True - print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") + print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id) - - results = [ - soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), - "deal_id mismatch"), - soft_assert(deal_in_db.company_id == hs_company_id, - "company_id mismatch"), - soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), - "landlord_property_id mismatch"), - soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), - "outcome mismatch"), - soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), - "dealstage mismatch"), - soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), - "dealname mismatch"), - soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), - "project_code mismatch"), - soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), - "uprn mismatch"), - soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), - "outcome_notes mismatch"), - soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), - "major condition description mismatch"), - soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), - "major condition issue photos mismatch") + # Soft compare key fields + checks = [ + soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"), + soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), + soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), "landlord_property_id mismatch"), + soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"), + soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"), + soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"), + soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), "project_code mismatch"), + soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"), + soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), "outcome_notes mismatch"), + soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), "major condition description mismatch"), + soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), "major condition issue photos mismatch"), ] - # if any of the soft asserts failed - if not all(results): - print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.") - self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing) - return False + # If discrepancies found, update from HubSpot + if not all(checks): + print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot.") + return self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) + + # Handle photo upload if it exists but S3 URL is missing + if deal_in_db.major_condition_issue_photos and not deal_in_db.major_condition_issue_evidence_s3_url: + print(f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3...") + + # Download from HubSpot + local_file = hubspot_client.download_file_from_url(deal_in_db.major_condition_issue_photos) + + # Upload to S3 + bucket = "retrofit-data-dev" + s3_url = self.s3.upload_file(local_file, bucket, prefix="hubspot/awaabs_law_evidence/") + + # Download again to verify integrity + downloaded = self.s3.download_from_url(s3_url) + if self._sha256(local_file) == self._sha256(downloaded): + print("✅ SHA256 match verified — upload successful.") + else: + print("❌ SHA256 mismatch — integrity check failed.") + raise ValueError("File integrity check failed after S3 upload.") + + # Update DB record with S3 URL + with get_db_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = s3_url + session.add(db_record) + session.commit() + print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}") + else: - print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.") - return True - - def upsert_hubspot_deal(self, deal_data, company, listing): + print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") + + return True + + def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client): """ - Inserts a new record or updates an existing record in hubspot_deal_data. - Uses deal_id (hs_object_id) as the unique identifier. + Inserts or updates a deal record. + Also uploads photos if present and adds S3 URL. """ with get_db_session() as session: deal_id = deal_data.get("hs_object_id") - # Use SQLModel's modern query style statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id) existing = session.exec(statement).first() if existing: print(f"🔄 Updating existing deal (deal_id={deal_id})") - existing.dealname = deal_data.get("dealname", existing.dealname) - existing.dealstage = deal_data.get("dealstage", existing.dealstage) - existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id) - existing.uprn = listing.get("national_uprn", existing.uprn) - existing.outcome = deal_data.get("outcome", existing.outcome) - existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes) - existing.project_code = deal_data.get("project_code", existing.project_code) - existing.company_id = company or existing.company_id - existing.major_condition_issue_description = deal_data.get("major_condition_issue_description", existing.major_condition_issue_description) - existing.major_condition_issue_photos = deal_data.get("major_condition_issue_photos", existing.major_condition_issue_photos) + for attr, value in { + "dealname": deal_data.get("dealname"), + "dealstage": deal_data.get("dealstage"), + "landlord_property_id": listing.get("owner_property_id"), + "uprn": listing.get("national_uprn"), + "outcome": deal_data.get("outcome"), + "outcome_notes": deal_data.get("outcome_notes"), + "project_code": deal_data.get("project_code"), + "company_id": company, + "major_condition_issue_description": deal_data.get("major_condition_issue_description"), + "major_condition_issue_photos": deal_data.get("major_condition_issue_photos"), + }.items(): + setattr(existing, attr, value or getattr(existing, attr)) + + # Upload if photo exists but S3 link missing + if existing.major_condition_issue_photos and not existing.major_condition_issue_evidence_s3_url: + local_file = hubspot_client.download_file_from_url(existing.major_condition_issue_photos) + s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/") + existing.major_condition_issue_evidence_s3_url = s3_url session.add(existing) session.commit() @@ -124,7 +152,6 @@ class HubspotTodb(): else: print(f"🆕 Inserting new deal (deal_id={deal_id})") - new_record = HubspotDealData( deal_id=deal_id, dealname=deal_data.get("dealname"), @@ -135,10 +162,16 @@ class HubspotTodb(): outcome_notes=deal_data.get("outcome_notes"), project_code=deal_data.get("project_code"), company_id=company, - major_condition_issue_description = deal_data.get("major_condition_issue_description"), - major_condition_issue_photos = deal_data.get("major_condition_issue_photos"), + major_condition_issue_description=deal_data.get("major_condition_issue_description"), + major_condition_issue_photos=deal_data.get("major_condition_issue_photos"), ) + # Handle upload at insert time + if new_record.major_condition_issue_photos: + local_file = hubspot_client.download_file_from_url(new_record.major_condition_issue_photos) + s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/") + new_record.major_condition_issue_evidence_s3_url = s3_url + session.add(new_record) session.commit() session.refresh(new_record) diff --git a/etl/s3/s3_uploader.py b/etl/s3/s3_uploader.py index e820f18..32cb910 100644 --- a/etl/s3/s3_uploader.py +++ b/etl/s3/s3_uploader.py @@ -3,7 +3,7 @@ import boto3 from botocore.exceptions import ClientError from urllib.parse import urlparse from datetime import datetime - +import requests class S3Uploader: """ @@ -68,3 +68,46 @@ class S3Uploader: ) except ClientError as e: raise RuntimeError(f"❌ Failed to generate signed URL: {e}") + + + def download_from_url(self, s3_url: str, local_dir: str = ".", expires_in: int = 3600) -> str: + """ + Download a file from a public or private S3 URL. + If private, generates a presigned URL first. + + Args: + s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt) + local_dir (str): Folder to save the file in. + expires_in (int): Presigned URL lifetime (seconds). + + Returns: + str: Local file path of the downloaded file. + """ + parsed = urlparse(s3_url) + host_parts = parsed.netloc.split(".") + if len(host_parts) < 3 or host_parts[1] != "s3": + raise ValueError("❌ Not a valid S3 HTTPS URL") + + bucket = host_parts[0] + key = parsed.path.lstrip("/") + + # Generate presigned URL (whether public or private) + presigned_url = self.generate_presigned_url(bucket, key, expires_in) + + filename = os.path.basename(key) + local_path = os.path.join(local_dir, filename) + + try: + response = requests.get(presigned_url, stream=True) + response.raise_for_status() + + os.makedirs(local_dir, exist_ok=True) + with open(local_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + print(f"✅ Downloaded: {local_path}") + return local_path + + except requests.exceptions.RequestException as e: + raise RuntimeError(f"❌ Failed to download file: {e}") \ No newline at end of file