diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 662f53b0..6a1cc120 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -35,7 +35,8 @@ ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1 ADD backend/engine/requirements.txt requirements1.txt ADD backend/app/requirements/requirements.txt requirements2.txt ADD .devcontainer/backend/requirements.txt requirements3.txt -RUN cat requirements1.txt requirements2.txt requirements3.txt > requirements.txt +ADD etl/hubspot/requirements.txt requirements4.txt +RUN cat requirements1.txt requirements2.txt requirements3.txt requirements4.txt > requirements.txt RUN pip install -r requirements.txt # 5) Workdir diff --git a/.devcontainer/backend/requirements.txt b/.devcontainer/backend/requirements.txt index 5cd40ced..f6e1f665 100644 --- a/.devcontainer/backend/requirements.txt +++ b/.devcontainer/backend/requirements.txt @@ -23,4 +23,4 @@ psycopg[binary] pytest-postgresql # Formatting black==26.1.0 -boto3-stubs \ No newline at end of file +boto3-stubs diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index cc6431b8..116bc265 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -14,17 +14,16 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Set up Python 3.11 - uses: actions/setup-python@v4 - with: - python-version: '3.11' + - name: Build test image + run: docker build -f Dockerfile.test -t model-test . - - name: Install tox via Makefile - run: | - make setup - - - name: Run tests with tox via Makefile + - name: Run tests env: EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }} + HUBSPOT_API_KEY: ${{ secrets.HUBSPOT_API_KEY }} + run: | - make test \ No newline at end of file + docker run --rm \ + -e EPC_AUTH_TOKEN=${{ secrets.DEV_EPC_AUTH_TOKEN }} \ + -e HUBSPOT_API_KEY=${{ secrets.HUBSPOT_API_KEY }} \ + model-test pytest diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 00000000..802eb3a4 --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,28 @@ +FROM python:3.11-slim + +# Install PostgreSQL binaries — required by pytest-postgresql to spawn ephemeral test databases +RUN apt-get update \ + && apt-get install -y --no-install-recommends postgresql \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +ENV PYTHONPATH=/app + +# Copy requirements first so Docker can cache the install layer +COPY backend/engine/requirements.txt backend/engine/requirements.txt +COPY backend/app/requirements/requirements.txt backend/app/requirements/requirements.txt +COPY test.requirements.txt test.requirements.txt + +RUN pip install --no-cache-dir \ + -r backend/engine/requirements.txt \ + -r backend/app/requirements/requirements.txt \ + -r test.requirements.txt + +# Copy source +COPY . . + +# pg_ctl refuses to run as root — create an unprivileged user +RUN useradd -m testuser && chown -R testuser /app +USER testuser + +CMD ["pytest"] diff --git a/Dockerfile.test.dockerignore b/Dockerfile.test.dockerignore new file mode 100644 index 00000000..4f79c6ee --- /dev/null +++ b/Dockerfile.test.dockerignore @@ -0,0 +1,12 @@ +# We need this file otherwise it'll use .dockerignore +# Exclude large/irrelevant directories that are not needed for testing +model_data/local_data/ +backend/node_modules/ +backend/.idea/ +backend/.env +infrastructure/ +data_collection/ +node_modules/ +conservation_areas/ +open_uprn/ +land_registry/ diff --git a/README.md b/README.md index 9268ba25..b470e12c 100644 --- a/README.md +++ b/README.md @@ -39,3 +39,4 @@ pytest --cov-config=model_data/.coveragerc --cov=model_data This will produce the test results and coverage reports + diff --git a/backend/address2UPRN/README.md b/backend/address2UPRN/README.md index 6d26f281..e34e45f6 100644 --- a/backend/address2UPRN/README.md +++ b/backend/address2UPRN/README.md @@ -5,10 +5,11 @@ Before you run: Step 1) Get the list and ensure the following columns exists +I believe lower and upper case matter: * Address 1 * Address 2 * Address 3 -* postcode +* Postcode And save it as a .csv file @@ -23,16 +24,17 @@ For this example I'll be using "s3://retrofit-data-dev/ara_raw_inputs/calico/Cal Go to Ara DB and make a new task_id with a randomly generated uuid as the primarily key -task_id = a7b70a02-4df4-45b5-a50b-196e095910bb -sub_task_id = 567cf73b-1210-4909-9ecc-36ae7e23420e +task_id = 169ea9b0-01b5-48dc-9f90-ae1989491d09 +sub_task_id = e5704f9e-29fe-43c8-8913-05be09f2440f +s3 => s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching Rerun After Address Fix.csv Step 3) Alright, now lets make the input for postcode-splitter sqs to get the ball rolling postcode-splitter-sqs => https://eu-west-2.console.aws.amazon.com/sqs/v3/home?region=eu-west-2#/queues/https%3A%2F%2Fsqs.eu-west-2.amazonaws.com%2F337213553626%2Fpostcode-splitter-queue-dev { - "task_id": "a7b70a02-4df4-45b5-a50b-196e095910bb", - "sub_task_id": "567cf73b-1210-4909-9ecc-36ae7e23420e", - "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico Homes Full list EPC Properties(Sheet2) (1) (1).csv" + "task_id": "169ea9b0-01b5-48dc-9f90-ae1989491d09", + "sub_task_id": "e5704f9e-29fe-43c8-8913-05be09f2440f", + "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching (1)(Sheet1).csv" } Each batch of csv should be saved in retrofit-data-dev/ara_postcode_splitter_batches///.csv diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index d0ba36e6..af29a095 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -1,11 +1,13 @@ -from typing import Optional - from epc_api.client import EpcClient import os from urllib.parse import urlencode import pandas as pd +from difflib import SequenceMatcher from utils.logger import setup_logger +import re +from typing import Set import json +import requests from uuid import UUID import uuid from backend.app.db.functions.tasks.Tasks import SubTaskInterface @@ -16,8 +18,6 @@ from utils.s3 import ( ) from datetime import datetime -from backend.utils.addressMatch import AddressMatch - logger = setup_logger() @@ -29,6 +29,191 @@ if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") +def is_valid_postcode(postcode_clean: str) -> bool: + """ + Validate postcode using postcodes.io. + + Expects a sanitised postcode (e.g. E84SQ). + Returns True if valid, False otherwise. + """ + POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" + if not postcode_clean: + return False + + try: + resp = requests.get( + POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), + timeout=5, + ) + resp.raise_for_status() + return resp.json().get("result", False) + except requests.RequestException: + # Network issues, rate limits, etc. + return False + + +def levenshtein(a: str, b: str) -> float: + """ + Address similarity score in [0, 1]. + + Strategy: + - Normalise + - Strongly penalise mismatched house/flat numbers + - Combine token overlap + character similarity + """ + + def extract_number_sequence(s: str) -> list[str]: + return re.findall(r"\d+[a-z]?", s) + + def extract_numbers(s: str) -> Set[str]: + return set(extract_number_sequence(s)) + + def tokenise(s: str) -> Set[str]: + return set(s.split()) + + def extract_building_number(s: str) -> str | None: + """ + Extract the main building number (NOT flat/unit). + Assumes formats like: + - '42 moreton road' + - 'flat 3 42 moreton road' + """ + tokens = s.split() + + # remove flat/unit context + cleaned = [] + skip_next = False + for t in tokens: + if t in ("flat", "apt", "apartment", "unit"): + skip_next = True + continue + if skip_next: + skip_next = False + continue + cleaned.append(t) + + # first remaining number is building number + for t in cleaned: + if re.fullmatch(r"\d+[a-z]?", t): + return t + + return None + + a_norm = normalise_address(a) + b_norm = normalise_address(b) + + # --- hard signal: numbers --- + nums_a = extract_numbers(a_norm) + nums_b = extract_numbers(b_norm) + + if nums_a and not nums_b: + return 0.0 + + # No shared numbers at all → impossible match + if nums_a and nums_b and nums_a.isdisjoint(nums_b): + return 0.0 + + # 🔒 HARD GUARD: building number must match + bld_a = extract_building_number(a_norm) + bld_b = extract_building_number(b_norm) + + if bld_a and bld_b and bld_a != bld_b: + return 0.0 + + # --- order-sensitive flat/building guard --- + seq_a = extract_number_sequence(a_norm) + seq_b = extract_number_sequence(b_norm) + + has_flat_token_user = any( + tok in a_norm for tok in ("flat", "apt", "apartment", "unit") + ) + has_flat_token_epc = "flat" in b_norm + + if ( + len(seq_a) == 2 + and len(seq_b) >= 2 + and has_flat_token_epc + and not has_flat_token_user + and seq_a != seq_b[:2] + ): + return 0.0 + + # --- token similarity (order-independent) --- + toks_a = tokenise(a_norm) + toks_b = tokenise(b_norm) + + if not toks_a or not toks_b: + token_score = 0.0 + else: + token_score = len(toks_a & toks_b) / len(toks_a | toks_b) + + # --- character similarity (soft signal) --- + char_score = SequenceMatcher(None, a_norm, b_norm).ratio() + + # --- weighted blend --- + return round( + 0.65 * token_score + 0.35 * char_score, + 4, + ) + + +def normalise_address(s: str) -> str: + """ + Canonical UK-focused address normalisation. + + - Lowercases + - Removes punctuation (keeps / for flats) + - Normalises whitespace + - Applies synonym compression at token level + """ + + if not s: + return "" + + ADDRESS_SYNONYMS = { + # street types + "rd": "road", + "rd.": "road", + "st": "street", + "st.": "street", + "ave": "avenue", + "ave.": "avenue", + "ln": "lane", + "ln.": "lane", + "cres": "crescent", + "ct": "court", + "dr": "drive", + # flats / units + "apt": "flat", + "apartment": "flat", + "unit": "flat", + "ste": "suite", + # numbering noise + "no": "", + "no.": "", + } + # 1. lowercase + s = s.lower() + + # 1.5 split digit-letter suffixes + s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s) + + # 2. remove punctuation except / + s = re.sub(r"[^\w\s/]", " ", s) + + # 3. normalise whitespace + s = re.sub(r"\s+", " ", s).strip() + + # 4. tokenise + synonym normalisation + tokens = [] + for tok in s.split(): + replacement = ADDRESS_SYNONYMS.get(tok, tok) + if replacement: + tokens.append(replacement) + + return " ".join(tokens) + + def score_addresses( df: pd.DataFrame, user_address: str, @@ -37,7 +222,7 @@ def score_addresses( if column not in df.columns: raise ValueError(f"Missing column: {column}") - return df[column].apply(lambda x: AddressMatch.score(user_address, x)) + return df[column].apply(lambda x: levenshtein(user_address, x)) def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3): @@ -129,11 +314,9 @@ def get_uprn_candidates( out = df.copy() - user_norm = AddressMatch.normalise_address(user_address) + user_norm = normalise_address(user_address) - out["lexiscore"] = out[address_column].apply( - lambda x: AddressMatch.levenshtein(user_norm, x) - ) + out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x)) # Normalise UPRN to string out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True) @@ -297,10 +480,7 @@ def resolve_uprns_for_postcode_group( def save_results_to_s3( - results_df: pd.DataFrame, - task_id: str, - sub_task_id: str, - bucket_name: Optional[str] = None, + results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None ) -> bool: """ Save results DataFrame to S3 as CSV. @@ -353,7 +533,7 @@ def handler(event, context, local=False): { "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", "sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d", - "s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-18T11:47:00.822579_f95467f5.csv", + "s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv", } ) } @@ -441,6 +621,19 @@ def handler(event, context, local=False): # Process the rows logger.info(f"Processing {len(df)} rows for task {task_id}") + # Create user_input column by concatenating Address columns if not already present + if "user_input" not in df.columns: + df["user_input"] = ( + df["Address 1"].fillna("") + + " " + + df["Address 2"].fillna("") + + " " + + df["Address 3"].fillna("") + ).str.strip() + logger.info(f"Created user_input column from Address 1 and Address 2") + else: + logger.info(f"user_input column already present in data") + clean_df = df.dropna(subset=["postcode_clean"]) postcode_to_addresses = { @@ -460,7 +653,7 @@ def handler(event, context, local=False): ) # Validate postcode before processing - if not AddressMatch.is_valid_postcode(postcode): + if not is_valid_postcode(postcode): logger.warning(f"Postcode {postcode} is invalid, skipping") continue @@ -479,67 +672,57 @@ def handler(event, context, local=False): # Process each address in this postcode with the same EPC data for row in postcode_rows: try: - # Concatenate Address columns directly - address2uprn_user_input = ( - str(row.get("Address 1", "")).strip() - + " " - + str(row.get("Address 2", "")).strip() - + " " - + str(row.get("Address 3", "")).strip() - ).strip() - - if not address2uprn_user_input: + user_input = row.get("user_input", "") + if not user_input: logger.warning( - f"Skipping row with missing address components for postcode {postcode}" + f"Skipping row with missing user_input for postcode {postcode}" ) continue # Get UPRN using the pre-fetched EPC data with all return options result = get_uprn_with_epc_df( - user_inputed_address=address2uprn_user_input, - epc_df=epc_df, - verbose=True, + user_inputed_address=user_input, epc_df=epc_df, verbose=True ) # Parse result tuple if successful if result: uprn, found_address, score = result logger.info( - f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})" + f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})" ) results_data.append( { **row, # Include all original data - "address2uprn_uprn": uprn, - "address2uprn_address": found_address, - "address2uprn_lexiscore": score, + "uprn": uprn, + "domna_found_address": found_address, + "domna_lexiscore": score, } ) else: logger.warning( - f"No UPRN found for {address2uprn_user_input} in {postcode}" + f"No UPRN found for {user_input} in {postcode}" ) results_data.append( { **row, # Include all original data - "address2uprn_uprn": None, - "address2uprn_address": None, - "address2uprn_lexiscore": None, + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, } ) except Exception as e: logger.error( - f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}" + f"Error processing address {row.get('user_input', 'unknown')}: {e}" ) # Still add the row with error markers results_data.append( { **row, - "address2uprn_uprn": None, - "address2uprn_address": None, - "address2uprn_lexiscore": None, + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, "error": str(e), } ) diff --git a/backend/app/config.py b/backend/app/config.py index 6604fec9..46301e30 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -65,6 +65,8 @@ class Settings(BaseSettings): ORDNANCE_SURVEY_API_KEY: str = "changeme" + HUBSPOT_API_KEY: Optional[str] = None + # Optional AWS creds (only required in local) AWS_ACCESS_KEY_ID: Optional[str] = None AWS_SECRET_KEY_ID: Optional[str] = None diff --git a/conftest.py b/conftest.py index d93f0023..2ea20ebb 100644 --- a/conftest.py +++ b/conftest.py @@ -30,6 +30,7 @@ DEFAULT_ENV = { "HEATING_KWH_PREDICTIONS_BUCKET": "test", "HOTWATER_KWH_PREDICTIONS_BUCKET": "test", "ENERGY_ASSESSMENTS_BUCKET": "test", + "HUBSPOT_API_KEY": "changeme", } # runs immediately when pytest starts, BEFORE collection diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py new file mode 100644 index 00000000..f93a736c --- /dev/null +++ b/etl/hubspot/hubspotClient.py @@ -0,0 +1,441 @@ +import os +from enum import Enum +from typing import Optional, cast + +from hubspot.client import Client # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.products.api.basic_api import BasicApi as ProductsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.line_items.api.basic_api import BasicApi as LineItemsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs] + CollectionResponsePipelineNoPaging as PipelinesResponse, +) +from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs] +from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs] +from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs] +from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs] + CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse, + MultiAssociatedObjectWithLabel as AssociationsResult, + ForwardPaging as AssociationsPaging, + NextPage as AssociationsPagingNext, +) + + +from backend.app.config import get_settings +from utils.logger import setup_logger + +import mimetypes +import requests + + +class Companies(Enum): + ABRI = "237615001799" + SOUTHERN_HOUSING_GROUP = "109343619305" + LIVEWEST = "86205872354" + SURESERVE = "301745289413" + HOMEGROUP = "94946071794" + APPLE = "184769046716" + THE_GUINESS_PARTNERSHIP = "86970043613" + + +class DealStage(Enum): + SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914" + SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915" + CUSTOMER_CONTACTED = "888730834" + SURVEYED_COMPLETED_SIGNED_OFF = "1617223916" + FILES_MISSING_FROM_ASSESSOR = "1887736000" + + +class Pipeline(Enum): + OPERATIONS_SOCIAL_HOUSING = "1167582403" + + +# TODO get guiness working from here + + +class HubspotClient: + + def __init__(self): + """ + Hey Tech Team, Hubspot Library doesn't do type hitting. + We have type hinted stuff but pylance never becomes happy. + However, because I added the type hinting to the best of ability + and you'll still get sensible ide suggestions. + """ + settings = get_settings() + access_token = settings.HUBSPOT_API_KEY + if access_token is None: + raise RuntimeError("Missing HUBSPOT_API_KEY in env") + self.access_token: str = access_token + self.logger = setup_logger() + self.client: Client = Client.create(access_token=self.access_token) # type: ignore[reportUnknownMemberType] + # [Developer Only] + # Add a dot in front of client and see the wonders of ide suggestions + # This wouldn't work if we didn't add ': Client' to self.client. + # Sorry - not sorry but enjoy, Past Junte 13/03/2026 + # self.client + + def get_deal_ids_from_company(self, company_id: str) -> list[str]: + associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType] + self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + ) + + deal_ids: list[str] = [] + after: Optional[str] = None + + while True: + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="companies", + object_id=company_id, + to_object_type="deals", + limit=100, + after=after, + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + for assoc in results: + assoc: AssociationsResult + object_id: str = cast(str, assoc.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + deal_ids.append(object_id) + + paging: Optional[AssociationsPaging] = cast(Optional[AssociationsPaging], response.paging) # type: ignore[reportUnknownMemberType] + if not paging: + break + + paging_next: Optional[AssociationsPagingNext] = cast(Optional[AssociationsPagingNext], paging.next) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + if not paging_next: + break + + after = cast(str, paging_next.after) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + + return deal_ids + + def from_deal_id_get_associated_company_id(self, deal_id: str) -> Optional[str]: + """ + Get the associated company ID from a given deal ID. + Returns the associated company ID, or None if not found. + """ + try: + associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + + # Fetch associations for this specific deal only + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="deals", + object_id=deal_id, + to_object_type="companies", + limit=1, # Expect only one associated company + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + if not results: + self.logger.info(f"No company association found for deal {deal_id}") + return None + + first: AssociationsResult = results[0] + company_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}") + return company_id + + except ApiException as e: + self.logger.error( + f"Error fetching associated company for deal {deal_id}: {e}" + ) + return None + + def from_deal_id_get_associated_listing( + self, deal_id: str + ) -> Optional[dict[str, str]]: + """ + Get the associated listing information for a given deal. + Returns a dictionary of listing properties, or None if not found. + """ + associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing" + + # Fetch associated listing(s) + response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType] + object_type="deals", + object_id=deal_id, + to_object_type="0-420", # <-- use your exact custom object name slug here + limit=1, + ) + + results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType] + if not results: + self.logger.info(f"No listing association found for deal {deal_id}") + return None + + first: AssociationsResult = results[0] + listing_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}") + + # Fetch listing details (the "listing information") + listing: HubspotObject = listings_api.get_by_id( # type: ignore[reportUnknownMemberType] + object_type="0-420", # again, must match your HubSpot object name + object_id=listing_id, + properties=[ + "national_uprn", + "domna_property_id", + "owner_property_id", + ], + ) + + listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType] + self.logger.info(f"Listing info for deal {deal_id}: {listing_info}") + return listing_info + + def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]: + deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType] + + deal: HubspotObject = deals_api.get_by_id( # type: ignore[reportUnknownMemberType] + deal_id, + properties=[ + "dealname", + "dealstage", + "pipeline", + "outcome", # outcome, + "outcome_notes", # outcome notes + "project_code", + "major_condition_issue_description", + "major_condition_issue_photos", + "coordination_status__stage_1_", # Coordiantion Status (Stage 1), + "retrofit_design_status", # Retrofit Design Status + ], + ) + + deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType] + return deal_info + + def get_deal_info_for_db( + self, deal_id: str + ) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]: + deal: dict[str, str] = self.from_deal_id_get_info(deal_id) + company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id) + listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing( + deal_id + ) + + return deal, company, listing + + def get_company_information(self, company_id: str) -> dict[str, str]: + companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType] + + company: HubspotObject = companies_api.get_by_id( # type: ignore[reportUnknownMemberType] + company_id, + properties=[ + "name", + ], + ) + + company_info: dict[str, str] = cast(dict[str, str], company.properties) # type: ignore[reportUnknownMemberType] + return company_info + + def get_all_pipelines(self) -> list[dict[str, str]]: + """ + Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs. + """ + try: + pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType] + response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType] + + results: list[HubspotPipeline] = cast(list[HubspotPipeline], response.results) # type: ignore[reportUnknownMemberType] + pipelines: list[dict[str, str]] = [] + for pipeline in results: + pipeline: HubspotPipeline + pipelines.append( + { + "name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType] + "id": cast(str, pipeline.id), # type: ignore[reportUnknownMemberType] + } + ) + + self.logger.info(f"Retrieved {len(pipelines)} pipelines.") + return pipelines + + except Exception as e: + self.logger.error(f"Error retrieving pipelines: {e}") + return [] + + def get_deal_stages_from_pipeline_id( + self, pipeline_id: Optional[str] = None + ) -> list[dict[str, str]]: + """ + Retrieve all deal stages for a given pipeline. + If no pipeline_id is provided, retrieves all stages for all pipelines. + Returns a list of dicts with pipeline name, stage name, and stage ID. + """ + try: + pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType] + response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType] + + all_stages: list[dict[str, str]] = [] + + for pipeline in cast(list[HubspotPipeline], response.results): # type: ignore[reportUnknownMemberType] + pipeline: HubspotPipeline + # Skip other pipelines if a specific one is requested + pipeline_id_str: str = cast(str, pipeline.id) # type: ignore[reportUnknownMemberType] + if pipeline_id and pipeline_id_str != str(pipeline_id): + continue + + for stage in cast(list[HubspotPipelineStage], pipeline.stages): # type: ignore[reportUnknownMemberType] + stage: HubspotPipelineStage + all_stages.append( + { + "pipeline_name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType] + "pipeline_id": pipeline_id_str, + "stage_name": cast(str, stage.label), # type: ignore[reportUnknownMemberType] + "stage_id": cast(str, stage.id), # type: ignore[reportUnknownMemberType] + } + ) + + if not all_stages: + self.logger.info( + f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}" + ) + else: + self.logger.info(f"Retrieved {len(all_stages)} deal stages.") + + return all_stages + + except Exception as e: + self.logger.error(f"Error retrieving deal stages: {e}") + return [] + + def download_file_from_url( + self, download_url: str, save_path: Optional[str] = None + ) -> str: + """ + Download a file from a HubSpot file URL (public or private), keeping its original file type. + """ + + try: + headers: dict[str, str] = {} + if "hubspotusercontent" not in download_url: + headers["Authorization"] = f"Bearer {self.access_token}" + + self.logger.info(f"Downloading HubSpot file: {download_url}") + response = requests.get( + download_url, headers=headers, stream=True, allow_redirects=True + ) + response.raise_for_status() + + # Try to infer filename from Content-Disposition header + content_disposition = response.headers.get("content-disposition") + if content_disposition and "filename=" in content_disposition: + filename = content_disposition.split("filename=")[1].strip('"') + else: + # fallback: extract from URL or content-type + filename = ( + os.path.basename(download_url.split("?")[0]) or "hubspot_download" + ) + if "." not in filename: + content_type = response.headers.get("content-type") + ext = ( + mimetypes.guess_extension(content_type.split(";")[0]) + if content_type + else None + ) + if ext: + filename += ext + + # Make sure save_path is valid + if save_path is None: + save_path = os.path.abspath(filename) + elif os.path.isdir(save_path): + save_path = os.path.join(save_path, filename) + else: + # if user passes a file path directly, leave it + save_path = os.path.abspath(save_path) + + with open(save_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + self.logger.info(f"File downloaded successfully → {save_path}") + return save_path + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to download file from HubSpot: {e}") + raise + + def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str: + # Fetch product mapping + products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType] + product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType] + product_id, properties=["name", "price", "hs_price"] + ) + properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType] + + name: str = properties.get("name") or "" + price: str = ( + properties.get("price") or properties.get("hs_price") or "0" + ) + + # Build line item payload + line_item_input = SimplePublicObjectInput( + properties={ + "hs_product_id": product_id, + "name": name, + "quantity": str(quantity), + "price": price, + "amount": str(float(price) * quantity), + "invoiced": "Outstanding", + } + ) + + # Create line item + line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType] + line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType] + return cast(str, line_item.id) # type: ignore[reportUnknownMemberType] + + def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None: + self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}") + + association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType] + + association_api.create( # type: ignore[reportUnknownMemberType] + "0-3", # to object type + deal_id, # to object id + "line_items", # from object type + line_item_id, # from object id + [ + AssociationSpec( + association_category="HUBSPOT_DEFINED", + association_type_id=19, # line_item → deal + ) + ], + ) + + def add_product_line_item_to_deal( + self, deal_id: str, product_id: str, quantity: int = 1 + ) -> str: + # Step 1: Create the line item from product mapping + line_item_id: str = self.create_line_item_from_product(product_id, quantity) + + # Step 2: Associate the created line item to the deal + self.associate_line_item_to_deal(line_item_id, deal_id) + + return line_item_id + + def delete_line_item(self, line_item_id: str) -> bool: + """ + Delete (archive) a line item in HubSpot by its ID. + """ + try: + self.logger.info(f"Deleting line item {line_item_id}...") + + line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType] + line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType] + + self.logger.info(f"Line item {line_item_id} deleted successfully.") + return True + + except ApiException as e: + self.logger.error(f"Failed to delete line item {line_item_id}: {e}") + return False diff --git a/etl/hubspot/requirements.txt b/etl/hubspot/requirements.txt new file mode 100644 index 00000000..ef8e3ebc --- /dev/null +++ b/etl/hubspot/requirements.txt @@ -0,0 +1 @@ +hubspot-api-client \ No newline at end of file diff --git a/etl/hubspot/tests/__init__.py b/etl/hubspot/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/hubspot/tests/test_hubspot_client_integration.py b/etl/hubspot/tests/test_hubspot_client_integration.py new file mode 100644 index 00000000..a3d8ae54 --- /dev/null +++ b/etl/hubspot/tests/test_hubspot_client_integration.py @@ -0,0 +1,117 @@ +import os +from typing import Optional + +import pytest +from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline, DealStage + + +class TestHubspotClientIntegration: + """Integration tests using real HubSpot API calls.""" + + @pytest.fixture + def client(self): + """Initialize HubSpot client with env variables.""" + return HubspotClient() + + def test_client_initialization(self, client: HubspotClient): + """Checks initialisation of HubspotClient and fails early if env variables is not set""" + assert client.access_token is not None + assert client.client is not None + assert client.logger is not None + + def test_get_deal_ids_from_company(self, client: HubspotClient): + """Test getting deal IDs from Apple company includes expected deal.""" + company_id: str = Companies.APPLE.value + + deal_ids: list[str] = client.get_deal_ids_from_company(company_id) + + # https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079 + assert "263490768079" in deal_ids + + def test_get_company_id_from_deal_id(self, client: HubspotClient): + deal_id: str = "263490768079" + + company_id: Optional[str] = client.from_deal_id_get_associated_company_id( + deal_id + ) + # https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079 + assert company_id == Companies.APPLE.value + + def test_from_deal_id_get_associated_listing(self, client: HubspotClient): + deal_id: str = "263490768079" + + listing_info: Optional[dict[str, str]] = ( + client.from_deal_id_get_associated_listing(deal_id) + ) + + assert listing_info is not None + assert "hs_object_id" in listing_info + assert "national_uprn" in listing_info + assert "owner_property_id" in listing_info + assert "domna_property_id" in listing_info + + def test_from_deal_id_get_info(self, client: HubspotClient): + deal_id: str = "263490768079" + + deal_info: dict[str, str] = client.from_deal_id_get_info(deal_id) + + assert "dealname" in deal_info + assert "dealstage" in deal_info + assert "pipeline" in deal_info + assert "outcome" in deal_info # outcome + assert "outcome_notes" in deal_info # outcome notes + assert "project_code" in deal_info + assert "major_condition_issue_description" in deal_info + assert "major_condition_issue_photos" in deal_info + assert ( + "coordination_status__stage_1_" in deal_info + ) # Coordiantion Status (Stage 1) + assert "retrofit_design_status" in deal_info # Retrofit Design Status + + def test_get_deal_info_for_db(self, client: HubspotClient): + deal_id: str = "263490768079" + + deal, company, listing = client.get_deal_info_for_db(deal_id) + + assert "dealname" in deal + assert "dealstage" in deal + assert "pipeline" in deal + + assert company == Companies.APPLE.value + + assert listing is None or "hs_object_id" in listing + + def test_get_company_information(self, client: HubspotClient): + company_id: str = Companies.APPLE.value + + company_info: dict[str, str] = client.get_company_information(company_id) + + assert "name" in company_info + assert company_info["name"].lower() == "Apple".lower() + + def test_get_all_pipelines(self, client: HubspotClient): + pipelines: list[dict[str, str]] = client.get_all_pipelines() + + assert len(pipelines) > 0 + pipeline_ids: list[str] = [p["id"] for p in pipelines] + assert Pipeline.OPERATIONS_SOCIAL_HOUSING.value in pipeline_ids + + def test_get_deal_stages_from_pipeline_id(self, client: HubspotClient): + stages: list[dict[str, str]] = client.get_deal_stages_from_pipeline_id( + Pipeline.OPERATIONS_SOCIAL_HOUSING.value + ) + + assert len(stages) > 0 + stage_ids: list[str] = [s["stage_id"] for s in stages] + assert DealStage.SURVEYED_COMPLETE_NEEDS_SIGN_OFF.value in stage_ids + + def test_download_file_from_url( + self, client: HubspotClient, tmp_path: Optional[str] + ): + deal_info: dict[str, str] = client.from_deal_id_get_info("254427203793") + download_url: str = deal_info["major_condition_issue_photos"] + + save_path: str = client.download_file_from_url(download_url, str(tmp_path)) + + assert os.path.exists(save_path) + assert os.path.getsize(save_path) > 0 diff --git a/pyrightconfig.json b/pyrightconfig.json index d4e0e2a4..18f578a5 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -2,7 +2,7 @@ "typeCheckingMode": "strict", "venvPath": "/Users/khalimconn-kowlessar/opt/anaconda3/envs/", "venv": "Fastapi-backend", - "include": [ +"include": [ "." ] } \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index 608d5e0c..c9dd8ca8 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,4 +3,4 @@ pythonpath = . log_cli = true log_cli_level = INFO addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial -testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests +testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests diff --git a/test.requirements.txt b/test.requirements.txt index d8b8b777..936e2f7d 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -4,4 +4,6 @@ pytest-cov pytest-mock dotenv psycopg[binary] -pytest-postgresql \ No newline at end of file +pytest-postgresql +hubspot-api-client +fuzzywuzzy \ No newline at end of file