This commit is contained in:
Jun-te Kim 2026-03-17 13:14:36 +00:00 committed by GitHub
commit d2abd27ad5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 852 additions and 62 deletions

View file

@ -35,7 +35,8 @@ ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1
ADD backend/engine/requirements.txt requirements1.txt
ADD backend/app/requirements/requirements.txt requirements2.txt
ADD .devcontainer/backend/requirements.txt requirements3.txt
RUN cat requirements1.txt requirements2.txt requirements3.txt > requirements.txt
ADD etl/hubspot/requirements.txt requirements4.txt
RUN cat requirements1.txt requirements2.txt requirements3.txt requirements4.txt > requirements.txt
RUN pip install -r requirements.txt
# 5) Workdir

View file

@ -23,4 +23,4 @@ psycopg[binary]
pytest-postgresql
# Formatting
black==26.1.0
boto3-stubs
boto3-stubs

View file

@ -14,17 +14,16 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Build test image
run: docker build -f Dockerfile.test -t model-test .
- name: Install tox via Makefile
run: |
make setup
- name: Run tests with tox via Makefile
- name: Run tests
env:
EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }}
HUBSPOT_API_KEY: ${{ secrets.HUBSPOT_API_KEY }}
run: |
make test
docker run --rm \
-e EPC_AUTH_TOKEN=${{ secrets.DEV_EPC_AUTH_TOKEN }} \
-e HUBSPOT_API_KEY=${{ secrets.HUBSPOT_API_KEY }} \
model-test pytest

28
Dockerfile.test Normal file
View file

@ -0,0 +1,28 @@
FROM python:3.11-slim
# Install PostgreSQL binaries — required by pytest-postgresql to spawn ephemeral test databases
RUN apt-get update \
&& apt-get install -y --no-install-recommends postgresql \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
ENV PYTHONPATH=/app
# Copy requirements first so Docker can cache the install layer
COPY backend/engine/requirements.txt backend/engine/requirements.txt
COPY backend/app/requirements/requirements.txt backend/app/requirements/requirements.txt
COPY test.requirements.txt test.requirements.txt
RUN pip install --no-cache-dir \
-r backend/engine/requirements.txt \
-r backend/app/requirements/requirements.txt \
-r test.requirements.txt
# Copy source
COPY . .
# pg_ctl refuses to run as root — create an unprivileged user
RUN useradd -m testuser && chown -R testuser /app
USER testuser
CMD ["pytest"]

View file

@ -0,0 +1,12 @@
# We need this file otherwise it'll use .dockerignore
# Exclude large/irrelevant directories that are not needed for testing
model_data/local_data/
backend/node_modules/
backend/.idea/
backend/.env
infrastructure/
data_collection/
node_modules/
conservation_areas/
open_uprn/
land_registry/

View file

@ -39,3 +39,4 @@ pytest --cov-config=model_data/.coveragerc --cov=model_data
This will produce the test results and coverage reports

View file

@ -5,10 +5,11 @@ Before you run:
Step 1) Get the list and ensure the following columns exists
I believe lower and upper case matter:
* Address 1
* Address 2
* Address 3
* postcode
* Postcode
And save it as a .csv file
@ -23,16 +24,17 @@ For this example I'll be using "s3://retrofit-data-dev/ara_raw_inputs/calico/Cal
Go to Ara DB and make a new task_id with a randomly generated uuid as the primarily key
task_id = a7b70a02-4df4-45b5-a50b-196e095910bb
sub_task_id = 567cf73b-1210-4909-9ecc-36ae7e23420e
task_id = 169ea9b0-01b5-48dc-9f90-ae1989491d09
sub_task_id = e5704f9e-29fe-43c8-8913-05be09f2440f
s3 => s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching Rerun After Address Fix.csv
Step 3) Alright, now lets make the input for postcode-splitter sqs to get the ball rolling
postcode-splitter-sqs => https://eu-west-2.console.aws.amazon.com/sqs/v3/home?region=eu-west-2#/queues/https%3A%2F%2Fsqs.eu-west-2.amazonaws.com%2F337213553626%2Fpostcode-splitter-queue-dev
{
"task_id": "a7b70a02-4df4-45b5-a50b-196e095910bb",
"sub_task_id": "567cf73b-1210-4909-9ecc-36ae7e23420e",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico Homes Full list EPC Properties(Sheet2) (1) (1).csv"
"task_id": "169ea9b0-01b5-48dc-9f90-ae1989491d09",
"sub_task_id": "e5704f9e-29fe-43c8-8913-05be09f2440f",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico UPRN Matching (1)(Sheet1).csv"
}
Each batch of csv should be saved in retrofit-data-dev/ara_postcode_splitter_batches/<task-id>/<sub-task-id>/<timestamp:uuid4>.csv

View file

@ -1,11 +1,13 @@
from typing import Optional
from epc_api.client import EpcClient
import os
from urllib.parse import urlencode
import pandas as pd
from difflib import SequenceMatcher
from utils.logger import setup_logger
import re
from typing import Set
import json
import requests
from uuid import UUID
import uuid
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
@ -16,8 +18,6 @@ from utils.s3 import (
)
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
logger = setup_logger()
@ -29,6 +29,191 @@ if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
def levenshtein(a: str, b: str) -> float:
"""
Address similarity score in [0, 1].
Strategy:
- Normalise
- Strongly penalise mismatched house/flat numbers
- Combine token overlap + character similarity
"""
def extract_number_sequence(s: str) -> list[str]:
return re.findall(r"\d+[a-z]?", s)
def extract_numbers(s: str) -> Set[str]:
return set(extract_number_sequence(s))
def tokenise(s: str) -> Set[str]:
return set(s.split())
def extract_building_number(s: str) -> str | None:
"""
Extract the main building number (NOT flat/unit).
Assumes formats like:
- '42 moreton road'
- 'flat 3 42 moreton road'
"""
tokens = s.split()
# remove flat/unit context
cleaned = []
skip_next = False
for t in tokens:
if t in ("flat", "apt", "apartment", "unit"):
skip_next = True
continue
if skip_next:
skip_next = False
continue
cleaned.append(t)
# first remaining number is building number
for t in cleaned:
if re.fullmatch(r"\d+[a-z]?", t):
return t
return None
a_norm = normalise_address(a)
b_norm = normalise_address(b)
# --- hard signal: numbers ---
nums_a = extract_numbers(a_norm)
nums_b = extract_numbers(b_norm)
if nums_a and not nums_b:
return 0.0
# No shared numbers at all → impossible match
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
return 0.0
# 🔒 HARD GUARD: building number must match
bld_a = extract_building_number(a_norm)
bld_b = extract_building_number(b_norm)
if bld_a and bld_b and bld_a != bld_b:
return 0.0
# --- order-sensitive flat/building guard ---
seq_a = extract_number_sequence(a_norm)
seq_b = extract_number_sequence(b_norm)
has_flat_token_user = any(
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
)
has_flat_token_epc = "flat" in b_norm
if (
len(seq_a) == 2
and len(seq_b) >= 2
and has_flat_token_epc
and not has_flat_token_user
and seq_a != seq_b[:2]
):
return 0.0
# --- token similarity (order-independent) ---
toks_a = tokenise(a_norm)
toks_b = tokenise(b_norm)
if not toks_a or not toks_b:
token_score = 0.0
else:
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
# --- character similarity (soft signal) ---
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
# --- weighted blend ---
return round(
0.65 * token_score + 0.35 * char_score,
4,
)
def normalise_address(s: str) -> str:
"""
Canonical UK-focused address normalisation.
- Lowercases
- Removes punctuation (keeps / for flats)
- Normalises whitespace
- Applies synonym compression at token level
"""
if not s:
return ""
ADDRESS_SYNONYMS = {
# street types
"rd": "road",
"rd.": "road",
"st": "street",
"st.": "street",
"ave": "avenue",
"ave.": "avenue",
"ln": "lane",
"ln.": "lane",
"cres": "crescent",
"ct": "court",
"dr": "drive",
# flats / units
"apt": "flat",
"apartment": "flat",
"unit": "flat",
"ste": "suite",
# numbering noise
"no": "",
"no.": "",
}
# 1. lowercase
s = s.lower()
# 1.5 split digit-letter suffixes
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
# 2. remove punctuation except /
s = re.sub(r"[^\w\s/]", " ", s)
# 3. normalise whitespace
s = re.sub(r"\s+", " ", s).strip()
# 4. tokenise + synonym normalisation
tokens = []
for tok in s.split():
replacement = ADDRESS_SYNONYMS.get(tok, tok)
if replacement:
tokens.append(replacement)
return " ".join(tokens)
def score_addresses(
df: pd.DataFrame,
user_address: str,
@ -37,7 +222,7 @@ def score_addresses(
if column not in df.columns:
raise ValueError(f"Missing column: {column}")
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
return df[column].apply(lambda x: levenshtein(user_address, x))
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
@ -129,11 +314,9 @@ def get_uprn_candidates(
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
user_norm = normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
@ -297,10 +480,7 @@ def resolve_uprns_for_postcode_group(
def save_results_to_s3(
results_df: pd.DataFrame,
task_id: str,
sub_task_id: str,
bucket_name: Optional[str] = None,
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> bool:
"""
Save results DataFrame to S3 as CSV.
@ -353,7 +533,7 @@ def handler(event, context, local=False):
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d",
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-18T11:47:00.822579_f95467f5.csv",
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv",
}
)
}
@ -441,6 +621,19 @@ def handler(event, context, local=False):
# Process the rows
logger.info(f"Processing {len(df)} rows for task {task_id}")
# Create user_input column by concatenating Address columns if not already present
if "user_input" not in df.columns:
df["user_input"] = (
df["Address 1"].fillna("")
+ " "
+ df["Address 2"].fillna("")
+ " "
+ df["Address 3"].fillna("")
).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
else:
logger.info(f"user_input column already present in data")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
@ -460,7 +653,7 @@ def handler(event, context, local=False):
)
# Validate postcode before processing
if not AddressMatch.is_valid_postcode(postcode):
if not is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
continue
@ -479,67 +672,57 @@ def handler(event, context, local=False):
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
# Concatenate Address columns directly
address2uprn_user_input = (
str(row.get("Address 1", "")).strip()
+ " "
+ str(row.get("Address 2", "")).strip()
+ " "
+ str(row.get("Address 3", "")).strip()
).strip()
if not address2uprn_user_input:
user_input = row.get("user_input", "")
if not user_input:
logger.warning(
f"Skipping row with missing address components for postcode {postcode}"
f"Skipping row with missing user_input for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
user_inputed_address=address2uprn_user_input,
epc_df=epc_df,
verbose=True,
user_inputed_address=user_input, epc_df=epc_df, verbose=True
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result
logger.info(
f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})"
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": uprn,
"address2uprn_address": found_address,
"address2uprn_lexiscore": score,
"uprn": uprn,
"domna_found_address": found_address,
"domna_lexiscore": score,
}
)
else:
logger.warning(
f"No UPRN found for {address2uprn_user_input} in {postcode}"
f"No UPRN found for {user_input} in {postcode}"
)
results_data.append(
{
**row, # Include all original data
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
}
)
except Exception as e:
logger.error(
f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}"
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append(
{
**row,
"address2uprn_uprn": None,
"address2uprn_address": None,
"address2uprn_lexiscore": None,
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
"error": str(e),
}
)

View file

@ -65,6 +65,8 @@ class Settings(BaseSettings):
ORDNANCE_SURVEY_API_KEY: str = "changeme"
HUBSPOT_API_KEY: Optional[str] = None
# Optional AWS creds (only required in local)
AWS_ACCESS_KEY_ID: Optional[str] = None
AWS_SECRET_KEY_ID: Optional[str] = None

View file

@ -30,6 +30,7 @@ DEFAULT_ENV = {
"HEATING_KWH_PREDICTIONS_BUCKET": "test",
"HOTWATER_KWH_PREDICTIONS_BUCKET": "test",
"ENERGY_ASSESSMENTS_BUCKET": "test",
"HUBSPOT_API_KEY": "changeme",
}
# runs immediately when pytest starts, BEFORE collection

View file

@ -0,0 +1,441 @@
import os
from enum import Enum
from typing import Optional, cast
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.products.api.basic_api import BasicApi as ProductsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.line_items.api.basic_api import BasicApi as LineItemsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs]
CollectionResponsePipelineNoPaging as PipelinesResponse,
)
from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs]
from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs]
from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs]
from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs]
CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse,
MultiAssociatedObjectWithLabel as AssociationsResult,
ForwardPaging as AssociationsPaging,
NextPage as AssociationsPagingNext,
)
from backend.app.config import get_settings
from utils.logger import setup_logger
import mimetypes
import requests
class Companies(Enum):
ABRI = "237615001799"
SOUTHERN_HOUSING_GROUP = "109343619305"
LIVEWEST = "86205872354"
SURESERVE = "301745289413"
HOMEGROUP = "94946071794"
APPLE = "184769046716"
THE_GUINESS_PARTNERSHIP = "86970043613"
class DealStage(Enum):
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
CUSTOMER_CONTACTED = "888730834"
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
FILES_MISSING_FROM_ASSESSOR = "1887736000"
class Pipeline(Enum):
OPERATIONS_SOCIAL_HOUSING = "1167582403"
# TODO get guiness working from here
class HubspotClient:
def __init__(self):
"""
Hey Tech Team, Hubspot Library doesn't do type hitting.
We have type hinted stuff but pylance never becomes happy.
However, because I added the type hinting to the best of ability
and you'll still get sensible ide suggestions.
"""
settings = get_settings()
access_token = settings.HUBSPOT_API_KEY
if access_token is None:
raise RuntimeError("Missing HUBSPOT_API_KEY in env")
self.access_token: str = access_token
self.logger = setup_logger()
self.client: Client = Client.create(access_token=self.access_token) # type: ignore[reportUnknownMemberType]
# [Developer Only]
# Add a dot in front of client and see the wonders of ide suggestions
# This wouldn't work if we didn't add ': Client' to self.client.
# Sorry - not sorry but enjoy, Past Junte 13/03/2026
# self.client
def get_deal_ids_from_company(self, company_id: str) -> list[str]:
associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType]
self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
)
deal_ids: list[str] = []
after: Optional[str] = None
while True:
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="companies",
object_id=company_id,
to_object_type="deals",
limit=100,
after=after,
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
for assoc in results:
assoc: AssociationsResult
object_id: str = cast(str, assoc.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
deal_ids.append(object_id)
paging: Optional[AssociationsPaging] = cast(Optional[AssociationsPaging], response.paging) # type: ignore[reportUnknownMemberType]
if not paging:
break
paging_next: Optional[AssociationsPagingNext] = cast(Optional[AssociationsPagingNext], paging.next) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
if not paging_next:
break
after = cast(str, paging_next.after) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
return deal_ids
def from_deal_id_get_associated_company_id(self, deal_id: str) -> Optional[str]:
"""
Get the associated company ID from a given deal ID.
Returns the associated company ID, or None if not found.
"""
try:
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
# Fetch associations for this specific deal only
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="companies",
limit=1, # Expect only one associated company
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
if not results:
self.logger.info(f"No company association found for deal {deal_id}")
return None
first: AssociationsResult = results[0]
company_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
return company_id
except ApiException as e:
self.logger.error(
f"Error fetching associated company for deal {deal_id}: {e}"
)
return None
def from_deal_id_get_associated_listing(
self, deal_id: str
) -> Optional[dict[str, str]]:
"""
Get the associated listing information for a given deal.
Returns a dictionary of listing properties, or None if not found.
"""
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing"
# Fetch associated listing(s)
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
object_type="deals",
object_id=deal_id,
to_object_type="0-420", # <-- use your exact custom object name slug here
limit=1,
)
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
if not results:
self.logger.info(f"No listing association found for deal {deal_id}")
return None
first: AssociationsResult = results[0]
listing_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
# Fetch listing details (the "listing information")
listing: HubspotObject = listings_api.get_by_id( # type: ignore[reportUnknownMemberType]
object_type="0-420", # again, must match your HubSpot object name
object_id=listing_id,
properties=[
"national_uprn",
"domna_property_id",
"owner_property_id",
],
)
listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType]
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
return listing_info
def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]:
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
deal: HubspotObject = deals_api.get_by_id( # type: ignore[reportUnknownMemberType]
deal_id,
properties=[
"dealname",
"dealstage",
"pipeline",
"outcome", # outcome,
"outcome_notes", # outcome notes
"project_code",
"major_condition_issue_description",
"major_condition_issue_photos",
"coordination_status__stage_1_", # Coordiantion Status (Stage 1),
"retrofit_design_status", # Retrofit Design Status
],
)
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
return deal_info
def get_deal_info_for_db(
self, deal_id: str
) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]:
deal: dict[str, str] = self.from_deal_id_get_info(deal_id)
company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id)
listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing(
deal_id
)
return deal, company, listing
def get_company_information(self, company_id: str) -> dict[str, str]:
companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType]
company: HubspotObject = companies_api.get_by_id( # type: ignore[reportUnknownMemberType]
company_id,
properties=[
"name",
],
)
company_info: dict[str, str] = cast(dict[str, str], company.properties) # type: ignore[reportUnknownMemberType]
return company_info
def get_all_pipelines(self) -> list[dict[str, str]]:
"""
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
"""
try:
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
results: list[HubspotPipeline] = cast(list[HubspotPipeline], response.results) # type: ignore[reportUnknownMemberType]
pipelines: list[dict[str, str]] = []
for pipeline in results:
pipeline: HubspotPipeline
pipelines.append(
{
"name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
"id": cast(str, pipeline.id), # type: ignore[reportUnknownMemberType]
}
)
self.logger.info(f"Retrieved {len(pipelines)} pipelines.")
return pipelines
except Exception as e:
self.logger.error(f"Error retrieving pipelines: {e}")
return []
def get_deal_stages_from_pipeline_id(
self, pipeline_id: Optional[str] = None
) -> list[dict[str, str]]:
"""
Retrieve all deal stages for a given pipeline.
If no pipeline_id is provided, retrieves all stages for all pipelines.
Returns a list of dicts with pipeline name, stage name, and stage ID.
"""
try:
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
all_stages: list[dict[str, str]] = []
for pipeline in cast(list[HubspotPipeline], response.results): # type: ignore[reportUnknownMemberType]
pipeline: HubspotPipeline
# Skip other pipelines if a specific one is requested
pipeline_id_str: str = cast(str, pipeline.id) # type: ignore[reportUnknownMemberType]
if pipeline_id and pipeline_id_str != str(pipeline_id):
continue
for stage in cast(list[HubspotPipelineStage], pipeline.stages): # type: ignore[reportUnknownMemberType]
stage: HubspotPipelineStage
all_stages.append(
{
"pipeline_name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
"pipeline_id": pipeline_id_str,
"stage_name": cast(str, stage.label), # type: ignore[reportUnknownMemberType]
"stage_id": cast(str, stage.id), # type: ignore[reportUnknownMemberType]
}
)
if not all_stages:
self.logger.info(
f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}"
)
else:
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
return all_stages
except Exception as e:
self.logger.error(f"Error retrieving deal stages: {e}")
return []
def download_file_from_url(
self, download_url: str, save_path: Optional[str] = None
) -> str:
"""
Download a file from a HubSpot file URL (public or private), keeping its original file type.
"""
try:
headers: dict[str, str] = {}
if "hubspotusercontent" not in download_url:
headers["Authorization"] = f"Bearer {self.access_token}"
self.logger.info(f"Downloading HubSpot file: {download_url}")
response = requests.get(
download_url, headers=headers, stream=True, allow_redirects=True
)
response.raise_for_status()
# Try to infer filename from Content-Disposition header
content_disposition = response.headers.get("content-disposition")
if content_disposition and "filename=" in content_disposition:
filename = content_disposition.split("filename=")[1].strip('"')
else:
# fallback: extract from URL or content-type
filename = (
os.path.basename(download_url.split("?")[0]) or "hubspot_download"
)
if "." not in filename:
content_type = response.headers.get("content-type")
ext = (
mimetypes.guess_extension(content_type.split(";")[0])
if content_type
else None
)
if ext:
filename += ext
# Make sure save_path is valid
if save_path is None:
save_path = os.path.abspath(filename)
elif os.path.isdir(save_path):
save_path = os.path.join(save_path, filename)
else:
# if user passes a file path directly, leave it
save_path = os.path.abspath(save_path)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.info(f"File downloaded successfully → {save_path}")
return save_path
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to download file from HubSpot: {e}")
raise
def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str:
# Fetch product mapping
products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType]
product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType]
product_id, properties=["name", "price", "hs_price"]
)
properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType]
name: str = properties.get("name") or ""
price: str = (
properties.get("price") or properties.get("hs_price") or "0"
)
# Build line item payload
line_item_input = SimplePublicObjectInput(
properties={
"hs_product_id": product_id,
"name": name,
"quantity": str(quantity),
"price": price,
"amount": str(float(price) * quantity),
"invoiced": "Outstanding",
}
)
# Create line item
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
return cast(str, line_item.id) # type: ignore[reportUnknownMemberType]
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None:
self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}")
association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
association_api.create( # type: ignore[reportUnknownMemberType]
"0-3", # to object type
deal_id, # to object id
"line_items", # from object type
line_item_id, # from object id
[
AssociationSpec(
association_category="HUBSPOT_DEFINED",
association_type_id=19, # line_item → deal
)
],
)
def add_product_line_item_to_deal(
self, deal_id: str, product_id: str, quantity: int = 1
) -> str:
# Step 1: Create the line item from product mapping
line_item_id: str = self.create_line_item_from_product(product_id, quantity)
# Step 2: Associate the created line item to the deal
self.associate_line_item_to_deal(line_item_id, deal_id)
return line_item_id
def delete_line_item(self, line_item_id: str) -> bool:
"""
Delete (archive) a line item in HubSpot by its ID.
"""
try:
self.logger.info(f"Deleting line item {line_item_id}...")
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType]
self.logger.info(f"Line item {line_item_id} deleted successfully.")
return True
except ApiException as e:
self.logger.error(f"Failed to delete line item {line_item_id}: {e}")
return False

View file

@ -0,0 +1 @@
hubspot-api-client

View file

View file

@ -0,0 +1,117 @@
import os
from typing import Optional
import pytest
from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline, DealStage
class TestHubspotClientIntegration:
"""Integration tests using real HubSpot API calls."""
@pytest.fixture
def client(self):
"""Initialize HubSpot client with env variables."""
return HubspotClient()
def test_client_initialization(self, client: HubspotClient):
"""Checks initialisation of HubspotClient and fails early if env variables is not set"""
assert client.access_token is not None
assert client.client is not None
assert client.logger is not None
def test_get_deal_ids_from_company(self, client: HubspotClient):
"""Test getting deal IDs from Apple company includes expected deal."""
company_id: str = Companies.APPLE.value
deal_ids: list[str] = client.get_deal_ids_from_company(company_id)
# https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079
assert "263490768079" in deal_ids
def test_get_company_id_from_deal_id(self, client: HubspotClient):
deal_id: str = "263490768079"
company_id: Optional[str] = client.from_deal_id_get_associated_company_id(
deal_id
)
# https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079
assert company_id == Companies.APPLE.value
def test_from_deal_id_get_associated_listing(self, client: HubspotClient):
deal_id: str = "263490768079"
listing_info: Optional[dict[str, str]] = (
client.from_deal_id_get_associated_listing(deal_id)
)
assert listing_info is not None
assert "hs_object_id" in listing_info
assert "national_uprn" in listing_info
assert "owner_property_id" in listing_info
assert "domna_property_id" in listing_info
def test_from_deal_id_get_info(self, client: HubspotClient):
deal_id: str = "263490768079"
deal_info: dict[str, str] = client.from_deal_id_get_info(deal_id)
assert "dealname" in deal_info
assert "dealstage" in deal_info
assert "pipeline" in deal_info
assert "outcome" in deal_info # outcome
assert "outcome_notes" in deal_info # outcome notes
assert "project_code" in deal_info
assert "major_condition_issue_description" in deal_info
assert "major_condition_issue_photos" in deal_info
assert (
"coordination_status__stage_1_" in deal_info
) # Coordiantion Status (Stage 1)
assert "retrofit_design_status" in deal_info # Retrofit Design Status
def test_get_deal_info_for_db(self, client: HubspotClient):
deal_id: str = "263490768079"
deal, company, listing = client.get_deal_info_for_db(deal_id)
assert "dealname" in deal
assert "dealstage" in deal
assert "pipeline" in deal
assert company == Companies.APPLE.value
assert listing is None or "hs_object_id" in listing
def test_get_company_information(self, client: HubspotClient):
company_id: str = Companies.APPLE.value
company_info: dict[str, str] = client.get_company_information(company_id)
assert "name" in company_info
assert company_info["name"].lower() == "Apple".lower()
def test_get_all_pipelines(self, client: HubspotClient):
pipelines: list[dict[str, str]] = client.get_all_pipelines()
assert len(pipelines) > 0
pipeline_ids: list[str] = [p["id"] for p in pipelines]
assert Pipeline.OPERATIONS_SOCIAL_HOUSING.value in pipeline_ids
def test_get_deal_stages_from_pipeline_id(self, client: HubspotClient):
stages: list[dict[str, str]] = client.get_deal_stages_from_pipeline_id(
Pipeline.OPERATIONS_SOCIAL_HOUSING.value
)
assert len(stages) > 0
stage_ids: list[str] = [s["stage_id"] for s in stages]
assert DealStage.SURVEYED_COMPLETE_NEEDS_SIGN_OFF.value in stage_ids
def test_download_file_from_url(
self, client: HubspotClient, tmp_path: Optional[str]
):
deal_info: dict[str, str] = client.from_deal_id_get_info("254427203793")
download_url: str = deal_info["major_condition_issue_photos"]
save_path: str = client.download_file_from_url(download_url, str(tmp_path))
assert os.path.exists(save_path)
assert os.path.getsize(save_path) > 0

View file

@ -2,7 +2,7 @@
"typeCheckingMode": "strict",
"venvPath": "/Users/khalimconn-kowlessar/opt/anaconda3/envs/",
"venv": "Fastapi-backend",
"include": [
"include": [
"."
]
}

View file

@ -3,4 +3,4 @@ pythonpath = .
log_cli = true
log_cli_level = INFO
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests

View file

@ -4,4 +4,6 @@ pytest-cov
pytest-mock
dotenv
psycopg[binary]
pytest-postgresql
pytest-postgresql
hubspot-api-client
fuzzywuzzy