diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index fccc6da4..22f16fee 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -505,7 +505,7 @@ jobs: # Deploy Hubspot ETL Lambda # ============================================================ hubspot_etl_lambda: - needs: [hubspot_etl_image, determine_stage] + needs: [hubspot_etl_image, determine_stage, pashub_to_ara_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: hubspot-etl-to-ara diff --git a/backend/app/config.py b/backend/app/config.py index 80a2d46a..9532ddd6 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -38,6 +38,7 @@ class Settings(BaseSettings): PLAN_TRIGGER_BUCKET: str = "changeme" ENGINE_SQS_URL: str = "changeme" CATEGORISATION_SQS_URL: str = "changeme" + PASHUB_TO_ARA_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/app/db/functions/uploaded_files_functions.py b/backend/app/db/functions/uploaded_files_functions.py new file mode 100644 index 00000000..3708813a --- /dev/null +++ b/backend/app/db/functions/uploaded_files_functions.py @@ -0,0 +1,25 @@ +from typing import Optional + +from sqlalchemy import select + +from backend.app.db.connection import db_read_session +from backend.app.db.models.uploaded_file import ( + FileSourceEnum, + FileTypeEnum, + UploadedFile, +) + + +def get_uploaded_file_by_listing_type_and_source( + hubspot_listing_id: int, + file_type: FileTypeEnum, + file_source: FileSourceEnum, +) -> Optional[UploadedFile]: + with db_read_session() as session: + statement = select(UploadedFile).where( + UploadedFile.hubspot_listing_id == hubspot_listing_id, + UploadedFile.file_type == file_type, + UploadedFile.file_source == file_source, + ) + + return session.exec(statement).one_or_none() diff --git a/backend/app/db/models/hubspot_deal_data.py b/backend/app/db/models/hubspot_deal_data.py new file mode 100644 index 00000000..758f688d --- /dev/null +++ b/backend/app/db/models/hubspot_deal_data.py @@ -0,0 +1,79 @@ +import uuid +from sqlmodel import SQLModel, Field, Column, text +from datetime import datetime +from typing import Optional +from sqlalchemy import DateTime +from sqlalchemy.sql import func + + +class HubspotDealData(SQLModel, table=True): + __tablename__ = "hubspot_deal_data" + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + + # HubSpot Deal identifiers + deal_id: str = Field(index=True, nullable=False) + dealname: Optional[str] = Field(default=None) + dealstage: Optional[str] = Field(default=None) + company_id: Optional[str] = Field(default=None) + project_code: Optional[str] = Field(default=None) + + # HubSpot custom properties + landlord_property_id: Optional[str] = Field(default=None) + uprn: Optional[str] = Field(default=None) + outcome: Optional[str] = Field(default=None) + outcome_notes: Optional[str] = Field(default=None) + + major_condition_issue_description: Optional[str] = Field(default=None) + major_condition_issue_photos: Optional[str] = Field(default=None) + major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None) + + coordination_status: Optional[str] = Field(default=None) + coordination_comments: Optional[str] = Field(default=None) + design_status: Optional[str] = Field(default=None) + + listing_id: Optional[str] = Field(default=None) + pashub_link: Optional[str] = Field(default=None) + sharepoint_link: Optional[str] = Field(default=None) + dampmould_growth: Optional[str] = Field(default=None) + damp_mould_and_repairs_comments: Optional[str] = Field(default=None) + pre_sap: Optional[str] = Field(default=None) + coordinator: Optional[str] = Field(default=None) + mtp_completion_date: Optional[datetime] = Field(default=None) + mtp_re_model_completion_date: Optional[datetime] = Field(default=None) + ioe_v3_completion_date: Optional[datetime] = Field(default=None) + proposed_measures: Optional[str] = Field(default=None) + approved_package: Optional[str] = Field(default=None) + designer: Optional[str] = Field(default=None) + design_completion_date: Optional[datetime] = Field(default=None) + actual_measures_installed: Optional[str] = Field(default=None) + installer: Optional[str] = Field(default=None) + installer_handover: Optional[str] = Field(default=None) + lodgement_status: Optional[str] = Field(default=None) + measures_lodgement_date: Optional[datetime] = Field(default=None) + lodgement_date: Optional[datetime] = Field(default=None) + expected_commencement_date: Optional[datetime] = Field(default=None) + surveyor: Optional[str] = Field(default=None) + confirmed_survey_date: Optional[datetime] = Field(default=None) + confirmed_survey_time: Optional[str] = Field(default=None) + surveyed_date: Optional[datetime] = Field(default=None) + design_type: Optional[str] = Field(default=None) + + created_at: Optional[datetime] = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("(NOW() AT TIME ZONE 'utc')"), + nullable=False, + ), + default=func.now(), + ) # Nullable in db but optional here as value is set on db save for new record + + updated_at: Optional[datetime] = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("(NOW() AT TIME ZONE 'utc')"), + onupdate=func.now(), + nullable=False, + ), + default=func.now(), + ) # Nullable in db but optional here as value is set on db save for new record diff --git a/backend/app/db/models/organisation.py b/backend/app/db/models/organisation.py index 784cc4ad..8afc5d63 100644 --- a/backend/app/db/models/organisation.py +++ b/backend/app/db/models/organisation.py @@ -1,9 +1,7 @@ -from sqlmodel import SQLModel, Field, Column, text +import uuid +from sqlmodel import SQLModel, Field from datetime import datetime, timezone from typing import Optional -from sqlalchemy import DateTime -from sqlalchemy.sql import func -import uuid class Organisation(SQLModel, table=True): @@ -13,74 +11,3 @@ class Organisation(SQLModel, table=True): updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) hubspot_company_id: Optional[str] = None name: Optional[str] = None - - -class HubspotDealData(SQLModel, table=True): - __tablename__ = "hubspot_deal_data" - - id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) - - # HubSpot Deal identifiers - deal_id: str = Field(index=True, nullable=False) - dealname: Optional[str] = Field(default=None) - dealstage: Optional[str] = Field(default=None) - company_id: Optional[str] = Field(default=None) - project_code: Optional[str] = Field(default=None) - - # HubSpot custom properties - landlord_property_id: Optional[str] = Field(default=None) - uprn: Optional[str] = Field(default=None) - outcome: Optional[str] = Field(default=None) - outcome_notes: Optional[str] = Field(default=None) - - major_condition_issue_description: Optional[str] = Field(default=None) - major_condition_issue_photos: Optional[str] = Field(default=None) - major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None) - - coordination_status: Optional[str] = Field(default=None) - coordination_comments: Optional[str] = Field(default=None) - design_status: Optional[str] = Field(default=None) - - listing_id: Optional[str] = Field(default=None) - pashub_link: Optional[str] = Field(default=None) - sharepoint_link: Optional[str] = Field(default=None) - dampmould_growth: Optional[str] = Field(default=None) - damp_mould_and_repairs_comments: Optional[str] = Field(default=None) - pre_sap: Optional[str] = Field(default=None) - coordinator: Optional[str] = Field(default=None) - mtp_completion_date: Optional[datetime] = Field(default=None) - mtp_re_model_completion_date: Optional[datetime] = Field(default=None) - ioe_v3_completion_date: Optional[datetime] = Field(default=None) - proposed_measures: Optional[str] = Field(default=None) - approved_package: Optional[str] = Field(default=None) - designer: Optional[str] = Field(default=None) - design_completion_date: Optional[datetime] = Field(default=None) - actual_measures_installed: Optional[str] = Field(default=None) - installer: Optional[str] = Field(default=None) - installer_handover: Optional[str] = Field(default=None) - lodgement_status: Optional[str] = Field(default=None) - measures_lodgement_date: Optional[datetime] = Field(default=None) - lodgement_date: Optional[datetime] = Field(default=None) - expected_commencement_date: Optional[datetime] = Field(default=None) - surveyor: Optional[str] = Field(default=None) - confirmed_survey_date: Optional[datetime] = Field(default=None) - confirmed_survey_time: Optional[str] = Field(default=None) - surveyed_date: Optional[datetime] = Field(default=None) - design_type: Optional[str] = Field(default=None) - - created_at: datetime = Field( - sa_column=Column( - DateTime(timezone=True), - server_default=text("(NOW() AT TIME ZONE 'utc')"), - nullable=False, - ) - ) - - updated_at: datetime = Field( - sa_column=Column( - DateTime(timezone=True), - server_default=text("(NOW() AT TIME ZONE 'utc')"), - onupdate=func.now(), - nullable=False, - ) - ) diff --git a/backend/app/db/models/uploaded_file.py b/backend/app/db/models/uploaded_file.py index 9b751d34..71763790 100644 --- a/backend/app/db/models/uploaded_file.py +++ b/backend/app/db/models/uploaded_file.py @@ -14,6 +14,8 @@ class FileTypeEnum(enum.Enum): PAR_PHOTO_PACK = "par_photo_pack" PAS_2023_PROPERTY = "pas_2023_property" PAS_2023_OCCUPANCY = "pas_2023_occupancy" + ECMK_SITE_NOTE = "ecmk_site_note" + ECMK_RD_SAP_SITE_NOTE = "ecmk_rd_sap_site_note" class FileSourceEnum(enum.Enum): @@ -37,9 +39,21 @@ class UploadedFile(Base): hubspot_listing_id = Column(BigInteger, nullable=True) file_type = Column( - SqlEnum(FileTypeEnum, name="file_type", create_type=False), nullable=True + SqlEnum( + FileTypeEnum, + name="file_type", + create_type=False, + values_callable=lambda enum_cls: [e.value for e in enum_cls], + ), + nullable=True, ) file_source = Column( - SqlEnum(FileSourceEnum, name="file_source", create_type=False), nullable=True + SqlEnum( + FileSourceEnum, + name="file_source", + create_type=False, + values_callable=lambda enum_cls: [e.value for e in enum_cls], + ), + nullable=True, ) diff --git a/backend/ecmk_fetcher/address_list.py b/backend/ecmk_fetcher/address_list.py index d273c45d..ba636a70 100644 --- a/backend/ecmk_fetcher/address_list.py +++ b/backend/ecmk_fetcher/address_list.py @@ -1,40 +1,59 @@ -from typing import Dict, Optional -from openpyxl import load_workbook import re +from dataclasses import dataclass +from typing import Any, Dict, Optional +from openpyxl import Workbook, load_workbook +from openpyxl.worksheet.worksheet import Worksheet -def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]: - wb = load_workbook(filepath, data_only=True) - ws = wb["Southern RA-Lite Programme 3103"] +@dataclass +class PropertyRow: + row_index: int + address: str + listing_id: str - properties: Dict[str, str] = {} - header_row = 1 - id_col_index = None - deal_name_col_index = None +def extract_addresses_from_spreadsheet( + filepath: str, +) -> Dict[str, PropertyRow]: + wb: Workbook = load_workbook(filepath, data_only=True) + ws: Worksheet = wb["Southern RA-Lite Programme 3103"] + header_row: int = 1 + id_col: Optional[int] = None + deal_name_col: Optional[int] = None + listing_id_col: Optional[int] = None + + # find columns for col in range(1, ws.max_column + 1): - value = ws.cell(row=header_row, column=col).value + raw_value: Any = ws.cell(row=header_row, column=col).value + value: str = str(raw_value).strip().lower() if raw_value else "" - if value and str(value).strip().lower() == "id": - id_col_index = col + if value == "id": + id_col = col + elif value == "deal name": + deal_name_col = col + elif value == "associated listing ids": + listing_id_col = col - if value and str(value).strip().lower() == "deal name": - deal_name_col_index = col - break + if id_col is None or deal_name_col is None or listing_id_col is None: + raise Exception("Missing required columns") - if id_col_index is None or deal_name_col_index is None: - raise Exception("Required columns not found") + properties: Dict[str, PropertyRow] = {} for row in range(2, ws.max_row + 1): - id_val = ws.cell(row=row, column=id_col_index).value - deal_name = ws.cell(row=row, column=deal_name_col_index).value + id_val: Any = ws.cell(row=row, column=id_col).value + deal_name: Any = ws.cell(row=row, column=deal_name_col).value + listing_id: Any = ws.cell(row=row, column=listing_id_col).value - if not id_val or not deal_name: + if not id_val or not deal_name or not listing_id: continue - properties[str(id_val).strip()] = extract_succinct_address( - str(deal_name).strip() + property_id: str = str(id_val).strip() + + properties[property_id] = PropertyRow( + row_index=row, + address=extract_succinct_address(str(deal_name)), + listing_id=listing_id, ) return properties diff --git a/backend/ecmk_fetcher/browser.py b/backend/ecmk_fetcher/browser.py index 6d018537..de349b92 100644 --- a/backend/ecmk_fetcher/browser.py +++ b/backend/ecmk_fetcher/browser.py @@ -50,6 +50,7 @@ def get_first_row_signature(page: Page) -> str: def go_to_next_page(page: Page) -> bool: + logger.info("Going to next page") before = get_first_row_signature(page) page.locator("#assessmentDatatable_next a").click() diff --git a/backend/ecmk_fetcher/handler/Dockerfile b/backend/ecmk_fetcher/handler/Dockerfile new file mode 100644 index 00000000..fa2126fd --- /dev/null +++ b/backend/ecmk_fetcher/handler/Dockerfile @@ -0,0 +1,26 @@ +FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy + +# Install AWS Lambda RIE +ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/local/bin/aws-lambda-rie +RUN chmod +x /usr/local/bin/aws-lambda-rie + +# Set working directory (Lambda task root) +WORKDIR /var/task + +COPY backend/ecmk_fetcher/handler/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +# Local lambda entrypoint +# ENTRYPOINT ["/usr/local/bin/aws-lambda-rie", "python", "-m", "awslambdaric"] + +#AWS lambda entrypoint +ENTRYPOINT ["python", "-m", "awslambdaric"] + +# ----------------------------- +# Lambda handler +# ----------------------------- +CMD ["backend.ecmk_fetcher.handler.handler.handler"] \ No newline at end of file diff --git a/backend/ecmk_fetcher/handler/handler.py b/backend/ecmk_fetcher/handler/handler.py index 4ce3a949..b777cc9f 100644 --- a/backend/ecmk_fetcher/handler/handler.py +++ b/backend/ecmk_fetcher/handler/handler.py @@ -1,9 +1,13 @@ from typing import Any, Mapping from backend.ecmk_fetcher.processor import run_job +from utils.logger import setup_logger + +logger = setup_logger() def handler(event: Mapping[str, Any], context: Any) -> None: + logger.info("Entered handler") run_job() diff --git a/backend/ecmk_fetcher/handler/requirements.txt b/backend/ecmk_fetcher/handler/requirements.txt new file mode 100644 index 00000000..2692484e --- /dev/null +++ b/backend/ecmk_fetcher/handler/requirements.txt @@ -0,0 +1,12 @@ +awslambdaric +playwright==1.58.0 +msal +openpyxl +sqlalchemy==2.0.36 +sqlmodel +pytz==2024.2 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 +boto3==1.35.44 +pandas==2.2.2 +numpy<2.0 \ No newline at end of file diff --git a/backend/ecmk_fetcher/local_handler/docker-compose.yml b/backend/ecmk_fetcher/local_handler/docker-compose.yml new file mode 100644 index 00000000..fd642499 --- /dev/null +++ b/backend/ecmk_fetcher/local_handler/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + ecmk-fetcher-lambda: + build: + context: ../../../ + dockerfile: backend/ecmk_fetcher/handler/Dockerfile + ports: + - "9000:8080" + env_file: + - ../../../.env \ No newline at end of file diff --git a/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py b/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py new file mode 100644 index 00000000..ba76301e --- /dev/null +++ b/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import json +import requests + +HOST = "localhost" +PORT = "9000" + +LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations" + +payload = { + "Records": [ + { + "body": json.dumps( + { + "test": 123456, + } + ) + } + ] +} + +response = requests.post(LAMBDA_URL, json=payload) + +print("Status code:", response.status_code) +print("Response:") +print(response.text) diff --git a/backend/ecmk_fetcher/processor.py b/backend/ecmk_fetcher/processor.py index 1852b867..2f122080 100644 --- a/backend/ecmk_fetcher/processor.py +++ b/backend/ecmk_fetcher/processor.py @@ -1,6 +1,5 @@ import os -from typing import Dict, List - +from typing import Dict from playwright.sync_api import ( sync_playwright, Locator, @@ -9,7 +8,14 @@ from playwright.sync_api import ( BrowserContext, ) -from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet +from backend.app.db.functions.uploaded_files_functions import ( + get_uploaded_file_by_listing_type_and_source, +) +from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum +from backend.ecmk_fetcher.address_list import ( + PropertyRow, + extract_addresses_from_spreadsheet, +) from backend.ecmk_fetcher.browser import ( attach_debug_listeners, download_with_retry, @@ -18,14 +24,25 @@ from backend.ecmk_fetcher.browser import ( go_to_next_page, login, ) -from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id -from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint +from backend.ecmk_fetcher.reports import ( + REPORT_TYPES, + build_property_id, + map_report_type_to_db_file_type, +) +from backend.ecmk_fetcher.upload import ( + upload_file_to_s3_and_update_db, + upload_file_to_sharepoint, +) +from utils.logger import setup_logger from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient from utils.sharepoint.domna_sites import DomnaSites +logger = setup_logger() + def run_job() -> None: - username: str = "" + + username: str = "" # TODO: get from github secrets password: str = "" property_list_file: str = ( @@ -35,8 +52,7 @@ def run_job() -> None: BASE_DIR: str = os.path.dirname(__file__) filepath: str = os.path.join(BASE_DIR, property_list_file) - property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath) - property_ids: List[str] = list(property_map.keys()) + property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(filepath) sharepoint_client: DomnaSharepointClient = DomnaSharepointClient( sharepoint_location=DomnaSites.PRIVATE_PAY @@ -44,6 +60,8 @@ def run_job() -> None: sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments" + s3_bucket: str = "retrofit-energy-assessments-dev" + with sync_playwright() as p: browser: Browser = p.chromium.launch(headless=True) context: BrowserContext = browser.new_context() @@ -79,14 +97,38 @@ def run_job() -> None: property_id: str = build_property_id(address, postcode) - if property_id not in property_ids: + property_row: PropertyRow | None = property_map.get(property_id) + + if not property_row: continue - sharepoint_address: str = property_map[property_id] + logger.info(f"Match found for property {address}") + + sharepoint_address: str = property_row.address go_to_assessment_details(page, row) for report_type in REPORT_TYPES: + hubspot_listing_id: str = property_row.listing_id + try: + db_file_type: FileTypeEnum = ( + map_report_type_to_db_file_type(report_type) + ) + + except ValueError: + logger.error( + f"Unknown report type {report_type}, skipping file" + ) + continue + + if get_uploaded_file_by_listing_type_and_source( + hubspot_listing_id=int(hubspot_listing_id), + file_type=db_file_type, + file_source=FileSourceEnum.ECMK, + ): + logger.debug("File already uploaded to s3, skipping") + continue + file_path: str | None = download_with_retry( page, report_type ) @@ -94,6 +136,10 @@ def run_job() -> None: if not file_path: continue + logger.info( + f"Successfully downloaded file {os.path.basename(file_path)} from ECMK" + ) + try: upload_file_to_sharepoint( client=sharepoint_client, @@ -101,6 +147,20 @@ def run_job() -> None: base_path=sharepoint_base_path, subpath=sharepoint_address, ) + logger.info( + f"Successfully loaded {os.path.basename(file_path)} to sharepoint for {address}" + ) + + # Upload to s3 and update db + upload_file_to_s3_and_update_db( + bucket=s3_bucket, + file_path=file_path, + hubspot_listing_id=hubspot_listing_id, + file_type=db_file_type, + ) + + except Exception: + raise finally: if os.path.exists(file_path): os.remove(file_path) diff --git a/backend/ecmk_fetcher/reports.py b/backend/ecmk_fetcher/reports.py index a8f12792..d8d11d50 100644 --- a/backend/ecmk_fetcher/reports.py +++ b/backend/ecmk_fetcher/reports.py @@ -1,5 +1,7 @@ from enum import Enum +from backend.app.db.models.uploaded_file import FileTypeEnum + class FileDownloadButtonType(Enum): ASSESSOR_HUB_SITENOTE_REPORT = 11 @@ -15,6 +17,16 @@ REPORT_TYPES = [ ] +def map_report_type_to_db_file_type(report_type: int) -> FileTypeEnum: + match report_type: + case FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value: + return FileTypeEnum.ECMK_SITE_NOTE + case FileDownloadButtonType.SITENOTE_REPORT.value: + return FileTypeEnum.ECMK_RD_SAP_SITE_NOTE + case _: + raise ValueError("Unknown report type") + + def build_report_selector(report_type: int) -> str: return f"a.download-report-btn[data-report-type='{report_type}']" diff --git a/backend/ecmk_fetcher/sharepoint.py b/backend/ecmk_fetcher/sharepoint.py deleted file mode 100644 index 79db1294..00000000 --- a/backend/ecmk_fetcher/sharepoint.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient - - -def upload_file_to_sharepoint( - client: DomnaSharepointClient, - file_path: str, - base_path: str, - subpath: str, -) -> None: - filename = os.path.basename(file_path) - - full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment" - - client.upload_file( - file_path=file_path, - sharepoint_path=full_path, - file_name=filename, - ) diff --git a/backend/ecmk_fetcher/upload.py b/backend/ecmk_fetcher/upload.py new file mode 100644 index 00000000..0a744e53 --- /dev/null +++ b/backend/ecmk_fetcher/upload.py @@ -0,0 +1,51 @@ +from datetime import datetime, timezone +import os + +from backend.app.db.connection import db_session +from backend.app.db.models.uploaded_file import ( + FileSourceEnum, + FileTypeEnum, + UploadedFile, +) +from utils.s3 import upload_file_to_s3 +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient + + +def upload_file_to_sharepoint( + client: DomnaSharepointClient, + file_path: str, + base_path: str, + subpath: str, +) -> None: + filename = os.path.basename(file_path) + + full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment" + + client.upload_file( + file_path=file_path, + sharepoint_path=full_path, + file_name=filename, + ) + + +def upload_file_to_s3_and_update_db( + bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum +) -> None: + filename: str = os.path.basename(file_path) + key: str = f"documents/hubspot_listing_id/{hubspot_listing_id}/{filename}" + + upload_file_to_s3(file_path, bucket, key) + + uploaded_file = UploadedFile( + s3_file_bucket=bucket, + s3_file_key=key, + s3_upload_timestamp=datetime.now(timezone.utc), + hubspot_listing_id=hubspot_listing_id, + file_source=FileSourceEnum.ECMK.value, + file_type=file_type.value, + ) + + with db_session() as session: + # TODO: we should do multiple files at once to reduce db trips + session.add(uploaded_file) + session.commit() diff --git a/etl/hubspot/company_data.py b/etl/hubspot/company_data.py new file mode 100644 index 00000000..13b2ee88 --- /dev/null +++ b/etl/hubspot/company_data.py @@ -0,0 +1,6 @@ +from typing import TypedDict + + +class CompanyData(TypedDict): + hs_object_id: str + name: str diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index dc2fc7fe..6bdf71ed 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -26,10 +26,10 @@ from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTy ForwardPaging as AssociationsPaging, NextPage as AssociationsPagingNext, ) -from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb from backend.app.config import get_settings +from etl.hubspot.company_data import CompanyData from utils.logger import setup_logger import mimetypes @@ -230,7 +230,9 @@ class HubspotClient: self.logger.info(f"Listing info for deal {deal_id}: {listing_info}") return listing_info - def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]: + def from_deal_id_get_info( + self, deal_id: str + ) -> dict[str, str]: # TODO: add dataclass for this deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType] deal: HubspotObject = self._call_with_retry( @@ -280,18 +282,12 @@ class HubspotClient: deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType] return deal_info - def get_deal_info_for_db( + def get_deal_and_company_and_listing( self, deal_id: str ) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]: deal: dict[str, str] = self.from_deal_id_get_info(deal_id) company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id) - - if company: - company_data: CompanyData = self.get_company_information(company) - dbloader: HubspotDataToDb = HubspotDataToDb() - dbloader.upsert_company(company_data) - listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing( deal_id ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index d7b47ef3..9756833b 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -1,16 +1,19 @@ -from backend.app.db.connection import db_read_session -from backend.app.db.models.organisation import Organisation, HubspotDealData +import os from sqlmodel import select from datetime import datetime, timezone -from typing import TypedDict, Optional +from typing import Dict, Optional + +from backend.app.db.models.hubspot_deal_data import HubspotDealData +from etl.hubspot.company_data import CompanyData +from etl.hubspot.hubspotClient import HubspotClient from etl.hubspot.s3_uploader import S3Uploader -import hashlib -import os +from backend.app.db.connection import db_read_session +from backend.app.db.models.organisation import Organisation +from etl.hubspot.utils import parse_hs_date +from utils.logger import setup_logger -class CompanyData(TypedDict): - hs_object_id: str - name: str +logger = setup_logger() class HubspotDataToDb: @@ -31,7 +34,7 @@ class HubspotDataToDb: records = self.read_org_table(limit) return [org.name for org in records if org.name] - def upsert_company(self, company_data: CompanyData) -> Organisation: + def upsert_organisation(self, company_data: CompanyData) -> Organisation: """Upserts a company record. Updates if hubspot_company_id exists, otherwise creates new.""" with db_read_session() as session: hubspot_id = company_data.get("hs_object_id") @@ -61,11 +64,7 @@ class HubspotDataToDb: session.commit() return record - def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client): - print("⚠️ Deprecated — use the new interface instead.") - return self.upsert_deal(deal_data, company, listing, hubspot_client) - - def find_all_deals_with_company_id(self, company_id): + def find_all_deals_with_company_id(self, company_id: str): """Returns a list of deals for a given company_id.""" with db_read_session() as session: return ( @@ -74,7 +73,7 @@ class HubspotDataToDb: .all() ) - def find_deal_with_deal_id(self, deal_id): + def find_deal_with_deal_id(self, deal_id: str) -> Optional[HubspotDealData]: with db_read_session() as session: return ( session.query(HubspotDealData) @@ -82,275 +81,13 @@ class HubspotDataToDb: .one_or_none() ) - def _parse_hs_date(self, value: Optional[str]) -> Optional[datetime]: - if not value: - return None - try: - return datetime.fromisoformat(value.replace("Z", "+00:00")) - except ValueError: - return None - - def _sha256(self, file_path: str) -> str: - """Compute SHA-256 checksum of a file.""" - sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - sha256.update(chunk) - return sha256.hexdigest() - - def update_deal_with_checks(self, deal_in_db, hubspot_client) -> bool: - """ - Checks if a deal needs updating and syncs it with HubSpot. - Also handles major_condition_issue_photos file upload to S3 with integrity check. - """ - - def soft_assert(condition, message="Assertion Failed"): - if not condition: - print(f"⚠️ Soft Assert Failed: {message}") - return False - return True - - print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") - - hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db( - deal_in_db.deal_id - ) - - # Soft compare key fields - checks = [ - soft_assert( - deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch" - ), - soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), - soft_assert( - deal_in_db.listing_id == hs_listing.get("listing_id"), - "listing_id mismatch", - ), - soft_assert( - deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), - "landlord_property_id mismatch", - ), - soft_assert( - deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch" - ), - soft_assert( - deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch" - ), - soft_assert( - deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch" - ), - soft_assert( - deal_in_db.project_code == hs_deal.get("project_code"), - "project_code mismatch", - ), - soft_assert( - deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch" - ), - soft_assert( - deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), - "outcome_notes mismatch", - ), - soft_assert( - deal_in_db.major_condition_issue_description - == hs_deal.get("major_condition_issue_description"), - "major condition description mismatch", - ), - soft_assert( - deal_in_db.major_condition_issue_photos - == hs_deal.get("major_condition_issue_photos"), - "major condition issue photos mismatch", - ), - soft_assert( - deal_in_db.coordination_status - == hs_deal.get("coordination_status__stage_1_"), - "coordination stage 1 status mismatch", - ), - soft_assert( - deal_in_db.coordination_comments == hs_deal.get("coordination_comments"), - "coordination_comments mismatch", - ), - soft_assert( - deal_in_db.design_status == hs_deal.get("retrofit_design_status"), - "retrofit design mismatch", - ), - soft_assert( - deal_in_db.pashub_link == hs_deal.get("pashub_link"), - "pashub_link mismatch", - ), - soft_assert( - deal_in_db.sharepoint_link == hs_deal.get("sharepoint_link"), - "sharepoint_link mismatch", - ), - soft_assert( - deal_in_db.dampmould_growth == hs_deal.get("dampmould_growth"), - "dampmould_growth mismatch", - ), - soft_assert( - deal_in_db.damp_mould_and_repairs_comments - == hs_deal.get("damp_mould_and_repairs_comments"), - "damp_mould_and_repairs_comments mismatch", - ), - soft_assert( - deal_in_db.pre_sap == hs_deal.get("pre_sap"), - "pre_sap mismatch", - ), - soft_assert( - deal_in_db.coordinator == hs_deal.get("coordinator"), - "coordinator mismatch", - ), - soft_assert( - deal_in_db.mtp_completion_date - == self._parse_hs_date(hs_deal.get("mtp_completion_date")), - "mtp_completion_date mismatch", - ), - soft_assert( - deal_in_db.mtp_re_model_completion_date - == self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")), - "mtp_re_model_completion_date mismatch", - ), - soft_assert( - deal_in_db.ioe_v3_completion_date - == self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")), - "ioe_v3_completion_date mismatch", - ), - soft_assert( - deal_in_db.proposed_measures == hs_deal.get("proposed_measures"), - "proposed_measures mismatch", - ), - soft_assert( - deal_in_db.approved_package == hs_deal.get("approved_package"), - "approved_package mismatch", - ), - soft_assert( - deal_in_db.designer == hs_deal.get("designer"), - "designer mismatch", - ), - soft_assert( - deal_in_db.design_completion_date - == self._parse_hs_date(hs_deal.get("design_completion_date")), - "design_completion_date mismatch", - ), - soft_assert( - deal_in_db.actual_measures_installed - == hs_deal.get("actual_measures_installed"), - "actual_measures_installed mismatch", - ), - soft_assert( - deal_in_db.installer == hs_deal.get("installer"), - "installer mismatch", - ), - soft_assert( - deal_in_db.installer_handover == hs_deal.get("installer_handover"), - "installer_handover mismatch", - ), - soft_assert( - deal_in_db.lodgement_status == hs_deal.get("lodgement_status"), - "lodgement_status mismatch", - ), - soft_assert( - deal_in_db.measures_lodgement_date - == self._parse_hs_date(hs_deal.get("measures_lodgement_date")), - "measures_lodgement_date mismatch", - ), - soft_assert( - deal_in_db.lodgement_date - == self._parse_hs_date(hs_deal.get("lodgement_date")), - "lodgement_date mismatch", - ), - soft_assert( - deal_in_db.expected_commencement_date - == self._parse_hs_date(hs_deal.get("expected_commencement_date")), - "expected_commencement_date mismatch", - ), - soft_assert( - deal_in_db.surveyor == hs_deal.get("surveyor"), - "surveyor mismatch", - ), - soft_assert( - deal_in_db.confirmed_survey_date - == self._parse_hs_date(hs_deal.get("confirmed_survey_date")), - "confirmed_survey_date mismatch", - ), - soft_assert( - deal_in_db.confirmed_survey_time - == hs_deal.get("confirmed_survey_time"), - "confirmed_survey_time mismatch", - ), - soft_assert( - deal_in_db.surveyed_date - == self._parse_hs_date(hs_deal.get("surveyed_date")), - "surveyed_date mismatch", - ), - soft_assert( - deal_in_db.design_type == hs_deal.get("design_type"), - "design_type mismatch", - ), - ] - - # If discrepancies found, update from HubSpot - if not all(checks): - print( - f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot." - ) - self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) - return False - - # Handle photo upload if it exists but S3 URL is missing - if ( - deal_in_db.major_condition_issue_photos - and not deal_in_db.major_condition_issue_evidence_s3_url - ): - print( - f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." - ) - - photo_url = hs_deal.get("major_condition_issue_photos") - if photo_url: - try: - # Download from HubSpot using fresh URL from hs_deal (not stale DB URL) - local_file = hubspot_client.download_file_from_url(photo_url) - - # Upload to S3 - bucket = "retrofit-data-dev" - s3_url = self.s3.upload_file( - local_file, bucket, prefix="hubspot/awaabs_law_evidence/" - ) - - # Download again to verify integrity - downloaded = self.s3.download_from_url(s3_url) - if self._sha256(local_file) == self._sha256(downloaded): - print("✅ SHA256 match verified — upload successful.") - else: - print("❌ SHA256 mismatch — integrity check failed.") - raise ValueError("File integrity check failed after S3 upload.") - - # Update DB record with S3 URL - with db_read_session() as session: - db_record = session.get(HubspotDealData, deal_in_db.id) - db_record.major_condition_issue_evidence_s3_url = s3_url - session.add(db_record) - session.commit() - print( - f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}" - ) - return False - except Exception as e: - print( - f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}" - ) - # Continue without the file — don't crash the entire update - finally: - if "local_file" in locals() and os.path.exists(local_file): - os.remove(local_file) - else: - print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") - - else: - print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") - - return True - - def upsert_deal(self, deal_data, company, listing, hubspot_client): + def upsert_deal( + self, + deal_data: Dict[str, str], + company: Optional[str], + listing: Optional[dict[str, str]], + hubspot_client: HubspotClient, + ): """ Inserts or updates a deal record. Also uploads photos if present and adds S3 URL. @@ -364,111 +101,10 @@ class HubspotDataToDb: existing = session.exec(statement).first() if existing: + self._handle_existing_photo_upload(existing, hubspot_client) + print(f"🔄 Updating existing deal (deal_id={deal_id})") - - for attr, value in { - "dealname": deal_data.get("dealname"), - "dealstage": deal_data.get("dealstage"), - "listing_id": listing.get("listing_id", None) if listing else None, - "landlord_property_id": ( - listing.get("owner_property_id", None) if listing else None - ), - "uprn": listing.get("national_uprn", None) if listing else None, - "outcome": deal_data.get("outcome"), - "outcome_notes": deal_data.get("outcome_notes"), - "project_code": deal_data.get("project_code"), - "company_id": company, - "major_condition_issue_description": deal_data.get( - "major_condition_issue_description" - ), - "major_condition_issue_photos": deal_data.get( - "major_condition_issue_photos" - ), - "coordination_status": deal_data.get( - "coordination_status__stage_1_" - ), - "coordination_comments": deal_data.get("coordination_comments"), - "design_status": deal_data.get("retrofit_design_status"), - "pashub_link": deal_data.get("pashub_link"), - "sharepoint_link": deal_data.get("sharepoint_link"), - "dampmould_growth": deal_data.get("dampmould_growth"), - "damp_mould_and_repairs_comments": deal_data.get( - "damp_mould_and_repairs_comments" - ), - "pre_sap": deal_data.get("pre_sap"), - "coordinator": deal_data.get("coordinator"), - "mtp_completion_date": self._parse_hs_date( - deal_data.get("mtp_completion_date") - ), - "mtp_re_model_completion_date": self._parse_hs_date( - deal_data.get("mtp_re_model_completion_date") - ), - "ioe_v3_completion_date": self._parse_hs_date( - deal_data.get("ioe_v3_completion_date") - ), - "proposed_measures": deal_data.get("proposed_measures"), - "approved_package": deal_data.get("approved_package"), - "designer": deal_data.get("designer"), - "design_completion_date": self._parse_hs_date( - deal_data.get("design_completion_date") - ), - "actual_measures_installed": deal_data.get( - "actual_measures_installed" - ), - "installer": deal_data.get("installer"), - "installer_handover": deal_data.get("installer_handover"), - "lodgement_status": deal_data.get("lodgement_status"), - "measures_lodgement_date": self._parse_hs_date( - deal_data.get("measures_lodgement_date") - ), - "lodgement_date": self._parse_hs_date( - deal_data.get("lodgement_date") - ), - "expected_commencement_date": self._parse_hs_date( - deal_data.get("expected_commencement_date") - ), - "surveyor": deal_data.get("surveyor"), - "confirmed_survey_date": self._parse_hs_date( - deal_data.get("confirmed_survey_date") - ), - "confirmed_survey_time": deal_data.get("confirmed_survey_time"), - "surveyed_date": self._parse_hs_date( - deal_data.get("surveyed_date") - ), - "design_type": deal_data.get("design_type"), - }.items(): - setattr(existing, attr, value or getattr(existing, attr)) - - # Upload if photo exists but S3 link missing - if ( - existing.major_condition_issue_photos - and not existing.major_condition_issue_evidence_s3_url - ): - # Fetch fresh URL from HubSpot instead of using potentially expired stored URL - fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id) - photo_url = fresh_deal.get("major_condition_issue_photos") - - if photo_url: - try: - local_file = hubspot_client.download_file_from_url( - photo_url - ) - s3_url = self.s3.upload_file( - local_file, - "retrofit-data-dev", - prefix="hubspot/awaabs_law_evidence/", - ) - existing.major_condition_issue_evidence_s3_url = s3_url - except Exception as e: - print( - f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}" - ) - # Continue without the file — don't crash the update - finally: - if "local_file" in locals() and os.path.exists(local_file): - os.remove(local_file) - else: - print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}") + self._update_existing_deal(existing, deal_data, listing, company) session.add(existing) session.commit() @@ -477,93 +113,213 @@ class HubspotDataToDb: else: print(f"🆕 Inserting new deal (deal_id={deal_id})") - new_record = HubspotDealData( - deal_id=deal_id, - dealname=deal_data.get("dealname"), - dealstage=deal_data.get("dealstage"), - listing_id=listing.get("listing_id", None) if listing else None, - landlord_property_id=listing.get("owner_property_id") if listing else None, - uprn=listing.get("national_uprn") if listing else None, - outcome=deal_data.get("outcome"), - outcome_notes=deal_data.get("outcome_notes"), - project_code=deal_data.get("project_code"), - company_id=company, - major_condition_issue_description=deal_data.get( - "major_condition_issue_description" - ), - major_condition_issue_photos=deal_data.get( - "major_condition_issue_photos" - ), - coordination_status=deal_data.get("coordination_status__stage_1_"), - coordination_comments=deal_data.get("coordination_comments"), - design_status=deal_data.get("retrofit_design_status"), - pashub_link=deal_data.get("pashub_link"), - sharepoint_link=deal_data.get("sharepoint_link"), - dampmould_growth=deal_data.get("dampmould_growth"), - damp_mould_and_repairs_comments=deal_data.get( - "damp_mould_and_repairs_comments" - ), - pre_sap=deal_data.get("pre_sap"), - coordinator=deal_data.get("coordinator"), - mtp_completion_date=self._parse_hs_date( - deal_data.get("mtp_completion_date") - ), - mtp_re_model_completion_date=self._parse_hs_date( - deal_data.get("mtp_re_model_completion_date") - ), - ioe_v3_completion_date=self._parse_hs_date( - deal_data.get("ioe_v3_completion_date") - ), - proposed_measures=deal_data.get("proposed_measures"), - approved_package=deal_data.get("approved_package"), - designer=deal_data.get("designer"), - design_completion_date=self._parse_hs_date( - deal_data.get("design_completion_date") - ), - actual_measures_installed=deal_data.get( - "actual_measures_installed" - ), - installer=deal_data.get("installer"), - installer_handover=deal_data.get("installer_handover"), - lodgement_status=deal_data.get("lodgement_status"), - measures_lodgement_date=self._parse_hs_date( - deal_data.get("measures_lodgement_date") - ), - lodgement_date=self._parse_hs_date(deal_data.get("lodgement_date")), - expected_commencement_date=self._parse_hs_date( - deal_data.get("expected_commencement_date") - ), - surveyor=deal_data.get("surveyor"), - confirmed_survey_date=self._parse_hs_date( - deal_data.get("confirmed_survey_date") - ), - confirmed_survey_time=deal_data.get("confirmed_survey_time"), - surveyed_date=self._parse_hs_date(deal_data.get("surveyed_date")), - design_type=deal_data.get("design_type"), + new_record: HubspotDealData = self._build_new_deal( + deal_id, deal_data, listing, company ) # Handle upload at insert time - if new_record.major_condition_issue_photos: - try: - local_file = hubspot_client.download_file_from_url( - new_record.major_condition_issue_photos - ) - s3_url = self.s3.upload_file( - local_file, - "retrofit-data-dev", - prefix="hubspot/awaabs_law_evidence/", - ) - new_record.major_condition_issue_evidence_s3_url = s3_url - except Exception as e: - print( - f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}" - ) - # Continue without the file — don't crash the insert - finally: - if "local_file" in locals() and os.path.exists(local_file): - os.remove(local_file) + self._handle_new_photo_upload(new_record, hubspot_client) session.add(new_record) session.commit() session.refresh(new_record) return new_record + + def _update_existing_deal( + self, + existing: HubspotDealData, + deal_data: Dict[str, str], + listing: Optional[dict[str, str]], + company: Optional[str], + ): + for attr, value in { + "dealname": deal_data.get("dealname"), + "dealstage": deal_data.get("dealstage"), + "listing_id": listing.get("listing_id", None) if listing else None, + "landlord_property_id": ( + listing.get("owner_property_id", None) if listing else None + ), + "uprn": listing.get("national_uprn", None) if listing else None, + "outcome": deal_data.get("outcome"), + "outcome_notes": deal_data.get("outcome_notes"), + "project_code": deal_data.get("project_code"), + "company_id": company, + "major_condition_issue_description": deal_data.get( + "major_condition_issue_description" + ), + "major_condition_issue_photos": deal_data.get( + "major_condition_issue_photos" + ), + "coordination_status": deal_data.get("coordination_status__stage_1_"), + "design_status": deal_data.get("retrofit_design_status"), + "coordination_comments": deal_data.get("coordination_comments"), + "pashub_link": deal_data.get("pashub_link"), + "sharepoint_link": deal_data.get("sharepoint_link"), + "dampmould_growth": deal_data.get("dampmould_growth"), + "damp_mould_and_repairs_comments": deal_data.get( + "damp_mould_and_repairs_comments" + ), + "pre_sap": deal_data.get("pre_sap"), + "coordinator": deal_data.get("coordinator"), + "mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")), + "mtp_re_model_completion_date": parse_hs_date( + deal_data.get("mtp_re_model_completion_date") + ), + "ioe_v3_completion_date": parse_hs_date( + deal_data.get("ioe_v3_completion_date") + ), + "proposed_measures": deal_data.get("proposed_measures"), + "approved_package": deal_data.get("approved_package"), + "designer": deal_data.get("designer"), + "design_completion_date": parse_hs_date( + deal_data.get("design_completion_date") + ), + "actual_measures_installed": deal_data.get("actual_measures_installed"), + "installer": deal_data.get("installer"), + "installer_handover": deal_data.get("installer_handover"), + "lodgement_status": deal_data.get("lodgement_status"), + "measures_lodgement_date": parse_hs_date( + deal_data.get("measures_lodgement_date") + ), + "lodgement_date": parse_hs_date(deal_data.get("lodgement_date")), + "expected_commencement_date": parse_hs_date( + deal_data.get("expected_commencement_date") + ), + "surveyor": deal_data.get("surveyor"), + "confirmed_survey_date": parse_hs_date( + deal_data.get("confirmed_survey_date") + ), + "confirmed_survey_time": deal_data.get("confirmed_survey_time"), + "surveyed_date": parse_hs_date(deal_data.get("surveyed_date")), + "design_type": deal_data.get("design_type"), + }.items(): + setattr(existing, attr, value or getattr(existing, attr)) + + def _build_new_deal( + self, + deal_id: str, + deal_data: Dict[str, str], + listing: Optional[dict[str, str]], + company: Optional[str], + ) -> HubspotDealData: + return HubspotDealData( + deal_id=deal_id, + dealname=deal_data.get("dealname"), + dealstage=deal_data.get("dealstage"), + listing_id=listing.get("listing_id") if listing else None, + landlord_property_id=( + listing.get("owner_property_id") if listing else None + ), + uprn=listing.get("national_uprn") if listing else None, + outcome=deal_data.get("outcome"), + outcome_notes=deal_data.get("outcome_notes"), + project_code=deal_data.get("project_code"), + company_id=company, + major_condition_issue_description=deal_data.get( + "major_condition_issue_description" + ), + major_condition_issue_photos=deal_data.get("major_condition_issue_photos"), + coordination_status=deal_data.get("coordination_status__stage_1_"), + design_status=deal_data.get("retrofit_design_status"), + coordination_comments=deal_data.get("coordination_comments"), + pashub_link=deal_data.get("pashub_link"), + sharepoint_link=deal_data.get("sharepoint_link"), + dampmould_growth=deal_data.get("dampmould_growth"), + damp_mould_and_repairs_comments=deal_data.get( + "damp_mould_and_repairs_comments" + ), + pre_sap=deal_data.get("pre_sap"), + coordinator=deal_data.get("coordinator"), + mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")), + mtp_re_model_completion_date=parse_hs_date( + deal_data.get("mtp_re_model_completion_date") + ), + ioe_v3_completion_date=parse_hs_date( + deal_data.get("ioe_v3_completion_date") + ), + proposed_measures=deal_data.get("proposed_measures"), + approved_package=deal_data.get("approved_package"), + designer=deal_data.get("designer"), + design_completion_date=parse_hs_date( + deal_data.get("design_completion_date") + ), + actual_measures_installed=deal_data.get("actual_measures_installed"), + installer=deal_data.get("installer"), + installer_handover=deal_data.get("installer_handover"), + lodgement_status=deal_data.get("lodgement_status"), + measures_lodgement_date=parse_hs_date( + deal_data.get("measures_lodgement_date") + ), + lodgement_date=parse_hs_date(deal_data.get("lodgement_date")), + expected_commencement_date=parse_hs_date( + deal_data.get("expected_commencement_date") + ), + surveyor=deal_data.get("surveyor"), + confirmed_survey_date=parse_hs_date(deal_data.get("confirmed_survey_date")), + confirmed_survey_time=deal_data.get("confirmed_survey_time"), + surveyed_date=parse_hs_date(deal_data.get("surveyed_date")), + design_type=deal_data.get("design_type"), + ) + + def _handle_existing_photo_upload( + self, + existing_deal: HubspotDealData, + hubspot_client: HubspotClient, + ): + # if self._needs_photo_upload(existing): + + fresh_deal = hubspot_client.from_deal_id_get_info(existing_deal.deal_id) + fresh_photo_url = fresh_deal.get("major_condition_issue_photos") + + if not fresh_photo_url: + print(f"⚠️ Photo URL missing for deal_id {existing_deal.deal_id}") + return + + if fresh_photo_url != existing_deal.major_condition_issue_photos: + logger.info( + f"Hubspot image URL changed from {existing_deal.major_condition_issue_photos} to {fresh_photo_url}" + ) + self._upload_photo_to_s3(existing_deal, fresh_photo_url, hubspot_client) + else: + logger.info(f"Hubspot image URL unchanged: {fresh_photo_url}") + + def _handle_new_photo_upload( + self, + record: HubspotDealData, + hubspot_client: HubspotClient, + ): + if record.major_condition_issue_photos: + self._upload_photo_to_s3( + record, + record.major_condition_issue_photos, + hubspot_client, + ) + + def _upload_photo_to_s3( + self, + record: HubspotDealData, + hubspot_photo_url: str, + hubspot_client: HubspotClient, + ): + try: + local_file = hubspot_client.download_file_from_url(hubspot_photo_url) + + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + + record.major_condition_issue_evidence_s3_url = s3_url + + except Exception as e: + print(f"⚠️ Failed to upload photo for deal_id {record.deal_id}: {e}") + finally: + if "local_file" in locals() and os.path.exists(local_file): + os.remove(local_file) + + def _needs_photo_upload(self, old_deal: HubspotDealData) -> bool: + return bool( + old_deal.major_condition_issue_photos + and not old_deal.major_condition_issue_evidence_s3_url + ) diff --git a/etl/hubspot/hubspot_deal_differ.py b/etl/hubspot/hubspot_deal_differ.py new file mode 100644 index 00000000..b95b544c --- /dev/null +++ b/etl/hubspot/hubspot_deal_differ.py @@ -0,0 +1,177 @@ +from typing import Dict, List, Optional + +from backend.app.db.models.hubspot_deal_data import HubspotDealData +from etl.hubspot.utils import parse_hs_date + + +class HubspotDealDiffer: + COORDINATION_COMPLETE: List[str] = [ + "v1 ioe/mtp complete", + "v2 ioe/mtp complete", + "v3 ioe/mtp complete", + ] + RETROFIT_DESIGN_COMPLETE = "uploaded" + LODGEMENT_COMPLETE: List[str] = ["lodgement complete", "measures lodged"] + + @staticmethod + def check_for_db_update_trigger( + new_deal: Dict[str, str], + new_company: Optional[str], + new_listing: Optional[Dict[str, str]], + old_deal: HubspotDealData, + ) -> bool: + """ + Returns True if ANY difference exists between HubSpot data and DB. + Returns False if everything matches (i.e. no update needed). + """ + + # --- Deal ID --- + if str(old_deal.deal_id) != str(new_deal.get("hs_object_id")): + return True + + # --- Company --- + if new_company is not None: + if old_deal.company_id != new_company: + return True + + # --- Listing --- + hs_listing = new_listing or {} + + if old_deal.listing_id != hs_listing.get("listing_id"): + return True + + if old_deal.landlord_property_id != hs_listing.get("owner_property_id"): + return True + + if old_deal.uprn != hs_listing.get("national_uprn"): + return True + + # --- Field mappings --- + FIELD_MAP = { + "outcome": "outcome", + "dealstage": "dealstage", + "dealname": "dealname", + "project_code": "project_code", + "outcome_notes": "outcome_notes", + "major_condition_issue_description": "major_condition_issue_description", + "major_condition_issue_photos": "major_condition_issue_photos", + "coordination_status__stage_1_": "coordination_status", + "coordination_comments": "coordination_comments", + "retrofit_design_status": "design_status", + "pashub_link": "pashub_link", + "sharepoint_link": "sharepoint_link", + "dampmould_growth": "dampmould_growth", + "damp_mould_and_repairs_comments": "damp_mould_and_repairs_comments", + "pre_sap": "pre_sap", + "coordinator": "coordinator", + "proposed_measures": "proposed_measures", + "approved_package": "approved_package", + "designer": "designer", + "actual_measures_installed": "actual_measures_installed", + "installer": "installer", + "installer_handover": "installer_handover", + "lodgement_status": "lodgement_status", + "design_type": "design_type", + "surveyor": "surveyor", + "confirmed_survey_time": "confirmed_survey_time", + } + + for hs_field, db_field in FIELD_MAP.items(): + old_value = getattr(old_deal, db_field) + new_value = new_deal.get(hs_field) + + if old_value != new_value: + return True + + # --- Date fields --- + date_fields = [ + ("mtp_completion_date", "mtp_completion_date"), + ("mtp_re_model_completion_date", "mtp_re_model_completion_date"), + ("ioe_v3_completion_date", "ioe_v3_completion_date"), + ("design_completion_date", "design_completion_date"), + ("measures_lodgement_date", "measures_lodgement_date"), + ("lodgement_date", "lodgement_date"), + ("expected_commencement_date", "expected_commencement_date"), + ("confirmed_survey_date", "confirmed_survey_date"), + ("surveyed_date", "surveyed_date"), + ] + + for hs_field, db_field in date_fields: + old_value = getattr(old_deal, db_field) + new_value = parse_hs_date(new_deal.get(hs_field)) + + if old_value != new_value: + return True + + # --- Time field --- + if old_deal.confirmed_survey_time != new_deal.get("confirmed_survey_time"): + return True + + # No differences found + return False + + @staticmethod + def check_for_pashub_trigger( + new_deal: Dict[str, str], old_deal: HubspotDealData + ) -> bool: + new_pashub_link: str = new_deal.get("pashub_link", "") + + if not HubspotDealDiffer._has_valid_pashub_link(new_pashub_link): + return False + + if HubspotDealDiffer._new_or_updated_pashub_link(new_pashub_link, old_deal): + return True + + if HubspotDealDiffer._coordination_completed(new_deal, old_deal): + return True + + if HubspotDealDiffer._design_completed(new_deal, old_deal): + return True + + if HubspotDealDiffer._lodgement_completed(new_deal, old_deal): + return True + + return False + + @staticmethod + def _has_valid_pashub_link(new_pashub_link: str) -> bool: + return bool(new_pashub_link) + + @staticmethod + def _new_or_updated_pashub_link( + new_pashub_link: str, old_deal: HubspotDealData + ) -> bool: + if not old_deal.pashub_link: + return True + return old_deal.pashub_link != new_pashub_link + + @staticmethod + def _coordination_completed( + new_deal: Dict[str, str], old_deal: HubspotDealData + ) -> bool: + new_status: str = new_deal.get("coordination_status", "") + return ( + new_status != "" + and new_status in HubspotDealDiffer.COORDINATION_COMPLETE + and new_status != old_deal.coordination_status + ) + + @staticmethod + def _design_completed(new_deal: Dict[str, str], old_deal: HubspotDealData) -> bool: + new_status: str = new_deal.get("design_status", "") + return ( + new_status != "" + and new_status == HubspotDealDiffer.RETROFIT_DESIGN_COMPLETE + and new_status != old_deal.design_status + ) + + @staticmethod + def _lodgement_completed( + new_deal: Dict[str, str], old_deal: HubspotDealData + ) -> bool: + new_status: str = new_deal.get("lodgement_status", "") + return ( + new_status != "" + and new_status in HubspotDealDiffer.LODGEMENT_COMPLETE + and new_status != old_deal.lodgement_status + ) diff --git a/etl/hubspot/hubspot_trigger_orchestrator_trigger_request.py b/etl/hubspot/hubspot_trigger_orchestrator_trigger_request.py new file mode 100644 index 00000000..1adfa07c --- /dev/null +++ b/etl/hubspot/hubspot_trigger_orchestrator_trigger_request.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class HubspotTriggerOrchestratorTriggerRequest(BaseModel): + hubspot_deal_id: str diff --git a/etl/hubspot/scripts/onboarding/new_organisation.py b/etl/hubspot/scripts/onboarding/new_organisation.py index f8c6ba7a..0785949a 100644 --- a/etl/hubspot/scripts/onboarding/new_organisation.py +++ b/etl/hubspot/scripts/onboarding/new_organisation.py @@ -22,7 +22,7 @@ companies_to_add_or_ensure_it_exists = [ for company in companies_to_add_or_ensure_it_exists: company_info: CompanyData = hubspot.get_company_information(company.value) - dbRead.upsert_company(company_info) + dbRead.upsert_organisation(company_info) dbRead = HubspotDataToDb() diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 4f71c6d0..d754cbb1 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -1,24 +1,113 @@ +import json +import boto3 +from typing import Any, Dict, Optional + +from backend.app.config import get_settings from etl.hubspot.hubspotClient import HubspotClient -from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb +from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer +from etl.hubspot.hubspot_trigger_orchestrator_trigger_request import ( + HubspotTriggerOrchestratorTriggerRequest, +) from backend.utils.subtasks import task_handler -from typing import Any +from backend.app.db.models.hubspot_deal_data import HubspotDealData +from utils.logger import setup_logger + +logger = setup_logger() @task_handler() def handler(body: dict[str, Any], context: Any) -> None: - hubspot_deal_id = body.get("hubspot_deal_id", "") + db_client = HubspotDataToDb() + hubspot_client = HubspotClient() - if hubspot_deal_id == "": - raise RuntimeError( - "Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'" - ) - hubspot_deal_id = "327170793707" + sqs_client = boto3.client("sqs") + PASHUB_TRIGGER_QUEUE_URL = get_settings().PASHUB_TO_ARA_SQS_URL - hubspot: HubspotClient = HubspotClient() - dbloader: HubspotDataToDb = HubspotDataToDb() - deal = dbloader.find_deal_with_deal_id(hubspot_deal_id) - if deal: - dbloader.update_deal_with_checks(deal, hubspot) + payload = HubspotTriggerOrchestratorTriggerRequest.model_validate(body) + hubspot_deal_id: str = payload.hubspot_deal_id + + db_deal: Optional[HubspotDealData] = db_client.find_deal_with_deal_id( + hubspot_deal_id + ) + + hubspot_deal: Dict[str, str] + company: Optional[str] + listing: Optional[dict[str, str]] + + hubspot_deal, company, listing = hubspot_client.get_deal_and_company_and_listing( + hubspot_deal_id + ) + + deal_changed = False + if not db_deal: + # New hubspot deal, no diffing to do + logger.info(f"New HubSpot deal of ID {hubspot_deal_id}. Loading to database...") + if company: + company_data: CompanyData = hubspot_client.get_company_information(company) + db_client: HubspotDataToDb = HubspotDataToDb() + db_client.upsert_organisation(company_data) + + db_client.upsert_deal(hubspot_deal, company, listing, hubspot_client) else: - deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id) - dbloader.upsert_deal(deal, company, listing, hubspot) + # Deal already in db, check whether anything has changed + logger.info( + f"HubSpot deal {hubspot_deal_id} already in database. Checking for changes..." + ) + if HubspotDealDiffer.check_for_db_update_trigger( + new_deal=hubspot_deal, + new_company=company, + new_listing=listing, + old_deal=db_deal, + ): + logger.info( + f"Deal {hubspot_deal_id} has been changed, updating database..." + ) + db_client.upsert_deal( + deal_data=hubspot_deal, + company=company, + listing=listing, + hubspot_client=hubspot_client, + ) + deal_changed = True + + if not deal_changed: + logger.info(f"No changes to deal {hubspot_deal_id}") + return + + # ============================== + # Orchestration of other lambdas + # ============================== + if HubspotDealDiffer.check_for_pashub_trigger( + new_deal=hubspot_deal, old_deal=db_deal + ): + logger.info( + f"Triggering Pas Hub file fetcher for HubSpot deal ID {hubspot_deal_id}" + ) + message_body: Dict[str, Optional[str]] = { + "pashub_link": hubspot_deal["pashub_link"], + "address": None, # potentially available from Listing, leave as None for now + "sharepoint_link": hubspot_deal["sharepoint_link"], + "uprn": hubspot_deal["national_uprn"], + "landlord_property_id": hubspot_deal["owner_property_id"], + "deal_stage": hubspot_deal["deal_stage"], + } + + response = sqs_client.send_message( + QueueUrl=PASHUB_TRIGGER_QUEUE_URL, MessageBody=json.dumps(message_body) + ) + + logger.info( + f"Sent message to Pashub To Ara queue. MessageId: {response['MessageId']}" + ) + else: + logger.info( + f"Not Triggering PasHub file fetcher for HubSpot deal ID {hubspot_deal_id}" + ) + + print("done") + + +if __name__ == "__main__": + handler({"hubspot_deal_id": "371470706915"}, "") + print("beep") diff --git a/etl/hubspot/tests/test_hubspot_client_integration.py b/etl/hubspot/tests/test_hubspot_client_integration.py index a3d8ae54..0f4b425c 100644 --- a/etl/hubspot/tests/test_hubspot_client_integration.py +++ b/etl/hubspot/tests/test_hubspot_client_integration.py @@ -71,7 +71,7 @@ class TestHubspotClientIntegration: def test_get_deal_info_for_db(self, client: HubspotClient): deal_id: str = "263490768079" - deal, company, listing = client.get_deal_info_for_db(deal_id) + deal, company, listing = client.get_deal_and_company_and_listing(deal_id) assert "dealname" in deal assert "dealstage" in deal diff --git a/etl/hubspot/tests/test_hubspot_deal_differ.py b/etl/hubspot/tests/test_hubspot_deal_differ.py new file mode 100644 index 00000000..69f7668b --- /dev/null +++ b/etl/hubspot/tests/test_hubspot_deal_differ.py @@ -0,0 +1,401 @@ +from datetime import datetime, timezone +from typing import Any, Dict +import uuid + +import pytest + +from backend.app.db.models.hubspot_deal_data import HubspotDealData +from etl.hubspot.hubspot_deal_differ import HubspotDealDiffer + + +BASE_TIME = datetime(2025, 12, 1, 12, 0, 0) + + +def make_old_deal(**overrides: Any) -> HubspotDealData: + return HubspotDealData( + id=overrides.get("id", uuid.uuid4()), + deal_id="1", + created_at=BASE_TIME, + updated_at=BASE_TIME, + **{k: v for k, v in overrides.items() if k != "id"}, + ) + + +def make_new_deal(deal_id: uuid.UUID, **overrides: Any) -> Dict[str, str]: + return { + "id": str(deal_id), + "deal_id": "1", + "created_at": BASE_TIME.isoformat(), + "updated_at": datetime(2025, 12, 1, 12, 30, 0).isoformat(), + **overrides, + } + + +# ==================== +# PASHUB TRIGGER TESTS +# ==================== + + +@pytest.mark.parametrize( + "new_overrides,expected", + [ + ({"outcome_notes": "test note"}, False), + ], +) +def test_pashub_trigger__outcome_note_added__returns_false( + new_overrides: Dict[str, str], + expected: bool, +) -> None: + deal_id = uuid.uuid4() + old_deal = make_old_deal(id=deal_id) + new_deal = make_new_deal(deal_id, **new_overrides) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + == expected + ) + + +@pytest.mark.parametrize( + "old_overrides,new_overrides,expected", + [ + ( + {"pashub_link": "www.google.co.uk"}, + {"pashub_link": "www.bbc.co.uk"}, + True, + ), + ], +) +def test_pashub_trigger__pashub_link_changed__returns_true( + old_overrides: Dict[str, str], + new_overrides: Dict[str, str], + expected: bool, +) -> None: + deal_id = uuid.uuid4() + old_deal = make_old_deal(id=deal_id, **old_overrides) + new_deal = make_new_deal(deal_id, **new_overrides) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + == expected + ) + + +@pytest.mark.parametrize( + "coordination_status,expected", + [ + ("v1 ioe/mtp complete", True), + ("v2 ioe/mtp complete", True), + ], +) +def test_pashub_trigger__coordination_completed_and_pashub_link_set__returns_true( + coordination_status: str, + expected: bool, +) -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + pashub_link="www.google.co.uk", + coordination_status="random", + ) + + new_deal = make_new_deal( + deal_id, + pashub_link="www.google.co.uk", + coordination_status=coordination_status, + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + == expected + ) + + +def test_pashub_trigger__coordination_completed_and_pashub_link_not_set__returns_false() -> ( + None +): + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + coordination_status="random", + ) + + new_deal = make_new_deal( + deal_id, + coordination_status="v2 ioe/mtp complete", + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + is False + ) + + +def test_pashub_trigger__design_completed_and_pashub_link_set__returns_true() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + pashub_link="www.google.co.uk", + ) + + new_deal = make_new_deal( + deal_id, + pashub_link="www.google.co.uk", + design_status="uploaded", + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + is True + ) + + +def test_pashub_trigger__design_completed_and_pashub_link_not_set__returns_false() -> ( + None +): + deal_id = uuid.uuid4() + + old_deal = make_old_deal(id=deal_id) + + new_deal = make_new_deal( + deal_id, + design_status="uploaded", + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + is False + ) + + +@pytest.mark.parametrize( + "lodgement_status,expected", + [ + ("lodgement complete", True), + ("measures lodged", True), + ], +) +def test_pashub_trigger__lodgement_completed_and_pashub_link_set__returns_true( + lodgement_status: str, + expected: bool, +) -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + pashub_link="www.google.co.uk", + ) + + new_deal = make_new_deal( + deal_id, + pashub_link="www.google.co.uk", + lodgement_status=lodgement_status, + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + == expected + ) + + +def test_pashub_trigger__lodgement_completed_and_pashub_link_not_set__returns_false() -> ( + None +): + deal_id = uuid.uuid4() + + old_deal = make_old_deal(id=deal_id) + + new_deal = make_new_deal( + deal_id, + design_status="lodgement complete", + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + is False + ) + + +def test_pashub_trigger__coordination_design_lodgement_not_completed_and_pashub_link_set__returns_false() -> ( + None +): + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + pashub_link="www.google.co.uk", + ) + + new_deal = make_new_deal( + deal_id, + pashub_link="www.google.co.uk", + coordination_status="not uploaded", + design_status="not uploaded", + lodgement_status="not uploaded", + ) + + assert ( + HubspotDealDiffer.check_for_pashub_trigger( + new_deal=new_deal, + old_deal=old_deal, + ) + is False + ) + + +# ======================= +# DB UPDATE TRIGGER TESTS +# ======================= + + +def test_db_update_trigger__no_changes__returns_false() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + dealname="Test Deal", + dealstage="stage_1", + outcome="won", + ) + + new_deal = make_new_deal( + deal_id, + hs_object_id="1", + dealname="Test Deal", + dealstage="stage_1", + outcome="won", + ) + + result = HubspotDealDiffer.check_for_db_update_trigger( + new_deal=new_deal, + new_company=None, + new_listing=None, + old_deal=old_deal, + ) + + assert result is False + + +def test_db_update_trigger__dealname_changed__returns_true() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + dealname="Old Name", + ) + + new_deal = make_new_deal( + deal_id, + hs_object_id="1", + dealname="New Name", + ) + + result = HubspotDealDiffer.check_for_db_update_trigger( + new_deal=new_deal, + new_company=None, + new_listing=None, + old_deal=old_deal, + ) + + assert result is True + + +def test_db_update_trigger__company_changed__returns_true() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + company_id="old_company", + ) + + new_deal = make_new_deal( + deal_id, + hs_object_id="1", + ) + + new_company = "new_company" + + result = HubspotDealDiffer.check_for_db_update_trigger( + new_deal=new_deal, + new_company=new_company, + new_listing=None, + old_deal=old_deal, + ) + + assert result is True + + +def test_db_update_trigger__missing_hubspot_timezone__returns_false() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + design_completion_date=datetime(2025, 11, 3, 0, 0, tzinfo=timezone.utc), + ) + + new_deal = make_new_deal( + deal_id, + hs_object_id="1", + design_completion_date=datetime(2025, 11, 3, 0, 0).isoformat(), + ) + + result = HubspotDealDiffer.check_for_db_update_trigger( + new_deal=new_deal, + new_company=None, + new_listing=None, + old_deal=old_deal, + ) + + assert result is False + + +def test_db_update_trigger__listing_changed__returns_true() -> None: + deal_id = uuid.uuid4() + + old_deal = make_old_deal( + id=deal_id, + listing_id="abc", + ) + + new_deal = make_new_deal( + deal_id, + hs_object_id="1", + ) + + new_listing = {"listing_id": "xyz"} + + result = HubspotDealDiffer.check_for_db_update_trigger( + new_deal=new_deal, + new_company=None, + new_listing=new_listing, + old_deal=old_deal, + ) + + assert result is True diff --git a/etl/hubspot/utils.py b/etl/hubspot/utils.py new file mode 100644 index 00000000..b7331f94 --- /dev/null +++ b/etl/hubspot/utils.py @@ -0,0 +1,16 @@ +from datetime import datetime, timezone +from typing import Optional + + +def parse_hs_date(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + + return dt.astimezone(timezone.utc) + except ValueError: + return None diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/main.tf b/infrastructure/terraform/lambda/ecmk_to_ara/main.tf new file mode 100644 index 00000000..357c2f87 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/main.tf @@ -0,0 +1,27 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +module "lambda" { + source = "../../modules/lambda_with_sqs" + + name = "ecmk_to_ara" #"address2uprn" for example + stage = var.stage + + image_uri = local.image_uri + + # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) + maximum_concurrency = var.maximum_concurrency + + batch_size = var.batch_size + + environment = { + STAGE = var.stage + LOG_LEVEL = "info" + } +} diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf b/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf new file mode 100644 index 00000000..87a94150 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "ecmk-to-ara-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf b/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf new file mode 100644 index 00000000..984e3908 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf @@ -0,0 +1,37 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda (e.g. address2uprn)" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +variable "maximum_concurrency" { + type = number + default = 2 + description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." +} + +variable "batch_size" { + type = number + default = 1 +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf index 6ce7a386..e8762337 100644 --- a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf @@ -7,6 +7,14 @@ data "terraform_remote_state" "shared" { } } +data "terraform_remote_state" "pashub_to_ara" { + backend = "s3" + config = { + bucket = "pashub-to-ara-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} data "aws_secretsmanager_secret_version" "db_credentials" { secret_id = "${var.stage}/assessment_model/db_credentials" @@ -39,6 +47,8 @@ module "hubspot_deal_etl" { DB_NAME = var.db_name DB_PORT = var.db_port HUBSPOT_API_KEY = var.hubspot_api_key + + PASHUB_TO_ARA_SQS_URL = data.terraform_remote_state.pashub_to_ara.outputs.pashub_to_ara_queue_url } } diff --git a/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf b/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf new file mode 100644 index 00000000..d44b8763 --- /dev/null +++ b/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf @@ -0,0 +1,4 @@ +output "pashub_to_ara_queue_url" { + value = module.lambda.queue_url + description = "URL of the PasHub to Ara SQS queue" +} diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index 9d272eb6..47866c92 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -538,6 +538,20 @@ module "pashub_to_ara_registry" { stage = var.stage } +################################################ +# ECMK to Ara – Lambda +################################################ +module "ecmk_to_ara_state_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "ecmk-to-ara-terraform-state" +} + +module "ecmk_to_ara_registry" { + source = "../modules/container_registry" + name = "ecmk_to_ara" + stage = var.stage +} + ################################################ # Engine – Lambda ECR ################################################ diff --git a/pytest.ini b/pytest.ini index db7afaf5..792b27e0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,6 +3,6 @@ pythonpath = . log_cli = true log_cli_level = INFO addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial -testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests +testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests markers = integration: mark a test as an integration test