From 5cc963c8b6f4a43ada99001dd500532dbabd86e4 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 28 Oct 2025 12:18:41 +0000 Subject: [PATCH] hubspot sync --- .devcontainer/docker-compose.yml | 4 +- etl/db/db.py | 2 + etl/db/hubSpotLoad.py | 135 +++++++++++++++--- etl/hubSpotClient/hubspotClient.py | 11 +- .../hubspot_abri_etl_first_time copy.py | 45 ------ etl/hubSpotClient/hubspot_company.py | 18 --- etl/hubSpotClient/hubspot_types.py | 20 +-- etl/models/topLevel.py | 28 ++-- migration_db.sh | 2 +- 9 files changed, 162 insertions(+), 103 deletions(-) delete mode 100644 etl/hubSpotClient/hubspot_abri_etl_first_time copy.py delete mode 100644 etl/hubSpotClient/hubspot_company.py diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index a0d477b..301ea1a 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -24,7 +24,7 @@ services: env_file: - ../.db-env volumes: - - postgres-data:/var/lib/postgresql/data + - postgres-data-two:/var/lib/postgresql/data networks: - survey-net @@ -46,5 +46,5 @@ networks: driver: bridge volumes: - postgres-data: + postgres-data-two: diff --git a/etl/db/db.py b/etl/db/db.py index 1fd7621..0bc1e6b 100644 --- a/etl/db/db.py +++ b/etl/db/db.py @@ -9,6 +9,7 @@ class Settings(BaseSettings): env_file = ".env" # Load from an optional .env file settings = Settings() +# engine to the dabatase, currently set up to connect via settings. database engine = create_engine(settings.DATABASE_URL) if settings.DATABASE_URL else None @@ -19,4 +20,5 @@ def get_db_session(): def init_db(): if engine: + # Links SQLModel and metadata defined in sqlmodel instance SQLModel.metadata.create_all(engine) \ No newline at end of file diff --git a/etl/db/hubSpotLoad.py b/etl/db/hubSpotLoad.py index 0582df6..4226446 100644 --- a/etl/db/hubSpotLoad.py +++ b/etl/db/hubSpotLoad.py @@ -1,31 +1,21 @@ from etl.db.db import get_db_session, init_db from etl.models.topLevel import HubspotDealData, HubspotCommpanyData +from sqlmodel import select class HubspotTodb(): def __init__(self): init_db() def new_record_to_hubspot_data(self, deal_data, company, listing): - with get_db_session() as session: - new_record = HubspotDealData( - deal_id=deal_data.get("hs_object_id"), - dealname=deal_data.get("dealname"), - dealstage=deal_data.get("dealstage"), - landlord_property_id=listing.get("owner_property_id"), - uprn=listing.get("national_uprn"), - outcome=deal_data.get("outcome"), - outcome_notes=deal_data.get("outcome_notes"), - project_code=deal_data.get("project_code"), - company_id = company, - ) + print("This has been depreciated using new interface") + self.upsert_hubspot_deal(self, deal_data, company, listing) - # Add and commit the record - session.add(new_record) - session.commit() - session.refresh(new_record) - return new_record def new_record_company(self, company_data): + """ + Adds a new records to the hubspot_compnay_data table + """ + with get_db_session() as session: new_record = HubspotCommpanyData( company_id=company_data.get("hs_object_id"), @@ -34,4 +24,113 @@ class HubspotTodb(): session.add(new_record) session.commit() session.refresh(new_record) - return new_record \ No newline at end of file + return new_record + + def find_all_deals_with_company_id(self, company_id): + """ + Returns a list of records that have a company_id from the hubspot_deal_data table + """ + """ + Returns a list of records that have a company_id from the hubspot_deal_data table + """ + with get_db_session() as session: + results = ( + session.query(HubspotDealData) + .filter(HubspotDealData.company_id == company_id) + .all() + ) + return results + + def update_deal(self, deal_in_db, hubspot_client): + """ + Checks if a deal needs updating and updates it in the db with the latest information. + Performs soft assertions for field-level consistency between DB and HubSpot data. + """ + def soft_assert(condition, message="Assertion Failed"): + if not condition: + print(f"⚠️ Soft Assert Failed: {message}") + return False + return True + + print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") + + hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id) + + results = [ + soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"), + "deal_id mismatch"), + soft_assert(deal_in_db.company_id == hs_company_id, + "company_id mismatch"), + soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), + "landlord_property_id mismatch"), + soft_assert(deal_in_db.outcome == hs_deal.get("outcome"), + "outcome mismatch"), + soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"), + "dealstage mismatch"), + soft_assert(deal_in_db.dealname == hs_deal.get("dealname"), + "dealname mismatch"), + soft_assert(deal_in_db.project_code == hs_deal.get("project_code"), + "project_code mismatch"), + soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"), + "uprn mismatch"), + soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), + "outcome_notes mismatch"), + ] + + # if any of the soft asserts failed + if not all(results): + print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.") + self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing) + return False + else: + print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.") + return True + + def upsert_hubspot_deal(self, deal_data, company, listing): + """ + Inserts a new record or updates an existing record in hubspot_deal_data. + Uses deal_id (hs_object_id) as the unique identifier. + """ + with get_db_session() as session: + deal_id = deal_data.get("hs_object_id") + + # Use SQLModel's modern query style + statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id) + existing = session.exec(statement).first() + + if existing: + print(f"🔄 Updating existing deal (deal_id={deal_id})") + + existing.dealname = deal_data.get("dealname", existing.dealname) + existing.dealstage = deal_data.get("dealstage", existing.dealstage) + existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id) + existing.uprn = listing.get("national_uprn", existing.uprn) + existing.outcome = deal_data.get("outcome", existing.outcome) + existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes) + existing.project_code = deal_data.get("project_code", existing.project_code) + existing.company_id = company or existing.company_id + + session.add(existing) + session.commit() + session.refresh(existing) + return existing + + else: + print(f"🆕 Inserting new deal (deal_id={deal_id})") + + new_record = HubspotDealData( + deal_id=deal_id, + dealname=deal_data.get("dealname"), + dealstage=deal_data.get("dealstage"), + landlord_property_id=listing.get("owner_property_id"), + uprn=listing.get("national_uprn"), + outcome=deal_data.get("outcome"), + outcome_notes=deal_data.get("outcome_notes"), + project_code=deal_data.get("project_code"), + company_id=company, + ) + + session.add(new_record) + session.commit() + session.refresh(new_record) + return new_record diff --git a/etl/hubSpotClient/hubspotClient.py b/etl/hubSpotClient/hubspotClient.py index f8809a0..822aff8 100644 --- a/etl/hubSpotClient/hubspotClient.py +++ b/etl/hubSpotClient/hubspotClient.py @@ -124,6 +124,14 @@ class HubSpotClient(): ) return deal.properties + + def get_deal_info_for_db(self, deal_id): + deal = self.from_deal_get_info(deal_id) + company = self.from_deal_get_associated_company_id(deal_id) + listing = self.from_deal_get_associated_listing(deal_id) + + return deal, company, listing + def get_company_information(self, company_id): company = self.client.crm.companies.basic_api.get_by_id( @@ -133,4 +141,5 @@ class HubSpotClient(): ] ) company_info = company.properties - return company_info \ No newline at end of file + return company_info + diff --git a/etl/hubSpotClient/hubspot_abri_etl_first_time copy.py b/etl/hubSpotClient/hubspot_abri_etl_first_time copy.py deleted file mode 100644 index 9dc4e4c..0000000 --- a/etl/hubSpotClient/hubspot_abri_etl_first_time copy.py +++ /dev/null @@ -1,45 +0,0 @@ -from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline -from tqdm import tqdm -from etl.db.hubSpotLoad import HubspotTodb - -''' -# TODO: - get one deal from db, from db - for avri only so far - add it to the db - show in frontend -''' - -# get ALL deals -hubspot = HubSpotClient() - -# All deals from a pipeline_id via filter -deals = hubspot.get_deal_ids_by_pipeline( - pipeline_id=Pipeline.OPERATIONS_SOCIAL_HOUSING.value, - ) - -# deals from companies we care about -valueable_deals = [ - Companies.ABRI.value -] -deals_to_add = [] - - -deal_to_companies = {} -loader = HubspotTodb() -# Get all deals we care about -for i,deal in enumerate(tqdm(deals)): - company = hubspot.from_deal_get_associated_company_id(deal) - if company in valueable_deals: - deals_to_add.append(deal) - deal_to_companies.update({deal: company}) - deal_data = hubspot.from_deal_get_info(deal_id=deal) - listing_data = hubspot.from_deal_get_associated_listing(deal_id=deal) - loader.new_record_to_hubspot_data(deal_data, deal_to_companies[deal], listing_data) - - - -#TODO check if database has abri data -# make companies table -# make a scrip that updates table - diff --git a/etl/hubSpotClient/hubspot_company.py b/etl/hubSpotClient/hubspot_company.py deleted file mode 100644 index 37c73ac..0000000 --- a/etl/hubSpotClient/hubspot_company.py +++ /dev/null @@ -1,18 +0,0 @@ -from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline -from tqdm import tqdm -from etl.db.hubSpotLoad import HubspotTodb - -hubspot = HubSpotClient() - -# All deals from a pipeline_id via filter -company = hubspot.get_company_information(Companies.ABRI.value) - -loader = HubspotTodb() -loader.new_record_company(company) - - - -#TODO check if database has abri data -# make companies table -# make a scrip that updates table - diff --git a/etl/hubSpotClient/hubspot_types.py b/etl/hubSpotClient/hubspot_types.py index f775215..2c79def 100644 --- a/etl/hubSpotClient/hubspot_types.py +++ b/etl/hubSpotClient/hubspot_types.py @@ -1,15 +1,15 @@ -from sqlmodel import Field, SQLModel -from sqlalchemy import Column -from sqlalchemy.dialects.postgresql import UUID -import uuid -from pydantic import Field +# from sqlmodel import Field, SQLModel +# from sqlalchemy import Column +# from sqlalchemy.dialects.postgresql import UUID +# import uuid +# from pydantic import Field -class BaseModel(SQLModel): - id: uuid.UUID = Field( - default_factory=uuid.uuid4, - sa_column=Column(UUID(as_uuid=True), primary_key=True) - ) +# class BaseModel(SQLModel): +# id: uuid.UUID = Field( +# default_factory=uuid.uuid4, +# sa_column=Column(UUID(as_uuid=True), primary_key=True) +# ) diff --git a/etl/models/topLevel.py b/etl/models/topLevel.py index ba963a8..8228d3a 100644 --- a/etl/models/topLevel.py +++ b/etl/models/topLevel.py @@ -1,4 +1,4 @@ -from sqlmodel import Field, SQLModel, Relationship, text +from sqlmodel import Field, SQLModel, Relationship, text, Column import uuid from typing import Optional, List from datetime import datetime @@ -7,13 +7,15 @@ from sqlalchemy import Enum as SAEnum from sqlalchemy import Column from sqlalchemy.dialects.postgresql import UUID from etl.fileReader.reportType import ReportType -from sqlalchemy import DateTime from sqlalchemy.dialects.postgresql import JSON -from sqlalchemy import Text +from sqlalchemy import Text, DateTime +from sqlalchemy.sql import func from enum import Enum from sqlalchemy import Column + + class BaseModel(SQLModel): # Generate a fresh Column per table (no shared Column instance) id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) @@ -87,8 +89,13 @@ class HubspotDealData(SQLModel, table=True): ) ) - updated_at: Optional[datetime] = Field( - sa_column=Column(DateTime(timezone=True), nullable=True) + updated_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("NOW() AT TIME ZONE 'utc'"), + onupdate=func.now(), + nullable=False, + ) ) class HubspotCommpanyData(SQLModel, table=True): @@ -109,6 +116,11 @@ class HubspotCommpanyData(SQLModel, table=True): ) ) - updated_at: Optional[datetime] = Field( - sa_column=Column(DateTime(timezone=True), nullable=True) - ) \ No newline at end of file + updated_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("NOW() AT TIME ZONE 'utc'"), + onupdate=func.now(), + nullable=False, + ) + ) diff --git a/migration_db.sh b/migration_db.sh index a65889a..5c596d4 100644 --- a/migration_db.sh +++ b/migration_db.sh @@ -1,4 +1,4 @@ -#poetry run alembic revision --autogenerate -m "add company info" +#poetry run alembic revision --autogenerate -m "auto update" poetry run alembic upgrade head