diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 662f53b0..6a1cc120 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -35,7 +35,8 @@ ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1 ADD backend/engine/requirements.txt requirements1.txt ADD backend/app/requirements/requirements.txt requirements2.txt ADD .devcontainer/backend/requirements.txt requirements3.txt -RUN cat requirements1.txt requirements2.txt requirements3.txt > requirements.txt +ADD etl/hubspot/requirements.txt requirements4.txt +RUN cat requirements1.txt requirements2.txt requirements3.txt requirements4.txt > requirements.txt RUN pip install -r requirements.txt # 5) Workdir diff --git a/.devcontainer/backend/requirements.txt b/.devcontainer/backend/requirements.txt index 029e5efa..cb90af18 100644 --- a/.devcontainer/backend/requirements.txt +++ b/.devcontainer/backend/requirements.txt @@ -25,4 +25,4 @@ psycopg[binary] pytest-postgresql # Formatting black==26.1.0 -boto3-stubs \ No newline at end of file +boto3-stubs diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 3263739f..cbcd88c4 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -484,4 +484,40 @@ jobs: - name: Terraform Apply if: env.TERRAFORM_APPLY == 'true' working-directory: infrastructure/terraform/cdn - run: terraform apply -auto-approve tfplan \ No newline at end of file + run: terraform apply -auto-approve tfplan + + # ============================================================ + # Build Hubspot ETL image + # ============================================================ + hubspot_etl_image: + needs: [determine_stage, shared_terraform] + uses: ./.github/workflows/_build_image.yml + with: + ecr_repo: hubspot-etl-${{ needs.determine_stage.outputs.stage }} + dockerfile_path: etl/hubspot/scripts/scraper/handler/Dockerfile + build_context: . + secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + + # ============================================================ + # Deploy Hubspot ETL Lambda + # ============================================================ + hubspot_etl_lambda: + needs: [hubspot_etl_image, determine_stage] + uses: ./.github/workflows/_deploy_lambda.yml + with: + lambda_name: hubspot-etl-to-ara + lambda_path: infrastructure/terraform/lambda/hubspot_deal_etl + stage: ${{ needs.determine_stage.outputs.stage }} + ecr_repo: hubspot-etl-${{ needs.determine_stage.outputs.stage }} + image_digest: ${{ needs.hubspot_etl_image.outputs.image_digest }} + terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }} + secrets: + TF_VAR_db_host: ${{ secrets.DEV_DB_HOST }} + TF_VAR_db_name: ${{ secrets.DEV_DB_NAME }} + TF_VAR_db_port: ${{ secrets.DEV_DB_PORT }} + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 58bfdd7a..436428f9 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -7,24 +7,52 @@ on: jobs: - test: + test-docker: + name: Tests (Docker) runs-on: ubuntu-latest + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: test + POSTGRES_PASSWORD: test + POSTGRES_DB: test + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: - name: Checkout code uses: actions/checkout@v4 - - name: Set up Python 3.11 - uses: actions/setup-python@v4 - with: - python-version: '3.11' + - name: Build test image + run: docker build -f Dockerfile.test -t model-test . - - name: Install tox via Makefile + - name: Initialise database schema run: | - make setup + docker run --rm \ + --network host \ + -e DB_HOST=localhost \ + -e DB_NAME=test \ + -e DB_USERNAME=test \ + -e DB_PASSWORD=test \ + -e DB_PORT=5432 \ + model-test python scripts/init_db.py - - name: Run tests with tox via Makefile - env: - EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }} + - name: Run tests run: | - make test ARGS="-m 'not integration'" + docker run --rm \ + --network host \ + -e EPC_AUTH_TOKEN=${{ secrets.DEV_EPC_AUTH_TOKEN }} \ + -e HUBSPOT_API_KEY=${{ secrets.HUBSPOT_API_KEY }} \ + -e DB_HOST=localhost \ + -e DB_NAME=test \ + -e DB_USERNAME=test \ + -e DB_PASSWORD=test \ + -e DB_PORT=5432 \ + model-test pytest -vv -m 'not integration' diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 00000000..802eb3a4 --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,28 @@ +FROM python:3.11-slim + +# Install PostgreSQL binaries — required by pytest-postgresql to spawn ephemeral test databases +RUN apt-get update \ + && apt-get install -y --no-install-recommends postgresql \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +ENV PYTHONPATH=/app + +# Copy requirements first so Docker can cache the install layer +COPY backend/engine/requirements.txt backend/engine/requirements.txt +COPY backend/app/requirements/requirements.txt backend/app/requirements/requirements.txt +COPY test.requirements.txt test.requirements.txt + +RUN pip install --no-cache-dir \ + -r backend/engine/requirements.txt \ + -r backend/app/requirements/requirements.txt \ + -r test.requirements.txt + +# Copy source +COPY . . + +# pg_ctl refuses to run as root — create an unprivileged user +RUN useradd -m testuser && chown -R testuser /app +USER testuser + +CMD ["pytest"] diff --git a/Dockerfile.test.dockerignore b/Dockerfile.test.dockerignore new file mode 100644 index 00000000..4f79c6ee --- /dev/null +++ b/Dockerfile.test.dockerignore @@ -0,0 +1,12 @@ +# We need this file otherwise it'll use .dockerignore +# Exclude large/irrelevant directories that are not needed for testing +model_data/local_data/ +backend/node_modules/ +backend/.idea/ +backend/.env +infrastructure/ +data_collection/ +node_modules/ +conservation_areas/ +open_uprn/ +land_registry/ diff --git a/README.md b/README.md index 9268ba25..b470e12c 100644 --- a/README.md +++ b/README.md @@ -39,3 +39,4 @@ pytest --cov-config=model_data/.coveragerc --cov=model_data This will produce the test results and coverage reports + diff --git a/asset_list/app.py b/asset_list/app.py index 5e821bb9..5794eaf3 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -73,59 +73,24 @@ def app(): Property UPRN """ - # data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/E.ON/202603 modelling project" - # # data_filename = "For Modelling - Final - reviewed.xlsx" - # data_filename = "eon - 20260323 address sanitisation.xlsx" - # sheet_name = "in" - # postcode_column = "postcode" - # address1_column = "Address 1" - # address1_method = None - # fulladdress_column = "Address 1" - # address_cols_to_concat = [] - # missing_postcodes_method = None - # landlord_year_built = None - # landlord_os_uprn = "address2uprn_uprn" - # landlord_property_type = "PropertyType" - # landlord_built_form = "BuiltForm" - # landlord_wall_construction = None - # landlord_roof_construction = None - # landlord_heating_system = None - # landlord_existing_pv = None - # landlord_property_id = "UPRN" - # landlord_sap = None - # outcomes_filename = None - # outcomes_sheetname = None - # outcomes_postcode = None - # outcomes_houseno = None - # outcomes_id = None - # outcomes_address = None - # master_filepaths = [] - # master_id_colnames = [] - # master_to_asset_list_filepath = None - # phase = False - # ecosurv_landlords = None - # asset_list_header = 0 - # landlord_block_reference = None - - data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/SMS" - # data_filename = "For Modelling - Final - reviewed.xlsx" - data_filename = "SMS Data sample to sense check before WHLG deploy.xlsx" - sheet_name = "All Darlaston Properties" + data_folder = "/workspaces/model/asset_list" + data_filename = "Calico ARA Upload Review.xlsx" + sheet_name = "Sheet1" postcode_column = "Postcode" - address1_column = "House Number" + address1_column = "Units" address1_method = None - fulladdress_column = None - address_cols_to_concat = ["House Number", "Street name"] + fulladdress_column = "Units" + address_cols_to_concat = ["Units"] missing_postcodes_method = None landlord_year_built = None landlord_os_uprn = None - landlord_property_type = None - landlord_built_form = None + landlord_property_type = None # Good to include if landlord gave + landlord_built_form = None # Good to include if landlord gave landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None - landlord_property_id = "id" + landlord_property_id = "llid" landlord_sap = None outcomes_filename = None outcomes_sheetname = None @@ -277,7 +242,7 @@ def app(): if skip is not None and not force_retrieve_data: if i <= skip: continue - chunk = asset_list.standardised_asset_list[i: i + chunk_size] + chunk = asset_list.standardised_asset_list[i : i + chunk_size] epc_data_chunk, errors_chunk, no_epc_chunk = get_data( df=chunk, row_id_name=asset_list.DOMNA_PROPERTY_ID, @@ -420,7 +385,7 @@ def app(): # Retrieve just the data we need epc_df = epc_df[ [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) - ].rename(columns=asset_list.EPC_API_DATA_NAMES) + ].rename(columns=asset_list.EPC_API_DATA_NAMES) # Look for columns not in the find my EPC data, which will have happened if we didn't # retrieve it in the first place @@ -437,7 +402,7 @@ def app(): find_my_epc_data[ [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) - ].rename(columns=asset_list.FIND_EPC_DATA_NAMES), + ].rename(columns=asset_list.FIND_EPC_DATA_NAMES), how="left", on=asset_list.DOMNA_PROPERTY_ID, ) diff --git a/backend/address2UPRN/README.md b/backend/address2UPRN/README.md index 6d26f281..646fec01 100644 --- a/backend/address2UPRN/README.md +++ b/backend/address2UPRN/README.md @@ -5,6 +5,7 @@ Before you run: Step 1) Get the list and ensure the following columns exists +I believe lower and upper case matter: * Address 1 * Address 2 * Address 3 @@ -23,17 +24,19 @@ For this example I'll be using "s3://retrofit-data-dev/ara_raw_inputs/calico/Cal Go to Ara DB and make a new task_id with a randomly generated uuid as the primarily key -task_id = a7b70a02-4df4-45b5-a50b-196e095910bb -sub_task_id = 567cf73b-1210-4909-9ecc-36ae7e23420e +task_id = ea615ac3-ac28-46c4-8bff-2431c5b9c13d +sub_task_id = 85a23b67-8f18-4299-9bf0-69bfb87adbc7 +s3 => s3://retrofit-data-dev/ara_raw_inputs/eon/North Tyneside Council.csv Step 3) Alright, now lets make the input for postcode-splitter sqs to get the ball rolling postcode-splitter-sqs => https://eu-west-2.console.aws.amazon.com/sqs/v3/home?region=eu-west-2#/queues/https%3A%2F%2Fsqs.eu-west-2.amazonaws.com%2F337213553626%2Fpostcode-splitter-queue-dev { - "task_id": "a7b70a02-4df4-45b5-a50b-196e095910bb", - "sub_task_id": "567cf73b-1210-4909-9ecc-36ae7e23420e", - "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico Homes Full list EPC Properties(Sheet2) (1) (1).csv" + "task_id": "ea615ac3-ac28-46c4-8bff-2431c5b9c13d", + "sub_task_id": "85a23b67-8f18-4299-9bf0-69bfb87adbc7", + "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/eon/eon(Sheet1).csv" } + Each batch of csv should be saved in retrofit-data-dev/ara_postcode_splitter_batches///.csv outputs of address2uprn ( which is automatically triggered on postcodesplitter) will be saved on retrofit-data-dev/ara_raw_outputs///.csv diff --git a/backend/app/config.py b/backend/app/config.py index 1310a05e..80a2d46a 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -65,6 +65,7 @@ class Settings(BaseSettings): ORDNANCE_SURVEY_API_KEY: str = "changeme" + HUBSPOT_API_KEY: Optional[str] = None # Sharepoint SHAREPOINT_CLIENT_ID: Optional[str] = None SHAREPOINT_CLIENT_SECRET: Optional[str] = None @@ -102,7 +103,6 @@ def get_prediction_buckets(): "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET, "heating_kwh_predictions": get_settings().HEATING_KWH_PREDICTIONS_BUCKET, "hotwater_kwh_predictions": get_settings().HOTWATER_KWH_PREDICTIONS_BUCKET, - # Score model - SAP re-baselining model "retrofit_sap_baseline_predictions": get_settings().SAP_BASELINE_PREDICTIONS_BUCKET, "retrofit_carbon_baseline_predictions": get_settings().CARBON_BASELINE_PREDICTIONS_BUCKET, diff --git a/backend/app/db/models/organisation.py b/backend/app/db/models/organisation.py new file mode 100644 index 00000000..e8649cdd --- /dev/null +++ b/backend/app/db/models/organisation.py @@ -0,0 +1,58 @@ +from sqlmodel import SQLModel, Field, Column, text +from datetime import datetime, timezone +from typing import Optional +from sqlalchemy import DateTime +from sqlalchemy.sql import func +import uuid + + +class Organisation(SQLModel, table=True): + __tablename__ = "organisation" + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + hubspot_company_id: Optional[str] = None + name: Optional[str] = None + + +class HubspotDealData(SQLModel, table=True): + __tablename__ = "hubspot_deal_data" + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + + # HubSpot Deal identifiers + deal_id: str = Field(index=True, nullable=False) + dealname: Optional[str] = Field(default=None) + dealstage: Optional[str] = Field(default=None) + company_id: Optional[str] = Field(default=None) + project_code: Optional[str] = Field(default=None) + + # HubSpot custom properties + landlord_property_id: Optional[str] = Field(default=None) + uprn: Optional[str] = Field(default=None) + outcome: Optional[str] = Field(default=None) + outcome_notes: Optional[str] = Field(default=None) + + major_condition_issue_description: Optional[str] = Field(default=None) + major_condition_issue_photos: Optional[str] = Field(default=None) + major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None) + + coordination_status: Optional[str] = Field(default=None) + design_status: Optional[str] = Field(default=None) + + created_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("(NOW() AT TIME ZONE 'utc')"), + nullable=False, + ) + ) + + updated_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("(NOW() AT TIME ZONE 'utc')"), + onupdate=func.now(), + nullable=False, + ) + ) diff --git a/backend/export/tests/conftest.py b/backend/export/tests/conftest.py index 10bfa971..80344c5e 100644 --- a/backend/export/tests/conftest.py +++ b/backend/export/tests/conftest.py @@ -2,6 +2,8 @@ import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from backend.app.db.base import Base +from sqlmodel import SQLModel +import backend.app.db.models.organisation # noqa: F401 — registers Organisation with SQLModel.metadata @pytest.fixture(scope="function") @@ -25,12 +27,14 @@ def engine(postgresql): # Create tables once per test session Base.metadata.create_all(engine) + SQLModel.metadata.create_all(engine) # Yeild will split this function into two phase. 1) setup and 2) teardown, the latter of which will run after all # tests have completed yield engine # Clean-up after entire test session + SQLModel.metadata.drop_all(engine) Base.metadata.drop_all(engine) engine.dispose() diff --git a/backend/scripts/combine_address2uprn_outputs.py b/backend/scripts/combine_address2uprn_outputs.py index be17f610..105b8639 100644 --- a/backend/scripts/combine_address2uprn_outputs.py +++ b/backend/scripts/combine_address2uprn_outputs.py @@ -53,13 +53,3 @@ def main(task_id, output): print(f"Combined CSV saved to {output}") print(f"Total rows: {len(combined)}") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("task_id", help="Task ID folder in S3") - parser.add_argument("--output", default="combined.csv") - - args = parser.parse_args() - - main(args.task_id, args.output) diff --git a/conftest.py b/conftest.py index d93f0023..2ea20ebb 100644 --- a/conftest.py +++ b/conftest.py @@ -30,6 +30,7 @@ DEFAULT_ENV = { "HEATING_KWH_PREDICTIONS_BUCKET": "test", "HOTWATER_KWH_PREDICTIONS_BUCKET": "test", "ENERGY_ASSESSMENTS_BUCKET": "test", + "HUBSPOT_API_KEY": "changeme", } # runs immediately when pytest starts, BEFORE collection diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py new file mode 100644 index 00000000..8bbe8a63 --- /dev/null +++ b/etl/hubspot/hubspotClient.py @@ -0,0 +1,448 @@ +import os +from enum import Enum +from typing import Optional, cast + +from hubspot.client import Client # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.products.api.basic_api import BasicApi as ProductsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.line_items.api.basic_api import BasicApi as LineItemsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs] + CollectionResponsePipelineNoPaging as PipelinesResponse, +) +from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs] + CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse, + MultiAssociatedObjectWithLabel as AssociationsResult, + ForwardPaging as AssociationsPaging, + NextPage as AssociationsPagingNext, +) +from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb + + +from backend.app.config import get_settings +from utils.logger import setup_logger + +import mimetypes +import requests + + +class Companies(Enum): + ABRI = "237615001799" + SOUTHERN_HOUSING_GROUP = "109343619305" + LIVEWEST = "86205872354" + SURESERVE = "301745289413" + HOMEGROUP = "94946071794" + APPLE = "184769046716" + THE_GUINESS_PARTNERSHIP = "86970043613" + CALICO_HOMES = "86975437046" + + +class DealStage(Enum): + SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914" + SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915" + CUSTOMER_CONTACTED = "888730834" + SURVEYED_COMPLETED_SIGNED_OFF = "1617223916" + FILES_MISSING_FROM_ASSESSOR = "1887736000" + + +class Pipeline(Enum): + OPERATIONS_SOCIAL_HOUSING = "1167582403" + + +# TODO get guiness working from here + + +class HubspotClient: + + def __init__(self): + """ + Hey Tech Team, Hubspot Library doesn't do type hitting. + We have type hinted stuff but pylance never becomes happy. + However, because I added the type hinting to the best of ability + and you'll still get sensible ide suggestions. + """ + settings = get_settings() + access_token = settings.HUBSPOT_API_KEY + if access_token is None: + raise RuntimeError("Missing HUBSPOT_API_KEY in env") + self.access_token: str = access_token + self.logger = setup_logger() + self.client: Client = Client.create(access_token=self.access_token) # type: ignore[reportUnknownMemberType] + # [Developer Only] + # Add a dot in front of client and see the wonders of ide suggestions + # This wouldn't work if we didn't add ': Client' to self.client. + # Sorry - not sorry but enjoy, Past Junte 13/03/2026 + # self.client + + def get_deal_ids_from_company(self, company_id: str) -> list[str]: + associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType] + self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + ) + + deal_ids: list[str] = [] + after: Optional[str] = None + + while True: + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="companies", + object_id=company_id, + to_object_type="deals", + limit=100, + after=after, + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + for assoc in results: + assoc: AssociationsResult + object_id: str = cast(str, assoc.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + deal_ids.append(object_id) + + paging: Optional[AssociationsPaging] = cast(Optional[AssociationsPaging], response.paging) # type: ignore[reportUnknownMemberType] + if not paging: + break + + paging_next: Optional[AssociationsPagingNext] = cast(Optional[AssociationsPagingNext], paging.next) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + if not paging_next: + break + + after = cast(str, paging_next.after) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + + return deal_ids + + def from_deal_id_get_associated_company_id(self, deal_id: str) -> Optional[str]: + """ + Get the associated company ID from a given deal ID. + Returns the associated company ID, or None if not found. + """ + try: + associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + + # Fetch associations for this specific deal only + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="deals", + object_id=deal_id, + to_object_type="companies", + limit=1, # Expect only one associated company + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + if not results: + self.logger.info(f"No company association found for deal {deal_id}") + return None + + first: AssociationsResult = results[0] + company_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}") + return company_id + + except ApiException as e: + self.logger.error( + f"Error fetching associated company for deal {deal_id}: {e}" + ) + return None + + def from_deal_id_get_associated_listing( + self, deal_id: str + ) -> Optional[dict[str, str]]: + """ + Get the associated listing information for a given deal. + Returns a dictionary of listing properties, or None if not found. + """ + associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing" + + # Fetch associated listing(s) + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="deals", + object_id=deal_id, + to_object_type="0-420", # <-- to get an listing object + limit=1, + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + if not results: + self.logger.info(f"No listing association found for deal {deal_id}") + return None + + first: AssociationsResult = results[0] + listing_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}") + + # Fetch listing details (the "listing information") + listing: HubspotObject = listings_api.get_by_id( # type: ignore[reportUnknownMemberType] + object_type="0-420", # again, must match your HubSpot object name + object_id=listing_id, + properties=[ + "national_uprn", + "domna_property_id", + "owner_property_id", + ], + ) + + listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType] + self.logger.info(f"Listing info for deal {deal_id}: {listing_info}") + return listing_info + + def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]: + deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType] + + deal: HubspotObject = deals_api.get_by_id( # type: ignore[reportUnknownMemberType] + deal_id, + properties=[ + "dealname", + "dealstage", + "pipeline", + "outcome", # outcome, + "outcome_notes", # outcome notes + "project_code", + "major_condition_issue_description", + "major_condition_issue_photos", + "coordination_status__stage_1_", # Coordiantion Status (Stage 1), + "retrofit_design_status", # Retrofit Design Status + ], + ) + + deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType] + return deal_info + + def get_deal_info_for_db( + self, deal_id: str + ) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]: + + deal: dict[str, str] = self.from_deal_id_get_info(deal_id) + company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id) + + if company: + company_data: CompanyData = self.get_company_information(company) + dbloader: HubspotDataToDb = HubspotDataToDb() + dbloader.upsert_company(company_data) + + listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing( + deal_id + ) + + return deal, company, listing + + def get_company_information(self, company_id: str) -> CompanyData: + companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType] + + company: HubspotObject = companies_api.get_by_id( # type: ignore[reportUnknownMemberType] + company_id, + properties=[ + "name", + ], + ) + + company_info: CompanyData = company.properties # type: ignore[reportUnknownMemberType] + return company_info + + def get_all_pipelines(self) -> list[dict[str, str]]: + """ + Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs. + """ + try: + pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType] + response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType] + + results: list[HubspotPipeline] = cast(list[HubspotPipeline], response.results) # type: ignore[reportUnknownMemberType] + pipelines: list[dict[str, str]] = [] + for pipeline in results: + pipeline: HubspotPipeline + pipelines.append( + { + "name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType] + "id": cast(str, pipeline.id), # type: ignore[reportUnknownMemberType] + } + ) + + self.logger.info(f"Retrieved {len(pipelines)} pipelines.") + return pipelines + + except Exception as e: + self.logger.error(f"Error retrieving pipelines: {e}") + return [] + + def get_deal_stages_from_pipeline_id( + self, pipeline_id: Optional[str] = None + ) -> list[dict[str, str]]: + """ + Retrieve all deal stages for a given pipeline. + If no pipeline_id is provided, retrieves all stages for all pipelines. + Returns a list of dicts with pipeline name, stage name, and stage ID. + """ + try: + pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType] + response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType] + + all_stages: list[dict[str, str]] = [] + + for pipeline in cast(list[HubspotPipeline], response.results): # type: ignore[reportUnknownMemberType] + pipeline: HubspotPipeline + # Skip other pipelines if a specific one is requested + pipeline_id_str: str = cast(str, pipeline.id) # type: ignore[reportUnknownMemberType] + if pipeline_id and pipeline_id_str != str(pipeline_id): + continue + + for stage in cast(list[HubspotPipelineStage], pipeline.stages): # type: ignore[reportUnknownMemberType] + stage: HubspotPipelineStage + all_stages.append( + { + "pipeline_name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType] + "pipeline_id": pipeline_id_str, + "stage_name": cast(str, stage.label), # type: ignore[reportUnknownMemberType] + "stage_id": cast(str, stage.id), # type: ignore[reportUnknownMemberType] + } + ) + + if not all_stages: + self.logger.info( + f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}" + ) + else: + self.logger.info(f"Retrieved {len(all_stages)} deal stages.") + + return all_stages + + except Exception as e: + self.logger.error(f"Error retrieving deal stages: {e}") + return [] + + def download_file_from_url( + self, download_url: str, save_path: Optional[str] = None + ) -> str: + """ + Download a file from a HubSpot file URL (public or private), keeping its original file type. + """ + + try: + headers: dict[str, str] = {} + if "hubspotusercontent" not in download_url: + headers["Authorization"] = f"Bearer {self.access_token}" + + self.logger.info(f"Downloading HubSpot file: {download_url}") + response = requests.get( + download_url, headers=headers, stream=True, allow_redirects=True + ) + response.raise_for_status() + + # Try to infer filename from Content-Disposition header + content_disposition = response.headers.get("content-disposition") + if content_disposition and "filename=" in content_disposition: + filename = content_disposition.split("filename=")[1].strip('"') + else: + # fallback: extract from URL or content-type + filename = ( + os.path.basename(download_url.split("?")[0]) or "hubspot_download" + ) + if "." not in filename: + content_type = response.headers.get("content-type") + ext = ( + mimetypes.guess_extension(content_type.split(";")[0]) + if content_type + else None + ) + if ext: + filename += ext + + # Make sure save_path is valid + if save_path is None: + save_path = os.path.abspath(filename) + elif os.path.isdir(save_path): + save_path = os.path.join(save_path, filename) + else: + # if user passes a file path directly, leave it + save_path = os.path.abspath(save_path) + + with open(save_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + self.logger.info(f"File downloaded successfully → {save_path}") + return save_path + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to download file from HubSpot: {e}") + raise + + def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str: + # Fetch product mapping + products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType] + product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType] + product_id, properties=["name", "price", "hs_price"] + ) + properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType] + + name: str = properties.get("name") or "" + price: str = properties.get("price") or properties.get("hs_price") or "0" + + # Build line item payload + line_item_input = SimplePublicObjectInput( + properties={ + "hs_product_id": product_id, + "name": name, + "quantity": str(quantity), + "price": price, + "amount": str(float(price) * quantity), + "invoiced": "Outstanding", + } + ) + + # Create line item + line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType] + line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType] + return cast(str, line_item.id) # type: ignore[reportUnknownMemberType] + + def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None: + self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}") + + association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + + association_api.create( # type: ignore[reportUnknownMemberType] + "0-3", # to object type + deal_id, # to object id + "line_items", # from object type + line_item_id, # from object id + [ + AssociationSpec( + association_category="HUBSPOT_DEFINED", + association_type_id=19, # line_item → deal + ) + ], + ) + + def add_product_line_item_to_deal( + self, deal_id: str, product_id: str, quantity: int = 1 + ) -> str: + # Step 1: Create the line item from product mapping + line_item_id: str = self.create_line_item_from_product(product_id, quantity) + + # Step 2: Associate the created line item to the deal + self.associate_line_item_to_deal(line_item_id, deal_id) + + return line_item_id + + def delete_line_item(self, line_item_id: str) -> bool: + """ + Delete (archive) a line item in HubSpot by its ID. + """ + try: + self.logger.info(f"Deleting line item {line_item_id}...") + + line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType] + line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType] + + self.logger.info(f"Line item {line_item_id} deleted successfully.") + return True + + except ApiException as e: + self.logger.error(f"Failed to delete line item {line_item_id}: {e}") + return False diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py new file mode 100644 index 00000000..f7f79e46 --- /dev/null +++ b/etl/hubspot/hubspotDataTodB.py @@ -0,0 +1,342 @@ +from backend.app.db.connection import db_read_session +from backend.app.db.models.organisation import Organisation, HubspotDealData +from sqlmodel import select +from datetime import datetime, timezone +from typing import TypedDict +from etl.hubspot.s3_uploader import S3Uploader +import hashlib +import os + + +class CompanyData(TypedDict): + hs_object_id: str + name: str + + +class HubspotDataToDb: + def __init__(self): + self.s3 = S3Uploader( + aws_access_key=os.getenv("AWS_ACCESS_KEY"), + aws_secret_key=os.getenv("AWS_SECRET_KEY"), + region=os.getenv("AWS_REGION"), + ) + + def read_org_table(self, limit: int = 10): + with db_read_session() as session: + records = session.exec(select(Organisation).limit(limit)).all() + return records + + def get_org_names(self, limit: int = 10) -> list[str]: + """Returns a list of organisation names.""" + records = self.read_org_table(limit) + return [org.name for org in records if org.name] + + def upsert_company(self, company_data: CompanyData) -> Organisation: + """Upserts a company record. Updates if hubspot_company_id exists, otherwise creates new.""" + with db_read_session() as session: + hubspot_id = company_data.get("hs_object_id") + company_name = company_data.get("name") + + # Check if company already exists + existing = session.exec( + select(Organisation).where( + Organisation.hubspot_company_id == hubspot_id + ) + ).first() + + if existing: + # Update existing record + existing.name = company_name + existing.updated_at = datetime.now(timezone.utc) + session.add(existing) + record = existing + else: + # Create new record + record = Organisation( + hubspot_company_id=hubspot_id, + name=company_name, + ) + session.add(record) + + session.commit() + return record + + def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client): + print("⚠️ Deprecated — use the new interface instead.") + return self.upsert_deal(deal_data, company, listing, hubspot_client) + + def find_all_deals_with_company_id(self, company_id): + """Returns a list of deals for a given company_id.""" + with db_read_session() as session: + return ( + session.query(HubspotDealData) + .filter(HubspotDealData.company_id == company_id) + .all() + ) + + def find_deal_with_deal_id(self, deal_id): + with db_read_session() as session: + return ( + session.query(HubspotDealData) + .filter(HubspotDealData.deal_id == deal_id) + .one_or_none() + ) + + def _sha256(self, file_path: str) -> str: + """Compute SHA-256 checksum of a file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return sha256.hexdigest() + + def update_deal_with_checks(self, deal_in_db, hubspot_client) -> bool: + """ + Checks if a deal needs updating and syncs it with HubSpot. + Also handles major_condition_issue_photos file upload to S3 with integrity check. + """ + + def soft_assert(condition, message="Assertion Failed"): + if not condition: + print(f"⚠️ Soft Assert Failed: {message}") + return False + return True + + print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") + + hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db( + deal_in_db.deal_id + ) + + # Soft compare key fields + checks = [ + soft_assert( + deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch" + ), + soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), + soft_assert( + deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), + "landlord_property_id mismatch", + ), + soft_assert( + deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch" + ), + soft_assert( + deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch" + ), + soft_assert( + deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch" + ), + soft_assert( + deal_in_db.project_code == hs_deal.get("project_code"), + "project_code mismatch", + ), + soft_assert( + deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch" + ), + soft_assert( + deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), + "outcome_notes mismatch", + ), + soft_assert( + deal_in_db.major_condition_issue_description + == hs_deal.get("major_condition_issue_description"), + "major condition description mismatch", + ), + soft_assert( + deal_in_db.major_condition_issue_photos + == hs_deal.get("major_condition_issue_photos"), + "major condition issue photos mismatch", + ), + soft_assert( + deal_in_db.coordination_status + == hs_deal.get("coordination_status__stage_1_"), + "coordination stage 1 status mismatch", + ), + soft_assert( + deal_in_db.design_status == hs_deal.get("retrofit_design_status"), + "retrofit design mismatch", + ), + ] + + # If discrepancies found, update from HubSpot + if not all(checks): + print( + f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot." + ) + self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) + return False + + # Handle photo upload if it exists but S3 URL is missing + if ( + deal_in_db.major_condition_issue_photos + and not deal_in_db.major_condition_issue_evidence_s3_url + ): + print( + f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." + ) + + photo_url = hs_deal.get("major_condition_issue_photos") + if photo_url: + try: + # Download from HubSpot using fresh URL from hs_deal (not stale DB URL) + local_file = hubspot_client.download_file_from_url(photo_url) + + # Upload to S3 + bucket = "retrofit-data-dev" + s3_url = self.s3.upload_file( + local_file, bucket, prefix="hubspot/awaabs_law_evidence/" + ) + + # Download again to verify integrity + downloaded = self.s3.download_from_url(s3_url) + if self._sha256(local_file) == self._sha256(downloaded): + print("✅ SHA256 match verified — upload successful.") + else: + print("❌ SHA256 mismatch — integrity check failed.") + raise ValueError("File integrity check failed after S3 upload.") + + # Update DB record with S3 URL + with db_read_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = s3_url + session.add(db_record) + session.commit() + print( + f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}" + ) + return False + except Exception as e: + print( + f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}" + ) + # Continue without the file — don't crash the entire update + else: + print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") + + else: + print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") + + return True + + def upsert_deal(self, deal_data, company, listing, hubspot_client): + """ + Inserts or updates a deal record. + Also uploads photos if present and adds S3 URL. + """ + with db_read_session() as session: + deal_id = deal_data.get("hs_object_id") + + statement = select(HubspotDealData).where( + HubspotDealData.deal_id == deal_id + ) + existing = session.exec(statement).first() + + if existing: + print(f"🔄 Updating existing deal (deal_id={deal_id})") + + for attr, value in { + "dealname": deal_data.get("dealname"), + "dealstage": deal_data.get("dealstage"), + "landlord_property_id": listing.get("owner_property_id"), + "uprn": listing.get("national_uprn"), + "outcome": deal_data.get("outcome"), + "outcome_notes": deal_data.get("outcome_notes"), + "project_code": deal_data.get("project_code"), + "company_id": company, + "major_condition_issue_description": deal_data.get( + "major_condition_issue_description" + ), + "major_condition_issue_photos": deal_data.get( + "major_condition_issue_photos" + ), + "major_condition_issue_description": deal_data.get( + "major_condition_issue_description" + ), + "major_condition_issue_photos": deal_data.get( + "major_condition_issue_photos" + ), + "coordination_status": deal_data.get( + "coordination_status__stage_1_" + ), + "design_status": deal_data.get("retrofit_design_status"), + }.items(): + setattr(existing, attr, value or getattr(existing, attr)) + + # Upload if photo exists but S3 link missing + if ( + existing.major_condition_issue_photos + and not existing.major_condition_issue_evidence_s3_url + ): + # Fetch fresh URL from HubSpot instead of using potentially expired stored URL + fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id) + photo_url = fresh_deal.get("major_condition_issue_photos") + + if photo_url: + try: + local_file = hubspot_client.download_file_from_url( + photo_url + ) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + existing.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print( + f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}" + ) + # Continue without the file — don't crash the update + else: + print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}") + + session.add(existing) + session.commit() + session.refresh(existing) + return existing + + else: + print(f"🆕 Inserting new deal (deal_id={deal_id})") + new_record = HubspotDealData( + deal_id=deal_id, + dealname=deal_data.get("dealname"), + dealstage=deal_data.get("dealstage"), + landlord_property_id=listing.get("owner_property_id"), + uprn=listing.get("national_uprn"), + outcome=deal_data.get("outcome"), + outcome_notes=deal_data.get("outcome_notes"), + project_code=deal_data.get("project_code"), + company_id=company, + major_condition_issue_description=deal_data.get( + "major_condition_issue_description" + ), + major_condition_issue_photos=deal_data.get( + "major_condition_issue_photos" + ), + coordination_status=deal_data.get("coordination_status__stage_1_"), + design_status=deal_data.get("retrofit_design_status"), + ) + + # Handle upload at insert time + if new_record.major_condition_issue_photos: + try: + local_file = hubspot_client.download_file_from_url( + new_record.major_condition_issue_photos + ) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + new_record.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print( + f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}" + ) + # Continue without the file — don't crash the insert + + session.add(new_record) + session.commit() + session.refresh(new_record) + return new_record diff --git a/etl/hubspot/requirements.txt b/etl/hubspot/requirements.txt new file mode 100644 index 00000000..ef8e3ebc --- /dev/null +++ b/etl/hubspot/requirements.txt @@ -0,0 +1 @@ +hubspot-api-client \ No newline at end of file diff --git a/etl/hubspot/s3_uploader.py b/etl/hubspot/s3_uploader.py new file mode 100644 index 00000000..f5cc0ec9 --- /dev/null +++ b/etl/hubspot/s3_uploader.py @@ -0,0 +1,116 @@ +import os +import boto3 +from botocore.exceptions import ClientError +from urllib.parse import urlparse +from datetime import datetime +import requests + + +class S3Uploader: + """ + Simple helper to upload local files to S3 and return their S3 HTTPS URI. + """ + + def __init__( + self, + aws_access_key: str, + aws_secret_key: str, + region: str = "eu-west-2", + ): + self.aws_access_key = aws_access_key + self.aws_secret_key = aws_secret_key + self.region = region + + self.s3 = boto3.client( + "s3", + aws_access_key_id=self.aws_access_key, + aws_secret_access_key=self.aws_secret_key, + region_name=self.region, + ) + + def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str: + """ + Upload a local file to an S3 bucket and return its HTTPS URI. + + Args: + file_path (str): Path to the local file. + bucket (str): S3 bucket name. + prefix (str): Folder/prefix in the bucket. + + Returns: + str: HTTPS-style S3 URI (not signed). + """ + try: + filename = os.path.basename(file_path) + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + s3_key = os.path.join(prefix, f"{timestamp}_{filename}") + + self.s3.upload_file(file_path, bucket, s3_key) + + s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}" + return s3_uri + + except ClientError as e: + raise RuntimeError(f"❌ S3 upload failed: {e}") + + def print_bucket(self): + print(self.s3.head_bucket(Bucket="retrofit-data-dev")) + + def generate_presigned_url( + self, bucket: str, key: str, expires_in: int = 3600 + ) -> str: + """ + Generate a temporary presigned URL for an S3 object. + """ + try: + return self.s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=expires_in, + ) + except ClientError as e: + raise RuntimeError(f"❌ Failed to generate signed URL: {e}") + + def download_from_url( + self, s3_url: str, local_dir: str = ".", expires_in: int = 3600 + ) -> str: + """ + Download a file from a public or private S3 URL. + If private, generates a presigned URL first. + + Args: + s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt) + local_dir (str): Folder to save the file in. + expires_in (int): Presigned URL lifetime (seconds). + + Returns: + str: Local file path of the downloaded file. + """ + parsed = urlparse(s3_url) + host_parts = parsed.netloc.split(".") + if len(host_parts) < 3 or host_parts[1] != "s3": + raise ValueError("❌ Not a valid S3 HTTPS URL") + + bucket = host_parts[0] + key = parsed.path.lstrip("/") + + # Generate presigned URL (whether public or private) + presigned_url = self.generate_presigned_url(bucket, key, expires_in) + + filename = os.path.basename(key) + local_path = os.path.join(local_dir, filename) + + try: + response = requests.get(presigned_url, stream=True) + response.raise_for_status() + + os.makedirs(local_dir, exist_ok=True) + with open(local_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + print(f"✅ Downloaded: {local_path}") + return local_path + + except requests.exceptions.RequestException as e: + raise RuntimeError(f"❌ Failed to download file: {e}") diff --git a/etl/hubspot/scripts/onboarding/new_organisation.py b/etl/hubspot/scripts/onboarding/new_organisation.py new file mode 100644 index 00000000..f8c6ba7a --- /dev/null +++ b/etl/hubspot/scripts/onboarding/new_organisation.py @@ -0,0 +1,30 @@ +""" +README.md + +This is a simple script to showcase how a new organisation can be +added to AraDb. + +This has been made reduntant due to doing this process when ever +hubspot has a webhook +""" + +from etl.hubspot.hubspotClient import HubspotClient, Companies + +from etl.hubspot.hubspotDataTodB import HubspotDataToDb, CompanyData + +hubspot = HubspotClient() +dbRead = HubspotDataToDb() +companies_to_add_or_ensure_it_exists = [ + Companies.THE_GUINESS_PARTNERSHIP, + Companies.SOUTHERN_HOUSING_GROUP, + Companies.CALICO_HOMES, +] + +for company in companies_to_add_or_ensure_it_exists: + company_info: CompanyData = hubspot.get_company_information(company.value) + dbRead.upsert_company(company_info) + + +dbRead = HubspotDataToDb() +names = dbRead.get_org_names() +print(f"Organisations in database: {names}") diff --git a/etl/hubspot/scripts/scraper/README.md b/etl/hubspot/scripts/scraper/README.md new file mode 100644 index 00000000..2d7fe975 --- /dev/null +++ b/etl/hubspot/scripts/scraper/README.md @@ -0,0 +1,15 @@ +Input: + + + + +Function: + + + + +Used in: + +when changes are made in hubspot, this will trigger a workflow in make. + +This in turn will trigger this sqs which I'm building from this directory \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/__init__.py b/etl/hubspot/scripts/scraper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/hubspot/scripts/scraper/handler/Dockerfile b/etl/hubspot/scripts/scraper/handler/Dockerfile new file mode 100644 index 00000000..012da376 --- /dev/null +++ b/etl/hubspot/scripts/scraper/handler/Dockerfile @@ -0,0 +1,28 @@ +FROM public.ecr.aws/lambda/python:3.10 +# FROM python:3.11.10-bullseye + +# Set working directory (Lambda task root) +WORKDIR /var/task + +# ----------------------------- +# Copy requirements FIRST (for Docker layer caching) +# ----------------------------- +COPY etl/hubspot/scripts/scraper/handler/requirements.txt . + +# Install dependencies into Lambda runtime +RUN pip install --no-cache-dir -r requirements.txt + + +# Copy necessary files for database and utility imports +COPY backend/ backend/ +COPY utils/ utils/ +COPY datatypes/ datatypes/ +COPY etl/hubspot etl/hubspot + +# Copy the handler +COPY etl/hubspot/scripts/scraper/main.py . + +# ----------------------------- +# Lambda handler +# ----------------------------- +CMD ["main.handler"] \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/handler/requirements.txt b/etl/hubspot/scripts/scraper/handler/requirements.txt new file mode 100644 index 00000000..230b460e --- /dev/null +++ b/etl/hubspot/scripts/scraper/handler/requirements.txt @@ -0,0 +1,12 @@ +pandas==2.2.2 +numpy<2.0 +requests +tqdm +openpyxl +epc-api-python==1.0.2 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 +hubspot-api-client \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml b/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml new file mode 100644 index 00000000..77679650 --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + hubspot-scraper: + build: + context: ../../../../../ + dockerfile: etl/hubspot/scripts/scraper/handler/Dockerfile + ports: + - "9000:8080" + env_file: + - ../../../../../.env \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py b/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py new file mode 100644 index 00000000..69580a93 --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +import json +import requests + +HOST = "localhost" +PORT = "9000" + +LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations" + +payload = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "hubspot_deal_id": "254427203793", + } + ) + } + ] +} + +response = requests.post(LAMBDA_URL, json=payload) + +print("Status code:", response.status_code) +print("Response:") +print(response.text) diff --git a/etl/hubspot/scripts/scraper/local_handler/run_local.sh b/etl/hubspot/scripts/scraper/local_handler/run_local.sh new file mode 100644 index 00000000..17474bdb --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/run_local.sh @@ -0,0 +1,2 @@ +docker compose build --no-cache +docker compose up --force-recreate diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py new file mode 100644 index 00000000..48864b22 --- /dev/null +++ b/etl/hubspot/scripts/scraper/main.py @@ -0,0 +1,40 @@ +""" +1) [completed]Get hubspot deal properties from one deal +2) Put it in some class +3) [completed] Load the db and check if upsert it into the table +4) [completed]Getting working on a AWS lambda +5) [completed] subtask and tasks history +6) [TODO]The new sexy deal properties, move it over +""" + +from etl.hubspot.hubspotClient import HubspotClient +from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from typing import Any + + +# @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id +def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: + if local is True: + body = { + "hubspot_deal_id": "254427203793", + } + + hubspot_deal_id = body.get("hubspot_deal_id", "") + + if hubspot_deal_id == "": + raise RuntimeError( + "Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'" + ) + + hubspot: HubspotClient = HubspotClient() + dbloader: HubspotDataToDb = HubspotDataToDb() + + deal = dbloader.find_deal_with_deal_id(hubspot_deal_id) + + if deal: + dbloader.update_deal_with_checks(deal, hubspot) + else: + deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id) + dbloader.upsert_deal(deal, company, listing, hubspot) + + print("Finsihed running") diff --git a/etl/hubspot/tests/__init__.py b/etl/hubspot/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/hubspot/tests/test_hubspot_client_integration.py b/etl/hubspot/tests/test_hubspot_client_integration.py new file mode 100644 index 00000000..a3d8ae54 --- /dev/null +++ b/etl/hubspot/tests/test_hubspot_client_integration.py @@ -0,0 +1,117 @@ +import os +from typing import Optional + +import pytest +from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline, DealStage + + +class TestHubspotClientIntegration: + """Integration tests using real HubSpot API calls.""" + + @pytest.fixture + def client(self): + """Initialize HubSpot client with env variables.""" + return HubspotClient() + + def test_client_initialization(self, client: HubspotClient): + """Checks initialisation of HubspotClient and fails early if env variables is not set""" + assert client.access_token is not None + assert client.client is not None + assert client.logger is not None + + def test_get_deal_ids_from_company(self, client: HubspotClient): + """Test getting deal IDs from Apple company includes expected deal.""" + company_id: str = Companies.APPLE.value + + deal_ids: list[str] = client.get_deal_ids_from_company(company_id) + + # https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079 + assert "263490768079" in deal_ids + + def test_get_company_id_from_deal_id(self, client: HubspotClient): + deal_id: str = "263490768079" + + company_id: Optional[str] = client.from_deal_id_get_associated_company_id( + deal_id + ) + # https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079 + assert company_id == Companies.APPLE.value + + def test_from_deal_id_get_associated_listing(self, client: HubspotClient): + deal_id: str = "263490768079" + + listing_info: Optional[dict[str, str]] = ( + client.from_deal_id_get_associated_listing(deal_id) + ) + + assert listing_info is not None + assert "hs_object_id" in listing_info + assert "national_uprn" in listing_info + assert "owner_property_id" in listing_info + assert "domna_property_id" in listing_info + + def test_from_deal_id_get_info(self, client: HubspotClient): + deal_id: str = "263490768079" + + deal_info: dict[str, str] = client.from_deal_id_get_info(deal_id) + + assert "dealname" in deal_info + assert "dealstage" in deal_info + assert "pipeline" in deal_info + assert "outcome" in deal_info # outcome + assert "outcome_notes" in deal_info # outcome notes + assert "project_code" in deal_info + assert "major_condition_issue_description" in deal_info + assert "major_condition_issue_photos" in deal_info + assert ( + "coordination_status__stage_1_" in deal_info + ) # Coordiantion Status (Stage 1) + assert "retrofit_design_status" in deal_info # Retrofit Design Status + + def test_get_deal_info_for_db(self, client: HubspotClient): + deal_id: str = "263490768079" + + deal, company, listing = client.get_deal_info_for_db(deal_id) + + assert "dealname" in deal + assert "dealstage" in deal + assert "pipeline" in deal + + assert company == Companies.APPLE.value + + assert listing is None or "hs_object_id" in listing + + def test_get_company_information(self, client: HubspotClient): + company_id: str = Companies.APPLE.value + + company_info: dict[str, str] = client.get_company_information(company_id) + + assert "name" in company_info + assert company_info["name"].lower() == "Apple".lower() + + def test_get_all_pipelines(self, client: HubspotClient): + pipelines: list[dict[str, str]] = client.get_all_pipelines() + + assert len(pipelines) > 0 + pipeline_ids: list[str] = [p["id"] for p in pipelines] + assert Pipeline.OPERATIONS_SOCIAL_HOUSING.value in pipeline_ids + + def test_get_deal_stages_from_pipeline_id(self, client: HubspotClient): + stages: list[dict[str, str]] = client.get_deal_stages_from_pipeline_id( + Pipeline.OPERATIONS_SOCIAL_HOUSING.value + ) + + assert len(stages) > 0 + stage_ids: list[str] = [s["stage_id"] for s in stages] + assert DealStage.SURVEYED_COMPLETE_NEEDS_SIGN_OFF.value in stage_ids + + def test_download_file_from_url( + self, client: HubspotClient, tmp_path: Optional[str] + ): + deal_info: dict[str, str] = client.from_deal_id_get_info("254427203793") + download_url: str = deal_info["major_condition_issue_photos"] + + save_path: str = client.download_file_from_url(download_url, str(tmp_path)) + + assert os.path.exists(save_path) + assert os.path.getsize(save_path) > 0 diff --git a/infrastructure/terraform/lambda/_template/variables.tf b/infrastructure/terraform/lambda/_template/variables.tf index e7646811..ae588840 100644 --- a/infrastructure/terraform/lambda/_template/variables.tf +++ b/infrastructure/terraform/lambda/_template/variables.tf @@ -35,3 +35,4 @@ locals { output "resolved_image_uri" { value = local.image_uri } + diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf new file mode 100644 index 00000000..051c7154 --- /dev/null +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf @@ -0,0 +1,44 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + + +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + + +module "hubspot_deal_etl" { + source = "../../modules/lambda_with_sqs" + + name = "hubspot_deal_etl" + stage = var.stage + + image_uri = local.image_uri + + # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) + maximum_concurrency = var.maximum_concurrency + + batch_size = var.batch_size + + environment = { + STAGE = var.stage + LOG_LEVEL = "info" + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + } +} + +resource "aws_iam_role_policy_attachment" "lambda_s3_policy" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.hubspot_etl_s3_read_and_write_arn +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf new file mode 100644 index 00000000..c8a3972c --- /dev/null +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "hubspot-etl-bucket-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf new file mode 100644 index 00000000..2e7da609 --- /dev/null +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf @@ -0,0 +1,50 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda (e.g. address2uprn)" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +variable "maximum_concurrency" { + type = number + default = null + description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." +} + +variable "batch_size" { + type = number + default = 1 +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} + + +variable "db_host" { + type = string +} + +variable "db_name" { + type = string +} + +variable "db_port" { + type = string +} \ No newline at end of file diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index 3b12561c..9d272eb6 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -628,6 +628,8 @@ output "cdn_certificate_state_bucket" { value = module.cdn_certificate_state_bucket.bucket_name } + + ################################################ # CDN ################################################ @@ -639,3 +641,35 @@ module "cdn_state_bucket" { output "cdn_state_bucket" { value = module.cdn_state_bucket.bucket_name } + + +################################################ +# Hubspot ETL Lambda +################################################ +module "hubspot_etl_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "hubspot-etl-bucket-terraform-state" + +} + +module "hubspot_etl_registry" { + source = "../modules/container_registry" + name = "hubspot-etl" + stage = var.stage + +} + +# S3 policy for postcode splitter to read from retrofit data bucket +module "hubspot_etl_s3_read_and_write" { + source = "../modules/s3_iam_policy" + + policy_name = "HubspotETLReadandWriteS3" + policy_description = "Allow hubspot_etl_lambda Lambda to read and write from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] + resource_paths = ["/*"] +} + +output "hubspot_etl_s3_read_and_write_arn" { + value = module.hubspot_etl_s3_read_and_write.policy_arn +} \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index ecb17089..db7afaf5 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,6 +3,6 @@ pythonpath = . log_cli = true log_cli_level = INFO addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial -testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests +testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests markers = integration: mark a test as an integration test diff --git a/scripts/init_db.py b/scripts/init_db.py new file mode 100644 index 00000000..69edf777 --- /dev/null +++ b/scripts/init_db.py @@ -0,0 +1,5 @@ +from sqlmodel import SQLModel +import backend.app.db.models.organisation # noqa: F401 +from backend.app.db.connection import db_engine + +SQLModel.metadata.create_all(db_engine) diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 519636be..c89560cb 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,15 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -# PORTFOLIO_ID = 206 -# SCENARIOS = [389] -PORTFOLIO_ID = 581 -SCENARIOS = [1124] +PORTFOLIO_ID = 639 +SCENARIOS = [1157] scenario_names = { - 1124: "EPC C - Solar Focused", + 1157: "EPC C - no EWI solid floor", } -project_name = "WCHG EPC D rated properties" +project_name = "Instagroup Sample" def get_data(portfolio_id, scenario_ids): diff --git a/test.requirements.txt b/test.requirements.txt index d8b8b777..936e2f7d 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -4,4 +4,6 @@ pytest-cov pytest-mock dotenv psycopg[binary] -pytest-postgresql \ No newline at end of file +pytest-postgresql +hubspot-api-client +fuzzywuzzy \ No newline at end of file