mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-30 13:10:56 +00:00
Merge pull request #106 from Hestia-Homes/feature/images_in_s3
Feature/images in s3
This commit is contained in:
commit
9a768d6239
7 changed files with 306 additions and 74 deletions
33
alembic/versions/c8af22cece92_s3_add.py
Normal file
33
alembic/versions/c8af22cece92_s3_add.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
"""s3 add
|
||||
|
||||
Revision ID: c8af22cece92
|
||||
Revises: ed6aaa298de4
|
||||
Create Date: 2025-11-07 15:00:32.917157
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
import sqlmodel
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'c8af22cece92'
|
||||
down_revision: Union[str, None] = 'ed6aaa298de4'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('hubspot_deal_data', sa.Column('major_condition_issue_evidence_s3_url', sqlmodel.sql.sqltypes.AutoString(), nullable=True))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column('hubspot_deal_data', 'major_condition_issue_evidence_s3_url')
|
||||
# ### end Alembic commands ###
|
||||
|
|
@ -6,7 +6,7 @@ terraform {
|
|||
}
|
||||
}
|
||||
backend "s3" {
|
||||
bucket = "survey-extractor-tf-state"
|
||||
= "survey-extractor-tf-state"
|
||||
region = "eu-west-2"
|
||||
key = "env:/dev/terraform.tfstate"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,50 +1,53 @@
|
|||
from etl.db.db import get_db_session, init_db
|
||||
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
|
||||
from sqlmodel import select
|
||||
from etl.s3.s3_uploader import S3Uploader
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
class HubspotTodb():
|
||||
|
||||
class HubspotTodb:
|
||||
def __init__(self):
|
||||
init_db()
|
||||
self.s3 = S3Uploader()
|
||||
|
||||
def new_record_to_hubspot_data(self, deal_data, company, listing):
|
||||
print("This has been depreciated using new interface")
|
||||
self.upsert_hubspot_deal(deal_data, company, listing)
|
||||
print("⚠️ Deprecated — use the new interface instead.")
|
||||
return self.upsert_hubspot_deal(deal_data, company, listing)
|
||||
|
||||
|
||||
def new_record_company(self, company_data):
|
||||
"""
|
||||
Adds a new records to the hubspot_compnay_data table
|
||||
"""
|
||||
|
||||
"""Adds a new record to the hubspot_company_data table."""
|
||||
with get_db_session() as session:
|
||||
new_record = HubspotCommpanyData(
|
||||
company_id=company_data.get("hs_object_id"),
|
||||
company_name=company_data.get("name")
|
||||
company_name=company_data.get("name"),
|
||||
)
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
return new_record
|
||||
|
||||
|
||||
def find_all_deals_with_company_id(self, company_id):
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
"""Returns a list of deals for a given company_id."""
|
||||
with get_db_session() as session:
|
||||
results = (
|
||||
return (
|
||||
session.query(HubspotDealData)
|
||||
.filter(HubspotDealData.company_id == company_id)
|
||||
.all()
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def _sha256(self, file_path: str) -> str:
|
||||
"""Compute SHA-256 checksum of a file."""
|
||||
sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
sha256.update(chunk)
|
||||
return sha256.hexdigest()
|
||||
|
||||
def update_deal(self, deal_in_db, hubspot_client):
|
||||
"""
|
||||
Checks if a deal needs updating and updates it in the db with the latest information.
|
||||
Performs soft assertions for field-level consistency between DB and HubSpot data.
|
||||
Checks if a deal needs updating and syncs it with HubSpot.
|
||||
Also handles major_condition_issue_photos file upload to S3 with integrity check.
|
||||
"""
|
||||
def soft_assert(condition, message="Assertion Failed"):
|
||||
if not condition:
|
||||
|
|
@ -52,70 +55,95 @@ class HubspotTodb():
|
|||
return False
|
||||
return True
|
||||
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
|
||||
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id)
|
||||
|
||||
|
||||
results = [
|
||||
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"),
|
||||
"deal_id mismatch"),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id,
|
||||
"company_id mismatch"),
|
||||
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
|
||||
"landlord_property_id mismatch"),
|
||||
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"),
|
||||
"outcome mismatch"),
|
||||
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"),
|
||||
"dealstage mismatch"),
|
||||
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"),
|
||||
"dealname mismatch"),
|
||||
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"),
|
||||
"project_code mismatch"),
|
||||
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"),
|
||||
"uprn mismatch"),
|
||||
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
|
||||
"outcome_notes mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"),
|
||||
"major condition description mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"),
|
||||
"major condition issue photos mismatch")
|
||||
# Soft compare key fields
|
||||
checks = [
|
||||
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
|
||||
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), "landlord_property_id mismatch"),
|
||||
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"),
|
||||
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"),
|
||||
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"),
|
||||
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), "project_code mismatch"),
|
||||
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"),
|
||||
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), "outcome_notes mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_description == hs_deal.get("major_condition_issue_description"), "major condition description mismatch"),
|
||||
soft_assert(deal_in_db.major_condition_issue_photos == hs_deal.get("major_condition_issue_photos"), "major condition issue photos mismatch"),
|
||||
]
|
||||
|
||||
# if any of the soft asserts failed
|
||||
if not all(results):
|
||||
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.")
|
||||
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing)
|
||||
return False
|
||||
# If discrepancies found, update from HubSpot
|
||||
if not all(checks):
|
||||
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot.")
|
||||
return self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
|
||||
|
||||
# Handle photo upload if it exists but S3 URL is missing
|
||||
if deal_in_db.major_condition_issue_photos and not deal_in_db.major_condition_issue_evidence_s3_url:
|
||||
print(f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3...")
|
||||
|
||||
# Download from HubSpot
|
||||
local_file = hubspot_client.download_file_from_url(deal_in_db.major_condition_issue_photos)
|
||||
|
||||
# Upload to S3
|
||||
bucket = "retrofit-data-dev"
|
||||
s3_url = self.s3.upload_file(local_file, bucket, prefix="hubspot/awaabs_law_evidence/")
|
||||
|
||||
# Download again to verify integrity
|
||||
downloaded = self.s3.download_from_url(s3_url)
|
||||
if self._sha256(local_file) == self._sha256(downloaded):
|
||||
print("✅ SHA256 match verified — upload successful.")
|
||||
else:
|
||||
print("❌ SHA256 mismatch — integrity check failed.")
|
||||
raise ValueError("File integrity check failed after S3 upload.")
|
||||
|
||||
# Update DB record with S3 URL
|
||||
with get_db_session() as session:
|
||||
db_record = session.get(HubspotDealData, deal_in_db.id)
|
||||
db_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
session.add(db_record)
|
||||
session.commit()
|
||||
print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}")
|
||||
|
||||
else:
|
||||
print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.")
|
||||
return True
|
||||
|
||||
def upsert_hubspot_deal(self, deal_data, company, listing):
|
||||
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
|
||||
|
||||
return True
|
||||
|
||||
def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client):
|
||||
"""
|
||||
Inserts a new record or updates an existing record in hubspot_deal_data.
|
||||
Uses deal_id (hs_object_id) as the unique identifier.
|
||||
Inserts or updates a deal record.
|
||||
Also uploads photos if present and adds S3 URL.
|
||||
"""
|
||||
with get_db_session() as session:
|
||||
deal_id = deal_data.get("hs_object_id")
|
||||
|
||||
# Use SQLModel's modern query style
|
||||
statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id)
|
||||
existing = session.exec(statement).first()
|
||||
|
||||
if existing:
|
||||
print(f"🔄 Updating existing deal (deal_id={deal_id})")
|
||||
|
||||
existing.dealname = deal_data.get("dealname", existing.dealname)
|
||||
existing.dealstage = deal_data.get("dealstage", existing.dealstage)
|
||||
existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id)
|
||||
existing.uprn = listing.get("national_uprn", existing.uprn)
|
||||
existing.outcome = deal_data.get("outcome", existing.outcome)
|
||||
existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes)
|
||||
existing.project_code = deal_data.get("project_code", existing.project_code)
|
||||
existing.company_id = company or existing.company_id
|
||||
existing.major_condition_issue_description = deal_data.get("major_condition_issue_description", existing.major_condition_issue_description)
|
||||
existing.major_condition_issue_photos = deal_data.get("major_condition_issue_photos", existing.major_condition_issue_photos)
|
||||
for attr, value in {
|
||||
"dealname": deal_data.get("dealname"),
|
||||
"dealstage": deal_data.get("dealstage"),
|
||||
"landlord_property_id": listing.get("owner_property_id"),
|
||||
"uprn": listing.get("national_uprn"),
|
||||
"outcome": deal_data.get("outcome"),
|
||||
"outcome_notes": deal_data.get("outcome_notes"),
|
||||
"project_code": deal_data.get("project_code"),
|
||||
"company_id": company,
|
||||
"major_condition_issue_description": deal_data.get("major_condition_issue_description"),
|
||||
"major_condition_issue_photos": deal_data.get("major_condition_issue_photos"),
|
||||
}.items():
|
||||
setattr(existing, attr, value or getattr(existing, attr))
|
||||
|
||||
# Upload if photo exists but S3 link missing
|
||||
if existing.major_condition_issue_photos and not existing.major_condition_issue_evidence_s3_url:
|
||||
local_file = hubspot_client.download_file_from_url(existing.major_condition_issue_photos)
|
||||
s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/")
|
||||
existing.major_condition_issue_evidence_s3_url = s3_url
|
||||
|
||||
session.add(existing)
|
||||
session.commit()
|
||||
|
|
@ -124,7 +152,6 @@ class HubspotTodb():
|
|||
|
||||
else:
|
||||
print(f"🆕 Inserting new deal (deal_id={deal_id})")
|
||||
|
||||
new_record = HubspotDealData(
|
||||
deal_id=deal_id,
|
||||
dealname=deal_data.get("dealname"),
|
||||
|
|
@ -135,10 +162,16 @@ class HubspotTodb():
|
|||
outcome_notes=deal_data.get("outcome_notes"),
|
||||
project_code=deal_data.get("project_code"),
|
||||
company_id=company,
|
||||
major_condition_issue_description = deal_data.get("major_condition_issue_description"),
|
||||
major_condition_issue_photos = deal_data.get("major_condition_issue_photos"),
|
||||
major_condition_issue_description=deal_data.get("major_condition_issue_description"),
|
||||
major_condition_issue_photos=deal_data.get("major_condition_issue_photos"),
|
||||
)
|
||||
|
||||
# Handle upload at insert time
|
||||
if new_record.major_condition_issue_photos:
|
||||
local_file = hubspot_client.download_file_from_url(new_record.major_condition_issue_photos)
|
||||
s3_url = self.s3.upload_file(local_file, "retrofit-data-dev", prefix="hubspot/awaabs_law_evidence/")
|
||||
new_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ from enum import Enum
|
|||
from etl.utils.logger import Logger
|
||||
import logging
|
||||
from hubspot.crm.associations import ApiException
|
||||
import os
|
||||
import requests
|
||||
|
||||
class Companies(Enum):
|
||||
ABRI = "237615001799"
|
||||
|
|
@ -208,4 +210,54 @@ class HubSpotClient():
|
|||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error retrieving deal stages: {e}")
|
||||
return []
|
||||
return []
|
||||
|
||||
def download_file_from_url(self, download_url: str, save_path: str = None) -> str:
|
||||
"""
|
||||
Download a file from a HubSpot file URL (public or private), keeping its original file type.
|
||||
"""
|
||||
import mimetypes
|
||||
import requests
|
||||
import os
|
||||
|
||||
try:
|
||||
headers = {}
|
||||
if "hubspotusercontent" not in download_url:
|
||||
headers["Authorization"] = f"Bearer {self.access_token}"
|
||||
|
||||
self.logger.info(f"Downloading HubSpot file: {download_url}")
|
||||
response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True)
|
||||
response.raise_for_status()
|
||||
|
||||
# Try to infer filename from Content-Disposition header
|
||||
content_disposition = response.headers.get("content-disposition")
|
||||
if content_disposition and "filename=" in content_disposition:
|
||||
filename = content_disposition.split("filename=")[1].strip('"')
|
||||
else:
|
||||
# fallback: extract from URL or content-type
|
||||
filename = os.path.basename(download_url.split("?")[0]) or "hubspot_download"
|
||||
if "." not in filename:
|
||||
content_type = response.headers.get("content-type")
|
||||
ext = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
|
||||
if ext:
|
||||
filename += ext
|
||||
|
||||
# Make sure save_path is valid
|
||||
if save_path is None:
|
||||
save_path = os.path.abspath(filename)
|
||||
elif os.path.isdir(save_path):
|
||||
save_path = os.path.join(save_path, filename)
|
||||
else:
|
||||
# if user passes a file path directly, leave it
|
||||
save_path = os.path.abspath(save_path)
|
||||
|
||||
with open(save_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
self.logger.info(f"File downloaded successfully → {save_path}")
|
||||
return save_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Failed to download file from HubSpot: {e}")
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ class HubspotDealData(SQLModel, table=True):
|
|||
|
||||
major_condition_issue_description: Optional[str] = Field(default=None)
|
||||
major_condition_issue_photos: Optional[str] = Field(default=None)
|
||||
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
|
||||
|
||||
created_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
|
|
|
|||
113
etl/s3/s3_uploader.py
Normal file
113
etl/s3/s3_uploader.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
import os
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from urllib.parse import urlparse
|
||||
from datetime import datetime
|
||||
import requests
|
||||
|
||||
class S3Uploader:
|
||||
"""
|
||||
Simple helper to upload local files to S3 and return their S3 HTTPS URI.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
aws_access_key: str = "AKIAU5A36PPNK7RXX52V",
|
||||
aws_secret_key: str = "KRTjzoGVestZ0ifDwaAVqiPoXXZAvQKAjY5sVBtP",
|
||||
region: str = "eu-west-2",
|
||||
):
|
||||
self.aws_access_key = aws_access_key
|
||||
self.aws_secret_key = aws_secret_key
|
||||
self.region = region
|
||||
|
||||
self.s3 = boto3.client(
|
||||
"s3",
|
||||
aws_access_key_id=self.aws_access_key,
|
||||
aws_secret_access_key=self.aws_secret_key,
|
||||
region_name=self.region,
|
||||
)
|
||||
|
||||
def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str:
|
||||
"""
|
||||
Upload a local file to an S3 bucket and return its HTTPS URI.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the local file.
|
||||
bucket (str): S3 bucket name.
|
||||
prefix (str): Folder/prefix in the bucket.
|
||||
|
||||
Returns:
|
||||
str: HTTPS-style S3 URI (not signed).
|
||||
"""
|
||||
try:
|
||||
filename = os.path.basename(file_path)
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
s3_key = os.path.join(prefix, f"{timestamp}_{filename}")
|
||||
|
||||
self.s3.upload_file(file_path, bucket, s3_key)
|
||||
|
||||
s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}"
|
||||
return s3_uri
|
||||
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"❌ S3 upload failed: {e}")
|
||||
|
||||
def print_bucket(self):
|
||||
print(self.s3.head_bucket(Bucket="retrofit-data-dev"))
|
||||
|
||||
|
||||
def generate_presigned_url(self, bucket: str, key: str, expires_in: int = 3600) -> str:
|
||||
"""
|
||||
Generate a temporary presigned URL for an S3 object.
|
||||
"""
|
||||
try:
|
||||
return self.s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": bucket, "Key": key},
|
||||
ExpiresIn=expires_in,
|
||||
)
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"❌ Failed to generate signed URL: {e}")
|
||||
|
||||
|
||||
def download_from_url(self, s3_url: str, local_dir: str = ".", expires_in: int = 3600) -> str:
|
||||
"""
|
||||
Download a file from a public or private S3 URL.
|
||||
If private, generates a presigned URL first.
|
||||
|
||||
Args:
|
||||
s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt)
|
||||
local_dir (str): Folder to save the file in.
|
||||
expires_in (int): Presigned URL lifetime (seconds).
|
||||
|
||||
Returns:
|
||||
str: Local file path of the downloaded file.
|
||||
"""
|
||||
parsed = urlparse(s3_url)
|
||||
host_parts = parsed.netloc.split(".")
|
||||
if len(host_parts) < 3 or host_parts[1] != "s3":
|
||||
raise ValueError("❌ Not a valid S3 HTTPS URL")
|
||||
|
||||
bucket = host_parts[0]
|
||||
key = parsed.path.lstrip("/")
|
||||
|
||||
# Generate presigned URL (whether public or private)
|
||||
presigned_url = self.generate_presigned_url(bucket, key, expires_in)
|
||||
|
||||
filename = os.path.basename(key)
|
||||
local_path = os.path.join(local_dir, filename)
|
||||
|
||||
try:
|
||||
response = requests.get(presigned_url, stream=True)
|
||||
response.raise_for_status()
|
||||
|
||||
os.makedirs(local_dir, exist_ok=True)
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
print(f"✅ Downloaded: {local_path}")
|
||||
return local_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise RuntimeError(f"❌ Failed to download file: {e}")
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
#poetry run alembic revision --autogenerate -m "added major condition issue things"
|
||||
#poetry run alembic revision --autogenerate -m "s3 add "
|
||||
|
||||
poetry run alembic upgrade head
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue