diff --git a/alembic/versions/c8af22cece92_s3_add.py b/alembic/versions/c8af22cece92_s3_add.py new file mode 100644 index 0000000..9c3db63 --- /dev/null +++ b/alembic/versions/c8af22cece92_s3_add.py @@ -0,0 +1,33 @@ +"""s3 add + +Revision ID: c8af22cece92 +Revises: ed6aaa298de4 +Create Date: 2025-11-07 15:00:32.917157 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import sqlmodel + + +# revision identifiers, used by Alembic. +revision: str = 'c8af22cece92' +down_revision: Union[str, None] = 'ed6aaa298de4' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('hubspot_deal_data', sa.Column('major_condition_issue_evidence_s3_url', sqlmodel.sql.sqltypes.AutoString(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('hubspot_deal_data', 'major_condition_issue_evidence_s3_url') + # ### end Alembic commands ### diff --git a/deployment/database/provider.tf b/deployment/database/provider.tf index 8f8274a..9ec544e 100644 --- a/deployment/database/provider.tf +++ b/deployment/database/provider.tf @@ -6,7 +6,7 @@ terraform { } } backend "s3" { - bucket = "survey-extractor-tf-state" + = "survey-extractor-tf-state" region = "eu-west-2" key = "env:/dev/terraform.tfstate" } diff --git a/etl/db/hubSpotLoad.py b/etl/db/hubSpotLoad.py index 209a0e5..286aa44 100644 --- a/etl/db/hubSpotLoad.py +++ b/etl/db/hubSpotLoad.py @@ -1,50 +1,53 @@ from etl.db.db import get_db_session, init_db from etl.models.topLevel import HubspotDealData, HubspotCommpanyData from sqlmodel import select +from etl.s3.s3_uploader import S3Uploader +import hashlib +import os -class HubspotTodb(): + +class HubspotTodb: def __init__(self): init_db() + self.s3 = S3Uploader() def new_record_to_hubspot_data(self, deal_data, company, listing): - print("This has been depreciated using new interface") - self.upsert_hubspot_deal(deal_data, company, listing) + print("⚠️ Deprecated — use the new interface instead.") + return self.upsert_hubspot_deal(deal_data, company, listing) - def new_record_company(self, company_data): - """ - Adds a new records to the hubspot_compnay_data table - """ - + """Adds a new record to the hubspot_company_data table.""" with get_db_session() as session: new_record = HubspotCommpanyData( company_id=company_data.get("hs_object_id"), - company_name=company_data.get("name") + company_name=company_data.get("name"), ) session.add(new_record) session.commit() session.refresh(new_record) return new_record - + def find_all_deals_with_company_id(self, company_id): - """ - Returns a list of records that have a company_id from the hubspot_deal_data table - """ - """ - Returns a list of records that have a company_id from the hubspot_deal_data table - """ + """Returns a list of deals for a given company_id.""" with get_db_session() as session: - results = ( + return ( session.query(HubspotDealData) .filter(HubspotDealData.company_id == company_id) .all() ) - return results - + + def _sha256(self, file_path: str) -> str: + """Compute SHA-256 checksum of a file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return sha256.hexdigest() + def update_deal(self, deal_in_db, hubspot_client): """ - Checks if a deal needs updating and updates it in the db with the latest information. - Performs soft assertions for field-level consistency between DB and HubSpot data. + Checks if a deal needs updating and syncs it with HubSpot. + Also handles major_condition_issue_photos file upload to S3 with integrity check. """ def soft_assert(condition, message="Assertion Failed"): if not condition: @@ -52,70 +55,95 @@ class HubspotTodb(): return False return True - print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") + print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id) - - results = [ - soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), - "deal_id mismatch"), - soft_assert(deal_in_db.company_id == hs_company_id, - "company_id mismatch"), - soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), - "landlord_property_id mismatch"), - soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), - "outcome mismatch"), - soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), - "dealstage mismatch"), - soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), - "dealname mismatch"), - soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), - "project_code mismatch"), - soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), - "uprn mismatch"), - soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), - "outcome_notes mismatch"), - soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), - "major condition description mismatch"), - soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), - "major condition issue photos mismatch") + # Soft compare key fields + checks = [ + soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"), + soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), + soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), "landlord_property_id mismatch"), + soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"), + soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"), + soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"), + soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), "project_code mismatch"), + soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"), + soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), "outcome_notes mismatch"), + soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), "major condition description mismatch"), + soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), "major condition issue photos mismatch"), ] - # if any of the soft asserts failed - if not all(results): - print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.") - self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing) - return False + # If discrepancies found, update from HubSpot + if not all(checks): + print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot.") + return self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) + + # Handle photo upload if it exists but S3 URL is missing + if deal_in_db.major_condition_issue_photos and not deal_in_db.major_condition_issue_evidence_s3_url: + print(f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3...") + + # Download from HubSpot + local_file = hubspot_client.download_file_from_url(deal_in_db.major_condition_issue_photos) + + # Upload to S3 + bucket = "retrofit-data-dev" + s3_url = self.s3.upload_file(local_file, bucket, prefix="hubspot/awaabs_law_evidence/") + + # Download again to verify integrity + downloaded = self.s3.download_from_url(s3_url) + if self._sha256(local_file) == self._sha256(downloaded): + print("✅ SHA256 match verified — upload successful.") + else: + print("❌ SHA256 mismatch — integrity check failed.") + raise ValueError("File integrity check failed after S3 upload.") + + # Update DB record with S3 URL + with get_db_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = s3_url + session.add(db_record) + session.commit() + print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}") + else: - print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.") - return True - - def upsert_hubspot_deal(self, deal_data, company, listing): + print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") + + return True + + def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client): """ - Inserts a new record or updates an existing record in hubspot_deal_data. - Uses deal_id (hs_object_id) as the unique identifier. + Inserts or updates a deal record. + Also uploads photos if present and adds S3 URL. """ with get_db_session() as session: deal_id = deal_data.get("hs_object_id") - # Use SQLModel's modern query style statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id) existing = session.exec(statement).first() if existing: print(f"🔄 Updating existing deal (deal_id={deal_id})") - existing.dealname = deal_data.get("dealname", existing.dealname) - existing.dealstage = deal_data.get("dealstage", existing.dealstage) - existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id) - existing.uprn = listing.get("national_uprn", existing.uprn) - existing.outcome = deal_data.get("outcome", existing.outcome) - existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes) - existing.project_code = deal_data.get("project_code", existing.project_code) - existing.company_id = company or existing.company_id - existing.major_condition_issue_description = deal_data.get("major_condition_issue_description", existing.major_condition_issue_description) - existing.major_condition_issue_photos = deal_data.get("major_condition_issue_photos", existing.major_condition_issue_photos) + for attr, value in { + "dealname": deal_data.get("dealname"), + "dealstage": deal_data.get("dealstage"), + "landlord_property_id": listing.get("owner_property_id"), + "uprn": listing.get("national_uprn"), + "outcome": deal_data.get("outcome"), + "outcome_notes": deal_data.get("outcome_notes"), + "project_code": deal_data.get("project_code"), + "company_id": company, + "major_condition_issue_description": deal_data.get("major_condition_issue_description"), + "major_condition_issue_photos": deal_data.get("major_condition_issue_photos"), + }.items(): + setattr(existing, attr, value or getattr(existing, attr)) + + # Upload if photo exists but S3 link missing + if existing.major_condition_issue_photos and not existing.major_condition_issue_evidence_s3_url: + local_file = hubspot_client.download_file_from_url(existing.major_condition_issue_photos) + s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/") + existing.major_condition_issue_evidence_s3_url = s3_url session.add(existing) session.commit() @@ -124,7 +152,6 @@ class HubspotTodb(): else: print(f"🆕 Inserting new deal (deal_id={deal_id})") - new_record = HubspotDealData( deal_id=deal_id, dealname=deal_data.get("dealname"), @@ -135,10 +162,16 @@ class HubspotTodb(): outcome_notes=deal_data.get("outcome_notes"), project_code=deal_data.get("project_code"), company_id=company, - major_condition_issue_description = deal_data.get("major_condition_issue_description"), - major_condition_issue_photos = deal_data.get("major_condition_issue_photos"), + major_condition_issue_description=deal_data.get("major_condition_issue_description"), + major_condition_issue_photos=deal_data.get("major_condition_issue_photos"), ) + # Handle upload at insert time + if new_record.major_condition_issue_photos: + local_file = hubspot_client.download_file_from_url(new_record.major_condition_issue_photos) + s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/") + new_record.major_condition_issue_evidence_s3_url = s3_url + session.add(new_record) session.commit() session.refresh(new_record) diff --git a/etl/hubSpotClient/hubspotClient.py b/etl/hubSpotClient/hubspotClient.py index a913a60..adbec50 100644 --- a/etl/hubSpotClient/hubspotClient.py +++ b/etl/hubSpotClient/hubspotClient.py @@ -3,6 +3,8 @@ from enum import Enum from etl.utils.logger import Logger import logging from hubspot.crm.associations import ApiException +import os +import requests class Companies(Enum): ABRI = "237615001799" @@ -208,4 +210,54 @@ class HubSpotClient(): except Exception as e: self.logger.error(f"Error retrieving deal stages: {e}") - return [] \ No newline at end of file + return [] + + def download_file_from_url(self, download_url: str, save_path: str = None) -> str: + """ + Download a file from a HubSpot file URL (public or private), keeping its original file type. + """ + import mimetypes + import requests + import os + + try: + headers = {} + if "hubspotusercontent" not in download_url: + headers["Authorization"] = f"Bearer {self.access_token}" + + self.logger.info(f"Downloading HubSpot file: {download_url}") + response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True) + response.raise_for_status() + + # Try to infer filename from Content-Disposition header + content_disposition = response.headers.get("content-disposition") + if content_disposition and "filename=" in content_disposition: + filename = content_disposition.split("filename=")[1].strip('"') + else: + # fallback: extract from URL or content-type + filename = os.path.basename(download_url.split("?")[0]) or "hubspot_download" + if "." not in filename: + content_type = response.headers.get("content-type") + ext = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None + if ext: + filename += ext + + # Make sure save_path is valid + if save_path is None: + save_path = os.path.abspath(filename) + elif os.path.isdir(save_path): + save_path = os.path.join(save_path, filename) + else: + # if user passes a file path directly, leave it + save_path = os.path.abspath(save_path) + + with open(save_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + self.logger.info(f"File downloaded successfully → {save_path}") + return save_path + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to download file from HubSpot: {e}") + raise diff --git a/etl/models/topLevel.py b/etl/models/topLevel.py index de7139b..cdf1f9f 100644 --- a/etl/models/topLevel.py +++ b/etl/models/topLevel.py @@ -83,6 +83,7 @@ class HubspotDealData(SQLModel, table=True): major_condition_issue_description: Optional[str] = Field(default=None) major_condition_issue_photos: Optional[str] = Field(default=None) + major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None) created_at: datetime = Field( sa_column=Column( diff --git a/etl/s3/s3_uploader.py b/etl/s3/s3_uploader.py new file mode 100644 index 0000000..32cb910 --- /dev/null +++ b/etl/s3/s3_uploader.py @@ -0,0 +1,113 @@ +import os +import boto3 +from botocore.exceptions import ClientError +from urllib.parse import urlparse +from datetime import datetime +import requests + +class S3Uploader: + """ + Simple helper to upload local files to S3 and return their S3 HTTPS URI. + """ + + def __init__( + self, + aws_access_key: str = "AKIAU5A36PPNK7RXX52V", + aws_secret_key: str = "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", + region: str = "eu-west-2", + ): + self.aws_access_key = aws_access_key + self.aws_secret_key = aws_secret_key + self.region = region + + self.s3 = boto3.client( + "s3", + aws_access_key_id=self.aws_access_key, + aws_secret_access_key=self.aws_secret_key, + region_name=self.region, + ) + + def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str: + """ + Upload a local file to an S3 bucket and return its HTTPS URI. + + Args: + file_path (str): Path to the local file. + bucket (str): S3 bucket name. + prefix (str): Folder/prefix in the bucket. + + Returns: + str: HTTPS-style S3 URI (not signed). + """ + try: + filename = os.path.basename(file_path) + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + s3_key = os.path.join(prefix, f"{timestamp}_{filename}") + + self.s3.upload_file(file_path, bucket, s3_key) + + s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}" + return s3_uri + + except ClientError as e: + raise RuntimeError(f"❌ S3 upload failed: {e}") + + def print_bucket(self): + print(self.s3.head_bucket(Bucket="retrofit-data-dev")) + + + def generate_presigned_url(self, bucket: str, key: str, expires_in: int = 3600) -> str: + """ + Generate a temporary presigned URL for an S3 object. + """ + try: + return self.s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=expires_in, + ) + except ClientError as e: + raise RuntimeError(f"❌ Failed to generate signed URL: {e}") + + + def download_from_url(self, s3_url: str, local_dir: str = ".", expires_in: int = 3600) -> str: + """ + Download a file from a public or private S3 URL. + If private, generates a presigned URL first. + + Args: + s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt) + local_dir (str): Folder to save the file in. + expires_in (int): Presigned URL lifetime (seconds). + + Returns: + str: Local file path of the downloaded file. + """ + parsed = urlparse(s3_url) + host_parts = parsed.netloc.split(".") + if len(host_parts) < 3 or host_parts[1] != "s3": + raise ValueError("❌ Not a valid S3 HTTPS URL") + + bucket = host_parts[0] + key = parsed.path.lstrip("/") + + # Generate presigned URL (whether public or private) + presigned_url = self.generate_presigned_url(bucket, key, expires_in) + + filename = os.path.basename(key) + local_path = os.path.join(local_dir, filename) + + try: + response = requests.get(presigned_url, stream=True) + response.raise_for_status() + + os.makedirs(local_dir, exist_ok=True) + with open(local_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + print(f"✅ Downloaded: {local_path}") + return local_path + + except requests.exceptions.RequestException as e: + raise RuntimeError(f"❌ Failed to download file: {e}") \ No newline at end of file diff --git a/migration_db.sh b/migration_db.sh index 1443b18..c0046bc 100644 --- a/migration_db.sh +++ b/migration_db.sh @@ -1,4 +1,4 @@ -#poetry run alembic revision --autogenerate -m "added major condition issue things" +#poetry run alembic revision --autogenerate -m "s3 add " poetry run alembic upgrade head