Model/etl/hubspot/scripts/scraper/main.py
2026-04-09 16:25:44 +00:00

113 lines
3.9 KiB
Python

import json
import boto3
from typing import Any, Dict, Optional
from backend.app.config import get_settings
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer
from etl.hubspot.hubspot_trigger_orchestrator_trigger_request import (
HubspotTriggerOrchestratorTriggerRequest,
)
from backend.utils.subtasks import task_handler
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from utils.logger import setup_logger
logger = setup_logger()
@task_handler()
def handler(body: dict[str, Any], context: Any) -> None:
db_client = HubspotDataToDb()
hubspot_client = HubspotClient()
sqs_client = boto3.client("sqs")
PASHUB_TRIGGER_QUEUE_URL = get_settings().PASHUB_TO_ARA_SQS_URL
payload = HubspotTriggerOrchestratorTriggerRequest.model_validate(body)
hubspot_deal_id: str = payload.hubspot_deal_id
db_deal: Optional[HubspotDealData] = db_client.find_deal_with_deal_id(
hubspot_deal_id
)
hubspot_deal: Dict[str, str]
company: Optional[str]
listing: Optional[dict[str, str]]
hubspot_deal, company, listing = hubspot_client.get_deal_and_company_and_listing(
hubspot_deal_id
)
deal_changed = False
if not db_deal:
# New hubspot deal, no diffing to do
logger.info(f"New HubSpot deal of ID {hubspot_deal_id}. Loading to database...")
if company:
company_data: CompanyData = hubspot_client.get_company_information(company)
db_client: HubspotDataToDb = HubspotDataToDb()
db_client.upsert_organisation(company_data)
db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client)
else:
# Deal already in db, check whether anything has changed
logger.info(
f"HubSpot deal {hubspot_deal_id} already in database. Checking for changes..."
)
if HubspotDealDiffer.check_for_db_update_trigger(
new_deal=hubspot_deal,
new_company=company,
new_listing=listing,
old_deal=db_deal,
):
logger.info(
f"Deal {hubspot_deal_id} has been changed, updating database..."
)
db_client.upsert_deal(
deal_data=hubspot_deal,
company=company,
listing=listing,
hubspot_client=hubspot_client,
)
deal_changed = True
if not deal_changed:
logger.info(f"No changes to deal {hubspot_deal_id}")
return
# ==============================
# Orchestration of other lambdas
# ==============================
if HubspotDealDiffer.check_for_pashub_trigger(
new_deal=hubspot_deal, old_deal=db_deal
):
logger.info(
f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
message_body: Dict[str, Optional[str]] = {
"pashub_link": hubspot_deal["pashub_link"],
"address": None, # potentially available from Listing, leave as None for now
"sharepoint_link": hubspot_deal["sharepoint_link"],
"uprn": hubspot_deal["national_uprn"],
"landlord_property_id": hubspot_deal["owner_property_id"],
"deal_stage": hubspot_deal["deal_stage"],
}
response = sqs_client.send_message(
QueueUrl=PASHUB_TRIGGER_QUEUE_URL, MessageBody=json.dumps(message_body)
)
logger.info(
f"Sent message to Pashub To Ara queue. MessageId: {response['MessageId']}"
)
else:
logger.info(
f"Not Triggering PasHub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
print("done")
if __name__ == "__main__":
handler({"hubspot_deal_id": "371470706915"}, "")
print("beep")