import json import boto3 from typing import Any, Dict, Optional from backend.app.config import get_settings from etl.hubspot.hubspotClient import HubspotClient from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer from etl.hubspot.hubspot_trigger_orchestrator_trigger_request import ( HubspotTriggerOrchestratorTriggerRequest, ) from backend.utils.subtasks import task_handler from backend.app.db.models.hubspot_deal_data import HubspotDealData from utils.logger import setup_logger logger = setup_logger() @task_handler() def handler(body: dict[str, Any], context: Any) -> None: db_client = HubspotDataToDb() hubspot_client = HubspotClient() sqs_client = boto3.client("sqs") payload = HubspotTriggerOrchestratorTriggerRequest.model_validate(body) hubspot_deal_id: str = payload.hubspot_deal_id db_deal: Optional[HubspotDealData] = db_client.find_deal_with_deal_id( hubspot_deal_id ) hubspot_deal: Dict[str, str] company: Optional[str] listing: Optional[dict[str, str]] hubspot_deal, company, listing = hubspot_client.get_deal_and_company_and_listing( hubspot_deal_id ) deal_changed = False if not db_deal: logger.info(f"New HubSpot deal of ID {hubspot_deal_id}. Loading to database...") if company: company_data: CompanyData = hubspot_client.get_company_information(company) db_client: HubspotDataToDb = HubspotDataToDb() db_client.upsert_organisation(company_data) db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client) # ============================== # Orchestration of other lambdas # ============================== if hubspot_deal["pashub_link"]: logger.info( f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}" ) _trigger_pashub_fetcher(sqs_client, hubspot_deal) else: # Deal already in db, check whether anything has changed logger.info( f"HubSpot deal {hubspot_deal_id} already in database. Checking for changes..." ) if HubspotDealDiffer.check_for_db_update_trigger( new_deal=hubspot_deal, new_company=company, new_listing=listing, old_deal=db_deal, ): logger.info( f"Deal {hubspot_deal_id} has been changed, updating database..." ) db_client.upsert_deal( deal_data=hubspot_deal, company=company, listing=listing, hubspot_client=hubspot_client, ) deal_changed = True if not deal_changed: logger.info(f"No changes to deal {hubspot_deal_id}") return # ============================== # Orchestration of other lambdas # ============================== if HubspotDealDiffer.check_for_pashub_trigger( new_deal=hubspot_deal, old_deal=db_deal ): logger.info( f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}" ) _trigger_pashub_fetcher(sqs_client, hubspot_deal) else: logger.info( f"Not Triggering PasHub file fetcher for HubSpot deal ID {hubspot_deal_id}" ) print("done") def _trigger_pashub_fetcher(sqs_client: Any, hubspot_deal: Dict[str, str]) -> None: message_body: Dict[str, Optional[str]] = { "pashub_link": hubspot_deal["pashub_link"], "address": None, # potentially available from Listing, leave as None for now "sharepoint_link": hubspot_deal.get("sharepoint_link", None), "uprn": hubspot_deal.get("national_uprn", None), "landlord_property_id": hubspot_deal.get("owner_property_id", None), "deal_stage": hubspot_deal.get("deal_stage", None), } response = sqs_client.send_message( QueueUrl=get_settings().PASHUB_TO_ARA_SQS_URL, MessageBody=json.dumps(message_body), ) logger.info( f"Sent message to Pashub To Ara queue. MessageId: {response['MessageId']}" ) if __name__ == "__main__": handler({"hubspot_deal_id": "498926855369"}, "") print("beep")