added hubspot to add one deal with deal id

This commit is contained in:
Jun-te Kim 2026-03-30 14:41:38 +00:00
parent da039b91b2
commit 934e666357
13 changed files with 610 additions and 12 deletions

View file

@ -75,22 +75,22 @@ def app():
data_folder = "/workspaces/model/asset_list"
data_filename = "Calico ARA Upload Review.xlsx"
sheet_name = "Upload to Ara - Needs Sign Off"
sheet_name = "Sheet1"
postcode_column = "Postcode"
address1_column = "Address 1"
address1_column = "Units"
address1_method = None
fulladdress_column = "Address 1"
address_cols_to_concat = []
fulladdress_column = "Units"
address_cols_to_concat = ["Units"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = "ara_found_uprn"
landlord_property_type = "Property Type"
landlord_built_form = "Property Type"
landlord_os_uprn = None
landlord_property_type = None # Good to include if landlord gave
landlord_built_form = None # Good to include if landlord gave
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Asset Reference"
landlord_property_id = "llid"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None

View file

@ -1,6 +1,8 @@
from sqlmodel import SQLModel, Field
from sqlmodel import SQLModel, Field, Column, text
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import DateTime
from sqlalchemy.sql import func
import uuid
@ -11,3 +13,46 @@ class Organisation(SQLModel, table=True):
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
hubspot_company_id: Optional[str] = None
name: Optional[str] = None
class HubspotDealData(SQLModel, table=True):
__tablename__ = "hubspot_deal_data"
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# HubSpot Deal identifiers
deal_id: str = Field(index=True, nullable=False)
dealname: Optional[str] = Field(default=None)
dealstage: Optional[str] = Field(default=None)
company_id: Optional[str] = Field(default=None)
project_code: Optional[str] = Field(default=None)
# HubSpot custom properties
landlord_property_id: Optional[str] = Field(default=None)
uprn: Optional[str] = Field(default=None)
outcome: Optional[str] = Field(default=None)
outcome_notes: Optional[str] = Field(default=None)
major_condition_issue_description: Optional[str] = Field(default=None)
major_condition_issue_photos: Optional[str] = Field(default=None)
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
coordination_status: Optional[str] = Field(default=None)
design_status: Optional[str] = Field(default=None)
created_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("NOW() AT TIME ZONE 'utc'"),
nullable=False,
)
)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("NOW() AT TIME ZONE 'utc'"),
onupdate=func.now(),
nullable=False,
)
)

View file

@ -1,8 +1,10 @@
from backend.app.db.connection import db_read_session
from backend.app.db.models.organisation import Organisation
from backend.app.db.models.organisation import Organisation, HubspotDealData
from sqlmodel import select
from datetime import datetime, timezone
from typing import TypedDict
from etl.hubspot.s3_uploader import S3Uploader
import hashlib
class CompanyData(TypedDict):
@ -12,7 +14,7 @@ class CompanyData(TypedDict):
class HubspotDataToDb:
def __init__(self):
pass
self.s3 = S3Uploader()
def read_org_table(self, limit: int = 10):
with db_read_session() as session:
@ -53,3 +55,287 @@ class HubspotDataToDb:
session.commit()
return record
###
# Check from here
###
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client)
def find_all_deals_with_company_id(self, company_id):
"""Returns a list of deals for a given company_id."""
with db_read_session() as session:
return (
session.query(HubspotDealData)
.filter(HubspotDealData.company_id == company_id)
.all()
)
def find_deal_with_deal_id(self, deal_id):
with db_read_session() as session:
return (
session.query(HubspotDealData)
.filter(HubspotDealData.deal_id == deal_id)
.one_or_none()
)
def _sha256(self, file_path: str) -> str:
"""Compute SHA-256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def update_deal(self, deal_in_db, hubspot_client):
"""
Checks if a deal needs updating and syncs it with HubSpot.
Also handles major_condition_issue_photos file upload to S3 with integrity check.
"""
def soft_assert(condition, message="Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
return False
return True
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
deal_in_db.deal_id
)
# Soft compare key fields
checks = [
soft_assert(
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
),
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
soft_assert(
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
"landlord_property_id mismatch",
),
soft_assert(
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
),
soft_assert(
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
),
soft_assert(
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
),
soft_assert(
deal_in_db.project_code == hs_deal.get("project_code"),
"project_code mismatch",
),
soft_assert(
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
),
soft_assert(
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
"outcome_notes mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_description
== hs_deal.get("major_condition_issue_description"),
"major condition description mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_photos
== hs_deal.get("major_condition_issue_photos"),
"major condition issue photos mismatch",
),
soft_assert(
deal_in_db.coordination_status
== hs_deal.get("coordination_status__stage_1_"),
"coordination stage 1 status mismatch",
),
soft_assert(
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
"retrofit design mismatch",
),
]
# If discrepancies found, update from HubSpot
if not all(checks):
print(
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
)
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
return False
# Handle photo upload if it exists but S3 URL is missing
if (
deal_in_db.major_condition_issue_photos
and not deal_in_db.major_condition_issue_evidence_s3_url
):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
try:
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
local_file = hubspot_client.download_file_from_url(photo_url)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with db_read_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
)
return False
except Exception as e:
print(
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
)
# Continue without the file — don't crash the entire update
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
"""
with db_read_session() as session:
deal_id = deal_data.get("hs_object_id")
statement = select(HubspotDealData).where(
HubspotDealData.deal_id == deal_id
)
existing = session.exec(statement).first()
if existing:
print(f"🔄 Updating existing deal (deal_id={deal_id})")
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"landlord_property_id": listing.get("owner_property_id"),
"uprn": listing.get("national_uprn"),
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get(
"coordination_status__stage_1_"
),
"design_status": deal_data.get("retrofit_design_status"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
# Upload if photo exists but S3 link missing
if (
existing.major_condition_issue_photos
and not existing.major_condition_issue_evidence_s3_url
):
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id)
photo_url = fresh_deal.get("major_condition_issue_photos")
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(
photo_url
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
)
# Continue without the file — don't crash the update
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
session.add(existing)
session.commit()
session.refresh(existing)
return existing
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record = HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
landlord_property_id=listing.get("owner_property_id"),
uprn=listing.get("national_uprn"),
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get(
"major_condition_issue_photos"
),
coordination_status=deal_data.get("coordination_status__stage_1_"),
design_status=deal_data.get("retrofit_design_status"),
)
# Handle upload at insert time
if new_record.major_condition_issue_photos:
try:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
)
# Continue without the file — don't crash the insert
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record

View file

@ -1 +1 @@
hubspot-api-client
hubspot-api-client

116
etl/hubspot/s3_uploader.py Normal file
View file

@ -0,0 +1,116 @@
import os
import boto3
from botocore.exceptions import ClientError
from urllib.parse import urlparse
from datetime import datetime
import requests
class S3Uploader:
"""
Simple helper to upload local files to S3 and return their S3 HTTPS URI.
"""
def __init__(
self,
aws_access_key: str = "AKIAU5A36PPNK7RXX52V",
aws_secret_key: str = "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",
region: str = "eu-west-2",
):
self.aws_access_key = aws_access_key
self.aws_secret_key = aws_secret_key
self.region = region
self.s3 = boto3.client(
"s3",
aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_key,
region_name=self.region,
)
def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str:
"""
Upload a local file to an S3 bucket and return its HTTPS URI.
Args:
file_path (str): Path to the local file.
bucket (str): S3 bucket name.
prefix (str): Folder/prefix in the bucket.
Returns:
str: HTTPS-style S3 URI (not signed).
"""
try:
filename = os.path.basename(file_path)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
s3_key = os.path.join(prefix, f"{timestamp}_{filename}")
self.s3.upload_file(file_path, bucket, s3_key)
s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}"
return s3_uri
except ClientError as e:
raise RuntimeError(f"❌ S3 upload failed: {e}")
def print_bucket(self):
print(self.s3.head_bucket(Bucket="retrofit-data-dev"))
def generate_presigned_url(
self, bucket: str, key: str, expires_in: int = 3600
) -> str:
"""
Generate a temporary presigned URL for an S3 object.
"""
try:
return self.s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expires_in,
)
except ClientError as e:
raise RuntimeError(f"❌ Failed to generate signed URL: {e}")
def download_from_url(
self, s3_url: str, local_dir: str = ".", expires_in: int = 3600
) -> str:
"""
Download a file from a public or private S3 URL.
If private, generates a presigned URL first.
Args:
s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt)
local_dir (str): Folder to save the file in.
expires_in (int): Presigned URL lifetime (seconds).
Returns:
str: Local file path of the downloaded file.
"""
parsed = urlparse(s3_url)
host_parts = parsed.netloc.split(".")
if len(host_parts) < 3 or host_parts[1] != "s3":
raise ValueError("❌ Not a valid S3 HTTPS URL")
bucket = host_parts[0]
key = parsed.path.lstrip("/")
# Generate presigned URL (whether public or private)
presigned_url = self.generate_presigned_url(bucket, key, expires_in)
filename = os.path.basename(key)
local_path = os.path.join(local_dir, filename)
try:
response = requests.get(presigned_url, stream=True)
response.raise_for_status()
os.makedirs(local_dir, exist_ok=True)
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Downloaded: {local_path}")
return local_path
except requests.exceptions.RequestException as e:
raise RuntimeError(f"❌ Failed to download file: {e}")

View file

@ -0,0 +1,15 @@
Input:
<Hubspot Deal ID>
Function:
<Add hubspot deal/update to hubspot_deal_data>
Used in:
when changes are made in hubspot, this will trigger a workflow in make.
This in turn will trigger this sqs which I'm building from this directory

View file

View file

@ -0,0 +1,38 @@
FROM public.ecr.aws/lambda/python:3.10
# FROM python:3.11.10-bullseye
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV DB_HOST=${DEV_DB_HOST}
ENV DB_PORT=${DEV_DB_PORT}
ENV DB_NAME=${DEV_DB_NAME}
# Set working directory (Lambda task root)
WORKDIR /var/task
# -----------------------------
# Copy requirements FIRST (for Docker layer caching)
# -----------------------------
COPY etl/hubspot/scripts/scraper/handler/requirements.txt .
# Install dependencies into Lambda runtime
RUN pip install --no-cache-dir -r requirements.txt
# Copy necessary files for database and utility imports
COPY backend/ backend/
COPY utils/ utils/
COPY datatypes/ datatypes/
COPY etl/hubspot etl/hubspot
# Copy the handler
COPY etl/hubspot/scripts/scraper/main.py .
# -----------------------------
# Lambda handler
# -----------------------------
CMD ["main.handler"]

View file

@ -0,0 +1,12 @@
pandas==2.2.2
numpy<2.0
requests
tqdm
openpyxl
epc-api-python==1.0.2
boto3==1.35.44
sqlmodel
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
pydantic-settings==2.6.0
hubspot-api-client

View file

@ -0,0 +1,11 @@
version: "3.9"
services:
hubspot-scraper:
build:
context: ../../../../../
dockerfile: etl/hubspot/scripts/scraper/handler/Dockerfile
ports:
- "9000:8080"
env_file:
- ../../../../../.env

View file

@ -0,0 +1,28 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9000"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"hubspot_deal_id": "254427203793",
}
)
}
]
}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -0,0 +1,2 @@
docker compose build --no-cache
docker compose up --force-recreate

View file

@ -0,0 +1,45 @@
"""
TODO:
1) [completed]Get hubspot deal properties from one deal
2) Put it in some class
3) [completed] Load the db and check if upsert it into the table
4) Getting working on a AWS lambda
5) [completed] subtask and tasks history
6) The new sexy deal properties, move it over
"""
from backend.utils.subtasks import subtask_handler
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from typing import Any
@subtask_handler()
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
if local is True:
body = {
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"hubspot_deal_id": "254427203793",
}
hubspot_deal_id = body.get("hubspot_deal_id", "")
if hubspot_deal_id == "":
raise RuntimeError(
"Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'"
)
hubspot = HubspotClient()
dbloader = HubspotDataToDb()
deal = dbloader.find_deal_with_deal_id(hubspot_deal_id)
if deal:
dbloader.update_deal(deal, hubspot)
else:
deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id)
dbloader.upsert_hubspot_deal(deal, company, listing, hubspot)
print("Finsihed running")