Merge pull request #964 from Hestia-Homes/feature/hubspot-trigger-orchestrator

Trigger PasHub to Ara Lambda from Hubspot to DB ETL
This commit is contained in:
Daniel Roth 2026-04-10 11:03:12 +01:00 committed by GitHub
commit d4d9b03511
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1040 additions and 573 deletions

View file

@ -505,7 +505,7 @@ jobs:
# Deploy Hubspot ETL Lambda
# ============================================================
hubspot_etl_lambda:
needs: [hubspot_etl_image, determine_stage]
needs: [hubspot_etl_image, determine_stage, pashub_to_ara_lambda]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: hubspot-etl-to-ara

View file

@ -38,6 +38,7 @@ class Settings(BaseSettings):
PLAN_TRIGGER_BUCKET: str = "changeme"
ENGINE_SQS_URL: str = "changeme"
CATEGORISATION_SQS_URL: str = "changeme"
PASHUB_TO_ARA_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str = "changeme"

View file

@ -0,0 +1,79 @@
import uuid
from sqlmodel import SQLModel, Field, Column, text
from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime
from sqlalchemy.sql import func
class HubspotDealData(SQLModel, table=True):
__tablename__ = "hubspot_deal_data"
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# HubSpot Deal identifiers
deal_id: str = Field(index=True, nullable=False)
dealname: Optional[str] = Field(default=None)
dealstage: Optional[str] = Field(default=None)
company_id: Optional[str] = Field(default=None)
project_code: Optional[str] = Field(default=None)
# HubSpot custom properties
landlord_property_id: Optional[str] = Field(default=None)
uprn: Optional[str] = Field(default=None)
outcome: Optional[str] = Field(default=None)
outcome_notes: Optional[str] = Field(default=None)
major_condition_issue_description: Optional[str] = Field(default=None)
major_condition_issue_photos: Optional[str] = Field(default=None)
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
coordination_status: Optional[str] = Field(default=None)
coordination_comments: Optional[str] = Field(default=None)
design_status: Optional[str] = Field(default=None)
listing_id: Optional[str] = Field(default=None)
pashub_link: Optional[str] = Field(default=None)
sharepoint_link: Optional[str] = Field(default=None)
dampmould_growth: Optional[str] = Field(default=None)
damp_mould_and_repairs_comments: Optional[str] = Field(default=None)
pre_sap: Optional[str] = Field(default=None)
coordinator: Optional[str] = Field(default=None)
mtp_completion_date: Optional[datetime] = Field(default=None)
mtp_re_model_completion_date: Optional[datetime] = Field(default=None)
ioe_v3_completion_date: Optional[datetime] = Field(default=None)
proposed_measures: Optional[str] = Field(default=None)
approved_package: Optional[str] = Field(default=None)
designer: Optional[str] = Field(default=None)
design_completion_date: Optional[datetime] = Field(default=None)
actual_measures_installed: Optional[str] = Field(default=None)
installer: Optional[str] = Field(default=None)
installer_handover: Optional[str] = Field(default=None)
lodgement_status: Optional[str] = Field(default=None)
measures_lodgement_date: Optional[datetime] = Field(default=None)
lodgement_date: Optional[datetime] = Field(default=None)
expected_commencement_date: Optional[datetime] = Field(default=None)
surveyor: Optional[str] = Field(default=None)
confirmed_survey_date: Optional[datetime] = Field(default=None)
confirmed_survey_time: Optional[str] = Field(default=None)
surveyed_date: Optional[datetime] = Field(default=None)
design_type: Optional[str] = Field(default=None)
created_at: Optional[datetime] = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
nullable=False,
),
default=func.now(),
) # Nullable in db but optional here as value is set on db save for new record
updated_at: Optional[datetime] = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
onupdate=func.now(),
nullable=False,
),
default=func.now(),
) # Nullable in db but optional here as value is set on db save for new record

View file

@ -1,9 +1,7 @@
from sqlmodel import SQLModel, Field, Column, text
import uuid
from sqlmodel import SQLModel, Field
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import DateTime
from sqlalchemy.sql import func
import uuid
class Organisation(SQLModel, table=True):
@ -13,74 +11,3 @@ class Organisation(SQLModel, table=True):
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
hubspot_company_id: Optional[str] = None
name: Optional[str] = None
class HubspotDealData(SQLModel, table=True):
__tablename__ = "hubspot_deal_data"
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# HubSpot Deal identifiers
deal_id: str = Field(index=True, nullable=False)
dealname: Optional[str] = Field(default=None)
dealstage: Optional[str] = Field(default=None)
company_id: Optional[str] = Field(default=None)
project_code: Optional[str] = Field(default=None)
# HubSpot custom properties
landlord_property_id: Optional[str] = Field(default=None)
uprn: Optional[str] = Field(default=None)
outcome: Optional[str] = Field(default=None)
outcome_notes: Optional[str] = Field(default=None)
major_condition_issue_description: Optional[str] = Field(default=None)
major_condition_issue_photos: Optional[str] = Field(default=None)
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
coordination_status: Optional[str] = Field(default=None)
coordination_comments: Optional[str] = Field(default=None)
design_status: Optional[str] = Field(default=None)
listing_id: Optional[str] = Field(default=None)
pashub_link: Optional[str] = Field(default=None)
sharepoint_link: Optional[str] = Field(default=None)
dampmould_growth: Optional[str] = Field(default=None)
damp_mould_and_repairs_comments: Optional[str] = Field(default=None)
pre_sap: Optional[str] = Field(default=None)
coordinator: Optional[str] = Field(default=None)
mtp_completion_date: Optional[datetime] = Field(default=None)
mtp_re_model_completion_date: Optional[datetime] = Field(default=None)
ioe_v3_completion_date: Optional[datetime] = Field(default=None)
proposed_measures: Optional[str] = Field(default=None)
approved_package: Optional[str] = Field(default=None)
designer: Optional[str] = Field(default=None)
design_completion_date: Optional[datetime] = Field(default=None)
actual_measures_installed: Optional[str] = Field(default=None)
installer: Optional[str] = Field(default=None)
installer_handover: Optional[str] = Field(default=None)
lodgement_status: Optional[str] = Field(default=None)
measures_lodgement_date: Optional[datetime] = Field(default=None)
lodgement_date: Optional[datetime] = Field(default=None)
expected_commencement_date: Optional[datetime] = Field(default=None)
surveyor: Optional[str] = Field(default=None)
confirmed_survey_date: Optional[datetime] = Field(default=None)
confirmed_survey_time: Optional[str] = Field(default=None)
surveyed_date: Optional[datetime] = Field(default=None)
design_type: Optional[str] = Field(default=None)
created_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
nullable=False,
)
)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
onupdate=func.now(),
nullable=False,
)
)

View file

@ -0,0 +1,6 @@
from typing import TypedDict
class CompanyData(TypedDict):
hs_object_id: str
name: str

View file

@ -26,10 +26,10 @@ from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTy
ForwardPaging as AssociationsPaging,
NextPage as AssociationsPagingNext,
)
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from backend.app.config import get_settings
from etl.hubspot.company_data import CompanyData
from utils.logger import setup_logger
import mimetypes
@ -230,7 +230,9 @@ class HubspotClient:
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]:
def from_deal_id_get_info(
self, deal_id: str
) -> dict[str, str]: # TODO: add dataclass for this
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
deal: HubspotObject = self._call_with_retry(
@ -280,18 +282,12 @@ class HubspotClient:
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
return deal_info
def get_deal_info_for_db(
def get_deal_and_company_and_listing(
self, deal_id: str
) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]:
deal: dict[str, str] = self.from_deal_id_get_info(deal_id)
company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id)
if company:
company_data: CompanyData = self.get_company_information(company)
dbloader: HubspotDataToDb = HubspotDataToDb()
dbloader.upsert_company(company_data)
listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing(
deal_id
)

View file

@ -1,16 +1,19 @@
from backend.app.db.connection import db_read_session
from backend.app.db.models.organisation import Organisation, HubspotDealData
import os
from sqlmodel import select
from datetime import datetime, timezone
from typing import TypedDict, Optional
from typing import Dict, Optional
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.company_data import CompanyData
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.s3_uploader import S3Uploader
import hashlib
import os
from backend.app.db.connection import db_read_session
from backend.app.db.models.organisation import Organisation
from etl.hubspot.utils import parse_hs_date
from utils.logger import setup_logger
class CompanyData(TypedDict):
hs_object_id: str
name: str
logger = setup_logger()
class HubspotDataToDb:
@ -31,7 +34,7 @@ class HubspotDataToDb:
records = self.read_org_table(limit)
return [org.name for org in records if org.name]
def upsert_company(self, company_data: CompanyData) -> Organisation:
def upsert_organisation(self, company_data: CompanyData) -> Organisation:
"""Upserts a company record. Updates if hubspot_company_id exists, otherwise creates new."""
with db_read_session() as session:
hubspot_id = company_data.get("hs_object_id")
@ -61,11 +64,7 @@ class HubspotDataToDb:
session.commit()
return record
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_deal(deal_data, company, listing, hubspot_client)
def find_all_deals_with_company_id(self, company_id):
def find_all_deals_with_company_id(self, company_id: str):
"""Returns a list of deals for a given company_id."""
with db_read_session() as session:
return (
@ -74,7 +73,7 @@ class HubspotDataToDb:
.all()
)
def find_deal_with_deal_id(self, deal_id):
def find_deal_with_deal_id(self, deal_id: str) -> Optional[HubspotDealData]:
with db_read_session() as session:
return (
session.query(HubspotDealData)
@ -82,275 +81,13 @@ class HubspotDataToDb:
.one_or_none()
)
def _parse_hs_date(self, value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _sha256(self, file_path: str) -> str:
"""Compute SHA-256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def update_deal_with_checks(self, deal_in_db, hubspot_client) -> bool:
"""
Checks if a deal needs updating and syncs it with HubSpot.
Also handles major_condition_issue_photos file upload to S3 with integrity check.
"""
def soft_assert(condition, message="Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
return False
return True
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
deal_in_db.deal_id
)
# Soft compare key fields
checks = [
soft_assert(
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
),
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
soft_assert(
deal_in_db.listing_id == hs_listing.get("listing_id"),
"listing_id mismatch",
),
soft_assert(
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
"landlord_property_id mismatch",
),
soft_assert(
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
),
soft_assert(
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
),
soft_assert(
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
),
soft_assert(
deal_in_db.project_code == hs_deal.get("project_code"),
"project_code mismatch",
),
soft_assert(
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
),
soft_assert(
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
"outcome_notes mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_description
== hs_deal.get("major_condition_issue_description"),
"major condition description mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_photos
== hs_deal.get("major_condition_issue_photos"),
"major condition issue photos mismatch",
),
soft_assert(
deal_in_db.coordination_status
== hs_deal.get("coordination_status__stage_1_"),
"coordination stage 1 status mismatch",
),
soft_assert(
deal_in_db.coordination_comments == hs_deal.get("coordination_comments"),
"coordination_comments mismatch",
),
soft_assert(
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
"retrofit design mismatch",
),
soft_assert(
deal_in_db.pashub_link == hs_deal.get("pashub_link"),
"pashub_link mismatch",
),
soft_assert(
deal_in_db.sharepoint_link == hs_deal.get("sharepoint_link"),
"sharepoint_link mismatch",
),
soft_assert(
deal_in_db.dampmould_growth == hs_deal.get("dampmould_growth"),
"dampmould_growth mismatch",
),
soft_assert(
deal_in_db.damp_mould_and_repairs_comments
== hs_deal.get("damp_mould_and_repairs_comments"),
"damp_mould_and_repairs_comments mismatch",
),
soft_assert(
deal_in_db.pre_sap == hs_deal.get("pre_sap"),
"pre_sap mismatch",
),
soft_assert(
deal_in_db.coordinator == hs_deal.get("coordinator"),
"coordinator mismatch",
),
soft_assert(
deal_in_db.mtp_completion_date
== self._parse_hs_date(hs_deal.get("mtp_completion_date")),
"mtp_completion_date mismatch",
),
soft_assert(
deal_in_db.mtp_re_model_completion_date
== self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")),
"mtp_re_model_completion_date mismatch",
),
soft_assert(
deal_in_db.ioe_v3_completion_date
== self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")),
"ioe_v3_completion_date mismatch",
),
soft_assert(
deal_in_db.proposed_measures == hs_deal.get("proposed_measures"),
"proposed_measures mismatch",
),
soft_assert(
deal_in_db.approved_package == hs_deal.get("approved_package"),
"approved_package mismatch",
),
soft_assert(
deal_in_db.designer == hs_deal.get("designer"),
"designer mismatch",
),
soft_assert(
deal_in_db.design_completion_date
== self._parse_hs_date(hs_deal.get("design_completion_date")),
"design_completion_date mismatch",
),
soft_assert(
deal_in_db.actual_measures_installed
== hs_deal.get("actual_measures_installed"),
"actual_measures_installed mismatch",
),
soft_assert(
deal_in_db.installer == hs_deal.get("installer"),
"installer mismatch",
),
soft_assert(
deal_in_db.installer_handover == hs_deal.get("installer_handover"),
"installer_handover mismatch",
),
soft_assert(
deal_in_db.lodgement_status == hs_deal.get("lodgement_status"),
"lodgement_status mismatch",
),
soft_assert(
deal_in_db.measures_lodgement_date
== self._parse_hs_date(hs_deal.get("measures_lodgement_date")),
"measures_lodgement_date mismatch",
),
soft_assert(
deal_in_db.lodgement_date
== self._parse_hs_date(hs_deal.get("lodgement_date")),
"lodgement_date mismatch",
),
soft_assert(
deal_in_db.expected_commencement_date
== self._parse_hs_date(hs_deal.get("expected_commencement_date")),
"expected_commencement_date mismatch",
),
soft_assert(
deal_in_db.surveyor == hs_deal.get("surveyor"),
"surveyor mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_date
== self._parse_hs_date(hs_deal.get("confirmed_survey_date")),
"confirmed_survey_date mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_time
== hs_deal.get("confirmed_survey_time"),
"confirmed_survey_time mismatch",
),
soft_assert(
deal_in_db.surveyed_date
== self._parse_hs_date(hs_deal.get("surveyed_date")),
"surveyed_date mismatch",
),
soft_assert(
deal_in_db.design_type == hs_deal.get("design_type"),
"design_type mismatch",
),
]
# If discrepancies found, update from HubSpot
if not all(checks):
print(
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
)
self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
return False
# Handle photo upload if it exists but S3 URL is missing
if (
deal_in_db.major_condition_issue_photos
and not deal_in_db.major_condition_issue_evidence_s3_url
):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
try:
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
local_file = hubspot_client.download_file_from_url(photo_url)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with db_read_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
)
return False
except Exception as e:
print(
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
)
# Continue without the file — don't crash the entire update
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_deal(self, deal_data, company, listing, hubspot_client):
def upsert_deal(
self,
deal_data: Dict[str, str],
company: Optional[str],
listing: Optional[dict[str, str]],
hubspot_client: HubspotClient,
):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
@ -364,111 +101,10 @@ class HubspotDataToDb:
existing = session.exec(statement).first()
if existing:
self._handle_existing_photo_upload(existing, hubspot_client)
print(f"🔄 Updating existing deal (deal_id={deal_id})")
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"listing_id": listing.get("listing_id", None) if listing else None,
"landlord_property_id": (
listing.get("owner_property_id", None) if listing else None
),
"uprn": listing.get("national_uprn", None) if listing else None,
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get(
"coordination_status__stage_1_"
),
"coordination_comments": deal_data.get("coordination_comments"),
"design_status": deal_data.get("retrofit_design_status"),
"pashub_link": deal_data.get("pashub_link"),
"sharepoint_link": deal_data.get("sharepoint_link"),
"dampmould_growth": deal_data.get("dampmould_growth"),
"damp_mould_and_repairs_comments": deal_data.get(
"damp_mould_and_repairs_comments"
),
"pre_sap": deal_data.get("pre_sap"),
"coordinator": deal_data.get("coordinator"),
"mtp_completion_date": self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
"mtp_re_model_completion_date": self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
"ioe_v3_completion_date": self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
"proposed_measures": deal_data.get("proposed_measures"),
"approved_package": deal_data.get("approved_package"),
"designer": deal_data.get("designer"),
"design_completion_date": self._parse_hs_date(
deal_data.get("design_completion_date")
),
"actual_measures_installed": deal_data.get(
"actual_measures_installed"
),
"installer": deal_data.get("installer"),
"installer_handover": deal_data.get("installer_handover"),
"lodgement_status": deal_data.get("lodgement_status"),
"measures_lodgement_date": self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
"lodgement_date": self._parse_hs_date(
deal_data.get("lodgement_date")
),
"expected_commencement_date": self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
"surveyor": deal_data.get("surveyor"),
"confirmed_survey_date": self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
"confirmed_survey_time": deal_data.get("confirmed_survey_time"),
"surveyed_date": self._parse_hs_date(
deal_data.get("surveyed_date")
),
"design_type": deal_data.get("design_type"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
# Upload if photo exists but S3 link missing
if (
existing.major_condition_issue_photos
and not existing.major_condition_issue_evidence_s3_url
):
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id)
photo_url = fresh_deal.get("major_condition_issue_photos")
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(
photo_url
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
)
# Continue without the file — don't crash the update
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
self._update_existing_deal(existing, deal_data, listing, company)
session.add(existing)
session.commit()
@ -477,93 +113,213 @@ class HubspotDataToDb:
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record = HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
listing_id=listing.get("listing_id", None) if listing else None,
landlord_property_id=listing.get("owner_property_id") if listing else None,
uprn=listing.get("national_uprn") if listing else None,
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get(
"major_condition_issue_photos"
),
coordination_status=deal_data.get("coordination_status__stage_1_"),
coordination_comments=deal_data.get("coordination_comments"),
design_status=deal_data.get("retrofit_design_status"),
pashub_link=deal_data.get("pashub_link"),
sharepoint_link=deal_data.get("sharepoint_link"),
dampmould_growth=deal_data.get("dampmould_growth"),
damp_mould_and_repairs_comments=deal_data.get(
"damp_mould_and_repairs_comments"
),
pre_sap=deal_data.get("pre_sap"),
coordinator=deal_data.get("coordinator"),
mtp_completion_date=self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
mtp_re_model_completion_date=self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
ioe_v3_completion_date=self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
proposed_measures=deal_data.get("proposed_measures"),
approved_package=deal_data.get("approved_package"),
designer=deal_data.get("designer"),
design_completion_date=self._parse_hs_date(
deal_data.get("design_completion_date")
),
actual_measures_installed=deal_data.get(
"actual_measures_installed"
),
installer=deal_data.get("installer"),
installer_handover=deal_data.get("installer_handover"),
lodgement_status=deal_data.get("lodgement_status"),
measures_lodgement_date=self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
lodgement_date=self._parse_hs_date(deal_data.get("lodgement_date")),
expected_commencement_date=self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
surveyor=deal_data.get("surveyor"),
confirmed_survey_date=self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
confirmed_survey_time=deal_data.get("confirmed_survey_time"),
surveyed_date=self._parse_hs_date(deal_data.get("surveyed_date")),
design_type=deal_data.get("design_type"),
new_record: HubspotDealData = self._build_new_deal(
deal_id, deal_data, listing, company
)
# Handle upload at insert time
if new_record.major_condition_issue_photos:
try:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
)
# Continue without the file — don't crash the insert
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
self._handle_new_photo_upload(new_record, hubspot_client)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def _update_existing_deal(
self,
existing: HubspotDealData,
deal_data: Dict[str, str],
listing: Optional[dict[str, str]],
company: Optional[str],
):
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"listing_id": listing.get("listing_id", None) if listing else None,
"landlord_property_id": (
listing.get("owner_property_id", None) if listing else None
),
"uprn": listing.get("national_uprn", None) if listing else None,
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get("coordination_status__stage_1_"),
"design_status": deal_data.get("retrofit_design_status"),
"coordination_comments": deal_data.get("coordination_comments"),
"pashub_link": deal_data.get("pashub_link"),
"sharepoint_link": deal_data.get("sharepoint_link"),
"dampmould_growth": deal_data.get("dampmould_growth"),
"damp_mould_and_repairs_comments": deal_data.get(
"damp_mould_and_repairs_comments"
),
"pre_sap": deal_data.get("pre_sap"),
"coordinator": deal_data.get("coordinator"),
"mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")),
"mtp_re_model_completion_date": parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
"ioe_v3_completion_date": parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
"proposed_measures": deal_data.get("proposed_measures"),
"approved_package": deal_data.get("approved_package"),
"designer": deal_data.get("designer"),
"design_completion_date": parse_hs_date(
deal_data.get("design_completion_date")
),
"actual_measures_installed": deal_data.get("actual_measures_installed"),
"installer": deal_data.get("installer"),
"installer_handover": deal_data.get("installer_handover"),
"lodgement_status": deal_data.get("lodgement_status"),
"measures_lodgement_date": parse_hs_date(
deal_data.get("measures_lodgement_date")
),
"lodgement_date": parse_hs_date(deal_data.get("lodgement_date")),
"expected_commencement_date": parse_hs_date(
deal_data.get("expected_commencement_date")
),
"surveyor": deal_data.get("surveyor"),
"confirmed_survey_date": parse_hs_date(
deal_data.get("confirmed_survey_date")
),
"confirmed_survey_time": deal_data.get("confirmed_survey_time"),
"surveyed_date": parse_hs_date(deal_data.get("surveyed_date")),
"design_type": deal_data.get("design_type"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
def _build_new_deal(
self,
deal_id: str,
deal_data: Dict[str, str],
listing: Optional[dict[str, str]],
company: Optional[str],
) -> HubspotDealData:
return HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
listing_id=listing.get("listing_id") if listing else None,
landlord_property_id=(
listing.get("owner_property_id") if listing else None
),
uprn=listing.get("national_uprn") if listing else None,
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get("major_condition_issue_photos"),
coordination_status=deal_data.get("coordination_status__stage_1_"),
design_status=deal_data.get("retrofit_design_status"),
coordination_comments=deal_data.get("coordination_comments"),
pashub_link=deal_data.get("pashub_link"),
sharepoint_link=deal_data.get("sharepoint_link"),
dampmould_growth=deal_data.get("dampmould_growth"),
damp_mould_and_repairs_comments=deal_data.get(
"damp_mould_and_repairs_comments"
),
pre_sap=deal_data.get("pre_sap"),
coordinator=deal_data.get("coordinator"),
mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")),
mtp_re_model_completion_date=parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
ioe_v3_completion_date=parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
proposed_measures=deal_data.get("proposed_measures"),
approved_package=deal_data.get("approved_package"),
designer=deal_data.get("designer"),
design_completion_date=parse_hs_date(
deal_data.get("design_completion_date")
),
actual_measures_installed=deal_data.get("actual_measures_installed"),
installer=deal_data.get("installer"),
installer_handover=deal_data.get("installer_handover"),
lodgement_status=deal_data.get("lodgement_status"),
measures_lodgement_date=parse_hs_date(
deal_data.get("measures_lodgement_date")
),
lodgement_date=parse_hs_date(deal_data.get("lodgement_date")),
expected_commencement_date=parse_hs_date(
deal_data.get("expected_commencement_date")
),
surveyor=deal_data.get("surveyor"),
confirmed_survey_date=parse_hs_date(deal_data.get("confirmed_survey_date")),
confirmed_survey_time=deal_data.get("confirmed_survey_time"),
surveyed_date=parse_hs_date(deal_data.get("surveyed_date")),
design_type=deal_data.get("design_type"),
)
def _handle_existing_photo_upload(
self,
existing_deal: HubspotDealData,
hubspot_client: HubspotClient,
):
# if self._needs_photo_upload(existing):
fresh_deal = hubspot_client.from_deal_id_get_info(existing_deal.deal_id)
fresh_photo_url = fresh_deal.get("major_condition_issue_photos")
if not fresh_photo_url:
print(f"⚠️ Photo URL missing for deal_id {existing_deal.deal_id}")
return
if fresh_photo_url != existing_deal.major_condition_issue_photos:
logger.info(
f"Hubspot image URL changed from {existing_deal.major_condition_issue_photos} to {fresh_photo_url}"
)
self._upload_photo_to_s3(existing_deal, fresh_photo_url, hubspot_client)
else:
logger.info(f"Hubspot image URL unchanged: {fresh_photo_url}")
def _handle_new_photo_upload(
self,
record: HubspotDealData,
hubspot_client: HubspotClient,
):
if record.major_condition_issue_photos:
self._upload_photo_to_s3(
record,
record.major_condition_issue_photos,
hubspot_client,
)
def _upload_photo_to_s3(
self,
record: HubspotDealData,
hubspot_photo_url: str,
hubspot_client: HubspotClient,
):
try:
local_file = hubspot_client.download_file_from_url(hubspot_photo_url)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to upload photo for deal_id {record.deal_id}: {e}")
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
def _needs_photo_upload(self, old_deal: HubspotDealData) -> bool:
return bool(
old_deal.major_condition_issue_photos
and not old_deal.major_condition_issue_evidence_s3_url
)

View file

@ -0,0 +1,177 @@
from typing import Dict, List, Optional
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.utils import parse_hs_date
class HubspotDealDiffer:
COORDINATION_COMPLETE: List[str] = [
"v1 ioe/mtp complete",
"v2 ioe/mtp complete",
"v3 ioe/mtp complete",
]
RETROFIT_DESIGN_COMPLETE = "uploaded"
LODGEMENT_COMPLETE: List[str] = ["lodgement complete", "measures lodged"]
@staticmethod
def check_for_db_update_trigger(
new_deal: Dict[str, str],
new_company: Optional[str],
new_listing: Optional[Dict[str, str]],
old_deal: HubspotDealData,
) -> bool:
"""
Returns True if ANY difference exists between HubSpot data and DB.
Returns False if everything matches (i.e. no update needed).
"""
# --- Deal ID ---
if str(old_deal.deal_id) != str(new_deal.get("hs_object_id")):
return True
# --- Company ---
if new_company is not None:
if old_deal.company_id != new_company:
return True
# --- Listing ---
hs_listing = new_listing or {}
if old_deal.listing_id != hs_listing.get("listing_id"):
return True
if old_deal.landlord_property_id != hs_listing.get("owner_property_id"):
return True
if old_deal.uprn != hs_listing.get("national_uprn"):
return True
# --- Field mappings ---
FIELD_MAP = {
"outcome": "outcome",
"dealstage": "dealstage",
"dealname": "dealname",
"project_code": "project_code",
"outcome_notes": "outcome_notes",
"major_condition_issue_description": "major_condition_issue_description",
"major_condition_issue_photos": "major_condition_issue_photos",
"coordination_status__stage_1_": "coordination_status",
"coordination_comments": "coordination_comments",
"retrofit_design_status": "design_status",
"pashub_link": "pashub_link",
"sharepoint_link": "sharepoint_link",
"dampmould_growth": "dampmould_growth",
"damp_mould_and_repairs_comments": "damp_mould_and_repairs_comments",
"pre_sap": "pre_sap",
"coordinator": "coordinator",
"proposed_measures": "proposed_measures",
"approved_package": "approved_package",
"designer": "designer",
"actual_measures_installed": "actual_measures_installed",
"installer": "installer",
"installer_handover": "installer_handover",
"lodgement_status": "lodgement_status",
"design_type": "design_type",
"surveyor": "surveyor",
"confirmed_survey_time": "confirmed_survey_time",
}
for hs_field, db_field in FIELD_MAP.items():
old_value = getattr(old_deal, db_field)
new_value = new_deal.get(hs_field)
if old_value != new_value:
return True
# --- Date fields ---
date_fields = [
("mtp_completion_date", "mtp_completion_date"),
("mtp_re_model_completion_date", "mtp_re_model_completion_date"),
("ioe_v3_completion_date", "ioe_v3_completion_date"),
("design_completion_date", "design_completion_date"),
("measures_lodgement_date", "measures_lodgement_date"),
("lodgement_date", "lodgement_date"),
("expected_commencement_date", "expected_commencement_date"),
("confirmed_survey_date", "confirmed_survey_date"),
("surveyed_date", "surveyed_date"),
]
for hs_field, db_field in date_fields:
old_value = getattr(old_deal, db_field)
new_value = parse_hs_date(new_deal.get(hs_field))
if old_value != new_value:
return True
# --- Time field ---
if old_deal.confirmed_survey_time != new_deal.get("confirmed_survey_time"):
return True
# No differences found
return False
@staticmethod
def check_for_pashub_trigger(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_pashub_link: str = new_deal.get("pashub_link", "")
if not HubspotDealDiffer._has_valid_pashub_link(new_pashub_link):
return False
if HubspotDealDiffer._new_or_updated_pashub_link(new_pashub_link, old_deal):
return True
if HubspotDealDiffer._coordination_completed(new_deal, old_deal):
return True
if HubspotDealDiffer._design_completed(new_deal, old_deal):
return True
if HubspotDealDiffer._lodgement_completed(new_deal, old_deal):
return True
return False
@staticmethod
def _has_valid_pashub_link(new_pashub_link: str) -> bool:
return bool(new_pashub_link)
@staticmethod
def _new_or_updated_pashub_link(
new_pashub_link: str, old_deal: HubspotDealData
) -> bool:
if not old_deal.pashub_link:
return True
return old_deal.pashub_link != new_pashub_link
@staticmethod
def _coordination_completed(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_status: str = new_deal.get("coordination_status", "")
return (
new_status != ""
and new_status in HubspotDealDiffer.COORDINATION_COMPLETE
and new_status != old_deal.coordination_status
)
@staticmethod
def _design_completed(new_deal: Dict[str, str], old_deal: HubspotDealData) -> bool:
new_status: str = new_deal.get("design_status", "")
return (
new_status != ""
and new_status == HubspotDealDiffer.RETROFIT_DESIGN_COMPLETE
and new_status != old_deal.design_status
)
@staticmethod
def _lodgement_completed(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_status: str = new_deal.get("lodgement_status", "")
return (
new_status != ""
and new_status in HubspotDealDiffer.LODGEMENT_COMPLETE
and new_status != old_deal.lodgement_status
)

View file

@ -0,0 +1,5 @@
from pydantic import BaseModel
class HubspotTriggerOrchestratorTriggerRequest(BaseModel):
hubspot_deal_id: str

View file

@ -22,7 +22,7 @@ companies_to_add_or_ensure_it_exists = [
for company in companies_to_add_or_ensure_it_exists:
company_info: CompanyData = hubspot.get_company_information(company.value)
dbRead.upsert_company(company_info)
dbRead.upsert_organisation(company_info)
dbRead = HubspotDataToDb()

View file

@ -1,24 +1,113 @@
import json
import boto3
from typing import Any, Dict, Optional
from backend.app.config import get_settings
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer
from etl.hubspot.hubspot_trigger_orchestrator_trigger_request import (
HubspotTriggerOrchestratorTriggerRequest,
)
from backend.utils.subtasks import task_handler
from typing import Any
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from utils.logger import setup_logger
logger = setup_logger()
@task_handler()
def handler(body: dict[str, Any], context: Any) -> None:
hubspot_deal_id = body.get("hubspot_deal_id", "")
db_client = HubspotDataToDb()
hubspot_client = HubspotClient()
if hubspot_deal_id == "":
raise RuntimeError(
"Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'"
)
hubspot_deal_id = "327170793707"
sqs_client = boto3.client("sqs")
PASHUB_TRIGGER_QUEUE_URL = get_settings().PASHUB_TO_ARA_SQS_URL
hubspot: HubspotClient = HubspotClient()
dbloader: HubspotDataToDb = HubspotDataToDb()
deal = dbloader.find_deal_with_deal_id(hubspot_deal_id)
if deal:
dbloader.update_deal_with_checks(deal, hubspot)
payload = HubspotTriggerOrchestratorTriggerRequest.model_validate(body)
hubspot_deal_id: str = payload.hubspot_deal_id
db_deal: Optional[HubspotDealData] = db_client.find_deal_with_deal_id(
hubspot_deal_id
)
hubspot_deal: Dict[str, str]
company: Optional[str]
listing: Optional[dict[str, str]]
hubspot_deal, company, listing = hubspot_client.get_deal_and_company_and_listing(
hubspot_deal_id
)
deal_changed = False
if not db_deal:
# New hubspot deal, no diffing to do
logger.info(f"New HubSpot deal of ID {hubspot_deal_id}. Loading to database...")
if company:
company_data: CompanyData = hubspot_client.get_company_information(company)
db_client: HubspotDataToDb = HubspotDataToDb()
db_client.upsert_organisation(company_data)
db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client)
else:
deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id)
dbloader.upsert_deal(deal, company, listing, hubspot)
# Deal already in db, check whether anything has changed
logger.info(
f"HubSpot deal {hubspot_deal_id} already in database. Checking for changes..."
)
if HubspotDealDiffer.check_for_db_update_trigger(
new_deal=hubspot_deal,
new_company=company,
new_listing=listing,
old_deal=db_deal,
):
logger.info(
f"Deal {hubspot_deal_id} has been changed, updating database..."
)
db_client.upsert_deal(
deal_data=hubspot_deal,
company=company,
listing=listing,
hubspot_client=hubspot_client,
)
deal_changed = True
if not deal_changed:
logger.info(f"No changes to deal {hubspot_deal_id}")
return
# ==============================
# Orchestration of other lambdas
# ==============================
if HubspotDealDiffer.check_for_pashub_trigger(
new_deal=hubspot_deal, old_deal=db_deal
):
logger.info(
f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
message_body: Dict[str, Optional[str]] = {
"pashub_link": hubspot_deal["pashub_link"],
"address": None, # potentially available from Listing, leave as None for now
"sharepoint_link": hubspot_deal["sharepoint_link"],
"uprn": hubspot_deal["national_uprn"],
"landlord_property_id": hubspot_deal["owner_property_id"],
"deal_stage": hubspot_deal["deal_stage"],
}
response = sqs_client.send_message(
QueueUrl=PASHUB_TRIGGER_QUEUE_URL, MessageBody=json.dumps(message_body)
)
logger.info(
f"Sent message to Pashub To Ara queue. MessageId: {response['MessageId']}"
)
else:
logger.info(
f"Not Triggering PasHub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
print("done")
if __name__ == "__main__":
handler({"hubspot_deal_id": "371470706915"}, "")
print("beep")

View file

@ -71,7 +71,7 @@ class TestHubspotClientIntegration:
def test_get_deal_info_for_db(self, client: HubspotClient):
deal_id: str = "263490768079"
deal, company, listing = client.get_deal_info_for_db(deal_id)
deal, company, listing = client.get_deal_and_company_and_listing(deal_id)
assert "dealname" in deal
assert "dealstage" in deal

View file

@ -0,0 +1,401 @@
from datetime import datetime, timezone
from typing import Any, Dict
import uuid
import pytest
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer
BASE_TIME = datetime(2025, 12, 1, 12, 0, 0)
def make_old_deal(**overrides: Any) -> HubspotDealData:
return HubspotDealData(
id=overrides.get("id", uuid.uuid4()),
deal_id="1",
created_at=BASE_TIME,
updated_at=BASE_TIME,
**{k: v for k, v in overrides.items() if k != "id"},
)
def make_new_deal(deal_id: uuid.UUID, **overrides: Any) -> Dict[str, str]:
return {
"id": str(deal_id),
"deal_id": "1",
"created_at": BASE_TIME.isoformat(),
"updated_at": datetime(2025, 12, 1, 12, 30, 0).isoformat(),
**overrides,
}
# ====================
# PASHUB TRIGGER TESTS
# ====================
@pytest.mark.parametrize(
"new_overrides,expected",
[
({"outcome_notes": "test note"}, False),
],
)
def test_pashub_trigger__outcome_note_added__returns_false(
new_overrides: Dict[str, str],
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(deal_id, **new_overrides)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
@pytest.mark.parametrize(
"old_overrides,new_overrides,expected",
[
(
{"pashub_link": "www.google.co.uk"},
{"pashub_link": "www.bbc.co.uk"},
True,
),
],
)
def test_pashub_trigger__pashub_link_changed__returns_true(
old_overrides: Dict[str, str],
new_overrides: Dict[str, str],
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id, **old_overrides)
new_deal = make_new_deal(deal_id, **new_overrides)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
@pytest.mark.parametrize(
"coordination_status,expected",
[
("v1 ioe/mtp complete", True),
("v2 ioe/mtp complete", True),
],
)
def test_pashub_trigger__coordination_completed_and_pashub_link_set__returns_true(
coordination_status: str,
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
coordination_status="random",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
coordination_status=coordination_status,
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
def test_pashub_trigger__coordination_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
coordination_status="random",
)
new_deal = make_new_deal(
deal_id,
coordination_status="v2 ioe/mtp complete",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
def test_pashub_trigger__design_completed_and_pashub_link_set__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
design_status="uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is True
)
def test_pashub_trigger__design_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(
deal_id,
design_status="uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
@pytest.mark.parametrize(
"lodgement_status,expected",
[
("lodgement complete", True),
("measures lodged", True),
],
)
def test_pashub_trigger__lodgement_completed_and_pashub_link_set__returns_true(
lodgement_status: str,
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
lodgement_status=lodgement_status,
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
def test_pashub_trigger__lodgement_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(
deal_id,
design_status="lodgement complete",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
def test_pashub_trigger__coordination_design_lodgement_not_completed_and_pashub_link_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
coordination_status="not uploaded",
design_status="not uploaded",
lodgement_status="not uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
# =======================
# DB UPDATE TRIGGER TESTS
# =======================
def test_db_update_trigger__no_changes__returns_false() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
dealname="Test Deal",
dealstage="stage_1",
outcome="won",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
dealname="Test Deal",
dealstage="stage_1",
outcome="won",
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is False
def test_db_update_trigger__dealname_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
dealname="Old Name",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
dealname="New Name",
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is True
def test_db_update_trigger__company_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
company_id="old_company",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
)
new_company = "new_company"
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=new_company,
new_listing=None,
old_deal=old_deal,
)
assert result is True
def test_db_update_trigger__missing_hubspot_timezone__returns_false() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
design_completion_date=datetime(2025, 11, 3, 0, 0, tzinfo=timezone.utc),
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
design_completion_date=datetime(2025, 11, 3, 0, 0).isoformat(),
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is False
def test_db_update_trigger__listing_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
listing_id="abc",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
)
new_listing = {"listing_id": "xyz"}
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=new_listing,
old_deal=old_deal,
)
assert result is True

16
etl/hubspot/utils.py Normal file
View file

@ -0,0 +1,16 @@
from datetime import datetime, timezone
from typing import Optional
def parse_hs_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except ValueError:
return None

View file

@ -7,6 +7,14 @@ data "terraform_remote_state" "shared" {
}
}
data "terraform_remote_state" "pashub_to_ara" {
backend = "s3"
config = {
bucket = "pashub-to-ara-terraform-state"
key = "ev:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
@ -39,6 +47,8 @@ module "hubspot_deal_etl" {
DB_NAME = var.db_name
DB_PORT = var.db_port
HUBSPOT_API_KEY = var.hubspot_api_key
PASHUB_TO_ARA_SQS_URL = data.terraform_remote_state.pashub_to_ara.pashhub_to_ara_queue_url
}
}

View file

@ -0,0 +1,4 @@
output "pashhub_to_ara_queue_url" {
value = module.lambda.queue_url
description = "URL of the PasHub to Ara SQS queue"
}

View file

@ -3,6 +3,6 @@ pythonpath = .
log_cli = true
log_cli_level = INFO
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests
markers =
integration: mark a test as an integration test