mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-08 11:17:29 +00:00
Merge pull request #98 from Hestia-Homes/feature/hubspot_sync
Feature/hubspot sync
This commit is contained in:
commit
623c110cfe
26 changed files with 755 additions and 593 deletions
|
|
@ -16,7 +16,7 @@ RUN apt update && apt install -y --no-install-recommends \
|
|||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create the user and grant sudo privileges
|
||||
RUN useradd -m -s /usr/bin/bash ${USER} \
|
||||
RUN useradd -m -s /bin/bash ${USER} \
|
||||
&& echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \
|
||||
&& chmod 0440 /etc/sudoers.d/${USER}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ services:
|
|||
env_file:
|
||||
- ../.db-env
|
||||
volumes:
|
||||
- postgres-data:/var/lib/postgresql/data
|
||||
- postgres-data-two:/var/lib/postgresql/data
|
||||
networks:
|
||||
- survey-net
|
||||
|
||||
|
|
@ -46,5 +46,5 @@ networks:
|
|||
driver: bridge
|
||||
|
||||
volumes:
|
||||
postgres-data:
|
||||
postgres-data-two:
|
||||
|
||||
|
|
|
|||
30
.github/workflows/hubspot_abri_sync.yml
vendored
Normal file
30
.github/workflows/hubspot_abri_sync.yml
vendored
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
name: Hubspot Sync Abri
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 6 * * 1' # Every Monday at 06:00 UTC
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
surveyed-needs-sign-off:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install poetry
|
||||
poetry install --no-root
|
||||
|
||||
- name: Run scripts
|
||||
env:
|
||||
PYTHONPATH: ${{ github.workspace }}
|
||||
run: |
|
||||
pwd
|
||||
ls -la
|
||||
poetry run python etl/hubSpotClient/scripts/hubspot_update_abri_script.py
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
"""add company id in hubspot data and rename table
|
||||
|
||||
Revision ID: 20c418a7d5ec
|
||||
Revises: e72e15f7e0c3
|
||||
Create Date: 2025-10-27 16:20:11.362657
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '20c418a7d5ec'
|
||||
down_revision: Union[str, None] = 'e72e15f7e0c3'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
import sqlmodel
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('hubspot_deal_data',
|
||||
sa.Column('id', sa.Uuid(), nullable=False),
|
||||
sa.Column('deal_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
||||
sa.Column('dealname', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('dealstage', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('company_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('landlord_property_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('uprn', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('outcome', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('outcome_notes', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column(
|
||||
'created_at',
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.text('(CURRENT_TIMESTAMP AT TIME ZONE \'UTC\')'),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_hubspot_deal_data_deal_id'), 'hubspot_deal_data', ['deal_id'], unique=False)
|
||||
op.drop_index('ix_hubspot_data_deal_id', table_name='hubspot_data')
|
||||
op.drop_table('hubspot_data')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('hubspot_data',
|
||||
sa.Column('id', sa.UUID(), autoincrement=False, nullable=False),
|
||||
sa.Column('deal_id', sa.VARCHAR(), autoincrement=False, nullable=False),
|
||||
sa.Column('dealname', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('dealstage', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('landlord_property_id', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('uprn', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('outcome', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('outcome_notes', sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||
sa.Column('created_at', postgresql.TIMESTAMP(timezone=True), server_default=sa.text("(CURRENT_TIMESTAMP AT TIME ZONE 'UTC'::text)"), autoincrement=False, nullable=False),
|
||||
sa.Column('updated_at', postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=True),
|
||||
sa.PrimaryKeyConstraint('id', name='hubspot_data_pkey')
|
||||
)
|
||||
op.create_index('ix_hubspot_data_deal_id', 'hubspot_data', ['deal_id'], unique=False)
|
||||
op.drop_index(op.f('ix_hubspot_deal_data_deal_id'), table_name='hubspot_deal_data')
|
||||
op.drop_table('hubspot_deal_data')
|
||||
# ### end Alembic commands ###
|
||||
32
alembic/versions/23a4e2cc5467_added_project_code.py
Normal file
32
alembic/versions/23a4e2cc5467_added_project_code.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
"""added project code
|
||||
|
||||
Revision ID: 23a4e2cc5467
|
||||
Revises: 20c418a7d5ec
|
||||
Create Date: 2025-10-27 16:23:16.984274
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
import sqlmodel
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '23a4e2cc5467'
|
||||
down_revision: Union[str, None] = '20c418a7d5ec'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('hubspot_deal_data', sa.Column('project_code', sqlmodel.sql.sqltypes.AutoString(), nullable=True))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column('hubspot_deal_data', 'project_code')
|
||||
# ### end Alembic commands ###
|
||||
48
alembic/versions/2409147995c5_added_hubspot_table.py
Normal file
48
alembic/versions/2409147995c5_added_hubspot_table.py
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
"""added hubspot table
|
||||
|
||||
Revision ID: 2409147995c5
|
||||
Revises: 4c67501b7451
|
||||
Create Date: 2025-10-27 15:05:01.552689
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
import sqlmodel
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '2409147995c5'
|
||||
down_revision: Union[str, None] = '4c67501b7451'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
'hubspot_data',
|
||||
sa.Column('id', sa.Uuid(), nullable=False),
|
||||
sa.Column('deal_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
||||
sa.Column('dealname', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('dealstage', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('landlord_property_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('uprn', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('outcome', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('outcome_notes', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('raw_data', postgresql.JSON(astext_type=sa.Text()), nullable=True),
|
||||
sa.Column(
|
||||
'created_at',
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.text('(CURRENT_TIMESTAMP AT TIME ZONE \'UTC\')'),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_hubspot_data_deal_id'), table_name='hubspot_data')
|
||||
op.drop_table('hubspot_data')
|
||||
# ### end Alembic commands ###
|
||||
46
alembic/versions/57c0dc06cd25_add_company_info.py
Normal file
46
alembic/versions/57c0dc06cd25_add_company_info.py
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
"""add company info
|
||||
|
||||
Revision ID: 57c0dc06cd25
|
||||
Revises: 23a4e2cc5467
|
||||
Create Date: 2025-10-27 20:25:27.686455
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
import sqlmodel
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '57c0dc06cd25'
|
||||
down_revision: Union[str, None] = '23a4e2cc5467'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('hubspot_company_data',
|
||||
sa.Column('id', sa.Uuid(), nullable=False),
|
||||
sa.Column('company_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
||||
sa.Column('company_name', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column('group_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||
sa.Column(
|
||||
'created_at',
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.text('(CURRENT_TIMESTAMP AT TIME ZONE \'UTC\')'),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_hubspot_company_data_company_id'), 'hubspot_company_data', ['company_id'], unique=False)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_hubspot_company_data_company_id'), table_name='hubspot_company_data')
|
||||
op.drop_table('hubspot_company_data')
|
||||
# ### end Alembic commands ###
|
||||
57
alembic/versions/66d2c9c325d6_auto_update.py
Normal file
57
alembic/versions/66d2c9c325d6_auto_update.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
"""auto update
|
||||
|
||||
Revision ID: 66d2c9c325d6
|
||||
Revises: 57c0dc06cd25
|
||||
Create Date: 2025-10-28 11:58:28.356864
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '66d2c9c325d6'
|
||||
down_revision: Union[str, None] = '57c0dc06cd25'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema safely."""
|
||||
# 1️⃣ Fill existing NULLs with current UTC time
|
||||
op.execute("UPDATE hubspot_company_data SET updated_at = NOW() AT TIME ZONE 'utc' WHERE updated_at IS NULL;")
|
||||
op.execute("UPDATE hubspot_deal_data SET updated_at = NOW() AT TIME ZONE 'utc' WHERE updated_at IS NULL;")
|
||||
|
||||
# 2️⃣ Now alter the column defaults and nullability
|
||||
op.alter_column(
|
||||
'hubspot_company_data',
|
||||
'updated_at',
|
||||
existing_type=sa.TIMESTAMP(timezone=True),
|
||||
server_default=sa.text("NOW() AT TIME ZONE 'utc'"),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
op.alter_column(
|
||||
'hubspot_deal_data',
|
||||
'updated_at',
|
||||
existing_type=sa.TIMESTAMP(timezone=True),
|
||||
server_default=sa.text("NOW() AT TIME ZONE 'utc'"),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.alter_column('hubspot_deal_data', 'updated_at',
|
||||
existing_type=postgresql.TIMESTAMP(timezone=True),
|
||||
server_default=None,
|
||||
nullable=True)
|
||||
op.alter_column('hubspot_company_data', 'updated_at',
|
||||
existing_type=postgresql.TIMESTAMP(timezone=True),
|
||||
server_default=None,
|
||||
nullable=True)
|
||||
# ### end Alembic commands ###
|
||||
34
alembic/versions/e72e15f7e0c3_delete_raw_data.py
Normal file
34
alembic/versions/e72e15f7e0c3_delete_raw_data.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
"""delete raw_data
|
||||
|
||||
Revision ID: e72e15f7e0c3
|
||||
Revises: 2409147995c5
|
||||
Create Date: 2025-10-27 15:31:20.870827
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'e72e15f7e0c3'
|
||||
down_revision: Union[str, None] = '2409147995c5'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_index(op.f('ix_hubspot_data_deal_id'), 'hubspot_data', ['deal_id'], unique=False)
|
||||
op.drop_column('hubspot_data', 'raw_data')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('hubspot_data', sa.Column('raw_data', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True))
|
||||
op.drop_index(op.f('ix_hubspot_data_deal_id'), table_name='hubspot_data')
|
||||
# ### end Alembic commands ###
|
||||
|
|
@ -9,6 +9,7 @@ class Settings(BaseSettings):
|
|||
env_file = ".env" # Load from an optional .env file
|
||||
|
||||
settings = Settings()
|
||||
# engine to the dabatase, currently set up to connect via settings. database
|
||||
engine = create_engine(settings.DATABASE_URL) if settings.DATABASE_URL else None
|
||||
|
||||
|
||||
|
|
@ -19,4 +20,5 @@ def get_db_session():
|
|||
|
||||
def init_db():
|
||||
if engine:
|
||||
# Links SQLModel and metadata defined in sqlmodel instance
|
||||
SQLModel.metadata.create_all(engine)
|
||||
|
|
@ -1,136 +1,136 @@
|
|||
from etl.hubSpotClient.hubspot import HubSpotClient, DealStage
|
||||
from etl.surveyPrice.surveyPrice import SurveyPrice
|
||||
from etl.surveyedData.surveryedData import surveyedDataProcessor
|
||||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller
|
||||
from etl.db.db import get_db_session, init_db
|
||||
import pandas as pd
|
||||
from etl.db.db import get_db_session, init_db
|
||||
from etl.utils.utils import get_sharepoint_path
|
||||
from etl.models.topLevel import HubspotDealData, HubspotCommpanyData
|
||||
from sqlmodel import select
|
||||
|
||||
class HubspotTodb():
|
||||
def __init__(self):
|
||||
init_db()
|
||||
self.hubspot = HubSpotClient()
|
||||
self.deals_in_hubspot = None
|
||||
self.data_in_sharepoint = []
|
||||
self.sp = SurveyPrice()
|
||||
|
||||
def get_all_deals(self):
|
||||
self.deals_in_hubspot = self.sp.get_all_surveys_from_hubspot()
|
||||
return self.deals_in_hubspot
|
||||
|
||||
|
||||
def get_sharepoint_scraper(self, installer):
|
||||
sp = None
|
||||
if installer.upper() == "J & J CRUMP":
|
||||
sp = SharePointScraper(SharePointInstaller.JJC)
|
||||
elif installer.upper() == "SCIS":
|
||||
sp = SharePointScraper(SharePointInstaller.SOUTH_COAST_INSULATION)
|
||||
elif installer.upper() == "SGEC":
|
||||
sp = SharePointScraper(SharePointInstaller.SGEC)
|
||||
else:
|
||||
sp = None
|
||||
def new_record_to_hubspot_data(self, deal_data, company, listing):
|
||||
print("This has been depreciated using new interface")
|
||||
self.upsert_hubspot_deal(self, deal_data, company, listing)
|
||||
|
||||
return sp
|
||||
|
||||
def create_files_locally(self, sp, path, address):
|
||||
address_paths = {}
|
||||
file_names_to_download = {}
|
||||
avoid = [".jpg",".mov", ".JPG", ".heic", ".HEIC", ".png", ".PNG", ".jpeg", ".JPEG", ".mov", ".MOV", ".mp4", ".MP4"]
|
||||
|
||||
|
||||
microsoft_graph_data = sp.get_folders_in_path(path)
|
||||
for file in microsoft_graph_data['value']:
|
||||
if 'file' in file:
|
||||
if any(file["name"].endswith(ext) for ext in avoid):
|
||||
continue
|
||||
file_names_to_download.update({file["name"]: file['@microsoft.graph.downloadUrl']})
|
||||
|
||||
each_file = []
|
||||
for file_name, url in file_names_to_download.items():
|
||||
content = sp.get_file_content(url)
|
||||
file_path = sp.create_temp_file(content, f"{address}/{file_name}")
|
||||
each_file.append(file_path)
|
||||
address_paths.update({address: each_file})
|
||||
return address_paths
|
||||
|
||||
def string_to_installer(self, installer):
|
||||
if installer.upper() == "J & J CRUMP":
|
||||
return SharePointInstaller.JJC
|
||||
elif installer.upper() == "SCIS":
|
||||
return SharePointInstaller.SOUTH_COAST_INSULATION
|
||||
elif installer.upper() == "SGEC":
|
||||
return SharePointInstaller.SGEC
|
||||
else:
|
||||
return None
|
||||
|
||||
def work_out_invoice(self, row):
|
||||
survey = self.gather_data_from_sharepoint_url(row)
|
||||
installer = self.string_to_installer(row["HUBSPOT_INSTALLER"])
|
||||
survey_pd = pd.DataFrame([self.sp.survey_to_pandas_format(surveyInfo=survey, installer=installer)])
|
||||
hubspot_data = pd.DataFrame([row])
|
||||
merged_df = self.sp.merge_hub_spot_and_survey_information_from_sharepoint_url(hubspot_data, survey_pd)
|
||||
return self.sp.calculate_all_price(merged_df)
|
||||
|
||||
|
||||
|
||||
# self.sp.calculate_one_price_with_sharepoint_url(row, )
|
||||
|
||||
def gather_data_from_sharepoint_url(self, row):
|
||||
sp = self.get_sharepoint_scraper(row["HUBSPOT_INSTALLER"])
|
||||
path = get_sharepoint_path(row["HUBSPOT_SHAREPOINT_PATH"])
|
||||
data_loc = self.create_files_locally(sp, path, row["HUBSPOT_DEAL_ADDRESS"])
|
||||
|
||||
for add, file_loc in data_loc.items():
|
||||
sdp = surveyedDataProcessor(add, file_loc)
|
||||
sdp.hubspot_deal_id = row["HUBSPOT_DEAL_ID"]
|
||||
with get_db_session() as session:
|
||||
self.load_one_pre_site_note(session, sdp, row)
|
||||
return sdp
|
||||
|
||||
|
||||
def gather_data_from_each_sharepoint(self):
|
||||
self.get_all_deals()
|
||||
for _, row in self.deals_in_hubspot.iterrows():
|
||||
sp = self.get_sharepoint_scraper(row["HUBSPOT_INSTALLER"])
|
||||
path = self.get_sharepoint_path(row["HUBSPOT_SHAREPOINT_PATH"])
|
||||
data_loc = self.create_files_locally(sp, path, row["HUBSPOT_DEAL_ADDRESS"])
|
||||
|
||||
for add, file_loc in data_loc.items():
|
||||
sdp = surveyedDataProcessor(add, file_loc)
|
||||
sdp.hubspot_deal_id = row["HUBSPOT_DEAL_ID"]
|
||||
self.data_in_sharepoint.append(sdp)
|
||||
|
||||
|
||||
def load_all(self, fast=False):
|
||||
if fast is False:
|
||||
self.gather_data_from_each_sharepoint()
|
||||
def new_record_company(self, company_data):
|
||||
"""
|
||||
Adds a new records to the hubspot_compnay_data table
|
||||
"""
|
||||
|
||||
with get_db_session() as session:
|
||||
self.load_all_pre_site_note(session)
|
||||
new_record = HubspotCommpanyData(
|
||||
company_id=company_data.get("hs_object_id"),
|
||||
company_name=company_data.get("name")
|
||||
)
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
return new_record
|
||||
|
||||
def find_all_deals_with_company_id(self, company_id):
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
"""
|
||||
Returns a list of records that have a company_id from the hubspot_deal_data table
|
||||
"""
|
||||
with get_db_session() as session:
|
||||
results = (
|
||||
session.query(HubspotDealData)
|
||||
.filter(HubspotDealData.company_id == company_id)
|
||||
.all()
|
||||
)
|
||||
return results
|
||||
|
||||
def update_deal(self, deal_in_db, hubspot_client):
|
||||
"""
|
||||
Checks if a deal needs updating and updates it in the db with the latest information.
|
||||
Performs soft assertions for field-level consistency between DB and HubSpot data.
|
||||
"""
|
||||
def soft_assert(condition, message="Assertion Failed"):
|
||||
if not condition:
|
||||
print(f"⚠️ Soft Assert Failed: {message}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def load_one_pre_site_note(self, db_session, surveyedData, hubspot_data):
|
||||
df = hubspot_data
|
||||
assessor = surveyedData.load_assessor_table(db_session)
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
|
||||
# Loads the pre site summary information
|
||||
summary_info = surveyedData.load_pre_site_notes_summary_table(db_session)
|
||||
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(deal_in_db.deal_id)
|
||||
|
||||
property_description = surveyedData.load_property_description(db_session)
|
||||
results = [
|
||||
soft_assert(deal_in_db.deal_id == hs_deal.get("hs_object_id"),
|
||||
"deal_id mismatch"),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id,
|
||||
"company_id mismatch"),
|
||||
soft_assert(deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
|
||||
"landlord_property_id mismatch"),
|
||||
soft_assert(deal_in_db.outcome == hs_deal.get("outcome"),
|
||||
"outcome mismatch"),
|
||||
soft_assert(deal_in_db.dealstage == hs_deal.get("dealstage"),
|
||||
"dealstage mismatch"),
|
||||
soft_assert(deal_in_db.dealname == hs_deal.get("dealname"),
|
||||
"dealname mismatch"),
|
||||
soft_assert(deal_in_db.project_code == hs_deal.get("project_code"),
|
||||
"project_code mismatch"),
|
||||
soft_assert(deal_in_db.uprn == hs_listing.get("national_uprn"),
|
||||
"uprn mismatch"),
|
||||
soft_assert(deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
|
||||
"outcome_notes mismatch"),
|
||||
]
|
||||
|
||||
# Creates the a final pre site note table that links all information
|
||||
presitenote = surveyedData.create_pre_site_note_table(db_session, assessor, summary_info, property_description)
|
||||
# if any of the soft asserts failed
|
||||
if not all(results):
|
||||
print(f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — database may need updating.")
|
||||
self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing)
|
||||
return False
|
||||
else:
|
||||
print(f"✅ All checks passed for deal_id {deal_in_db.deal_id}. No need to update.")
|
||||
return True
|
||||
|
||||
def upsert_hubspot_deal(self, deal_data, company, listing):
|
||||
"""
|
||||
Inserts a new record or updates an existing record in hubspot_deal_data.
|
||||
Uses deal_id (hs_object_id) as the unique identifier.
|
||||
"""
|
||||
with get_db_session() as session:
|
||||
deal_id = deal_data.get("hs_object_id")
|
||||
|
||||
building_table = surveyedData.create_buildings_table(
|
||||
db_session,
|
||||
df["HUBSPOT_LANDLORD_ID"],
|
||||
df["HUBSPOT_DOMNA_ID"],
|
||||
)
|
||||
documents = surveyedData.create_document_table_via_pre_site_note(db_session, presitenote, assessor, building_table)
|
||||
# Use SQLModel's modern query style
|
||||
statement = select(HubspotDealData).where(HubspotDealData.deal_id == deal_id)
|
||||
existing = session.exec(statement).first()
|
||||
|
||||
def load_all_pre_site_note(self, db_session):
|
||||
# Loads all pre
|
||||
for surveyedData in self.data_in_sharepoint:
|
||||
self.load_one_pre_site_note(surveyedData=surveyedData, db_session=db_session)
|
||||
if existing:
|
||||
print(f"🔄 Updating existing deal (deal_id={deal_id})")
|
||||
|
||||
existing.dealname = deal_data.get("dealname", existing.dealname)
|
||||
existing.dealstage = deal_data.get("dealstage", existing.dealstage)
|
||||
existing.landlord_property_id = listing.get("owner_property_id", existing.landlord_property_id)
|
||||
existing.uprn = listing.get("national_uprn", existing.uprn)
|
||||
existing.outcome = deal_data.get("outcome", existing.outcome)
|
||||
existing.outcome_notes = deal_data.get("outcome_notes", existing.outcome_notes)
|
||||
existing.project_code = deal_data.get("project_code", existing.project_code)
|
||||
existing.company_id = company or existing.company_id
|
||||
|
||||
session.add(existing)
|
||||
session.commit()
|
||||
session.refresh(existing)
|
||||
return existing
|
||||
|
||||
else:
|
||||
print(f"🆕 Inserting new deal (deal_id={deal_id})")
|
||||
|
||||
new_record = HubspotDealData(
|
||||
deal_id=deal_id,
|
||||
dealname=deal_data.get("dealname"),
|
||||
dealstage=deal_data.get("dealstage"),
|
||||
landlord_property_id=listing.get("owner_property_id"),
|
||||
uprn=listing.get("national_uprn"),
|
||||
outcome=deal_data.get("outcome"),
|
||||
outcome_notes=deal_data.get("outcome_notes"),
|
||||
project_code=deal_data.get("project_code"),
|
||||
company_id=company,
|
||||
)
|
||||
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
return new_record
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from bs4 import BeautifulSoup
|
|||
from openpyxl import Workbook
|
||||
from openpyxl.styles import Font
|
||||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller, previous_monday
|
||||
from etl.hubSpotClient.hubspot import HubSpotClient, DealStage
|
||||
from etl.hubSpotClient.hubspotClient import HubSpotClient, DealStage
|
||||
from collections import defaultdict
|
||||
import time
|
||||
# Auth credentials
|
||||
|
|
|
|||
|
|
@ -1,326 +0,0 @@
|
|||
import hubspot
|
||||
from enum import Enum
|
||||
from hubspot.crm.deals import PublicObjectSearchRequest
|
||||
from hubspot.crm.deals.models import SimplePublicObjectInput
|
||||
from etl.hubSpotClient.types import SubmissionInfoFromDeal
|
||||
import time
|
||||
from pydantic import ValidationError
|
||||
from etl.utils.logger import Logger
|
||||
import logging
|
||||
import traceback
|
||||
from hubspot.crm.objects.notes import SimplePublicObjectInput as NoteInput
|
||||
from hubspot.crm.associations import BatchInputPublicAssociation, PublicAssociation
|
||||
import time
|
||||
|
||||
|
||||
class DealStage(Enum):
|
||||
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
|
||||
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
|
||||
CUSTOMER_CONTACTED = "888730834"
|
||||
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
|
||||
FILES_MISSING_FROM_ASSESSOR = "1887736000"
|
||||
|
||||
class HubSpotClient():
|
||||
def __init__(self):
|
||||
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
|
||||
self.client = hubspot.Client.create(access_token=self.access_token)
|
||||
self.logger = Logger(name='HubSpotClient', level=logging.INFO).get_logger()
|
||||
|
||||
def get_all_deals(self):
|
||||
return self.client.crm.deals.get_all()
|
||||
|
||||
def get_owner_name_from_id(self, owner_id):
|
||||
owner = self.client.crm.owners.owners_api.get_by_id(owner_id)
|
||||
time.sleep(1)
|
||||
first_name = owner.first_name or ""
|
||||
last_name = owner.last_name or ""
|
||||
return f"{first_name} {last_name}".strip()
|
||||
|
||||
def get_deal_name_by_id(self, deal_id):
|
||||
try:
|
||||
deal = self.client.crm.deals.basic_api.get_by_id(deal_id)
|
||||
time.sleep(1)
|
||||
return deal.properties.get("dealname", "No deal name")
|
||||
except Exception as e:
|
||||
return "Unknown Deal" # Fallback if the deal name is not found
|
||||
|
||||
|
||||
def get_listings_from_deals_id(self, deals_id):
|
||||
from hubspot.crm.objects import PublicObjectSearchRequest
|
||||
found_notes = []
|
||||
after = None
|
||||
while True:
|
||||
# Correct filter for notes associated with the given deal ID
|
||||
search_request = PublicObjectSearchRequest(
|
||||
filter_groups=[{
|
||||
"filters": [{
|
||||
"propertyName": "associations.deal", # Filter by association to the deal
|
||||
"operator": "EQ",
|
||||
"value": deals_id,
|
||||
}]
|
||||
}],
|
||||
properties=["domna_property_id", "owner_property_id", 'national_uprn'], # Properties of the note you need
|
||||
limit=200,
|
||||
after=after,
|
||||
)
|
||||
# Call the search API
|
||||
response = self.client.crm.objects.search_api.do_search(object_type="0-420", public_object_search_request=search_request)
|
||||
time.sleep(1)
|
||||
|
||||
# Add the results to the found_notes list
|
||||
found_notes.extend(response.results)
|
||||
|
||||
# Handle pagination if more results are available
|
||||
if not response.paging or not response.paging.next:
|
||||
break
|
||||
after = response.paging.next.after
|
||||
|
||||
if found_notes:
|
||||
return found_notes[0]
|
||||
return None
|
||||
|
||||
def get_domna_and_landlord_id(self, deals_id):
|
||||
data = self.get_listings_from_deals_id(deals_id)
|
||||
return data.properties['domna_property_id'], data.properties['owner_property_id'], data.properties.get('national_uprn', '') or ''
|
||||
|
||||
def get_notes_from_deals_id(self, deals_id):
|
||||
from hubspot.crm.objects import PublicObjectSearchRequest
|
||||
found_notes = []
|
||||
after = None
|
||||
while True:
|
||||
# Correct filter for notes associated with the given deal ID
|
||||
search_request = PublicObjectSearchRequest(
|
||||
filter_groups=[{
|
||||
"filters": [{
|
||||
"propertyName": "associations.deal", # Filter by association to the deal
|
||||
"operator": "EQ",
|
||||
"value": deals_id,
|
||||
}]
|
||||
}],
|
||||
properties=["hs_note_body", "hubspot_owner_id"], # Properties of the note you need
|
||||
limit=200,
|
||||
after=after,
|
||||
)
|
||||
# Call the search API
|
||||
response = self.client.crm.objects.search_api.do_search(object_type="notes", public_object_search_request=search_request)
|
||||
time.sleep(1)
|
||||
|
||||
# Add the results to the found_notes list
|
||||
found_notes.extend(response.results)
|
||||
|
||||
# Handle pagination if more results are available
|
||||
if not response.paging or not response.paging.next:
|
||||
break
|
||||
after = response.paging.next.after
|
||||
|
||||
all_notes = []
|
||||
for note in found_notes:
|
||||
# Extract note content and author information
|
||||
note_body = note.properties.get("hs_note_body", "No content")
|
||||
|
||||
# Collect note details in a dictionary
|
||||
all_notes.append({
|
||||
"note_id": note.id,
|
||||
"note": note_body,
|
||||
"created_at": note.created_at.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
})
|
||||
return all_notes
|
||||
|
||||
|
||||
def get_all_deals_from_stage_id(self, stage_id):
|
||||
found_deals = []
|
||||
after = None
|
||||
while True:
|
||||
search_request = PublicObjectSearchRequest(
|
||||
filter_groups=[{
|
||||
"filters": [{
|
||||
"propertyName": "dealstage",
|
||||
"operator": "EQ",
|
||||
"value": stage_id,
|
||||
}]
|
||||
}],
|
||||
properties=[
|
||||
"dealname",
|
||||
"amount",
|
||||
"hubspot_owner_id",
|
||||
],
|
||||
limit=200,
|
||||
after=after,
|
||||
)
|
||||
response = self.client.crm.deals.search_api.do_search(search_request)
|
||||
time.sleep(1)
|
||||
found_deals.extend(response.results)
|
||||
if not response.paging or not response.paging.next:
|
||||
break
|
||||
after = response.paging.next.after
|
||||
|
||||
all_deals = []
|
||||
for deal in found_deals:
|
||||
all_deals.append({
|
||||
"deal_id": deal.id,
|
||||
"value": deal.properties["amount"],
|
||||
"deal_owner": deal.properties.get("hubspot_owner_id"),
|
||||
})
|
||||
return all_deals
|
||||
|
||||
def get_associations_for_deal(self, deal_id, to_object_type):
|
||||
"""
|
||||
Returns a list of associated object IDs of type `to_object_type`
|
||||
(e.g. "contacts", "companies", "notes", etc.)
|
||||
"""
|
||||
assoc_resp = self.client.crm.deals.associations_api.get_all(
|
||||
deal_id=deal_id,
|
||||
to_object_type=to_object_type
|
||||
)
|
||||
return [assoc.id for assoc in assoc_resp.results]
|
||||
|
||||
def get_deals_from_deal_stage(self, deal_stage: DealStage):
|
||||
found_deals = []
|
||||
after = None
|
||||
while True:
|
||||
search_request = PublicObjectSearchRequest(
|
||||
filter_groups=[{
|
||||
"filters": [{
|
||||
"propertyName": "dealstage",
|
||||
"operator": "EQ",
|
||||
"value": deal_stage.value,
|
||||
}]
|
||||
}],
|
||||
properties=[
|
||||
"dealname",
|
||||
"number_of_wet_rooms_needing_ventilation",
|
||||
"work_type",
|
||||
"property_needs_trickle_vents",
|
||||
"domna_survey_post_sap",
|
||||
"existing_wall_insulation",
|
||||
"installer",
|
||||
"submission_folder",
|
||||
],
|
||||
limit=200,
|
||||
after=after,
|
||||
)
|
||||
response = self.client.crm.deals.search_api.do_search(search_request)
|
||||
found_deals.extend(response.results)
|
||||
if not response.paging or not response.paging.next:
|
||||
break
|
||||
after = response.paging.next.after
|
||||
|
||||
all_deals = []
|
||||
for i,deal in enumerate(found_deals):
|
||||
domna_id, landlord_id, uprn = self.get_domna_and_landlord_id(deal.id)
|
||||
try:
|
||||
deal_name = deal.properties['dealname']
|
||||
self.logger.info(f"Validating <{deal_name}>")
|
||||
# input(f"Press enter to verfiy <{deal_name}>")
|
||||
all_deals.append(SubmissionInfoFromDeal(
|
||||
deal_id= deal.properties["hs_object_id"],
|
||||
deal_name=deal.properties["dealname"],
|
||||
work_type=deal.properties["work_type"],
|
||||
needs_trickle_ventilation=True if deal.properties.get("property_needs_trickle_vents", "NO").upper() == "YES" else False,
|
||||
post_sap_score=int(deal.properties["domna_survey_post_sap"]),
|
||||
existing_wall_insulation=deal.properties.get("existing_wall_insulation") if deal.properties.get("existing_wall_insulation") else "None",
|
||||
no_of_wet_rooms=int(deal.properties["number_of_wet_rooms_needing_ventilation"]),
|
||||
installer=deal.properties["installer"],
|
||||
submission_folder_path = deal.properties["submission_folder"],
|
||||
landlord_id = landlord_id,
|
||||
domna_id = domna_id,
|
||||
uprn = uprn,
|
||||
))
|
||||
except Exception as e:
|
||||
def format_error_note(e):
|
||||
note_text = "⚠️ <b>Automated Verification Failed:</b><br><br>"
|
||||
|
||||
if hasattr(e, "errors") and callable(e.errors):
|
||||
note_text += "❌ <b>Validation Errors:</b><br>"
|
||||
for error in e.errors():
|
||||
loc = error.get('loc', 'N/A')
|
||||
msg = error.get('msg', 'N/A')
|
||||
error_type = error.get('type', 'N/A')
|
||||
error_input = error.get('input', 'N/A')
|
||||
|
||||
note_text += (
|
||||
f"• <b>Field:</b> <code>{loc}</code><br>"
|
||||
f" - <b>Message:</b> {msg}<br>"
|
||||
f" - <b>Type:</b> {error_type}<br>"
|
||||
f" - <b>Input:</b> {error_input}<br><br>"
|
||||
)
|
||||
else:
|
||||
note_text += (
|
||||
"❗ <b>Non-validation error:</b><br>"
|
||||
f"<pre>{str(e)}</pre><br>"
|
||||
)
|
||||
|
||||
note_text += (
|
||||
"🛠️ Please review this error and take necessary actions.<br>"
|
||||
"Contact <b>Jun-te</b> if help is needed: <b>+44 7519 530 549</b> or via Teams."
|
||||
)
|
||||
|
||||
return note_text
|
||||
|
||||
deal_id = deal.properties['hs_object_id']
|
||||
if hasattr(e, "errors"):
|
||||
for error in e.errors():
|
||||
self.add_note_to_deal(deal_id, format_error_note(e))
|
||||
else:
|
||||
self.logger.error(f"Non-validation error occurred: {str(e)}", exc_info=True)
|
||||
self.logger.info(f"Deal name <{deal_name}> moving to 'needs additional information'")
|
||||
self.move_deals_to_different_stage([deal_id], DealStage.FILES_MISSING_FROM_ASSESSOR.value)
|
||||
return all_deals
|
||||
|
||||
def print_all_pipeline_ids(self):
|
||||
pipelines = self.client.crm.pipelines.pipelines_api.get_all(object_type="deals")
|
||||
for pipeline in pipelines.results:
|
||||
print(f"Pipeline: {pipeline.label}")
|
||||
for stage in pipeline.stages:
|
||||
print(f" - Label: {stage.label}")
|
||||
print(f" ID: {stage.id}")
|
||||
|
||||
def move_deals_to_different_stage(self, list_of_deals_id, to_stage_id):
|
||||
deal_properties = SimplePublicObjectInput(
|
||||
properties={
|
||||
"dealstage": to_stage_id
|
||||
}
|
||||
)
|
||||
for deal_id in list_of_deals_id:
|
||||
self.client.crm.deals.basic_api.update(
|
||||
deal_id,
|
||||
simple_public_object_input=deal_properties
|
||||
)
|
||||
self.logger.info(f"Deal {deal_id} moved to stage with ID {to_stage_id}.")
|
||||
|
||||
|
||||
|
||||
def add_note_to_deal(self, deal_id, note_text):
|
||||
try:
|
||||
# Generate current time in milliseconds since epoch
|
||||
hs_timestamp = int(time.time() * 1000)
|
||||
|
||||
# Step 1: Create the note with hs_timestamp
|
||||
note = NoteInput(
|
||||
properties={
|
||||
"hs_note_body": note_text,
|
||||
"hs_timestamp": hs_timestamp # Required field in your HubSpot setup
|
||||
}
|
||||
)
|
||||
created_note = self.client.crm.objects.notes.basic_api.create(note)
|
||||
note_id = created_note.id
|
||||
|
||||
# Step 2: Associate the note to the deal
|
||||
association = PublicAssociation(
|
||||
_from=note_id,
|
||||
to=deal_id,
|
||||
type="note_to_deal"
|
||||
)
|
||||
|
||||
self.client.crm.associations.batch_api.create(
|
||||
'notes',
|
||||
'deals',
|
||||
batch_input_public_association=BatchInputPublicAssociation(
|
||||
inputs=[association]
|
||||
)
|
||||
)
|
||||
|
||||
self.logger.info(f"📝 Note added to deal {deal_id}: {note_text}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to add note to deal {deal_id}: {e}", exc_info=True)
|
||||
145
etl/hubSpotClient/hubspotClient.py
Normal file
145
etl/hubSpotClient/hubspotClient.py
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
import hubspot
|
||||
from enum import Enum
|
||||
from etl.utils.logger import Logger
|
||||
import logging
|
||||
from hubspot.crm.associations import ApiException
|
||||
|
||||
class Companies(Enum):
|
||||
ABRI = "237615001799"
|
||||
|
||||
class DealStage(Enum):
|
||||
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
|
||||
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
|
||||
CUSTOMER_CONTACTED = "888730834"
|
||||
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
|
||||
FILES_MISSING_FROM_ASSESSOR = "1887736000"
|
||||
|
||||
class Pipeline(Enum):
|
||||
OPERATIONS_SOCIAL_HOUSING = "1167582403"
|
||||
|
||||
class HubSpotClient():
|
||||
def __init__(self):
|
||||
self.access_token = "pat-eu1-064f7f5c-a7d8-4d93-a9b2-b604da6164a6"
|
||||
self.client = hubspot.Client.create(access_token=self.access_token)
|
||||
self.logger = Logger(name='HubSpotClient', level=logging.INFO).get_logger()
|
||||
self.all_deals = None
|
||||
|
||||
def get_all_deals(self):
|
||||
self.all_deals = self.client.crm.deals.get_all()
|
||||
return self.all_deals
|
||||
|
||||
def get_deal_ids_by_pipeline(self, pipeline_id):
|
||||
"""
|
||||
Get all deal IDs associated with a given pipeline.
|
||||
"""
|
||||
if self.all_deals is None:
|
||||
self.get_all_deals()
|
||||
|
||||
# Filter deals where properties['pipeline'] matches the given pipeline_id
|
||||
filtered_deals = [
|
||||
deal for deal in self.all_deals
|
||||
if deal.properties["pipeline"] == str(pipeline_id)
|
||||
]
|
||||
|
||||
# Extract and return only the deal IDs
|
||||
deal_ids = [deal.id for deal in filtered_deals]
|
||||
|
||||
return deal_ids
|
||||
|
||||
def from_deal_get_associated_company_id(self, deal_id: str):
|
||||
"""
|
||||
Get the associated company ID from a given deal ID.
|
||||
Returns the associated company ID, or None if not found.
|
||||
"""
|
||||
try:
|
||||
associations_api = self.client.crm.associations.v4.basic_api
|
||||
|
||||
# Fetch associations for this specific deal only
|
||||
response = associations_api.get_page(
|
||||
object_type="deals",
|
||||
object_id=deal_id,
|
||||
to_object_type="companies",
|
||||
limit=1 # Expect only one associated company
|
||||
)
|
||||
|
||||
if not response.results:
|
||||
self.logger.info(f"No company association found for deal {deal_id}")
|
||||
return None
|
||||
|
||||
company_id = response.results[0].to_object_id
|
||||
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
|
||||
return company_id
|
||||
|
||||
except ApiException as e:
|
||||
self.logger.error(f"Error fetching associated company for deal {deal_id}: {e}")
|
||||
return None
|
||||
|
||||
def from_deal_get_associated_listing(self, deal_id: str):
|
||||
"""
|
||||
Get the associated listing information for a given deal.
|
||||
Returns a dictionary of listing properties, or None if not found.
|
||||
"""
|
||||
associations_api = self.client.crm.associations.v4.basic_api
|
||||
listings_api = self.client.crm.objects.basic_api # works for custom objects like "listing"
|
||||
|
||||
# Fetch associated listing(s)
|
||||
response = associations_api.get_page(
|
||||
object_type="deals",
|
||||
object_id=deal_id,
|
||||
to_object_type="0-420", # <-- use your exact custom object name slug here
|
||||
limit=1
|
||||
)
|
||||
|
||||
if not response.results:
|
||||
self.logger.info(f"No listing association found for deal {deal_id}")
|
||||
return None
|
||||
|
||||
listing_id = response.results[0].to_object_id
|
||||
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
|
||||
|
||||
# Fetch listing details (the "listing information")
|
||||
listing = listings_api.get_by_id(
|
||||
object_type="0-420", # again, must match your HubSpot object name
|
||||
object_id=listing_id,
|
||||
properties=[
|
||||
"national_uprn",
|
||||
"domna_property_id",
|
||||
"owner_property_id",
|
||||
]
|
||||
)
|
||||
|
||||
listing_info = listing.properties
|
||||
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
|
||||
return listing_info
|
||||
|
||||
def from_deal_get_info(self, deal_id):
|
||||
deal = self.client.crm.deals.basic_api.get_by_id(deal_id,
|
||||
properties=[
|
||||
'dealname',
|
||||
'dealstage',
|
||||
'outcome', #outcome,
|
||||
'outcome_notes', #outcome notes
|
||||
'project_code'
|
||||
]
|
||||
)
|
||||
|
||||
return deal.properties
|
||||
|
||||
def get_deal_info_for_db(self, deal_id):
|
||||
deal = self.from_deal_get_info(deal_id)
|
||||
company = self.from_deal_get_associated_company_id(deal_id)
|
||||
listing = self.from_deal_get_associated_listing(deal_id)
|
||||
|
||||
return deal, company, listing
|
||||
|
||||
|
||||
def get_company_information(self, company_id):
|
||||
company = self.client.crm.companies.basic_api.get_by_id(
|
||||
company_id,
|
||||
properties=[
|
||||
'name',
|
||||
]
|
||||
)
|
||||
company_info = company.properties
|
||||
return company_info
|
||||
|
||||
15
etl/hubSpotClient/hubspot_types.py
Normal file
15
etl/hubSpotClient/hubspot_types.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
# from sqlmodel import Field, SQLModel
|
||||
# from sqlalchemy import Column
|
||||
# from sqlalchemy.dialects.postgresql import UUID
|
||||
# import uuid
|
||||
# from pydantic import Field
|
||||
|
||||
|
||||
|
||||
# class BaseModel(SQLModel):
|
||||
# id: uuid.UUID = Field(
|
||||
# default_factory=uuid.uuid4,
|
||||
# sa_column=Column(UUID(as_uuid=True), primary_key=True)
|
||||
# )
|
||||
|
||||
|
||||
45
etl/hubSpotClient/scripts/hubspot_abri_etl_first_time.py
Normal file
45
etl/hubSpotClient/scripts/hubspot_abri_etl_first_time.py
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline
|
||||
from tqdm import tqdm
|
||||
from etl.db.hubSpotLoad import HubspotTodb
|
||||
|
||||
'''
|
||||
# TODO:
|
||||
get one deal from db, from db
|
||||
for avri only so far
|
||||
add it to the db
|
||||
show in frontend
|
||||
'''
|
||||
|
||||
# get ALL deals
|
||||
hubspot = HubSpotClient()
|
||||
|
||||
# All deals from a pipeline_id via filter
|
||||
deals = hubspot.get_deal_ids_by_pipeline(
|
||||
pipeline_id=Pipeline.OPERATIONS_SOCIAL_HOUSING.value,
|
||||
)
|
||||
|
||||
# deals from companies we care about
|
||||
valueable_deals = [
|
||||
Companies.ABRI.value
|
||||
]
|
||||
deals_to_add = []
|
||||
|
||||
|
||||
deal_to_companies = {}
|
||||
loader = HubspotTodb()
|
||||
# Get all deals we care about
|
||||
for i,deal in enumerate(tqdm(deals)):
|
||||
company = hubspot.from_deal_get_associated_company_id(deal)
|
||||
if company in valueable_deals:
|
||||
deals_to_add.append(deal)
|
||||
deal_to_companies.update({deal: company})
|
||||
deal_data = hubspot.from_deal_get_info(deal_id=deal)
|
||||
listing_data = hubspot.from_deal_get_associated_listing(deal_id=deal)
|
||||
loader.new_record_to_hubspot_data(deal_data, deal_to_companies[deal], listing_data)
|
||||
|
||||
|
||||
|
||||
#TODO check if database has abri data
|
||||
# make companies table
|
||||
# make a scrip that updates table
|
||||
|
||||
15
etl/hubSpotClient/scripts/hubspot_company.py
Normal file
15
etl/hubSpotClient/scripts/hubspot_company.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline
|
||||
from tqdm import tqdm
|
||||
from etl.db.hubSpotLoad import HubspotTodb
|
||||
|
||||
hubspot = HubSpotClient()
|
||||
|
||||
# All deals from a pipeline_id via filter
|
||||
company = hubspot.get_company_information(Companies.ABRI.value)
|
||||
|
||||
loader = HubspotTodb()
|
||||
loader.new_record_company(company)
|
||||
|
||||
|
||||
# make a scrip that updates table, with khalim scoping
|
||||
|
||||
23
etl/hubSpotClient/scripts/hubspot_update_abri_script.py
Normal file
23
etl/hubSpotClient/scripts/hubspot_update_abri_script.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
from etl.hubSpotClient.hubspotClient import HubSpotClient, Companies, Pipeline
|
||||
from tqdm import tqdm
|
||||
from etl.db.hubSpotLoad import HubspotTodb
|
||||
|
||||
hubspot = HubSpotClient()
|
||||
db = HubspotTodb()
|
||||
|
||||
records = db.find_all_deals_with_company_id(Companies.ABRI.value)
|
||||
|
||||
updated_count = 0 # Counter for deals that needed updating
|
||||
checked_count = 0 # Optional: total processed counter
|
||||
|
||||
for deal in tqdm(records, desc="Checking HubSpot deals"):
|
||||
checked_count += 1
|
||||
was_up_to_date = db.update_deal(deal, hubspot)
|
||||
|
||||
# update_deal() returns False when discrepancies are found
|
||||
if not was_up_to_date:
|
||||
updated_count += 1
|
||||
|
||||
print(f"\n✅ Finished checking {checked_count} deals.")
|
||||
print(f"🧩 {updated_count} deal(s) were updated.")
|
||||
print(f"📈 {checked_count - updated_count} deal(s) were already up to date.")
|
||||
|
|
@ -1,114 +0,0 @@
|
|||
from sqlmodel import Field, SQLModel
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
import uuid
|
||||
from pydantic import Field, field_validator, model_validator
|
||||
from etl.utils.utils import get_sharepoint_path
|
||||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller
|
||||
from etl.surveyedData.surveryedData import surveyedDataProcessor
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def string_to_installer(installer):
|
||||
if installer.upper() == "J & J CRUMP":
|
||||
return SharePointInstaller.JJC
|
||||
elif installer.upper() == "SCIS":
|
||||
return SharePointInstaller.SOUTH_COAST_INSULATION
|
||||
elif installer.upper() == "SGEC":
|
||||
return SharePointInstaller.JJC
|
||||
elif installer.upper() == "WARM FRONT":
|
||||
return SharePointInstaller.BAXTER_KELLY
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class BaseModel(SQLModel):
|
||||
id: uuid.UUID = Field(
|
||||
default_factory=uuid.uuid4,
|
||||
sa_column=Column(UUID(as_uuid=True), primary_key=True)
|
||||
)
|
||||
|
||||
|
||||
class SubmissionInfoFromDeal(BaseModel):
|
||||
deal_id: str = Field(..., min_length=1)
|
||||
deal_name: str = Field(..., min_length=1)
|
||||
work_type: str = Field(..., min_length=1)
|
||||
needs_trickle_ventilation: bool
|
||||
post_sap_score: int
|
||||
existing_wall_insulation: str = Field(..., min_length=1)
|
||||
no_of_wet_rooms: int
|
||||
installer: str = Field(..., min_length=1)
|
||||
submission_folder_path: str = Field(..., min_length=1)
|
||||
landlord_id: str = Field(..., min_length=1)
|
||||
domna_id: str = Field(..., min_length=1)
|
||||
uprn: str
|
||||
|
||||
@field_validator('post_sap_score', 'no_of_wet_rooms')
|
||||
@classmethod
|
||||
def must_be_non_negative(cls, v, info):
|
||||
if v < 0:
|
||||
raise ValueError(f"{info.field_name} must be non-negative")
|
||||
return v
|
||||
|
||||
@model_validator(mode="after")
|
||||
def check_sharepoint_link_and_contents(self):
|
||||
try:
|
||||
path = get_sharepoint_path(self.submission_folder_path)
|
||||
installer = string_to_installer(self.installer)
|
||||
sp = SharePointScraper(installer)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error accessing SharePoint path: {self.submission_folder_path}. Error: {str(e)}")
|
||||
|
||||
try:
|
||||
try:
|
||||
files = sp.get_folders_in_path(path)
|
||||
if files.get("value"):
|
||||
pass
|
||||
except Exception as e:
|
||||
print("Trying SGEC")
|
||||
sp = SharePointScraper(SharePointInstaller.SGEC)
|
||||
files = sp.get_folders_in_path(path)
|
||||
if files.get("value"):
|
||||
pass
|
||||
else:
|
||||
raise ValueError(f"[SharePoint Folder Empty] Folder has no contents after multiple attempts: {self.submission_folder_path}")
|
||||
except Exception as e:
|
||||
raise ValueError(f"[Folder Access Error] {str(e)}")
|
||||
|
||||
# download files in url and check files are there:
|
||||
try:
|
||||
|
||||
files = sp.download_files_from_path(path)
|
||||
print(files)
|
||||
sdp = surveyedDataProcessor("fake address", files)
|
||||
missing_items = []
|
||||
|
||||
if sdp.condition_report is None:
|
||||
missing_items.append("Condition Report")
|
||||
|
||||
if sdp.epr_with_data is None:
|
||||
missing_items.append("EPR Energy report with data")
|
||||
|
||||
if sdp.rd_sap_xml is None:
|
||||
missing_items.append("RDSAP XML")
|
||||
|
||||
if sdp.lig_sap_xml is None:
|
||||
missing_items.append("LIG SAP XML")
|
||||
|
||||
if sdp.epr_summary_information is None:
|
||||
missing_items.append("EPR Summary information")
|
||||
|
||||
if missing_items:
|
||||
raise ValueError(f"Missing required items: {', '.join(missing_items)}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
return self
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -13,7 +13,7 @@ os.environ["JJC_SERVICE_SHAREPOINT_ID"] = "7fdd0485-bbf3-4b29-b30f-98c81c2a6284"
|
|||
os.environ["SGEC_SERVICE_SHAREPOINT_ID"] = "52018e5c-3215-4fe4-a4e3-bbf0d0aa7cd9"
|
||||
|
||||
|
||||
from etl.hubSpotClient.hubspot import DealStage, HubSpotClient
|
||||
from etl.hubSpotClient.hubspotClient import DealStage, HubSpotClient
|
||||
# Local development
|
||||
os.environ["DATABASE_URL"] = "postgresql://postgres:makingwarmhomes@db:5432/postgres"
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from etl.hubSpotClient.hubspot import HubSpotClient, DealStage
|
||||
from etl.hubSpotClient.hubspotClient import HubSpotClient, DealStage
|
||||
import pandas as pd
|
||||
from etl.jjc_old_lewis_manual_way_ import get_jjc_price_matrix, work_out_total_floor_area, type_of_work, get_band
|
||||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller, WEEK_COMMENCING
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from sqlmodel import Field, SQLModel, Relationship, text
|
||||
from sqlmodel import Field, SQLModel, Relationship, text, Column
|
||||
import uuid
|
||||
from typing import Optional, List
|
||||
from datetime import datetime
|
||||
|
|
@ -7,40 +7,19 @@ from sqlalchemy import Enum as SAEnum
|
|||
from sqlalchemy import Column
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from etl.fileReader.reportType import ReportType
|
||||
from sqlalchemy import DateTime
|
||||
from sqlalchemy.dialects.postgresql import JSON
|
||||
from sqlalchemy import Text
|
||||
from sqlalchemy import Text, DateTime
|
||||
from sqlalchemy.sql import func
|
||||
from enum import Enum
|
||||
from sqlalchemy import Column
|
||||
|
||||
|
||||
|
||||
|
||||
class BaseModel(SQLModel):
|
||||
# Generate a fresh Column per table (no shared Column instance)
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
||||
|
||||
# class Buildings(BaseModel, table=True):
|
||||
# address: str
|
||||
# postcode: str
|
||||
# UPRN: str
|
||||
# landlord_id: str
|
||||
# domna_id: str
|
||||
|
||||
# documents: List["Documents"] = Relationship(back_populates="building")
|
||||
|
||||
# class Documents(BaseModel, table=True):
|
||||
# assessor_id: uuid.UUID = Field(
|
||||
# foreign_key="assessorinfo.id",
|
||||
# nullable=False
|
||||
# )
|
||||
# author: Optional["AssessorInfo"] = Relationship(back_populates="documents")
|
||||
# created_at: datetime
|
||||
# document_type: ReportType
|
||||
|
||||
# building_id: uuid.UUID = Field(foreign_key="buildings.id", nullable=False)
|
||||
# building: Optional["Buildings"] = Relationship(back_populates="documents")
|
||||
|
||||
# target_table: str
|
||||
# target_id: uuid.UUID
|
||||
|
||||
class ReportType(str, Enum):
|
||||
QUIDOS_PRESITE_NOTE = "QUIDOS_PRESITE_NOTE"
|
||||
|
|
@ -82,3 +61,66 @@ class uploaded_files(BaseModel, table=True):
|
|||
)
|
||||
|
||||
uprn: str = Field(index=True)
|
||||
|
||||
|
||||
class HubspotDealData(SQLModel, table=True):
|
||||
__tablename__ = "hubspot_deal_data"
|
||||
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
||||
|
||||
# HubSpot Deal identifiers
|
||||
deal_id: str = Field(index=True, nullable=False)
|
||||
dealname: Optional[str] = Field(default=None)
|
||||
dealstage: Optional[str] = Field(default=None)
|
||||
company_id: Optional[str] = Field(default=None)
|
||||
project_code: Optional[str] = Field(default=None)
|
||||
|
||||
# HubSpot custom properties
|
||||
landlord_property_id: Optional[str] = Field(default=None)
|
||||
uprn: Optional[str] = Field(default=None)
|
||||
outcome: Optional[str] = Field(default=None)
|
||||
outcome_notes: Optional[str] = Field(default=None)
|
||||
|
||||
created_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("NOW() AT TIME ZONE 'utc'"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
||||
updated_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("NOW() AT TIME ZONE 'utc'"),
|
||||
onupdate=func.now(),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
||||
class HubspotCommpanyData(SQLModel, table=True):
|
||||
__tablename__ = "hubspot_company_data"
|
||||
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
||||
|
||||
# HubSpot Deal identifiers
|
||||
company_id: str = Field(index=True, nullable=False)
|
||||
company_name: Optional[str] = Field(default=None)
|
||||
group_id: Optional[str] = Field(default=None)
|
||||
|
||||
created_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("NOW() AT TIME ZONE 'utc'"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
||||
updated_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("NOW() AT TIME ZONE 'utc'"),
|
||||
onupdate=func.now(),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ os.environ["SHAREPOINT_TENANT_ID"] = "c3f7519c-2719-4547-af04-6da6cbfd8f8f"
|
|||
os.environ["SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID"] = "b5a51507-9427-4ee0-b03e-90ec7681e2d3"
|
||||
os.environ["JJC_SERVICE_SHAREPOINT_ID"] = "7fdd0485-bbf3-4b29-b30f-98c81c2a6284"
|
||||
|
||||
from etl.hubSpotClient.hubspot import DealStage, HubSpotClient
|
||||
from etl.hubSpotClient.hubspotClient import DealStage, HubSpotClient
|
||||
from etl.surveyedData.surveryedData import surveyedDataProcessor
|
||||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller
|
||||
from etl.utils.utils import get_sharepoint_path
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from etl.scraper.scraper import SharePointScraper, SharePointInstaller, previous_monday
|
||||
from etl.hubSpotClient.hubspot import HubSpotClient, DealStage
|
||||
from etl.hubSpotClient.hubspotClient import HubSpotClient, DealStage
|
||||
from etl.surveyedData.surveryedData import surveyedDataProcessor
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
#poetry run alembic revision --autogenerate -m "added more enums"
|
||||
#poetry run alembic revision --autogenerate -m "auto update"
|
||||
|
||||
poetry run alembic upgrade head
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue