erge branch 'main' of https://github.com/Hestia-Homes/Model into feature/match-on-lmk

This commit is contained in:
Khalim Conn-Kowlessar 2026-04-10 15:16:26 +01:00
commit 9f9cab35ef
34 changed files with 1429 additions and 627 deletions

View file

@ -505,7 +505,7 @@ jobs:
# Deploy Hubspot ETL Lambda
# ============================================================
hubspot_etl_lambda:
needs: [hubspot_etl_image, determine_stage]
needs: [hubspot_etl_image, determine_stage, pashub_to_ara_lambda]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: hubspot-etl-to-ara

View file

@ -38,6 +38,7 @@ class Settings(BaseSettings):
PLAN_TRIGGER_BUCKET: str = "changeme"
ENGINE_SQS_URL: str = "changeme"
CATEGORISATION_SQS_URL: str = "changeme"
PASHUB_TO_ARA_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str = "changeme"

View file

@ -0,0 +1,25 @@
from typing import Optional
from sqlalchemy import select
from backend.app.db.connection import db_read_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
def get_uploaded_file_by_listing_type_and_source(
hubspot_listing_id: int,
file_type: FileTypeEnum,
file_source: FileSourceEnum,
) -> Optional[UploadedFile]:
with db_read_session() as session:
statement = select(UploadedFile).where(
UploadedFile.hubspot_listing_id == hubspot_listing_id,
UploadedFile.file_type == file_type,
UploadedFile.file_source == file_source,
)
return session.exec(statement).one_or_none()

View file

@ -0,0 +1,79 @@
import uuid
from sqlmodel import SQLModel, Field, Column, text
from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime
from sqlalchemy.sql import func
class HubspotDealData(SQLModel, table=True):
__tablename__ = "hubspot_deal_data"
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# HubSpot Deal identifiers
deal_id: str = Field(index=True, nullable=False)
dealname: Optional[str] = Field(default=None)
dealstage: Optional[str] = Field(default=None)
company_id: Optional[str] = Field(default=None)
project_code: Optional[str] = Field(default=None)
# HubSpot custom properties
landlord_property_id: Optional[str] = Field(default=None)
uprn: Optional[str] = Field(default=None)
outcome: Optional[str] = Field(default=None)
outcome_notes: Optional[str] = Field(default=None)
major_condition_issue_description: Optional[str] = Field(default=None)
major_condition_issue_photos: Optional[str] = Field(default=None)
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
coordination_status: Optional[str] = Field(default=None)
coordination_comments: Optional[str] = Field(default=None)
design_status: Optional[str] = Field(default=None)
listing_id: Optional[str] = Field(default=None)
pashub_link: Optional[str] = Field(default=None)
sharepoint_link: Optional[str] = Field(default=None)
dampmould_growth: Optional[str] = Field(default=None)
damp_mould_and_repairs_comments: Optional[str] = Field(default=None)
pre_sap: Optional[str] = Field(default=None)
coordinator: Optional[str] = Field(default=None)
mtp_completion_date: Optional[datetime] = Field(default=None)
mtp_re_model_completion_date: Optional[datetime] = Field(default=None)
ioe_v3_completion_date: Optional[datetime] = Field(default=None)
proposed_measures: Optional[str] = Field(default=None)
approved_package: Optional[str] = Field(default=None)
designer: Optional[str] = Field(default=None)
design_completion_date: Optional[datetime] = Field(default=None)
actual_measures_installed: Optional[str] = Field(default=None)
installer: Optional[str] = Field(default=None)
installer_handover: Optional[str] = Field(default=None)
lodgement_status: Optional[str] = Field(default=None)
measures_lodgement_date: Optional[datetime] = Field(default=None)
lodgement_date: Optional[datetime] = Field(default=None)
expected_commencement_date: Optional[datetime] = Field(default=None)
surveyor: Optional[str] = Field(default=None)
confirmed_survey_date: Optional[datetime] = Field(default=None)
confirmed_survey_time: Optional[str] = Field(default=None)
surveyed_date: Optional[datetime] = Field(default=None)
design_type: Optional[str] = Field(default=None)
created_at: Optional[datetime] = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
nullable=False,
),
default=func.now(),
) # Nullable in db but optional here as value is set on db save for new record
updated_at: Optional[datetime] = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
onupdate=func.now(),
nullable=False,
),
default=func.now(),
) # Nullable in db but optional here as value is set on db save for new record

View file

@ -1,9 +1,7 @@
from sqlmodel import SQLModel, Field, Column, text
import uuid
from sqlmodel import SQLModel, Field
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import DateTime
from sqlalchemy.sql import func
import uuid
class Organisation(SQLModel, table=True):
@ -13,74 +11,3 @@ class Organisation(SQLModel, table=True):
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
hubspot_company_id: Optional[str] = None
name: Optional[str] = None
class HubspotDealData(SQLModel, table=True):
__tablename__ = "hubspot_deal_data"
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# HubSpot Deal identifiers
deal_id: str = Field(index=True, nullable=False)
dealname: Optional[str] = Field(default=None)
dealstage: Optional[str] = Field(default=None)
company_id: Optional[str] = Field(default=None)
project_code: Optional[str] = Field(default=None)
# HubSpot custom properties
landlord_property_id: Optional[str] = Field(default=None)
uprn: Optional[str] = Field(default=None)
outcome: Optional[str] = Field(default=None)
outcome_notes: Optional[str] = Field(default=None)
major_condition_issue_description: Optional[str] = Field(default=None)
major_condition_issue_photos: Optional[str] = Field(default=None)
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
coordination_status: Optional[str] = Field(default=None)
coordination_comments: Optional[str] = Field(default=None)
design_status: Optional[str] = Field(default=None)
listing_id: Optional[str] = Field(default=None)
pashub_link: Optional[str] = Field(default=None)
sharepoint_link: Optional[str] = Field(default=None)
dampmould_growth: Optional[str] = Field(default=None)
damp_mould_and_repairs_comments: Optional[str] = Field(default=None)
pre_sap: Optional[str] = Field(default=None)
coordinator: Optional[str] = Field(default=None)
mtp_completion_date: Optional[datetime] = Field(default=None)
mtp_re_model_completion_date: Optional[datetime] = Field(default=None)
ioe_v3_completion_date: Optional[datetime] = Field(default=None)
proposed_measures: Optional[str] = Field(default=None)
approved_package: Optional[str] = Field(default=None)
designer: Optional[str] = Field(default=None)
design_completion_date: Optional[datetime] = Field(default=None)
actual_measures_installed: Optional[str] = Field(default=None)
installer: Optional[str] = Field(default=None)
installer_handover: Optional[str] = Field(default=None)
lodgement_status: Optional[str] = Field(default=None)
measures_lodgement_date: Optional[datetime] = Field(default=None)
lodgement_date: Optional[datetime] = Field(default=None)
expected_commencement_date: Optional[datetime] = Field(default=None)
surveyor: Optional[str] = Field(default=None)
confirmed_survey_date: Optional[datetime] = Field(default=None)
confirmed_survey_time: Optional[str] = Field(default=None)
surveyed_date: Optional[datetime] = Field(default=None)
design_type: Optional[str] = Field(default=None)
created_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
nullable=False,
)
)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
server_default=text("(NOW() AT TIME ZONE 'utc')"),
onupdate=func.now(),
nullable=False,
)
)

View file

@ -14,6 +14,8 @@ class FileTypeEnum(enum.Enum):
PAR_PHOTO_PACK = "par_photo_pack"
PAS_2023_PROPERTY = "pas_2023_property"
PAS_2023_OCCUPANCY = "pas_2023_occupancy"
ECMK_SITE_NOTE = "ecmk_site_note"
ECMK_RD_SAP_SITE_NOTE = "ecmk_rd_sap_site_note"
class FileSourceEnum(enum.Enum):
@ -37,9 +39,21 @@ class UploadedFile(Base):
hubspot_listing_id = Column(BigInteger, nullable=True)
file_type = Column(
SqlEnum(FileTypeEnum, name="file_type", create_type=False), nullable=True
SqlEnum(
FileTypeEnum,
name="file_type",
create_type=False,
values_callable=lambda enum_cls: [e.value for e in enum_cls],
),
nullable=True,
)
file_source = Column(
SqlEnum(FileSourceEnum, name="file_source", create_type=False), nullable=True
SqlEnum(
FileSourceEnum,
name="file_source",
create_type=False,
values_callable=lambda enum_cls: [e.value for e in enum_cls],
),
nullable=True,
)

View file

@ -1,40 +1,59 @@
from typing import Dict, Optional
from openpyxl import load_workbook
import re
from dataclasses import dataclass
from typing import Any, Dict, Optional
from openpyxl import Workbook, load_workbook
from openpyxl.worksheet.worksheet import Worksheet
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
wb = load_workbook(filepath, data_only=True)
ws = wb["Southern RA-Lite Programme 3103"]
@dataclass
class PropertyRow:
row_index: int
address: str
listing_id: str
properties: Dict[str, str] = {}
header_row = 1
id_col_index = None
deal_name_col_index = None
def extract_addresses_from_spreadsheet(
filepath: str,
) -> Dict[str, PropertyRow]:
wb: Workbook = load_workbook(filepath, data_only=True)
ws: Worksheet = wb["Southern RA-Lite Programme 3103"]
header_row: int = 1
id_col: Optional[int] = None
deal_name_col: Optional[int] = None
listing_id_col: Optional[int] = None
# find columns
for col in range(1, ws.max_column + 1):
value = ws.cell(row=header_row, column=col).value
raw_value: Any = ws.cell(row=header_row, column=col).value
value: str = str(raw_value).strip().lower() if raw_value else ""
if value and str(value).strip().lower() == "id":
id_col_index = col
if value == "id":
id_col = col
elif value == "deal name":
deal_name_col = col
elif value == "associated listing ids":
listing_id_col = col
if value and str(value).strip().lower() == "deal name":
deal_name_col_index = col
break
if id_col is None or deal_name_col is None or listing_id_col is None:
raise Exception("Missing required columns")
if id_col_index is None or deal_name_col_index is None:
raise Exception("Required columns not found")
properties: Dict[str, PropertyRow] = {}
for row in range(2, ws.max_row + 1):
id_val = ws.cell(row=row, column=id_col_index).value
deal_name = ws.cell(row=row, column=deal_name_col_index).value
id_val: Any = ws.cell(row=row, column=id_col).value
deal_name: Any = ws.cell(row=row, column=deal_name_col).value
listing_id: Any = ws.cell(row=row, column=listing_id_col).value
if not id_val or not deal_name:
if not id_val or not deal_name or not listing_id:
continue
properties[str(id_val).strip()] = extract_succinct_address(
str(deal_name).strip()
property_id: str = str(id_val).strip()
properties[property_id] = PropertyRow(
row_index=row,
address=extract_succinct_address(str(deal_name)),
listing_id=listing_id,
)
return properties

View file

@ -50,6 +50,7 @@ def get_first_row_signature(page: Page) -> str:
def go_to_next_page(page: Page) -> bool:
logger.info("Going to next page")
before = get_first_row_signature(page)
page.locator("#assessmentDatatable_next a").click()

View file

@ -0,0 +1,26 @@
FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy
# Install AWS Lambda RIE
ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/local/bin/aws-lambda-rie
RUN chmod +x /usr/local/bin/aws-lambda-rie
# Set working directory (Lambda task root)
WORKDIR /var/task
COPY backend/ecmk_fetcher/handler/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY utils/ utils/
COPY backend/ backend/
COPY datatypes/ datatypes/
# Local lambda entrypoint
# ENTRYPOINT ["/usr/local/bin/aws-lambda-rie", "python", "-m", "awslambdaric"]
#AWS lambda entrypoint
ENTRYPOINT ["python", "-m", "awslambdaric"]
# -----------------------------
# Lambda handler
# -----------------------------
CMD ["backend.ecmk_fetcher.handler.handler.handler"]

View file

@ -1,9 +1,13 @@
from typing import Any, Mapping
from backend.ecmk_fetcher.processor import run_job
from utils.logger import setup_logger
logger = setup_logger()
def handler(event: Mapping[str, Any], context: Any) -> None:
logger.info("Entered handler")
run_job()

View file

@ -0,0 +1,12 @@
awslambdaric
playwright==1.58.0
msal
openpyxl
sqlalchemy==2.0.36
sqlmodel
pytz==2024.2
psycopg2-binary==2.9.10
pydantic-settings==2.6.0
boto3==1.35.44
pandas==2.2.2
numpy<2.0

View file

@ -0,0 +1,11 @@
version: "3.9"
services:
ecmk-fetcher-lambda:
build:
context: ../../../
dockerfile: backend/ecmk_fetcher/handler/Dockerfile
ports:
- "9000:8080"
env_file:
- ../../../.env

View file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9000"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {
"Records": [
{
"body": json.dumps(
{
"test": 123456,
}
)
}
]
}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -1,6 +1,5 @@
import os
from typing import Dict, List
from typing import Dict
from playwright.sync_api import (
sync_playwright,
Locator,
@ -9,7 +8,14 @@ from playwright.sync_api import (
BrowserContext,
)
from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet
from backend.app.db.functions.uploaded_files_functions import (
get_uploaded_file_by_listing_type_and_source,
)
from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum
from backend.ecmk_fetcher.address_list import (
PropertyRow,
extract_addresses_from_spreadsheet,
)
from backend.ecmk_fetcher.browser import (
attach_debug_listeners,
download_with_retry,
@ -18,14 +24,25 @@ from backend.ecmk_fetcher.browser import (
go_to_next_page,
login,
)
from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id
from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint
from backend.ecmk_fetcher.reports import (
REPORT_TYPES,
build_property_id,
map_report_type_to_db_file_type,
)
from backend.ecmk_fetcher.upload import (
upload_file_to_s3_and_update_db,
upload_file_to_sharepoint,
)
from utils.logger import setup_logger
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
def run_job() -> None:
username: str = ""
username: str = "" # TODO: get from github secrets
password: str = ""
property_list_file: str = (
@ -35,8 +52,7 @@ def run_job() -> None:
BASE_DIR: str = os.path.dirname(__file__)
filepath: str = os.path.join(BASE_DIR, property_list_file)
property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath)
property_ids: List[str] = list(property_map.keys())
property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(filepath)
sharepoint_client: DomnaSharepointClient = DomnaSharepointClient(
sharepoint_location=DomnaSites.PRIVATE_PAY
@ -44,6 +60,8 @@ def run_job() -> None:
sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
s3_bucket: str = "retrofit-energy-assessments-dev"
with sync_playwright() as p:
browser: Browser = p.chromium.launch(headless=True)
context: BrowserContext = browser.new_context()
@ -79,14 +97,38 @@ def run_job() -> None:
property_id: str = build_property_id(address, postcode)
if property_id not in property_ids:
property_row: PropertyRow | None = property_map.get(property_id)
if not property_row:
continue
sharepoint_address: str = property_map[property_id]
logger.info(f"Match found for property {address}")
sharepoint_address: str = property_row.address
go_to_assessment_details(page, row)
for report_type in REPORT_TYPES:
hubspot_listing_id: str = property_row.listing_id
try:
db_file_type: FileTypeEnum = (
map_report_type_to_db_file_type(report_type)
)
except ValueError:
logger.error(
f"Unknown report type {report_type}, skipping file"
)
continue
if get_uploaded_file_by_listing_type_and_source(
hubspot_listing_id=int(hubspot_listing_id),
file_type=db_file_type,
file_source=FileSourceEnum.ECMK,
):
logger.debug("File already uploaded to s3, skipping")
continue
file_path: str | None = download_with_retry(
page, report_type
)
@ -94,6 +136,10 @@ def run_job() -> None:
if not file_path:
continue
logger.info(
f"Successfully downloaded file {os.path.basename(file_path)} from ECMK"
)
try:
upload_file_to_sharepoint(
client=sharepoint_client,
@ -101,6 +147,20 @@ def run_job() -> None:
base_path=sharepoint_base_path,
subpath=sharepoint_address,
)
logger.info(
f"Successfully loaded {os.path.basename(file_path)} to sharepoint for {address}"
)
# Upload to s3 and update db
upload_file_to_s3_and_update_db(
bucket=s3_bucket,
file_path=file_path,
hubspot_listing_id=hubspot_listing_id,
file_type=db_file_type,
)
except Exception:
raise
finally:
if os.path.exists(file_path):
os.remove(file_path)

View file

@ -1,5 +1,7 @@
from enum import Enum
from backend.app.db.models.uploaded_file import FileTypeEnum
class FileDownloadButtonType(Enum):
ASSESSOR_HUB_SITENOTE_REPORT = 11
@ -15,6 +17,16 @@ REPORT_TYPES = [
]
def map_report_type_to_db_file_type(report_type: int) -> FileTypeEnum:
match report_type:
case FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value:
return FileTypeEnum.ECMK_SITE_NOTE
case FileDownloadButtonType.SITENOTE_REPORT.value:
return FileTypeEnum.ECMK_RD_SAP_SITE_NOTE
case _:
raise ValueError("Unknown report type")
def build_report_selector(report_type: int) -> str:
return f"a.download-report-btn[data-report-type='{report_type}']"

View file

@ -1,20 +0,0 @@
import os
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
def upload_file_to_sharepoint(
client: DomnaSharepointClient,
file_path: str,
base_path: str,
subpath: str,
) -> None:
filename = os.path.basename(file_path)
full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment"
client.upload_file(
file_path=file_path,
sharepoint_path=full_path,
file_name=filename,
)

View file

@ -0,0 +1,51 @@
from datetime import datetime, timezone
import os
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
def upload_file_to_sharepoint(
client: DomnaSharepointClient,
file_path: str,
base_path: str,
subpath: str,
) -> None:
filename = os.path.basename(file_path)
full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment"
client.upload_file(
file_path=file_path,
sharepoint_path=full_path,
file_name=filename,
)
def upload_file_to_s3_and_update_db(
bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum
) -> None:
filename: str = os.path.basename(file_path)
key: str = f"documents/hubspot_listing_id/{hubspot_listing_id}/{filename}"
upload_file_to_s3(file_path, bucket, key)
uploaded_file = UploadedFile(
s3_file_bucket=bucket,
s3_file_key=key,
s3_upload_timestamp=datetime.now(timezone.utc),
hubspot_listing_id=hubspot_listing_id,
file_source=FileSourceEnum.ECMK.value,
file_type=file_type.value,
)
with db_session() as session:
# TODO: we should do multiple files at once to reduce db trips
session.add(uploaded_file)
session.commit()

View file

@ -0,0 +1,6 @@
from typing import TypedDict
class CompanyData(TypedDict):
hs_object_id: str
name: str

View file

@ -26,10 +26,10 @@ from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTy
ForwardPaging as AssociationsPaging,
NextPage as AssociationsPagingNext,
)
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from backend.app.config import get_settings
from etl.hubspot.company_data import CompanyData
from utils.logger import setup_logger
import mimetypes
@ -230,7 +230,9 @@ class HubspotClient:
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]:
def from_deal_id_get_info(
self, deal_id: str
) -> dict[str, str]: # TODO: add dataclass for this
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
deal: HubspotObject = self._call_with_retry(
@ -280,18 +282,12 @@ class HubspotClient:
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
return deal_info
def get_deal_info_for_db(
def get_deal_and_company_and_listing(
self, deal_id: str
) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]:
deal: dict[str, str] = self.from_deal_id_get_info(deal_id)
company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id)
if company:
company_data: CompanyData = self.get_company_information(company)
dbloader: HubspotDataToDb = HubspotDataToDb()
dbloader.upsert_company(company_data)
listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing(
deal_id
)

View file

@ -1,16 +1,19 @@
from backend.app.db.connection import db_read_session
from backend.app.db.models.organisation import Organisation, HubspotDealData
import os
from sqlmodel import select
from datetime import datetime, timezone
from typing import TypedDict, Optional
from typing import Dict, Optional
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.company_data import CompanyData
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.s3_uploader import S3Uploader
import hashlib
import os
from backend.app.db.connection import db_read_session
from backend.app.db.models.organisation import Organisation
from etl.hubspot.utils import parse_hs_date
from utils.logger import setup_logger
class CompanyData(TypedDict):
hs_object_id: str
name: str
logger = setup_logger()
class HubspotDataToDb:
@ -31,7 +34,7 @@ class HubspotDataToDb:
records = self.read_org_table(limit)
return [org.name for org in records if org.name]
def upsert_company(self, company_data: CompanyData) -> Organisation:
def upsert_organisation(self, company_data: CompanyData) -> Organisation:
"""Upserts a company record. Updates if hubspot_company_id exists, otherwise creates new."""
with db_read_session() as session:
hubspot_id = company_data.get("hs_object_id")
@ -61,11 +64,7 @@ class HubspotDataToDb:
session.commit()
return record
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_deal(deal_data, company, listing, hubspot_client)
def find_all_deals_with_company_id(self, company_id):
def find_all_deals_with_company_id(self, company_id: str):
"""Returns a list of deals for a given company_id."""
with db_read_session() as session:
return (
@ -74,7 +73,7 @@ class HubspotDataToDb:
.all()
)
def find_deal_with_deal_id(self, deal_id):
def find_deal_with_deal_id(self, deal_id: str) -> Optional[HubspotDealData]:
with db_read_session() as session:
return (
session.query(HubspotDealData)
@ -82,275 +81,13 @@ class HubspotDataToDb:
.one_or_none()
)
def _parse_hs_date(self, value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _sha256(self, file_path: str) -> str:
"""Compute SHA-256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def update_deal_with_checks(self, deal_in_db, hubspot_client) -> bool:
"""
Checks if a deal needs updating and syncs it with HubSpot.
Also handles major_condition_issue_photos file upload to S3 with integrity check.
"""
def soft_assert(condition, message="Assertion Failed"):
if not condition:
print(f"⚠️ Soft Assert Failed: {message}")
return False
return True
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
deal_in_db.deal_id
)
# Soft compare key fields
checks = [
soft_assert(
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
),
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
soft_assert(
deal_in_db.listing_id == hs_listing.get("listing_id"),
"listing_id mismatch",
),
soft_assert(
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
"landlord_property_id mismatch",
),
soft_assert(
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
),
soft_assert(
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
),
soft_assert(
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
),
soft_assert(
deal_in_db.project_code == hs_deal.get("project_code"),
"project_code mismatch",
),
soft_assert(
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
),
soft_assert(
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
"outcome_notes mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_description
== hs_deal.get("major_condition_issue_description"),
"major condition description mismatch",
),
soft_assert(
deal_in_db.major_condition_issue_photos
== hs_deal.get("major_condition_issue_photos"),
"major condition issue photos mismatch",
),
soft_assert(
deal_in_db.coordination_status
== hs_deal.get("coordination_status__stage_1_"),
"coordination stage 1 status mismatch",
),
soft_assert(
deal_in_db.coordination_comments == hs_deal.get("coordination_comments"),
"coordination_comments mismatch",
),
soft_assert(
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
"retrofit design mismatch",
),
soft_assert(
deal_in_db.pashub_link == hs_deal.get("pashub_link"),
"pashub_link mismatch",
),
soft_assert(
deal_in_db.sharepoint_link == hs_deal.get("sharepoint_link"),
"sharepoint_link mismatch",
),
soft_assert(
deal_in_db.dampmould_growth == hs_deal.get("dampmould_growth"),
"dampmould_growth mismatch",
),
soft_assert(
deal_in_db.damp_mould_and_repairs_comments
== hs_deal.get("damp_mould_and_repairs_comments"),
"damp_mould_and_repairs_comments mismatch",
),
soft_assert(
deal_in_db.pre_sap == hs_deal.get("pre_sap"),
"pre_sap mismatch",
),
soft_assert(
deal_in_db.coordinator == hs_deal.get("coordinator"),
"coordinator mismatch",
),
soft_assert(
deal_in_db.mtp_completion_date
== self._parse_hs_date(hs_deal.get("mtp_completion_date")),
"mtp_completion_date mismatch",
),
soft_assert(
deal_in_db.mtp_re_model_completion_date
== self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")),
"mtp_re_model_completion_date mismatch",
),
soft_assert(
deal_in_db.ioe_v3_completion_date
== self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")),
"ioe_v3_completion_date mismatch",
),
soft_assert(
deal_in_db.proposed_measures == hs_deal.get("proposed_measures"),
"proposed_measures mismatch",
),
soft_assert(
deal_in_db.approved_package == hs_deal.get("approved_package"),
"approved_package mismatch",
),
soft_assert(
deal_in_db.designer == hs_deal.get("designer"),
"designer mismatch",
),
soft_assert(
deal_in_db.design_completion_date
== self._parse_hs_date(hs_deal.get("design_completion_date")),
"design_completion_date mismatch",
),
soft_assert(
deal_in_db.actual_measures_installed
== hs_deal.get("actual_measures_installed"),
"actual_measures_installed mismatch",
),
soft_assert(
deal_in_db.installer == hs_deal.get("installer"),
"installer mismatch",
),
soft_assert(
deal_in_db.installer_handover == hs_deal.get("installer_handover"),
"installer_handover mismatch",
),
soft_assert(
deal_in_db.lodgement_status == hs_deal.get("lodgement_status"),
"lodgement_status mismatch",
),
soft_assert(
deal_in_db.measures_lodgement_date
== self._parse_hs_date(hs_deal.get("measures_lodgement_date")),
"measures_lodgement_date mismatch",
),
soft_assert(
deal_in_db.lodgement_date
== self._parse_hs_date(hs_deal.get("lodgement_date")),
"lodgement_date mismatch",
),
soft_assert(
deal_in_db.expected_commencement_date
== self._parse_hs_date(hs_deal.get("expected_commencement_date")),
"expected_commencement_date mismatch",
),
soft_assert(
deal_in_db.surveyor == hs_deal.get("surveyor"),
"surveyor mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_date
== self._parse_hs_date(hs_deal.get("confirmed_survey_date")),
"confirmed_survey_date mismatch",
),
soft_assert(
deal_in_db.confirmed_survey_time
== hs_deal.get("confirmed_survey_time"),
"confirmed_survey_time mismatch",
),
soft_assert(
deal_in_db.surveyed_date
== self._parse_hs_date(hs_deal.get("surveyed_date")),
"surveyed_date mismatch",
),
soft_assert(
deal_in_db.design_type == hs_deal.get("design_type"),
"design_type mismatch",
),
]
# If discrepancies found, update from HubSpot
if not all(checks):
print(
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
)
self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
return False
# Handle photo upload if it exists but S3 URL is missing
if (
deal_in_db.major_condition_issue_photos
and not deal_in_db.major_condition_issue_evidence_s3_url
):
print(
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
)
photo_url = hs_deal.get("major_condition_issue_photos")
if photo_url:
try:
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
local_file = hubspot_client.download_file_from_url(photo_url)
# Upload to S3
bucket = "retrofit-data-dev"
s3_url = self.s3.upload_file(
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
)
# Download again to verify integrity
downloaded = self.s3.download_from_url(s3_url)
if self._sha256(local_file) == self._sha256(downloaded):
print("✅ SHA256 match verified — upload successful.")
else:
print("❌ SHA256 mismatch — integrity check failed.")
raise ValueError("File integrity check failed after S3 upload.")
# Update DB record with S3 URL
with db_read_session() as session:
db_record = session.get(HubspotDealData, deal_in_db.id)
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
)
return False
except Exception as e:
print(
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
)
# Continue without the file — don't crash the entire update
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
else:
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
return True
def upsert_deal(self, deal_data, company, listing, hubspot_client):
def upsert_deal(
self,
deal_data: Dict[str, str],
company: Optional[str],
listing: Optional[dict[str, str]],
hubspot_client: HubspotClient,
):
"""
Inserts or updates a deal record.
Also uploads photos if present and adds S3 URL.
@ -364,111 +101,10 @@ class HubspotDataToDb:
existing = session.exec(statement).first()
if existing:
self._handle_existing_photo_upload(existing, hubspot_client)
print(f"🔄 Updating existing deal (deal_id={deal_id})")
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"listing_id": listing.get("listing_id", None) if listing else None,
"landlord_property_id": (
listing.get("owner_property_id", None) if listing else None
),
"uprn": listing.get("national_uprn", None) if listing else None,
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get(
"coordination_status__stage_1_"
),
"coordination_comments": deal_data.get("coordination_comments"),
"design_status": deal_data.get("retrofit_design_status"),
"pashub_link": deal_data.get("pashub_link"),
"sharepoint_link": deal_data.get("sharepoint_link"),
"dampmould_growth": deal_data.get("dampmould_growth"),
"damp_mould_and_repairs_comments": deal_data.get(
"damp_mould_and_repairs_comments"
),
"pre_sap": deal_data.get("pre_sap"),
"coordinator": deal_data.get("coordinator"),
"mtp_completion_date": self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
"mtp_re_model_completion_date": self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
"ioe_v3_completion_date": self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
"proposed_measures": deal_data.get("proposed_measures"),
"approved_package": deal_data.get("approved_package"),
"designer": deal_data.get("designer"),
"design_completion_date": self._parse_hs_date(
deal_data.get("design_completion_date")
),
"actual_measures_installed": deal_data.get(
"actual_measures_installed"
),
"installer": deal_data.get("installer"),
"installer_handover": deal_data.get("installer_handover"),
"lodgement_status": deal_data.get("lodgement_status"),
"measures_lodgement_date": self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
"lodgement_date": self._parse_hs_date(
deal_data.get("lodgement_date")
),
"expected_commencement_date": self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
"surveyor": deal_data.get("surveyor"),
"confirmed_survey_date": self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
"confirmed_survey_time": deal_data.get("confirmed_survey_time"),
"surveyed_date": self._parse_hs_date(
deal_data.get("surveyed_date")
),
"design_type": deal_data.get("design_type"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
# Upload if photo exists but S3 link missing
if (
existing.major_condition_issue_photos
and not existing.major_condition_issue_evidence_s3_url
):
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id)
photo_url = fresh_deal.get("major_condition_issue_photos")
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(
photo_url
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
)
# Continue without the file — don't crash the update
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
self._update_existing_deal(existing, deal_data, listing, company)
session.add(existing)
session.commit()
@ -477,93 +113,213 @@ class HubspotDataToDb:
else:
print(f"🆕 Inserting new deal (deal_id={deal_id})")
new_record = HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
listing_id=listing.get("listing_id", None) if listing else None,
landlord_property_id=listing.get("owner_property_id") if listing else None,
uprn=listing.get("national_uprn") if listing else None,
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get(
"major_condition_issue_photos"
),
coordination_status=deal_data.get("coordination_status__stage_1_"),
coordination_comments=deal_data.get("coordination_comments"),
design_status=deal_data.get("retrofit_design_status"),
pashub_link=deal_data.get("pashub_link"),
sharepoint_link=deal_data.get("sharepoint_link"),
dampmould_growth=deal_data.get("dampmould_growth"),
damp_mould_and_repairs_comments=deal_data.get(
"damp_mould_and_repairs_comments"
),
pre_sap=deal_data.get("pre_sap"),
coordinator=deal_data.get("coordinator"),
mtp_completion_date=self._parse_hs_date(
deal_data.get("mtp_completion_date")
),
mtp_re_model_completion_date=self._parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
ioe_v3_completion_date=self._parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
proposed_measures=deal_data.get("proposed_measures"),
approved_package=deal_data.get("approved_package"),
designer=deal_data.get("designer"),
design_completion_date=self._parse_hs_date(
deal_data.get("design_completion_date")
),
actual_measures_installed=deal_data.get(
"actual_measures_installed"
),
installer=deal_data.get("installer"),
installer_handover=deal_data.get("installer_handover"),
lodgement_status=deal_data.get("lodgement_status"),
measures_lodgement_date=self._parse_hs_date(
deal_data.get("measures_lodgement_date")
),
lodgement_date=self._parse_hs_date(deal_data.get("lodgement_date")),
expected_commencement_date=self._parse_hs_date(
deal_data.get("expected_commencement_date")
),
surveyor=deal_data.get("surveyor"),
confirmed_survey_date=self._parse_hs_date(
deal_data.get("confirmed_survey_date")
),
confirmed_survey_time=deal_data.get("confirmed_survey_time"),
surveyed_date=self._parse_hs_date(deal_data.get("surveyed_date")),
design_type=deal_data.get("design_type"),
new_record: HubspotDealData = self._build_new_deal(
deal_id, deal_data, listing, company
)
# Handle upload at insert time
if new_record.major_condition_issue_photos:
try:
local_file = hubspot_client.download_file_from_url(
new_record.major_condition_issue_photos
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
)
# Continue without the file — don't crash the insert
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
self._handle_new_photo_upload(new_record, hubspot_client)
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record
def _update_existing_deal(
self,
existing: HubspotDealData,
deal_data: Dict[str, str],
listing: Optional[dict[str, str]],
company: Optional[str],
):
for attr, value in {
"dealname": deal_data.get("dealname"),
"dealstage": deal_data.get("dealstage"),
"listing_id": listing.get("listing_id", None) if listing else None,
"landlord_property_id": (
listing.get("owner_property_id", None) if listing else None
),
"uprn": listing.get("national_uprn", None) if listing else None,
"outcome": deal_data.get("outcome"),
"outcome_notes": deal_data.get("outcome_notes"),
"project_code": deal_data.get("project_code"),
"company_id": company,
"major_condition_issue_description": deal_data.get(
"major_condition_issue_description"
),
"major_condition_issue_photos": deal_data.get(
"major_condition_issue_photos"
),
"coordination_status": deal_data.get("coordination_status__stage_1_"),
"design_status": deal_data.get("retrofit_design_status"),
"coordination_comments": deal_data.get("coordination_comments"),
"pashub_link": deal_data.get("pashub_link"),
"sharepoint_link": deal_data.get("sharepoint_link"),
"dampmould_growth": deal_data.get("dampmould_growth"),
"damp_mould_and_repairs_comments": deal_data.get(
"damp_mould_and_repairs_comments"
),
"pre_sap": deal_data.get("pre_sap"),
"coordinator": deal_data.get("coordinator"),
"mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")),
"mtp_re_model_completion_date": parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
"ioe_v3_completion_date": parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
"proposed_measures": deal_data.get("proposed_measures"),
"approved_package": deal_data.get("approved_package"),
"designer": deal_data.get("designer"),
"design_completion_date": parse_hs_date(
deal_data.get("design_completion_date")
),
"actual_measures_installed": deal_data.get("actual_measures_installed"),
"installer": deal_data.get("installer"),
"installer_handover": deal_data.get("installer_handover"),
"lodgement_status": deal_data.get("lodgement_status"),
"measures_lodgement_date": parse_hs_date(
deal_data.get("measures_lodgement_date")
),
"lodgement_date": parse_hs_date(deal_data.get("lodgement_date")),
"expected_commencement_date": parse_hs_date(
deal_data.get("expected_commencement_date")
),
"surveyor": deal_data.get("surveyor"),
"confirmed_survey_date": parse_hs_date(
deal_data.get("confirmed_survey_date")
),
"confirmed_survey_time": deal_data.get("confirmed_survey_time"),
"surveyed_date": parse_hs_date(deal_data.get("surveyed_date")),
"design_type": deal_data.get("design_type"),
}.items():
setattr(existing, attr, value or getattr(existing, attr))
def _build_new_deal(
self,
deal_id: str,
deal_data: Dict[str, str],
listing: Optional[dict[str, str]],
company: Optional[str],
) -> HubspotDealData:
return HubspotDealData(
deal_id=deal_id,
dealname=deal_data.get("dealname"),
dealstage=deal_data.get("dealstage"),
listing_id=listing.get("listing_id") if listing else None,
landlord_property_id=(
listing.get("owner_property_id") if listing else None
),
uprn=listing.get("national_uprn") if listing else None,
outcome=deal_data.get("outcome"),
outcome_notes=deal_data.get("outcome_notes"),
project_code=deal_data.get("project_code"),
company_id=company,
major_condition_issue_description=deal_data.get(
"major_condition_issue_description"
),
major_condition_issue_photos=deal_data.get("major_condition_issue_photos"),
coordination_status=deal_data.get("coordination_status__stage_1_"),
design_status=deal_data.get("retrofit_design_status"),
coordination_comments=deal_data.get("coordination_comments"),
pashub_link=deal_data.get("pashub_link"),
sharepoint_link=deal_data.get("sharepoint_link"),
dampmould_growth=deal_data.get("dampmould_growth"),
damp_mould_and_repairs_comments=deal_data.get(
"damp_mould_and_repairs_comments"
),
pre_sap=deal_data.get("pre_sap"),
coordinator=deal_data.get("coordinator"),
mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")),
mtp_re_model_completion_date=parse_hs_date(
deal_data.get("mtp_re_model_completion_date")
),
ioe_v3_completion_date=parse_hs_date(
deal_data.get("ioe_v3_completion_date")
),
proposed_measures=deal_data.get("proposed_measures"),
approved_package=deal_data.get("approved_package"),
designer=deal_data.get("designer"),
design_completion_date=parse_hs_date(
deal_data.get("design_completion_date")
),
actual_measures_installed=deal_data.get("actual_measures_installed"),
installer=deal_data.get("installer"),
installer_handover=deal_data.get("installer_handover"),
lodgement_status=deal_data.get("lodgement_status"),
measures_lodgement_date=parse_hs_date(
deal_data.get("measures_lodgement_date")
),
lodgement_date=parse_hs_date(deal_data.get("lodgement_date")),
expected_commencement_date=parse_hs_date(
deal_data.get("expected_commencement_date")
),
surveyor=deal_data.get("surveyor"),
confirmed_survey_date=parse_hs_date(deal_data.get("confirmed_survey_date")),
confirmed_survey_time=deal_data.get("confirmed_survey_time"),
surveyed_date=parse_hs_date(deal_data.get("surveyed_date")),
design_type=deal_data.get("design_type"),
)
def _handle_existing_photo_upload(
self,
existing_deal: HubspotDealData,
hubspot_client: HubspotClient,
):
# if self._needs_photo_upload(existing):
fresh_deal = hubspot_client.from_deal_id_get_info(existing_deal.deal_id)
fresh_photo_url = fresh_deal.get("major_condition_issue_photos")
if not fresh_photo_url:
print(f"⚠️ Photo URL missing for deal_id {existing_deal.deal_id}")
return
if fresh_photo_url != existing_deal.major_condition_issue_photos:
logger.info(
f"Hubspot image URL changed from {existing_deal.major_condition_issue_photos} to {fresh_photo_url}"
)
self._upload_photo_to_s3(existing_deal, fresh_photo_url, hubspot_client)
else:
logger.info(f"Hubspot image URL unchanged: {fresh_photo_url}")
def _handle_new_photo_upload(
self,
record: HubspotDealData,
hubspot_client: HubspotClient,
):
if record.major_condition_issue_photos:
self._upload_photo_to_s3(
record,
record.major_condition_issue_photos,
hubspot_client,
)
def _upload_photo_to_s3(
self,
record: HubspotDealData,
hubspot_photo_url: str,
hubspot_client: HubspotClient,
):
try:
local_file = hubspot_client.download_file_from_url(hubspot_photo_url)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
prefix="hubspot/awaabs_law_evidence/",
)
record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to upload photo for deal_id {record.deal_id}: {e}")
finally:
if "local_file" in locals() and os.path.exists(local_file):
os.remove(local_file)
def _needs_photo_upload(self, old_deal: HubspotDealData) -> bool:
return bool(
old_deal.major_condition_issue_photos
and not old_deal.major_condition_issue_evidence_s3_url
)

View file

@ -0,0 +1,177 @@
from typing import Dict, List, Optional
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.utils import parse_hs_date
class HubspotDealDiffer:
COORDINATION_COMPLETE: List[str] = [
"v1 ioe/mtp complete",
"v2 ioe/mtp complete",
"v3 ioe/mtp complete",
]
RETROFIT_DESIGN_COMPLETE = "uploaded"
LODGEMENT_COMPLETE: List[str] = ["lodgement complete", "measures lodged"]
@staticmethod
def check_for_db_update_trigger(
new_deal: Dict[str, str],
new_company: Optional[str],
new_listing: Optional[Dict[str, str]],
old_deal: HubspotDealData,
) -> bool:
"""
Returns True if ANY difference exists between HubSpot data and DB.
Returns False if everything matches (i.e. no update needed).
"""
# --- Deal ID ---
if str(old_deal.deal_id) != str(new_deal.get("hs_object_id")):
return True
# --- Company ---
if new_company is not None:
if old_deal.company_id != new_company:
return True
# --- Listing ---
hs_listing = new_listing or {}
if old_deal.listing_id != hs_listing.get("listing_id"):
return True
if old_deal.landlord_property_id != hs_listing.get("owner_property_id"):
return True
if old_deal.uprn != hs_listing.get("national_uprn"):
return True
# --- Field mappings ---
FIELD_MAP = {
"outcome": "outcome",
"dealstage": "dealstage",
"dealname": "dealname",
"project_code": "project_code",
"outcome_notes": "outcome_notes",
"major_condition_issue_description": "major_condition_issue_description",
"major_condition_issue_photos": "major_condition_issue_photos",
"coordination_status__stage_1_": "coordination_status",
"coordination_comments": "coordination_comments",
"retrofit_design_status": "design_status",
"pashub_link": "pashub_link",
"sharepoint_link": "sharepoint_link",
"dampmould_growth": "dampmould_growth",
"damp_mould_and_repairs_comments": "damp_mould_and_repairs_comments",
"pre_sap": "pre_sap",
"coordinator": "coordinator",
"proposed_measures": "proposed_measures",
"approved_package": "approved_package",
"designer": "designer",
"actual_measures_installed": "actual_measures_installed",
"installer": "installer",
"installer_handover": "installer_handover",
"lodgement_status": "lodgement_status",
"design_type": "design_type",
"surveyor": "surveyor",
"confirmed_survey_time": "confirmed_survey_time",
}
for hs_field, db_field in FIELD_MAP.items():
old_value = getattr(old_deal, db_field)
new_value = new_deal.get(hs_field)
if old_value != new_value:
return True
# --- Date fields ---
date_fields = [
("mtp_completion_date", "mtp_completion_date"),
("mtp_re_model_completion_date", "mtp_re_model_completion_date"),
("ioe_v3_completion_date", "ioe_v3_completion_date"),
("design_completion_date", "design_completion_date"),
("measures_lodgement_date", "measures_lodgement_date"),
("lodgement_date", "lodgement_date"),
("expected_commencement_date", "expected_commencement_date"),
("confirmed_survey_date", "confirmed_survey_date"),
("surveyed_date", "surveyed_date"),
]
for hs_field, db_field in date_fields:
old_value = getattr(old_deal, db_field)
new_value = parse_hs_date(new_deal.get(hs_field))
if old_value != new_value:
return True
# --- Time field ---
if old_deal.confirmed_survey_time != new_deal.get("confirmed_survey_time"):
return True
# No differences found
return False
@staticmethod
def check_for_pashub_trigger(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_pashub_link: str = new_deal.get("pashub_link", "")
if not HubspotDealDiffer._has_valid_pashub_link(new_pashub_link):
return False
if HubspotDealDiffer._new_or_updated_pashub_link(new_pashub_link, old_deal):
return True
if HubspotDealDiffer._coordination_completed(new_deal, old_deal):
return True
if HubspotDealDiffer._design_completed(new_deal, old_deal):
return True
if HubspotDealDiffer._lodgement_completed(new_deal, old_deal):
return True
return False
@staticmethod
def _has_valid_pashub_link(new_pashub_link: str) -> bool:
return bool(new_pashub_link)
@staticmethod
def _new_or_updated_pashub_link(
new_pashub_link: str, old_deal: HubspotDealData
) -> bool:
if not old_deal.pashub_link:
return True
return old_deal.pashub_link != new_pashub_link
@staticmethod
def _coordination_completed(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_status: str = new_deal.get("coordination_status", "")
return (
new_status != ""
and new_status in HubspotDealDiffer.COORDINATION_COMPLETE
and new_status != old_deal.coordination_status
)
@staticmethod
def _design_completed(new_deal: Dict[str, str], old_deal: HubspotDealData) -> bool:
new_status: str = new_deal.get("design_status", "")
return (
new_status != ""
and new_status == HubspotDealDiffer.RETROFIT_DESIGN_COMPLETE
and new_status != old_deal.design_status
)
@staticmethod
def _lodgement_completed(
new_deal: Dict[str, str], old_deal: HubspotDealData
) -> bool:
new_status: str = new_deal.get("lodgement_status", "")
return (
new_status != ""
and new_status in HubspotDealDiffer.LODGEMENT_COMPLETE
and new_status != old_deal.lodgement_status
)

View file

@ -0,0 +1,5 @@
from pydantic import BaseModel
class HubspotTriggerOrchestratorTriggerRequest(BaseModel):
hubspot_deal_id: str

View file

@ -22,7 +22,7 @@ companies_to_add_or_ensure_it_exists = [
for company in companies_to_add_or_ensure_it_exists:
company_info: CompanyData = hubspot.get_company_information(company.value)
dbRead.upsert_company(company_info)
dbRead.upsert_organisation(company_info)
dbRead = HubspotDataToDb()

View file

@ -1,24 +1,113 @@
import json
import boto3
from typing import Any, Dict, Optional
from backend.app.config import get_settings
from etl.hubspot.hubspotClient import HubspotClient
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer
from etl.hubspot.hubspot_trigger_orchestrator_trigger_request import (
HubspotTriggerOrchestratorTriggerRequest,
)
from backend.utils.subtasks import task_handler
from typing import Any
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from utils.logger import setup_logger
logger = setup_logger()
@task_handler()
def handler(body: dict[str, Any], context: Any) -> None:
hubspot_deal_id = body.get("hubspot_deal_id", "")
db_client = HubspotDataToDb()
hubspot_client = HubspotClient()
if hubspot_deal_id == "":
raise RuntimeError(
"Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'"
)
hubspot_deal_id = "327170793707"
sqs_client = boto3.client("sqs")
PASHUB_TRIGGER_QUEUE_URL = get_settings().PASHUB_TO_ARA_SQS_URL
hubspot: HubspotClient = HubspotClient()
dbloader: HubspotDataToDb = HubspotDataToDb()
deal = dbloader.find_deal_with_deal_id(hubspot_deal_id)
if deal:
dbloader.update_deal_with_checks(deal, hubspot)
payload = HubspotTriggerOrchestratorTriggerRequest.model_validate(body)
hubspot_deal_id: str = payload.hubspot_deal_id
db_deal: Optional[HubspotDealData] = db_client.find_deal_with_deal_id(
hubspot_deal_id
)
hubspot_deal: Dict[str, str]
company: Optional[str]
listing: Optional[dict[str, str]]
hubspot_deal, company, listing = hubspot_client.get_deal_and_company_and_listing(
hubspot_deal_id
)
deal_changed = False
if not db_deal:
# New hubspot deal, no diffing to do
logger.info(f"New HubSpot deal of ID {hubspot_deal_id}. Loading to database...")
if company:
company_data: CompanyData = hubspot_client.get_company_information(company)
db_client: HubspotDataToDb = HubspotDataToDb()
db_client.upsert_organisation(company_data)
db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client)
else:
deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id)
dbloader.upsert_deal(deal, company, listing, hubspot)
# Deal already in db, check whether anything has changed
logger.info(
f"HubSpot deal {hubspot_deal_id} already in database. Checking for changes..."
)
if HubspotDealDiffer.check_for_db_update_trigger(
new_deal=hubspot_deal,
new_company=company,
new_listing=listing,
old_deal=db_deal,
):
logger.info(
f"Deal {hubspot_deal_id} has been changed, updating database..."
)
db_client.upsert_deal(
deal_data=hubspot_deal,
company=company,
listing=listing,
hubspot_client=hubspot_client,
)
deal_changed = True
if not deal_changed:
logger.info(f"No changes to deal {hubspot_deal_id}")
return
# ==============================
# Orchestration of other lambdas
# ==============================
if HubspotDealDiffer.check_for_pashub_trigger(
new_deal=hubspot_deal, old_deal=db_deal
):
logger.info(
f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
message_body: Dict[str, Optional[str]] = {
"pashub_link": hubspot_deal["pashub_link"],
"address": None, # potentially available from Listing, leave as None for now
"sharepoint_link": hubspot_deal["sharepoint_link"],
"uprn": hubspot_deal["national_uprn"],
"landlord_property_id": hubspot_deal["owner_property_id"],
"deal_stage": hubspot_deal["deal_stage"],
}
response = sqs_client.send_message(
QueueUrl=PASHUB_TRIGGER_QUEUE_URL, MessageBody=json.dumps(message_body)
)
logger.info(
f"Sent message to Pashub To Ara queue. MessageId: {response['MessageId']}"
)
else:
logger.info(
f"Not Triggering PasHub file fetcher for HubSpot deal ID {hubspot_deal_id}"
)
print("done")
if __name__ == "__main__":
handler({"hubspot_deal_id": "371470706915"}, "")
print("beep")

View file

@ -71,7 +71,7 @@ class TestHubspotClientIntegration:
def test_get_deal_info_for_db(self, client: HubspotClient):
deal_id: str = "263490768079"
deal, company, listing = client.get_deal_info_for_db(deal_id)
deal, company, listing = client.get_deal_and_company_and_listing(deal_id)
assert "dealname" in deal
assert "dealstage" in deal

View file

@ -0,0 +1,401 @@
from datetime import datetime, timezone
from typing import Any, Dict
import uuid
import pytest
from backend.app.db.models.hubspot_deal_data import HubspotDealData
from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer
BASE_TIME = datetime(2025, 12, 1, 12, 0, 0)
def make_old_deal(**overrides: Any) -> HubspotDealData:
return HubspotDealData(
id=overrides.get("id", uuid.uuid4()),
deal_id="1",
created_at=BASE_TIME,
updated_at=BASE_TIME,
**{k: v for k, v in overrides.items() if k != "id"},
)
def make_new_deal(deal_id: uuid.UUID, **overrides: Any) -> Dict[str, str]:
return {
"id": str(deal_id),
"deal_id": "1",
"created_at": BASE_TIME.isoformat(),
"updated_at": datetime(2025, 12, 1, 12, 30, 0).isoformat(),
**overrides,
}
# ====================
# PASHUB TRIGGER TESTS
# ====================
@pytest.mark.parametrize(
"new_overrides,expected",
[
({"outcome_notes": "test note"}, False),
],
)
def test_pashub_trigger__outcome_note_added__returns_false(
new_overrides: Dict[str, str],
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(deal_id, **new_overrides)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
@pytest.mark.parametrize(
"old_overrides,new_overrides,expected",
[
(
{"pashub_link": "www.google.co.uk"},
{"pashub_link": "www.bbc.co.uk"},
True,
),
],
)
def test_pashub_trigger__pashub_link_changed__returns_true(
old_overrides: Dict[str, str],
new_overrides: Dict[str, str],
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id, **old_overrides)
new_deal = make_new_deal(deal_id, **new_overrides)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
@pytest.mark.parametrize(
"coordination_status,expected",
[
("v1 ioe/mtp complete", True),
("v2 ioe/mtp complete", True),
],
)
def test_pashub_trigger__coordination_completed_and_pashub_link_set__returns_true(
coordination_status: str,
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
coordination_status="random",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
coordination_status=coordination_status,
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
def test_pashub_trigger__coordination_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
coordination_status="random",
)
new_deal = make_new_deal(
deal_id,
coordination_status="v2 ioe/mtp complete",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
def test_pashub_trigger__design_completed_and_pashub_link_set__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
design_status="uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is True
)
def test_pashub_trigger__design_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(
deal_id,
design_status="uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
@pytest.mark.parametrize(
"lodgement_status,expected",
[
("lodgement complete", True),
("measures lodged", True),
],
)
def test_pashub_trigger__lodgement_completed_and_pashub_link_set__returns_true(
lodgement_status: str,
expected: bool,
) -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
lodgement_status=lodgement_status,
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
== expected
)
def test_pashub_trigger__lodgement_completed_and_pashub_link_not_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(id=deal_id)
new_deal = make_new_deal(
deal_id,
design_status="lodgement complete",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
def test_pashub_trigger__coordination_design_lodgement_not_completed_and_pashub_link_set__returns_false() -> (
None
):
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
pashub_link="www.google.co.uk",
)
new_deal = make_new_deal(
deal_id,
pashub_link="www.google.co.uk",
coordination_status="not uploaded",
design_status="not uploaded",
lodgement_status="not uploaded",
)
assert (
HubspotDealDiffer.check_for_pashub_trigger(
new_deal=new_deal,
old_deal=old_deal,
)
is False
)
# =======================
# DB UPDATE TRIGGER TESTS
# =======================
def test_db_update_trigger__no_changes__returns_false() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
dealname="Test Deal",
dealstage="stage_1",
outcome="won",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
dealname="Test Deal",
dealstage="stage_1",
outcome="won",
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is False
def test_db_update_trigger__dealname_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
dealname="Old Name",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
dealname="New Name",
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is True
def test_db_update_trigger__company_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
company_id="old_company",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
)
new_company = "new_company"
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=new_company,
new_listing=None,
old_deal=old_deal,
)
assert result is True
def test_db_update_trigger__missing_hubspot_timezone__returns_false() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
design_completion_date=datetime(2025, 11, 3, 0, 0, tzinfo=timezone.utc),
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
design_completion_date=datetime(2025, 11, 3, 0, 0).isoformat(),
)
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=None,
old_deal=old_deal,
)
assert result is False
def test_db_update_trigger__listing_changed__returns_true() -> None:
deal_id = uuid.uuid4()
old_deal = make_old_deal(
id=deal_id,
listing_id="abc",
)
new_deal = make_new_deal(
deal_id,
hs_object_id="1",
)
new_listing = {"listing_id": "xyz"}
result = HubspotDealDiffer.check_for_db_update_trigger(
new_deal=new_deal,
new_company=None,
new_listing=new_listing,
old_deal=old_deal,
)
assert result is True

16
etl/hubspot/utils.py Normal file
View file

@ -0,0 +1,16 @@
from datetime import datetime, timezone
from typing import Optional
def parse_hs_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except ValueError:
return None

View file

@ -0,0 +1,27 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
module "lambda" {
source = "../../modules/lambda_with_sqs"
name = "ecmk_to_ara" #"address2uprn" for example
stage = var.stage
image_uri = local.image_uri
# Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000)
maximum_concurrency = var.maximum_concurrency
batch_size = var.batch_size
environment = {
STAGE = var.stage
LOG_LEVEL = "info"
}
}

View file

@ -0,0 +1,16 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
backend "s3" {
bucket = "ecmk-to-ara-terraform-state"
key = "terraform.tfstate"
region = "eu-west-2"
}
required_version = ">= 1.2.0"
}

View file

@ -0,0 +1,37 @@
variable "lambda_name" {
type = string
description = "Logical name of the lambda (e.g. address2uprn)"
}
variable "stage" {
description = "Deployment stage (e.g. dev, prod)"
type = string
}
variable "ecr_repo_url" {
type = string
description = "ECR repository URL (no tag, no digest)"
}
variable "image_digest" {
type = string
description = "Image digest (sha256:...)"
}
variable "maximum_concurrency" {
type = number
default = 2
description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit."
}
variable "batch_size" {
type = number
default = 1
}
locals {
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
}
output "resolved_image_uri" {
value = local.image_uri
}

View file

@ -7,6 +7,14 @@ data "terraform_remote_state" "shared" {
}
}
data "terraform_remote_state" "pashub_to_ara" {
backend = "s3"
config = {
bucket = "pashub-to-ara-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
@ -39,6 +47,8 @@ module "hubspot_deal_etl" {
DB_NAME = var.db_name
DB_PORT = var.db_port
HUBSPOT_API_KEY = var.hubspot_api_key
PASHUB_TO_ARA_SQS_URL = data.terraform_remote_state.pashub_to_ara.outputs.pashub_to_ara_queue_url
}
}

View file

@ -0,0 +1,4 @@
output "pashub_to_ara_queue_url" {
value = module.lambda.queue_url
description = "URL of the PasHub to Ara SQS queue"
}

View file

@ -538,6 +538,20 @@ module "pashub_to_ara_registry" {
stage = var.stage
}
################################################
# ECMK to Ara Lambda
################################################
module "ecmk_to_ara_state_bucket" {
source = "../modules/tf_state_bucket"
bucket_name = "ecmk-to-ara-terraform-state"
}
module "ecmk_to_ara_registry" {
source = "../modules/container_registry"
name = "ecmk_to_ara"
stage = var.stage
}
################################################
# Engine Lambda ECR
################################################

View file

@ -3,6 +3,6 @@ pythonpath = .
log_cli = true
log_cli_level = INFO
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests
markers =
integration: mark a test as an integration test