hubspot sync

This commit is contained in:
Jun-te Kim 2025-10-28 12:18:41 +00:00
parent 15af734289
commit 5cc963c8b6
9 changed files with 162 additions and 103 deletions

View file

@ -24,7 +24,7 @@ services:
env_file:
- ../.db-env
volumes:
- postgres-data:/var/lib/postgresql/data
- postgres-data-two:/var/lib/postgresql/data
networks:
- survey-net
@ -46,5 +46,5 @@ networks:
driver: bridge
volumes:
postgres-data:
postgres-data-two:

View file

@ -9,6 +9,7 @@ class Settings(BaseSettings):
env_file = ".env" # Load from an optional .env file
settings = Settings()
# engine to the dabatase, currently set up to connect via settings. database
engine = create_engine(settings.DATABASE_URL) if settings.DATABASE_URL else None
@ -19,4 +20,5 @@ def get_db_session():
def init_db():
if engine:
# Links SQLModel and metadata defined in sqlmodel instance
SQLModel.metadata.create_all(engine)

View file

@ -1,31 +1,21 @@
from etl.db.db import get_db_session, init_db
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
from sqlmodel import select
class HubspotTodb():
def __init__(self):
init_db()
def new_record_to_hubspot_data(self, deal_data, company, listing):
with get_db_session() as session:
new_record = HubspotDealData(
deal_id=deal_data.get("hs_object_id"),
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
landlord_property_id=listing.get("owner_property_id"),
uprn=listing.get("national_uprn"),
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id = company,
)
print("This has been depreciated using new interface")
self.upsert_hubspot_deal(self, deal_data, company, listing)
# Add and commit the record
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def new_record_company(self, company_data):
"""
Adds a new records to the hubspot_compnay_data table
"""
with get_db_session() as session:
new_record = HubspotCommpanyData(
company_id=company_data.get("hs_object_id"),
@ -34,4 +24,113 @@ class HubspotTodb():
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
return new_record
def find_all_deals_with_company_id(self, company_id):
"""
Returns a list of records that have a company_id from the hubspot_deal_data table
"""
"""
Returns a list of records that have a company_id from the hubspot_deal_data table
"""
with get_db_session() as session:
results = (
session.query(HubspotDealData)
.filter(HubspotDealData.company_id == company_id)
.all()
)
return results
def update_deal(self, deal_in_db, hubspot_client):
"""
Checks if a deal needs updating and updates it in the db with the latest information.
Performs soft assertions for field-level consistency between DB and HubSpot data.
"""
def soft_assert(condition, message="Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
return False
return True
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id)
results = [
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"),
"deal_id mismatch"),
soft_assert(deal_in_db.company_id == hs_company_id,
"company_id mismatch"),
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
"landlord_property_id mismatch"),
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"),
"outcome mismatch"),
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"),
"dealstage mismatch"),
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"),
"dealname mismatch"),
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"),
"project_code mismatch"),
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"),
"uprn mismatch"),
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
"outcome_notes mismatch"),
]
# if any of the soft asserts failed
if not all(results):
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.")
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing)
return False
else:
print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.")
return True
def upsert_hubspot_deal(self, deal_data, company, listing):
"""
Inserts a new record or updates an existing record in hubspot_deal_data.
Uses deal_id (hs_object_id) as the unique identifier.
"""
with get_db_session() as session:
deal_id = deal_data.get("hs_object_id")
# Use SQLModel's modern query style
statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id)
existing = session.exec(statement).first()
if existing:
print(f"🔄 Updating existing deal (deal_id={deal_id})")
existing.dealname = deal_data.get("dealname", existing.dealname)
existing.dealstage = deal_data.get("dealstage", existing.dealstage)
existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id)
existing.uprn = listing.get("national_uprn", existing.uprn)
existing.outcome = deal_data.get("outcome", existing.outcome)
existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes)
existing.project_code = deal_data.get("project_code", existing.project_code)
existing.company_id = company or existing.company_id
session.add(existing)
session.commit()
session.refresh(existing)
return existing
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record = HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
landlord_property_id=listing.get("owner_property_id"),
uprn=listing.get("national_uprn"),
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record

View file

@ -124,6 +124,14 @@ class HubSpotClient():
)
return deal.properties
def get_deal_info_for_db(self, deal_id):
deal = self.from_deal_get_info(deal_id)
company = self.from_deal_get_associated_company_id(deal_id)
listing = self.from_deal_get_associated_listing(deal_id)
return deal, company, listing
def get_company_information(self, company_id):
company = self.client.crm.companies.basic_api.get_by_id(
@ -133,4 +141,5 @@ class HubSpotClient():
]
)
company_info = company.properties
return company_info
return company_info

View file

@ -1,45 +0,0 @@
from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline
from tqdm import tqdm
from etl.db.hubSpotLoad import HubspotTodb
'''
# TODO:
get one deal from db, from db
for avri only so far
add it to the db
show in frontend
'''
# get ALL deals
hubspot = HubSpotClient()
# All deals from a pipeline_id via filter
deals = hubspot.get_deal_ids_by_pipeline(
pipeline_id=Pipeline.OPERATIONS_SOCIAL_HOUSING.value,
)
# deals from companies we care about
valueable_deals = [
Companies.ABRI.value
]
deals_to_add = []
deal_to_companies = {}
loader = HubspotTodb()
# Get all deals we care about
for i,deal in enumerate(tqdm(deals)):
company = hubspot.from_deal_get_associated_company_id(deal)
if company in valueable_deals:
deals_to_add.append(deal)
deal_to_companies.update({deal: company})
deal_data = hubspot.from_deal_get_info(deal_id=deal)
listing_data = hubspot.from_deal_get_associated_listing(deal_id=deal)
loader.new_record_to_hubspot_data(deal_data, deal_to_companies[deal], listing_data)
#TODO check if database has abri data
# make companies table
# make a scrip that updates table

View file

@ -1,18 +0,0 @@
from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline
from tqdm import tqdm
from etl.db.hubSpotLoad import HubspotTodb
hubspot = HubSpotClient()
# All deals from a pipeline_id via filter
company = hubspot.get_company_information(Companies.ABRI.value)
loader = HubspotTodb()
loader.new_record_company(company)
#TODO check if database has abri data
# make companies table
# make a scrip that updates table

View file

@ -1,15 +1,15 @@
from sqlmodel import Field, SQLModel
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
import uuid
from pydantic import Field
# from sqlmodel import Field, SQLModel
# from sqlalchemy import Column
# from sqlalchemy.dialects.postgresql import UUID
# import uuid
# from pydantic import Field
class BaseModel(SQLModel):
id: uuid.UUID = Field(
default_factory=uuid.uuid4,
sa_column=Column(UUID(as_uuid=True), primary_key=True)
)
# class BaseModel(SQLModel):
# id: uuid.UUID = Field(
# default_factory=uuid.uuid4,
# sa_column=Column(UUID(as_uuid=True), primary_key=True)
# )

View file

@ -1,4 +1,4 @@
from sqlmodel import Field, SQLModel, Relationship, text
from sqlmodel import Field, SQLModel, Relationship, text, Column
import uuid
from typing import Optional, List
from datetime import datetime
@ -7,13 +7,15 @@ from sqlalchemy import Enum as SAEnum
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from etl.fileReader.reportType import ReportType
from sqlalchemy import DateTime
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy import Text
from sqlalchemy import Text, DateTime
from sqlalchemy.sql import func
from enum import Enum
from sqlalchemy import Column
class BaseModel(SQLModel):
# Generate a fresh Column per table (no shared Column instance)
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
@ -87,8 +89,13 @@ class HubspotDealData(SQLModel, table=True):
)
)
updated_at: Optional[datetime] = Field(
sa_column=Column(DateTime(timezone=True), nullable=True)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("NOW() AT TIME ZONE 'utc'"),
onupdate=func.now(),
nullable=False,
)
)
class HubspotCommpanyData(SQLModel, table=True):
@ -109,6 +116,11 @@ class HubspotCommpanyData(SQLModel, table=True):
)
)
updated_at: Optional[datetime] = Field(
sa_column=Column(DateTime(timezone=True), nullable=True)
)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("NOW() AT TIME ZONE 'utc'"),
onupdate=func.now(),
nullable=False,
)
)

View file

@ -1,4 +1,4 @@
#poetry run alembic revision --autogenerate -m "add company info"
#poetry run alembic revision --autogenerate -m "auto update"
poetry run alembic upgrade head