mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge branch 'main' into feature/pashub-to-ara
This commit is contained in:
commit
838765f5f5
41 changed files with 1559 additions and 89 deletions
|
|
@ -35,7 +35,8 @@ ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1
|
|||
ADD backend/engine/requirements.txt requirements1.txt
|
||||
ADD backend/app/requirements/requirements.txt requirements2.txt
|
||||
ADD .devcontainer/backend/requirements.txt requirements3.txt
|
||||
RUN cat requirements1.txt requirements2.txt requirements3.txt > requirements.txt
|
||||
ADD etl/hubspot/requirements.txt requirements4.txt
|
||||
RUN cat requirements1.txt requirements2.txt requirements3.txt requirements4.txt > requirements.txt
|
||||
RUN pip install -r requirements.txt
|
||||
|
||||
# 5) Workdir
|
||||
|
|
|
|||
|
|
@ -25,4 +25,4 @@ psycopg[binary]
|
|||
pytest-postgresql
|
||||
# Formatting
|
||||
black==26.1.0
|
||||
boto3-stubs
|
||||
boto3-stubs
|
||||
|
|
|
|||
38
.github/workflows/deploy_terraform.yml
vendored
38
.github/workflows/deploy_terraform.yml
vendored
|
|
@ -484,4 +484,40 @@ jobs:
|
|||
- name: Terraform Apply
|
||||
if: env.TERRAFORM_APPLY == 'true'
|
||||
working-directory: infrastructure/terraform/cdn
|
||||
run: terraform apply -auto-approve tfplan
|
||||
run: terraform apply -auto-approve tfplan
|
||||
|
||||
# ============================================================
|
||||
# Build Hubspot ETL image
|
||||
# ============================================================
|
||||
hubspot_etl_image:
|
||||
needs: [determine_stage, shared_terraform]
|
||||
uses: ./.github/workflows/_build_image.yml
|
||||
with:
|
||||
ecr_repo: hubspot-etl-${{ needs.determine_stage.outputs.stage }}
|
||||
dockerfile_path: etl/hubspot/scripts/scraper/handler/Dockerfile
|
||||
build_context: .
|
||||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
||||
# ============================================================
|
||||
# Deploy Hubspot ETL Lambda
|
||||
# ============================================================
|
||||
hubspot_etl_lambda:
|
||||
needs: [hubspot_etl_image, determine_stage]
|
||||
uses: ./.github/workflows/_deploy_lambda.yml
|
||||
with:
|
||||
lambda_name: hubspot-etl-to-ara
|
||||
lambda_path: infrastructure/terraform/lambda/hubspot_deal_etl
|
||||
stage: ${{ needs.determine_stage.outputs.stage }}
|
||||
ecr_repo: hubspot-etl-${{ needs.determine_stage.outputs.stage }}
|
||||
image_digest: ${{ needs.hubspot_etl_image.outputs.image_digest }}
|
||||
terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
|
||||
secrets:
|
||||
TF_VAR_db_host: ${{ secrets.DEV_DB_HOST }}
|
||||
TF_VAR_db_name: ${{ secrets.DEV_DB_NAME }}
|
||||
TF_VAR_db_port: ${{ secrets.DEV_DB_PORT }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
|
|
|||
50
.github/workflows/unit_tests.yml
vendored
50
.github/workflows/unit_tests.yml
vendored
|
|
@ -7,24 +7,52 @@ on:
|
|||
|
||||
|
||||
jobs:
|
||||
test:
|
||||
test-docker:
|
||||
name: Tests (Docker)
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:15
|
||||
env:
|
||||
POSTGRES_USER: test
|
||||
POSTGRES_PASSWORD: test
|
||||
POSTGRES_DB: test
|
||||
ports:
|
||||
- 5432:5432
|
||||
options: >-
|
||||
--health-cmd pg_isready
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.11'
|
||||
- name: Build test image
|
||||
run: docker build -f Dockerfile.test -t model-test .
|
||||
|
||||
- name: Install tox via Makefile
|
||||
- name: Initialise database schema
|
||||
run: |
|
||||
make setup
|
||||
docker run --rm \
|
||||
--network host \
|
||||
-e DB_HOST=localhost \
|
||||
-e DB_NAME=test \
|
||||
-e DB_USERNAME=test \
|
||||
-e DB_PASSWORD=test \
|
||||
-e DB_PORT=5432 \
|
||||
model-test python scripts/init_db.py
|
||||
|
||||
- name: Run tests with tox via Makefile
|
||||
env:
|
||||
EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }}
|
||||
- name: Run tests
|
||||
run: |
|
||||
make test ARGS="-m 'not integration'"
|
||||
docker run --rm \
|
||||
--network host \
|
||||
-e EPC_AUTH_TOKEN=${{ secrets.DEV_EPC_AUTH_TOKEN }} \
|
||||
-e HUBSPOT_API_KEY=${{ secrets.HUBSPOT_API_KEY }} \
|
||||
-e DB_HOST=localhost \
|
||||
-e DB_NAME=test \
|
||||
-e DB_USERNAME=test \
|
||||
-e DB_PASSWORD=test \
|
||||
-e DB_PORT=5432 \
|
||||
model-test pytest -vv -m 'not integration'
|
||||
|
|
|
|||
28
Dockerfile.test
Normal file
28
Dockerfile.test
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
FROM python:3.11-slim
|
||||
|
||||
# Install PostgreSQL binaries — required by pytest-postgresql to spawn ephemeral test databases
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends postgresql \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
# Copy requirements first so Docker can cache the install layer
|
||||
COPY backend/engine/requirements.txt backend/engine/requirements.txt
|
||||
COPY backend/app/requirements/requirements.txt backend/app/requirements/requirements.txt
|
||||
COPY test.requirements.txt test.requirements.txt
|
||||
|
||||
RUN pip install --no-cache-dir \
|
||||
-r backend/engine/requirements.txt \
|
||||
-r backend/app/requirements/requirements.txt \
|
||||
-r test.requirements.txt
|
||||
|
||||
# Copy source
|
||||
COPY . .
|
||||
|
||||
# pg_ctl refuses to run as root — create an unprivileged user
|
||||
RUN useradd -m testuser && chown -R testuser /app
|
||||
USER testuser
|
||||
|
||||
CMD ["pytest"]
|
||||
12
Dockerfile.test.dockerignore
Normal file
12
Dockerfile.test.dockerignore
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
# We need this file otherwise it'll use .dockerignore
|
||||
# Exclude large/irrelevant directories that are not needed for testing
|
||||
model_data/local_data/
|
||||
backend/node_modules/
|
||||
backend/.idea/
|
||||
backend/.env
|
||||
infrastructure/
|
||||
data_collection/
|
||||
node_modules/
|
||||
conservation_areas/
|
||||
open_uprn/
|
||||
land_registry/
|
||||
|
|
@ -39,3 +39,4 @@ pytest --cov-config=model_data/.coveragerc --cov=model_data
|
|||
|
||||
This will produce the test results and coverage reports
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -73,59 +73,24 @@ def app():
|
|||
Property UPRN
|
||||
"""
|
||||
|
||||
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/E.ON/202603 modelling project"
|
||||
# # data_filename = "For Modelling - Final - reviewed.xlsx"
|
||||
# data_filename = "eon - 20260323 address sanitisation.xlsx"
|
||||
# sheet_name = "in"
|
||||
# postcode_column = "postcode"
|
||||
# address1_column = "Address 1"
|
||||
# address1_method = None
|
||||
# fulladdress_column = "Address 1"
|
||||
# address_cols_to_concat = []
|
||||
# missing_postcodes_method = None
|
||||
# landlord_year_built = None
|
||||
# landlord_os_uprn = "address2uprn_uprn"
|
||||
# landlord_property_type = "PropertyType"
|
||||
# landlord_built_form = "BuiltForm"
|
||||
# landlord_wall_construction = None
|
||||
# landlord_roof_construction = None
|
||||
# landlord_heating_system = None
|
||||
# landlord_existing_pv = None
|
||||
# landlord_property_id = "UPRN"
|
||||
# landlord_sap = None
|
||||
# outcomes_filename = None
|
||||
# outcomes_sheetname = None
|
||||
# outcomes_postcode = None
|
||||
# outcomes_houseno = None
|
||||
# outcomes_id = None
|
||||
# outcomes_address = None
|
||||
# master_filepaths = []
|
||||
# master_id_colnames = []
|
||||
# master_to_asset_list_filepath = None
|
||||
# phase = False
|
||||
# ecosurv_landlords = None
|
||||
# asset_list_header = 0
|
||||
# landlord_block_reference = None
|
||||
|
||||
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/SMS"
|
||||
# data_filename = "For Modelling - Final - reviewed.xlsx"
|
||||
data_filename = "SMS Data sample to sense check before WHLG deploy.xlsx"
|
||||
sheet_name = "All Darlaston Properties"
|
||||
data_folder = "/workspaces/model/asset_list"
|
||||
data_filename = "Calico ARA Upload Review.xlsx"
|
||||
sheet_name = "Sheet1"
|
||||
postcode_column = "Postcode"
|
||||
address1_column = "House Number"
|
||||
address1_column = "Units"
|
||||
address1_method = None
|
||||
fulladdress_column = None
|
||||
address_cols_to_concat = ["House Number", "Street name"]
|
||||
fulladdress_column = "Units"
|
||||
address_cols_to_concat = ["Units"]
|
||||
missing_postcodes_method = None
|
||||
landlord_year_built = None
|
||||
landlord_os_uprn = None
|
||||
landlord_property_type = None
|
||||
landlord_built_form = None
|
||||
landlord_property_type = None # Good to include if landlord gave
|
||||
landlord_built_form = None # Good to include if landlord gave
|
||||
landlord_wall_construction = None
|
||||
landlord_roof_construction = None
|
||||
landlord_heating_system = None
|
||||
landlord_existing_pv = None
|
||||
landlord_property_id = "id"
|
||||
landlord_property_id = "llid"
|
||||
landlord_sap = None
|
||||
outcomes_filename = None
|
||||
outcomes_sheetname = None
|
||||
|
|
@ -277,7 +242,7 @@ def app():
|
|||
if skip is not None and not force_retrieve_data:
|
||||
if i <= skip:
|
||||
continue
|
||||
chunk = asset_list.standardised_asset_list[i: i + chunk_size]
|
||||
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
|
||||
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
|
||||
df=chunk,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
|
|
@ -420,7 +385,7 @@ def app():
|
|||
# Retrieve just the data we need
|
||||
epc_df = epc_df[
|
||||
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
|
||||
# Look for columns not in the find my EPC data, which will have happened if we didn't
|
||||
# retrieve it in the first place
|
||||
|
|
@ -437,7 +402,7 @@ def app():
|
|||
find_my_epc_data[
|
||||
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
|
||||
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
how="left",
|
||||
on=asset_list.DOMNA_PROPERTY_ID,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ Before you run:
|
|||
|
||||
Step 1) Get the list and ensure the following columns exists
|
||||
|
||||
I believe lower and upper case matter:
|
||||
* Address 1
|
||||
* Address 2
|
||||
* Address 3
|
||||
|
|
@ -23,17 +24,19 @@ For this example I'll be using "s3://retrofit-data-dev/ara_raw_inputs/calico/Cal
|
|||
|
||||
Go to Ara DB and make a new task_id with a randomly generated uuid as the primarily key
|
||||
|
||||
task_id = a7b70a02-4df4-45b5-a50b-196e095910bb
|
||||
sub_task_id = 567cf73b-1210-4909-9ecc-36ae7e23420e
|
||||
task_id = ea615ac3-ac28-46c4-8bff-2431c5b9c13d
|
||||
sub_task_id = 85a23b67-8f18-4299-9bf0-69bfb87adbc7
|
||||
s3 => s3://retrofit-data-dev/ara_raw_inputs/eon/North Tyneside Council.csv
|
||||
|
||||
Step 3) Alright, now lets make the input for postcode-splitter sqs to get the ball rolling
|
||||
postcode-splitter-sqs => https://eu-west-2.console.aws.amazon.com/sqs/v3/home?region=eu-west-2#/queues/https%3A%2F%2Fsqs.eu-west-2.amazonaws.com%2F337213553626%2Fpostcode-splitter-queue-dev
|
||||
|
||||
{
|
||||
"task_id": "a7b70a02-4df4-45b5-a50b-196e095910bb",
|
||||
"sub_task_id": "567cf73b-1210-4909-9ecc-36ae7e23420e",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/Calico Homes Full list EPC Properties(Sheet2) (1) (1).csv"
|
||||
"task_id": "ea615ac3-ac28-46c4-8bff-2431c5b9c13d",
|
||||
"sub_task_id": "85a23b67-8f18-4299-9bf0-69bfb87adbc7",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/eon/eon(Sheet1).csv"
|
||||
}
|
||||
|
||||
Each batch of csv should be saved in retrofit-data-dev/ara_postcode_splitter_batches/<task-id>/<sub-task-id>/<timestamp:uuid4>.csv
|
||||
|
||||
outputs of address2uprn ( which is automatically triggered on postcodesplitter) will be saved on retrofit-data-dev/ara_raw_outputs/<task-id>/<subtask-id>/<timestamp:uuid4>.csv
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ class Settings(BaseSettings):
|
|||
|
||||
ORDNANCE_SURVEY_API_KEY: str = "changeme"
|
||||
|
||||
HUBSPOT_API_KEY: Optional[str] = None
|
||||
# Sharepoint
|
||||
SHAREPOINT_CLIENT_ID: Optional[str] = None
|
||||
SHAREPOINT_CLIENT_SECRET: Optional[str] = None
|
||||
|
|
@ -102,7 +103,6 @@ def get_prediction_buckets():
|
|||
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET,
|
||||
"heating_kwh_predictions": get_settings().HEATING_KWH_PREDICTIONS_BUCKET,
|
||||
"hotwater_kwh_predictions": get_settings().HOTWATER_KWH_PREDICTIONS_BUCKET,
|
||||
|
||||
# Score model - SAP re-baselining model
|
||||
"retrofit_sap_baseline_predictions": get_settings().SAP_BASELINE_PREDICTIONS_BUCKET,
|
||||
"retrofit_carbon_baseline_predictions": get_settings().CARBON_BASELINE_PREDICTIONS_BUCKET,
|
||||
|
|
|
|||
58
backend/app/db/models/organisation.py
Normal file
58
backend/app/db/models/organisation.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
from sqlmodel import SQLModel, Field, Column, text
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
from sqlalchemy import DateTime
|
||||
from sqlalchemy.sql import func
|
||||
import uuid
|
||||
|
||||
|
||||
class Organisation(SQLModel, table=True):
|
||||
__tablename__ = "organisation"
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
hubspot_company_id: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
class HubspotDealData(SQLModel, table=True):
|
||||
__tablename__ = "hubspot_deal_data"
|
||||
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
||||
|
||||
# HubSpot Deal identifiers
|
||||
deal_id: str = Field(index=True, nullable=False)
|
||||
dealname: Optional[str] = Field(default=None)
|
||||
dealstage: Optional[str] = Field(default=None)
|
||||
company_id: Optional[str] = Field(default=None)
|
||||
project_code: Optional[str] = Field(default=None)
|
||||
|
||||
# HubSpot custom properties
|
||||
landlord_property_id: Optional[str] = Field(default=None)
|
||||
uprn: Optional[str] = Field(default=None)
|
||||
outcome: Optional[str] = Field(default=None)
|
||||
outcome_notes: Optional[str] = Field(default=None)
|
||||
|
||||
major_condition_issue_description: Optional[str] = Field(default=None)
|
||||
major_condition_issue_photos: Optional[str] = Field(default=None)
|
||||
major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None)
|
||||
|
||||
coordination_status: Optional[str] = Field(default=None)
|
||||
design_status: Optional[str] = Field(default=None)
|
||||
|
||||
created_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("(NOW() AT TIME ZONE 'utc')"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
||||
updated_at: datetime = Field(
|
||||
sa_column=Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("(NOW() AT TIME ZONE 'utc')"),
|
||||
onupdate=func.now(),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
|
|
@ -32,7 +32,6 @@ from backend.ml_models.api import ModelApi
|
|||
from backend.ml_models.Valuation import PropertyValuation
|
||||
from backend.Property import Property
|
||||
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
||||
from backend.addresses.Addresses import Addresses
|
||||
|
||||
from recommendations.optimiser.CostOptimiser import CostOptimiser
|
||||
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
||||
|
|
@ -642,7 +641,9 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
|
||||
epc_records = patch_epc(patch, epc_records)
|
||||
|
||||
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
|
||||
prepared_epc = EPCRecord(
|
||||
epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, address_metadata=addr
|
||||
)
|
||||
|
||||
input_properties.append(
|
||||
Property(
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ import pytest
|
|||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from backend.app.db.base import Base
|
||||
from sqlmodel import SQLModel
|
||||
import backend.app.db.models.organisation # noqa: F401 — registers Organisation with SQLModel.metadata
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
|
@ -25,12 +27,14 @@ def engine(postgresql):
|
|||
|
||||
# Create tables once per test session
|
||||
Base.metadata.create_all(engine)
|
||||
SQLModel.metadata.create_all(engine)
|
||||
|
||||
# Yeild will split this function into two phase. 1) setup and 2) teardown, the latter of which will run after all
|
||||
# tests have completed
|
||||
yield engine
|
||||
|
||||
# Clean-up after entire test session
|
||||
SQLModel.metadata.drop_all(engine)
|
||||
Base.metadata.drop_all(engine)
|
||||
engine.dispose()
|
||||
|
||||
|
|
|
|||
|
|
@ -53,13 +53,3 @@ def main(task_id, output):
|
|||
|
||||
print(f"Combined CSV saved to {output}")
|
||||
print(f"Total rows: {len(combined)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("task_id", help="Task ID folder in S3")
|
||||
parser.add_argument("--output", default="combined.csv")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
main(args.task_id, args.output)
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ DEFAULT_ENV = {
|
|||
"HEATING_KWH_PREDICTIONS_BUCKET": "test",
|
||||
"HOTWATER_KWH_PREDICTIONS_BUCKET": "test",
|
||||
"ENERGY_ASSESSMENTS_BUCKET": "test",
|
||||
"HUBSPOT_API_KEY": "changeme",
|
||||
}
|
||||
|
||||
# runs immediately when pytest starts, BEFORE collection
|
||||
|
|
|
|||
|
|
@ -566,6 +566,7 @@ class EPCRecord:
|
|||
"multi_glaze_proportion": addr.landlord_multi_glaze_proportion,
|
||||
"construction_age_band": addr.landlord_construction_age_band,
|
||||
}
|
||||
landlord_remapping = {k: v for k, v in landlord_remapping.items() if v is not None}
|
||||
|
||||
# Sanity check - ensure valid keys
|
||||
if any(k not in self._prepared_epc for k in landlord_remapping):
|
||||
|
|
@ -573,6 +574,7 @@ class EPCRecord:
|
|||
|
||||
self.landlord_differences = {} # Anything actaully changed
|
||||
for k, v in landlord_remapping.items():
|
||||
|
||||
if k == "total_floor_area":
|
||||
existing = self._prepared_epc.get(k)
|
||||
if existing is not None and v is not None and abs(existing - v) > 1: # 1m tolerance
|
||||
|
|
|
|||
448
etl/hubspot/hubspotClient.py
Normal file
448
etl/hubspot/hubspotClient.py
Normal file
|
|
@ -0,0 +1,448 @@
|
|||
import os
|
||||
from enum import Enum
|
||||
from typing import Optional, cast
|
||||
|
||||
from hubspot.client import Client # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations import ApiException # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects import SimplePublicObjectInput # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects.api.basic_api import BasicApi as ObjectsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.deals.api.basic_api import BasicApi as DealsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.companies.api.basic_api import BasicApi as CompaniesBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.products.api.basic_api import BasicApi as ProductsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.line_items.api.basic_api import BasicApi as LineItemsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.api.pipelines_api import PipelinesApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.models import ( # type: ignore[reportMissingTypeStubs]
|
||||
CollectionResponsePipelineNoPaging as PipelinesResponse,
|
||||
)
|
||||
from hubspot.crm.pipelines.models import Pipeline as HubspotPipeline # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.pipelines.models import PipelineStage as HubspotPipelineStage # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.objects.models import SimplePublicObject as HubspotObject # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4 import AssociationSpec # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4.api.basic_api import BasicApi as AssociationsBasicApi # type: ignore[reportMissingTypeStubs]
|
||||
from hubspot.crm.associations.v4.models import ( # type: ignore[reportMissingTypeStubs]
|
||||
CollectionResponseMultiAssociatedObjectWithLabelForwardPaging as AssociationsPageResponse,
|
||||
MultiAssociatedObjectWithLabel as AssociationsResult,
|
||||
ForwardPaging as AssociationsPaging,
|
||||
NextPage as AssociationsPagingNext,
|
||||
)
|
||||
from etl.hubspot.hubspotDataTodB import CompanyData, HubspotDataToDb
|
||||
|
||||
|
||||
from backend.app.config import get_settings
|
||||
from utils.logger import setup_logger
|
||||
|
||||
import mimetypes
|
||||
import requests
|
||||
|
||||
|
||||
class Companies(Enum):
|
||||
ABRI = "237615001799"
|
||||
SOUTHERN_HOUSING_GROUP = "109343619305"
|
||||
LIVEWEST = "86205872354"
|
||||
SURESERVE = "301745289413"
|
||||
HOMEGROUP = "94946071794"
|
||||
APPLE = "184769046716"
|
||||
THE_GUINESS_PARTNERSHIP = "86970043613"
|
||||
CALICO_HOMES = "86975437046"
|
||||
|
||||
|
||||
class DealStage(Enum):
|
||||
SURVEYED_COMPLETE_NEEDS_SIGN_OFF = "1617223914"
|
||||
SURVEYED_NO_ACCESS_NEED_SIGN_OFF = "1617223915"
|
||||
CUSTOMER_CONTACTED = "888730834"
|
||||
SURVEYED_COMPLETED_SIGNED_OFF = "1617223916"
|
||||
FILES_MISSING_FROM_ASSESSOR = "1887736000"
|
||||
|
||||
|
||||
class Pipeline(Enum):
|
||||
OPERATIONS_SOCIAL_HOUSING = "1167582403"
|
||||
|
||||
|
||||
# TODO get guiness working from here
|
||||
|
||||
|
||||
class HubspotClient:
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Hey Tech Team, Hubspot Library doesn't do type hitting.
|
||||
We have type hinted stuff but pylance never becomes happy.
|
||||
However, because I added the type hinting to the best of ability
|
||||
and you'll still get sensible ide suggestions.
|
||||
"""
|
||||
settings = get_settings()
|
||||
access_token = settings.HUBSPOT_API_KEY
|
||||
if access_token is None:
|
||||
raise RuntimeError("Missing HUBSPOT_API_KEY in env")
|
||||
self.access_token: str = access_token
|
||||
self.logger = setup_logger()
|
||||
self.client: Client = Client.create(access_token=self.access_token) # type: ignore[reportUnknownMemberType]
|
||||
# [Developer Only]
|
||||
# Add a dot in front of client and see the wonders of ide suggestions
|
||||
# This wouldn't work if we didn't add ': Client' to self.client.
|
||||
# Sorry - not sorry but enjoy, Past Junte 13/03/2026
|
||||
# self.client
|
||||
|
||||
def get_deal_ids_from_company(self, company_id: str) -> list[str]:
|
||||
associations_api: AssociationsBasicApi = ( # type: ignore[reportUnknownMemberType]
|
||||
self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
|
||||
)
|
||||
|
||||
deal_ids: list[str] = []
|
||||
after: Optional[str] = None
|
||||
|
||||
while True:
|
||||
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
|
||||
object_type="companies",
|
||||
object_id=company_id,
|
||||
to_object_type="deals",
|
||||
limit=100,
|
||||
after=after,
|
||||
)
|
||||
|
||||
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
|
||||
for assoc in results:
|
||||
assoc: AssociationsResult
|
||||
object_id: str = cast(str, assoc.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
||||
deal_ids.append(object_id)
|
||||
|
||||
paging: Optional[AssociationsPaging] = cast(Optional[AssociationsPaging], response.paging) # type: ignore[reportUnknownMemberType]
|
||||
if not paging:
|
||||
break
|
||||
|
||||
paging_next: Optional[AssociationsPagingNext] = cast(Optional[AssociationsPagingNext], paging.next) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
||||
if not paging_next:
|
||||
break
|
||||
|
||||
after = cast(str, paging_next.after) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
||||
|
||||
return deal_ids
|
||||
|
||||
def from_deal_id_get_associated_company_id(self, deal_id: str) -> Optional[str]:
|
||||
"""
|
||||
Get the associated company ID from a given deal ID.
|
||||
Returns the associated company ID, or None if not found.
|
||||
"""
|
||||
try:
|
||||
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
|
||||
|
||||
# Fetch associations for this specific deal only
|
||||
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
|
||||
object_type="deals",
|
||||
object_id=deal_id,
|
||||
to_object_type="companies",
|
||||
limit=1, # Expect only one associated company
|
||||
)
|
||||
|
||||
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
|
||||
if not results:
|
||||
self.logger.info(f"No company association found for deal {deal_id}")
|
||||
return None
|
||||
|
||||
first: AssociationsResult = results[0]
|
||||
company_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
||||
self.logger.info(f"Associated company ID for deal {deal_id}: {company_id}")
|
||||
return company_id
|
||||
|
||||
except ApiException as e:
|
||||
self.logger.error(
|
||||
f"Error fetching associated company for deal {deal_id}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def from_deal_id_get_associated_listing(
|
||||
self, deal_id: str
|
||||
) -> Optional[dict[str, str]]:
|
||||
"""
|
||||
Get the associated listing information for a given deal.
|
||||
Returns a dictionary of listing properties, or None if not found.
|
||||
"""
|
||||
associations_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
|
||||
listings_api: ObjectsBasicApi = self.client.crm.objects.basic_api # type: ignore[reportUnknownMemberType] # works for custom objects like "listing"
|
||||
|
||||
# Fetch associated listing(s)
|
||||
response: AssociationsPageResponse = associations_api.get_page( # type: ignore[reportUnknownMemberType]
|
||||
object_type="deals",
|
||||
object_id=deal_id,
|
||||
to_object_type="0-420", # <-- to get an listing object
|
||||
limit=1,
|
||||
)
|
||||
|
||||
results: list[AssociationsResult] = cast(list[AssociationsResult], response.results) # type: ignore[reportUnknownMemberType]
|
||||
if not results:
|
||||
self.logger.info(f"No listing association found for deal {deal_id}")
|
||||
return None
|
||||
|
||||
first: AssociationsResult = results[0]
|
||||
listing_id: str = cast(str, first.to_object_id) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
|
||||
self.logger.info(f"Associated listing ID for deal {deal_id}: {listing_id}")
|
||||
|
||||
# Fetch listing details (the "listing information")
|
||||
listing: HubspotObject = listings_api.get_by_id( # type: ignore[reportUnknownMemberType]
|
||||
object_type="0-420", # again, must match your HubSpot object name
|
||||
object_id=listing_id,
|
||||
properties=[
|
||||
"national_uprn",
|
||||
"domna_property_id",
|
||||
"owner_property_id",
|
||||
],
|
||||
)
|
||||
|
||||
listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType]
|
||||
self.logger.info(f"Listing info for deal {deal_id}: {listing_info}")
|
||||
return listing_info
|
||||
|
||||
def from_deal_id_get_info(self, deal_id: str) -> dict[str, str]:
|
||||
deals_api: DealsBasicApi = self.client.crm.deals.basic_api # type: ignore[reportUnknownMemberType]
|
||||
|
||||
deal: HubspotObject = deals_api.get_by_id( # type: ignore[reportUnknownMemberType]
|
||||
deal_id,
|
||||
properties=[
|
||||
"dealname",
|
||||
"dealstage",
|
||||
"pipeline",
|
||||
"outcome", # outcome,
|
||||
"outcome_notes", # outcome notes
|
||||
"project_code",
|
||||
"major_condition_issue_description",
|
||||
"major_condition_issue_photos",
|
||||
"coordination_status__stage_1_", # Coordiantion Status (Stage 1),
|
||||
"retrofit_design_status", # Retrofit Design Status
|
||||
],
|
||||
)
|
||||
|
||||
deal_info: dict[str, str] = cast(dict[str, str], deal.properties) # type: ignore[reportUnknownMemberType]
|
||||
return deal_info
|
||||
|
||||
def get_deal_info_for_db(
|
||||
self, deal_id: str
|
||||
) -> tuple[dict[str, str], Optional[str], Optional[dict[str, str]]]:
|
||||
|
||||
deal: dict[str, str] = self.from_deal_id_get_info(deal_id)
|
||||
company: Optional[str] = self.from_deal_id_get_associated_company_id(deal_id)
|
||||
|
||||
if company:
|
||||
company_data: CompanyData = self.get_company_information(company)
|
||||
dbloader: HubspotDataToDb = HubspotDataToDb()
|
||||
dbloader.upsert_company(company_data)
|
||||
|
||||
listing: Optional[dict[str, str]] = self.from_deal_id_get_associated_listing(
|
||||
deal_id
|
||||
)
|
||||
|
||||
return deal, company, listing
|
||||
|
||||
def get_company_information(self, company_id: str) -> CompanyData:
|
||||
companies_api: CompaniesBasicApi = self.client.crm.companies.basic_api # type: ignore[reportUnknownMemberType]
|
||||
|
||||
company: HubspotObject = companies_api.get_by_id( # type: ignore[reportUnknownMemberType]
|
||||
company_id,
|
||||
properties=[
|
||||
"name",
|
||||
],
|
||||
)
|
||||
|
||||
company_info: CompanyData = company.properties # type: ignore[reportUnknownMemberType]
|
||||
return company_info
|
||||
|
||||
def get_all_pipelines(self) -> list[dict[str, str]]:
|
||||
"""
|
||||
Retrieve all pipelines for deals, returning a list of dicts with pipeline names and IDs.
|
||||
"""
|
||||
try:
|
||||
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
|
||||
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
|
||||
|
||||
results: list[HubspotPipeline] = cast(list[HubspotPipeline], response.results) # type: ignore[reportUnknownMemberType]
|
||||
pipelines: list[dict[str, str]] = []
|
||||
for pipeline in results:
|
||||
pipeline: HubspotPipeline
|
||||
pipelines.append(
|
||||
{
|
||||
"name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
|
||||
"id": cast(str, pipeline.id), # type: ignore[reportUnknownMemberType]
|
||||
}
|
||||
)
|
||||
|
||||
self.logger.info(f"Retrieved {len(pipelines)} pipelines.")
|
||||
return pipelines
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error retrieving pipelines: {e}")
|
||||
return []
|
||||
|
||||
def get_deal_stages_from_pipeline_id(
|
||||
self, pipeline_id: Optional[str] = None
|
||||
) -> list[dict[str, str]]:
|
||||
"""
|
||||
Retrieve all deal stages for a given pipeline.
|
||||
If no pipeline_id is provided, retrieves all stages for all pipelines.
|
||||
Returns a list of dicts with pipeline name, stage name, and stage ID.
|
||||
"""
|
||||
try:
|
||||
pipelines_api: PipelinesApi = self.client.crm.pipelines.pipelines_api # type: ignore[reportUnknownMemberType]
|
||||
response: PipelinesResponse = pipelines_api.get_all(object_type="deals") # type: ignore[reportUnknownMemberType]
|
||||
|
||||
all_stages: list[dict[str, str]] = []
|
||||
|
||||
for pipeline in cast(list[HubspotPipeline], response.results): # type: ignore[reportUnknownMemberType]
|
||||
pipeline: HubspotPipeline
|
||||
# Skip other pipelines if a specific one is requested
|
||||
pipeline_id_str: str = cast(str, pipeline.id) # type: ignore[reportUnknownMemberType]
|
||||
if pipeline_id and pipeline_id_str != str(pipeline_id):
|
||||
continue
|
||||
|
||||
for stage in cast(list[HubspotPipelineStage], pipeline.stages): # type: ignore[reportUnknownMemberType]
|
||||
stage: HubspotPipelineStage
|
||||
all_stages.append(
|
||||
{
|
||||
"pipeline_name": cast(str, pipeline.label), # type: ignore[reportUnknownMemberType]
|
||||
"pipeline_id": pipeline_id_str,
|
||||
"stage_name": cast(str, stage.label), # type: ignore[reportUnknownMemberType]
|
||||
"stage_id": cast(str, stage.id), # type: ignore[reportUnknownMemberType]
|
||||
}
|
||||
)
|
||||
|
||||
if not all_stages:
|
||||
self.logger.info(
|
||||
f"No deal stages found for pipeline {pipeline_id if pipeline_id else 'ALL'}"
|
||||
)
|
||||
else:
|
||||
self.logger.info(f"Retrieved {len(all_stages)} deal stages.")
|
||||
|
||||
return all_stages
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error retrieving deal stages: {e}")
|
||||
return []
|
||||
|
||||
def download_file_from_url(
|
||||
self, download_url: str, save_path: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
Download a file from a HubSpot file URL (public or private), keeping its original file type.
|
||||
"""
|
||||
|
||||
try:
|
||||
headers: dict[str, str] = {}
|
||||
if "hubspotusercontent" not in download_url:
|
||||
headers["Authorization"] = f"Bearer {self.access_token}"
|
||||
|
||||
self.logger.info(f"Downloading HubSpot file: {download_url}")
|
||||
response = requests.get(
|
||||
download_url, headers=headers, stream=True, allow_redirects=True
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
# Try to infer filename from Content-Disposition header
|
||||
content_disposition = response.headers.get("content-disposition")
|
||||
if content_disposition and "filename=" in content_disposition:
|
||||
filename = content_disposition.split("filename=")[1].strip('"')
|
||||
else:
|
||||
# fallback: extract from URL or content-type
|
||||
filename = (
|
||||
os.path.basename(download_url.split("?")[0]) or "hubspot_download"
|
||||
)
|
||||
if "." not in filename:
|
||||
content_type = response.headers.get("content-type")
|
||||
ext = (
|
||||
mimetypes.guess_extension(content_type.split(";")[0])
|
||||
if content_type
|
||||
else None
|
||||
)
|
||||
if ext:
|
||||
filename += ext
|
||||
|
||||
# Make sure save_path is valid
|
||||
if save_path is None:
|
||||
save_path = os.path.abspath(filename)
|
||||
elif os.path.isdir(save_path):
|
||||
save_path = os.path.join(save_path, filename)
|
||||
else:
|
||||
# if user passes a file path directly, leave it
|
||||
save_path = os.path.abspath(save_path)
|
||||
|
||||
with open(save_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
self.logger.info(f"File downloaded successfully → {save_path}")
|
||||
return save_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Failed to download file from HubSpot: {e}")
|
||||
raise
|
||||
|
||||
def create_line_item_from_product(self, product_id: str, quantity: int = 1) -> str:
|
||||
# Fetch product mapping
|
||||
products_api: ProductsBasicApi = self.client.crm.products.basic_api # type: ignore[reportUnknownMemberType]
|
||||
product: HubspotObject = products_api.get_by_id( # type: ignore[reportUnknownMemberType]
|
||||
product_id, properties=["name", "price", "hs_price"]
|
||||
)
|
||||
properties: dict[str, str] = cast(dict[str, str], product.properties) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
name: str = properties.get("name") or ""
|
||||
price: str = properties.get("price") or properties.get("hs_price") or "0"
|
||||
|
||||
# Build line item payload
|
||||
line_item_input = SimplePublicObjectInput(
|
||||
properties={
|
||||
"hs_product_id": product_id,
|
||||
"name": name,
|
||||
"quantity": str(quantity),
|
||||
"price": price,
|
||||
"amount": str(float(price) * quantity),
|
||||
"invoiced": "Outstanding",
|
||||
}
|
||||
)
|
||||
|
||||
# Create line item
|
||||
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
|
||||
line_item: HubspotObject = line_items_api.create(line_item_input) # type: ignore[reportUnknownMemberType]
|
||||
return cast(str, line_item.id) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
def associate_line_item_to_deal(self, line_item_id: str, deal_id: str) -> None:
|
||||
self.logger.info(f"Associating line item {line_item_id} → deal {deal_id}")
|
||||
|
||||
association_api: AssociationsBasicApi = self.client.crm.associations.v4.basic_api # type: ignore[reportUnknownMemberType]
|
||||
|
||||
association_api.create( # type: ignore[reportUnknownMemberType]
|
||||
"0-3", # to object type
|
||||
deal_id, # to object id
|
||||
"line_items", # from object type
|
||||
line_item_id, # from object id
|
||||
[
|
||||
AssociationSpec(
|
||||
association_category="HUBSPOT_DEFINED",
|
||||
association_type_id=19, # line_item → deal
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
def add_product_line_item_to_deal(
|
||||
self, deal_id: str, product_id: str, quantity: int = 1
|
||||
) -> str:
|
||||
# Step 1: Create the line item from product mapping
|
||||
line_item_id: str = self.create_line_item_from_product(product_id, quantity)
|
||||
|
||||
# Step 2: Associate the created line item to the deal
|
||||
self.associate_line_item_to_deal(line_item_id, deal_id)
|
||||
|
||||
return line_item_id
|
||||
|
||||
def delete_line_item(self, line_item_id: str) -> bool:
|
||||
"""
|
||||
Delete (archive) a line item in HubSpot by its ID.
|
||||
"""
|
||||
try:
|
||||
self.logger.info(f"Deleting line item {line_item_id}...")
|
||||
|
||||
line_items_api: LineItemsBasicApi = self.client.crm.line_items.basic_api # type: ignore[reportUnknownMemberType]
|
||||
line_items_api.archive(line_item_id) # type: ignore[reportUnknownMemberType]
|
||||
|
||||
self.logger.info(f"Line item {line_item_id} deleted successfully.")
|
||||
return True
|
||||
|
||||
except ApiException as e:
|
||||
self.logger.error(f"Failed to delete line item {line_item_id}: {e}")
|
||||
return False
|
||||
342
etl/hubspot/hubspotDataTodB.py
Normal file
342
etl/hubspot/hubspotDataTodB.py
Normal file
|
|
@ -0,0 +1,342 @@
|
|||
from backend.app.db.connection import db_read_session
|
||||
from backend.app.db.models.organisation import Organisation, HubspotDealData
|
||||
from sqlmodel import select
|
||||
from datetime import datetime, timezone
|
||||
from typing import TypedDict
|
||||
from etl.hubspot.s3_uploader import S3Uploader
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
|
||||
class CompanyData(TypedDict):
|
||||
hs_object_id: str
|
||||
name: str
|
||||
|
||||
|
||||
class HubspotDataToDb:
|
||||
def __init__(self):
|
||||
self.s3 = S3Uploader(
|
||||
aws_access_key=os.getenv("AWS_ACCESS_KEY"),
|
||||
aws_secret_key=os.getenv("AWS_SECRET_KEY"),
|
||||
region=os.getenv("AWS_REGION"),
|
||||
)
|
||||
|
||||
def read_org_table(self, limit: int = 10):
|
||||
with db_read_session() as session:
|
||||
records = session.exec(select(Organisation).limit(limit)).all()
|
||||
return records
|
||||
|
||||
def get_org_names(self, limit: int = 10) -> list[str]:
|
||||
"""Returns a list of organisation names."""
|
||||
records = self.read_org_table(limit)
|
||||
return [org.name for org in records if org.name]
|
||||
|
||||
def upsert_company(self, company_data: CompanyData) -> Organisation:
|
||||
"""Upserts a company record. Updates if hubspot_company_id exists, otherwise creates new."""
|
||||
with db_read_session() as session:
|
||||
hubspot_id = company_data.get("hs_object_id")
|
||||
company_name = company_data.get("name")
|
||||
|
||||
# Check if company already exists
|
||||
existing = session.exec(
|
||||
select(Organisation).where(
|
||||
Organisation.hubspot_company_id == hubspot_id
|
||||
)
|
||||
).first()
|
||||
|
||||
if existing:
|
||||
# Update existing record
|
||||
existing.name = company_name
|
||||
existing.updated_at = datetime.now(timezone.utc)
|
||||
session.add(existing)
|
||||
record = existing
|
||||
else:
|
||||
# Create new record
|
||||
record = Organisation(
|
||||
hubspot_company_id=hubspot_id,
|
||||
name=company_name,
|
||||
)
|
||||
session.add(record)
|
||||
|
||||
session.commit()
|
||||
return record
|
||||
|
||||
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
|
||||
print("⚠️ Deprecated — use the new interface instead.")
|
||||
return self.upsert_deal(deal_data, company, listing, hubspot_client)
|
||||
|
||||
def find_all_deals_with_company_id(self, company_id):
|
||||
"""Returns a list of deals for a given company_id."""
|
||||
with db_read_session() as session:
|
||||
return (
|
||||
session.query(HubspotDealData)
|
||||
.filter(HubspotDealData.company_id == company_id)
|
||||
.all()
|
||||
)
|
||||
|
||||
def find_deal_with_deal_id(self, deal_id):
|
||||
with db_read_session() as session:
|
||||
return (
|
||||
session.query(HubspotDealData)
|
||||
.filter(HubspotDealData.deal_id == deal_id)
|
||||
.one_or_none()
|
||||
)
|
||||
|
||||
def _sha256(self, file_path: str) -> str:
|
||||
"""Compute SHA-256 checksum of a file."""
|
||||
sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
sha256.update(chunk)
|
||||
return sha256.hexdigest()
|
||||
|
||||
def update_deal_with_checks(self, deal_in_db, hubspot_client) -> bool:
|
||||
"""
|
||||
Checks if a deal needs updating and syncs it with HubSpot.
|
||||
Also handles major_condition_issue_photos file upload to S3 with integrity check.
|
||||
"""
|
||||
|
||||
def soft_assert(condition, message="Assertion Failed"):
|
||||
if not condition:
|
||||
print(f"⚠️ Soft Assert Failed: {message}")
|
||||
return False
|
||||
return True
|
||||
|
||||
print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})")
|
||||
|
||||
hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db(
|
||||
deal_in_db.deal_id
|
||||
)
|
||||
|
||||
# Soft compare key fields
|
||||
checks = [
|
||||
soft_assert(
|
||||
deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch"
|
||||
),
|
||||
soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"),
|
||||
soft_assert(
|
||||
deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"),
|
||||
"landlord_property_id mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch"
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch"
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch"
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.project_code == hs_deal.get("project_code"),
|
||||
"project_code mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch"
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.outcome_notes == hs_deal.get("outcome_notes"),
|
||||
"outcome_notes mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.major_condition_issue_description
|
||||
== hs_deal.get("major_condition_issue_description"),
|
||||
"major condition description mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.major_condition_issue_photos
|
||||
== hs_deal.get("major_condition_issue_photos"),
|
||||
"major condition issue photos mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.coordination_status
|
||||
== hs_deal.get("coordination_status__stage_1_"),
|
||||
"coordination stage 1 status mismatch",
|
||||
),
|
||||
soft_assert(
|
||||
deal_in_db.design_status == hs_deal.get("retrofit_design_status"),
|
||||
"retrofit design mismatch",
|
||||
),
|
||||
]
|
||||
|
||||
# If discrepancies found, update from HubSpot
|
||||
if not all(checks):
|
||||
print(
|
||||
f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot."
|
||||
)
|
||||
self.upsert_deal(hs_deal, hs_company_id, hs_listing, hubspot_client)
|
||||
return False
|
||||
|
||||
# Handle photo upload if it exists but S3 URL is missing
|
||||
if (
|
||||
deal_in_db.major_condition_issue_photos
|
||||
and not deal_in_db.major_condition_issue_evidence_s3_url
|
||||
):
|
||||
print(
|
||||
f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..."
|
||||
)
|
||||
|
||||
photo_url = hs_deal.get("major_condition_issue_photos")
|
||||
if photo_url:
|
||||
try:
|
||||
# Download from HubSpot using fresh URL from hs_deal (not stale DB URL)
|
||||
local_file = hubspot_client.download_file_from_url(photo_url)
|
||||
|
||||
# Upload to S3
|
||||
bucket = "retrofit-data-dev"
|
||||
s3_url = self.s3.upload_file(
|
||||
local_file, bucket, prefix="hubspot/awaabs_law_evidence/"
|
||||
)
|
||||
|
||||
# Download again to verify integrity
|
||||
downloaded = self.s3.download_from_url(s3_url)
|
||||
if self._sha256(local_file) == self._sha256(downloaded):
|
||||
print("✅ SHA256 match verified — upload successful.")
|
||||
else:
|
||||
print("❌ SHA256 mismatch — integrity check failed.")
|
||||
raise ValueError("File integrity check failed after S3 upload.")
|
||||
|
||||
# Update DB record with S3 URL
|
||||
with db_read_session() as session:
|
||||
db_record = session.get(HubspotDealData, deal_in_db.id)
|
||||
db_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
session.add(db_record)
|
||||
session.commit()
|
||||
print(
|
||||
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
|
||||
)
|
||||
# Continue without the file — don't crash the entire update
|
||||
else:
|
||||
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
|
||||
|
||||
else:
|
||||
print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.")
|
||||
|
||||
return True
|
||||
|
||||
def upsert_deal(self, deal_data, company, listing, hubspot_client):
|
||||
"""
|
||||
Inserts or updates a deal record.
|
||||
Also uploads photos if present and adds S3 URL.
|
||||
"""
|
||||
with db_read_session() as session:
|
||||
deal_id = deal_data.get("hs_object_id")
|
||||
|
||||
statement = select(HubspotDealData).where(
|
||||
HubspotDealData.deal_id == deal_id
|
||||
)
|
||||
existing = session.exec(statement).first()
|
||||
|
||||
if existing:
|
||||
print(f"🔄 Updating existing deal (deal_id={deal_id})")
|
||||
|
||||
for attr, value in {
|
||||
"dealname": deal_data.get("dealname"),
|
||||
"dealstage": deal_data.get("dealstage"),
|
||||
"landlord_property_id": listing.get("owner_property_id"),
|
||||
"uprn": listing.get("national_uprn"),
|
||||
"outcome": deal_data.get("outcome"),
|
||||
"outcome_notes": deal_data.get("outcome_notes"),
|
||||
"project_code": deal_data.get("project_code"),
|
||||
"company_id": company,
|
||||
"major_condition_issue_description": deal_data.get(
|
||||
"major_condition_issue_description"
|
||||
),
|
||||
"major_condition_issue_photos": deal_data.get(
|
||||
"major_condition_issue_photos"
|
||||
),
|
||||
"major_condition_issue_description": deal_data.get(
|
||||
"major_condition_issue_description"
|
||||
),
|
||||
"major_condition_issue_photos": deal_data.get(
|
||||
"major_condition_issue_photos"
|
||||
),
|
||||
"coordination_status": deal_data.get(
|
||||
"coordination_status__stage_1_"
|
||||
),
|
||||
"design_status": deal_data.get("retrofit_design_status"),
|
||||
}.items():
|
||||
setattr(existing, attr, value or getattr(existing, attr))
|
||||
|
||||
# Upload if photo exists but S3 link missing
|
||||
if (
|
||||
existing.major_condition_issue_photos
|
||||
and not existing.major_condition_issue_evidence_s3_url
|
||||
):
|
||||
# Fetch fresh URL from HubSpot instead of using potentially expired stored URL
|
||||
fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id)
|
||||
photo_url = fresh_deal.get("major_condition_issue_photos")
|
||||
|
||||
if photo_url:
|
||||
try:
|
||||
local_file = hubspot_client.download_file_from_url(
|
||||
photo_url
|
||||
)
|
||||
s3_url = self.s3.upload_file(
|
||||
local_file,
|
||||
"retrofit-data-dev",
|
||||
prefix="hubspot/awaabs_law_evidence/",
|
||||
)
|
||||
existing.major_condition_issue_evidence_s3_url = s3_url
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
|
||||
)
|
||||
# Continue without the file — don't crash the update
|
||||
else:
|
||||
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
|
||||
|
||||
session.add(existing)
|
||||
session.commit()
|
||||
session.refresh(existing)
|
||||
return existing
|
||||
|
||||
else:
|
||||
print(f"🆕 Inserting new deal (deal_id={deal_id})")
|
||||
new_record = HubspotDealData(
|
||||
deal_id=deal_id,
|
||||
dealname=deal_data.get("dealname"),
|
||||
dealstage=deal_data.get("dealstage"),
|
||||
landlord_property_id=listing.get("owner_property_id"),
|
||||
uprn=listing.get("national_uprn"),
|
||||
outcome=deal_data.get("outcome"),
|
||||
outcome_notes=deal_data.get("outcome_notes"),
|
||||
project_code=deal_data.get("project_code"),
|
||||
company_id=company,
|
||||
major_condition_issue_description=deal_data.get(
|
||||
"major_condition_issue_description"
|
||||
),
|
||||
major_condition_issue_photos=deal_data.get(
|
||||
"major_condition_issue_photos"
|
||||
),
|
||||
coordination_status=deal_data.get("coordination_status__stage_1_"),
|
||||
design_status=deal_data.get("retrofit_design_status"),
|
||||
)
|
||||
|
||||
# Handle upload at insert time
|
||||
if new_record.major_condition_issue_photos:
|
||||
try:
|
||||
local_file = hubspot_client.download_file_from_url(
|
||||
new_record.major_condition_issue_photos
|
||||
)
|
||||
s3_url = self.s3.upload_file(
|
||||
local_file,
|
||||
"retrofit-data-dev",
|
||||
prefix="hubspot/awaabs_law_evidence/",
|
||||
)
|
||||
new_record.major_condition_issue_evidence_s3_url = s3_url
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
|
||||
)
|
||||
# Continue without the file — don't crash the insert
|
||||
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
session.refresh(new_record)
|
||||
return new_record
|
||||
1
etl/hubspot/requirements.txt
Normal file
1
etl/hubspot/requirements.txt
Normal file
|
|
@ -0,0 +1 @@
|
|||
hubspot-api-client
|
||||
116
etl/hubspot/s3_uploader.py
Normal file
116
etl/hubspot/s3_uploader.py
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
import os
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from urllib.parse import urlparse
|
||||
from datetime import datetime
|
||||
import requests
|
||||
|
||||
|
||||
class S3Uploader:
|
||||
"""
|
||||
Simple helper to upload local files to S3 and return their S3 HTTPS URI.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
aws_access_key: str,
|
||||
aws_secret_key: str,
|
||||
region: str = "eu-west-2",
|
||||
):
|
||||
self.aws_access_key = aws_access_key
|
||||
self.aws_secret_key = aws_secret_key
|
||||
self.region = region
|
||||
|
||||
self.s3 = boto3.client(
|
||||
"s3",
|
||||
aws_access_key_id=self.aws_access_key,
|
||||
aws_secret_access_key=self.aws_secret_key,
|
||||
region_name=self.region,
|
||||
)
|
||||
|
||||
def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str:
|
||||
"""
|
||||
Upload a local file to an S3 bucket and return its HTTPS URI.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the local file.
|
||||
bucket (str): S3 bucket name.
|
||||
prefix (str): Folder/prefix in the bucket.
|
||||
|
||||
Returns:
|
||||
str: HTTPS-style S3 URI (not signed).
|
||||
"""
|
||||
try:
|
||||
filename = os.path.basename(file_path)
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
s3_key = os.path.join(prefix, f"{timestamp}_{filename}")
|
||||
|
||||
self.s3.upload_file(file_path, bucket, s3_key)
|
||||
|
||||
s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}"
|
||||
return s3_uri
|
||||
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"❌ S3 upload failed: {e}")
|
||||
|
||||
def print_bucket(self):
|
||||
print(self.s3.head_bucket(Bucket="retrofit-data-dev"))
|
||||
|
||||
def generate_presigned_url(
|
||||
self, bucket: str, key: str, expires_in: int = 3600
|
||||
) -> str:
|
||||
"""
|
||||
Generate a temporary presigned URL for an S3 object.
|
||||
"""
|
||||
try:
|
||||
return self.s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": bucket, "Key": key},
|
||||
ExpiresIn=expires_in,
|
||||
)
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"❌ Failed to generate signed URL: {e}")
|
||||
|
||||
def download_from_url(
|
||||
self, s3_url: str, local_dir: str = ".", expires_in: int = 3600
|
||||
) -> str:
|
||||
"""
|
||||
Download a file from a public or private S3 URL.
|
||||
If private, generates a presigned URL first.
|
||||
|
||||
Args:
|
||||
s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt)
|
||||
local_dir (str): Folder to save the file in.
|
||||
expires_in (int): Presigned URL lifetime (seconds).
|
||||
|
||||
Returns:
|
||||
str: Local file path of the downloaded file.
|
||||
"""
|
||||
parsed = urlparse(s3_url)
|
||||
host_parts = parsed.netloc.split(".")
|
||||
if len(host_parts) < 3 or host_parts[1] != "s3":
|
||||
raise ValueError("❌ Not a valid S3 HTTPS URL")
|
||||
|
||||
bucket = host_parts[0]
|
||||
key = parsed.path.lstrip("/")
|
||||
|
||||
# Generate presigned URL (whether public or private)
|
||||
presigned_url = self.generate_presigned_url(bucket, key, expires_in)
|
||||
|
||||
filename = os.path.basename(key)
|
||||
local_path = os.path.join(local_dir, filename)
|
||||
|
||||
try:
|
||||
response = requests.get(presigned_url, stream=True)
|
||||
response.raise_for_status()
|
||||
|
||||
os.makedirs(local_dir, exist_ok=True)
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
print(f"✅ Downloaded: {local_path}")
|
||||
return local_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise RuntimeError(f"❌ Failed to download file: {e}")
|
||||
30
etl/hubspot/scripts/onboarding/new_organisation.py
Normal file
30
etl/hubspot/scripts/onboarding/new_organisation.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
"""
|
||||
README.md
|
||||
|
||||
This is a simple script to showcase how a new organisation can be
|
||||
added to AraDb.
|
||||
|
||||
This has been made reduntant due to doing this process when ever
|
||||
hubspot has a webhook
|
||||
"""
|
||||
|
||||
from etl.hubspot.hubspotClient import HubspotClient, Companies
|
||||
|
||||
from etl.hubspot.hubspotDataTodB import HubspotDataToDb, CompanyData
|
||||
|
||||
hubspot = HubspotClient()
|
||||
dbRead = HubspotDataToDb()
|
||||
companies_to_add_or_ensure_it_exists = [
|
||||
Companies.THE_GUINESS_PARTNERSHIP,
|
||||
Companies.SOUTHERN_HOUSING_GROUP,
|
||||
Companies.CALICO_HOMES,
|
||||
]
|
||||
|
||||
for company in companies_to_add_or_ensure_it_exists:
|
||||
company_info: CompanyData = hubspot.get_company_information(company.value)
|
||||
dbRead.upsert_company(company_info)
|
||||
|
||||
|
||||
dbRead = HubspotDataToDb()
|
||||
names = dbRead.get_org_names()
|
||||
print(f"Organisations in database: {names}")
|
||||
15
etl/hubspot/scripts/scraper/README.md
Normal file
15
etl/hubspot/scripts/scraper/README.md
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
Input:
|
||||
|
||||
<Hubspot Deal ID>
|
||||
|
||||
|
||||
Function:
|
||||
|
||||
<Add hubspot deal/update to hubspot_deal_data>
|
||||
|
||||
|
||||
Used in:
|
||||
|
||||
when changes are made in hubspot, this will trigger a workflow in make.
|
||||
|
||||
This in turn will trigger this sqs which I'm building from this directory
|
||||
0
etl/hubspot/scripts/scraper/__init__.py
Normal file
0
etl/hubspot/scripts/scraper/__init__.py
Normal file
28
etl/hubspot/scripts/scraper/handler/Dockerfile
Normal file
28
etl/hubspot/scripts/scraper/handler/Dockerfile
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
FROM public.ecr.aws/lambda/python:3.10
|
||||
# FROM python:3.11.10-bullseye
|
||||
|
||||
# Set working directory (Lambda task root)
|
||||
WORKDIR /var/task
|
||||
|
||||
# -----------------------------
|
||||
# Copy requirements FIRST (for Docker layer caching)
|
||||
# -----------------------------
|
||||
COPY etl/hubspot/scripts/scraper/handler/requirements.txt .
|
||||
|
||||
# Install dependencies into Lambda runtime
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
|
||||
# Copy necessary files for database and utility imports
|
||||
COPY backend/ backend/
|
||||
COPY utils/ utils/
|
||||
COPY datatypes/ datatypes/
|
||||
COPY etl/hubspot etl/hubspot
|
||||
|
||||
# Copy the handler
|
||||
COPY etl/hubspot/scripts/scraper/main.py .
|
||||
|
||||
# -----------------------------
|
||||
# Lambda handler
|
||||
# -----------------------------
|
||||
CMD ["main.handler"]
|
||||
12
etl/hubspot/scripts/scraper/handler/requirements.txt
Normal file
12
etl/hubspot/scripts/scraper/handler/requirements.txt
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
pandas==2.2.2
|
||||
numpy<2.0
|
||||
requests
|
||||
tqdm
|
||||
openpyxl
|
||||
epc-api-python==1.0.2
|
||||
boto3==1.35.44
|
||||
sqlmodel
|
||||
sqlalchemy==2.0.36
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic-settings==2.6.0
|
||||
hubspot-api-client
|
||||
11
etl/hubspot/scripts/scraper/local_handler/docker-compose.yml
Normal file
11
etl/hubspot/scripts/scraper/local_handler/docker-compose.yml
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
version: "3.9"
|
||||
|
||||
services:
|
||||
hubspot-scraper:
|
||||
build:
|
||||
context: ../../../../../
|
||||
dockerfile: etl/hubspot/scripts/scraper/handler/Dockerfile
|
||||
ports:
|
||||
- "9000:8080"
|
||||
env_file:
|
||||
- ../../../../../.env
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
import requests
|
||||
|
||||
HOST = "localhost"
|
||||
PORT = "9000"
|
||||
|
||||
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
|
||||
|
||||
payload = {
|
||||
"Records": [
|
||||
{
|
||||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
||||
"hubspot_deal_id": "254427203793",
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
response = requests.post(LAMBDA_URL, json=payload)
|
||||
|
||||
print("Status code:", response.status_code)
|
||||
print("Response:")
|
||||
print(response.text)
|
||||
2
etl/hubspot/scripts/scraper/local_handler/run_local.sh
Normal file
2
etl/hubspot/scripts/scraper/local_handler/run_local.sh
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
docker compose build --no-cache
|
||||
docker compose up --force-recreate
|
||||
40
etl/hubspot/scripts/scraper/main.py
Normal file
40
etl/hubspot/scripts/scraper/main.py
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
"""
|
||||
1) [completed]Get hubspot deal properties from one deal
|
||||
2) Put it in some class
|
||||
3) [completed] Load the db and check if upsert it into the table
|
||||
4) [completed]Getting working on a AWS lambda
|
||||
5) [completed] subtask and tasks history
|
||||
6) [TODO]The new sexy deal properties, move it over
|
||||
"""
|
||||
|
||||
from etl.hubspot.hubspotClient import HubspotClient
|
||||
from etl.hubspot.hubspotDataTodB import HubspotDataToDb
|
||||
from typing import Any
|
||||
|
||||
|
||||
# @subtask_handler() TODO: Do this without subtask_handler but task_handler() that creates task_id and subtask_id
|
||||
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
|
||||
if local is True:
|
||||
body = {
|
||||
"hubspot_deal_id": "254427203793",
|
||||
}
|
||||
|
||||
hubspot_deal_id = body.get("hubspot_deal_id", "")
|
||||
|
||||
if hubspot_deal_id == "":
|
||||
raise RuntimeError(
|
||||
"Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'"
|
||||
)
|
||||
|
||||
hubspot: HubspotClient = HubspotClient()
|
||||
dbloader: HubspotDataToDb = HubspotDataToDb()
|
||||
|
||||
deal = dbloader.find_deal_with_deal_id(hubspot_deal_id)
|
||||
|
||||
if deal:
|
||||
dbloader.update_deal_with_checks(deal, hubspot)
|
||||
else:
|
||||
deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id)
|
||||
dbloader.upsert_deal(deal, company, listing, hubspot)
|
||||
|
||||
print("Finsihed running")
|
||||
0
etl/hubspot/tests/__init__.py
Normal file
0
etl/hubspot/tests/__init__.py
Normal file
117
etl/hubspot/tests/test_hubspot_client_integration.py
Normal file
117
etl/hubspot/tests/test_hubspot_client_integration.py
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline, DealStage
|
||||
|
||||
|
||||
class TestHubspotClientIntegration:
|
||||
"""Integration tests using real HubSpot API calls."""
|
||||
|
||||
@pytest.fixture
|
||||
def client(self):
|
||||
"""Initialize HubSpot client with env variables."""
|
||||
return HubspotClient()
|
||||
|
||||
def test_client_initialization(self, client: HubspotClient):
|
||||
"""Checks initialisation of HubspotClient and fails early if env variables is not set"""
|
||||
assert client.access_token is not None
|
||||
assert client.client is not None
|
||||
assert client.logger is not None
|
||||
|
||||
def test_get_deal_ids_from_company(self, client: HubspotClient):
|
||||
"""Test getting deal IDs from Apple company includes expected deal."""
|
||||
company_id: str = Companies.APPLE.value
|
||||
|
||||
deal_ids: list[str] = client.get_deal_ids_from_company(company_id)
|
||||
|
||||
# https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079
|
||||
assert "263490768079" in deal_ids
|
||||
|
||||
def test_get_company_id_from_deal_id(self, client: HubspotClient):
|
||||
deal_id: str = "263490768079"
|
||||
|
||||
company_id: Optional[str] = client.from_deal_id_get_associated_company_id(
|
||||
deal_id
|
||||
)
|
||||
# https://app-eu1.hubspot.com/contacts/145275138/record/0-3/263490768079
|
||||
assert company_id == Companies.APPLE.value
|
||||
|
||||
def test_from_deal_id_get_associated_listing(self, client: HubspotClient):
|
||||
deal_id: str = "263490768079"
|
||||
|
||||
listing_info: Optional[dict[str, str]] = (
|
||||
client.from_deal_id_get_associated_listing(deal_id)
|
||||
)
|
||||
|
||||
assert listing_info is not None
|
||||
assert "hs_object_id" in listing_info
|
||||
assert "national_uprn" in listing_info
|
||||
assert "owner_property_id" in listing_info
|
||||
assert "domna_property_id" in listing_info
|
||||
|
||||
def test_from_deal_id_get_info(self, client: HubspotClient):
|
||||
deal_id: str = "263490768079"
|
||||
|
||||
deal_info: dict[str, str] = client.from_deal_id_get_info(deal_id)
|
||||
|
||||
assert "dealname" in deal_info
|
||||
assert "dealstage" in deal_info
|
||||
assert "pipeline" in deal_info
|
||||
assert "outcome" in deal_info # outcome
|
||||
assert "outcome_notes" in deal_info # outcome notes
|
||||
assert "project_code" in deal_info
|
||||
assert "major_condition_issue_description" in deal_info
|
||||
assert "major_condition_issue_photos" in deal_info
|
||||
assert (
|
||||
"coordination_status__stage_1_" in deal_info
|
||||
) # Coordiantion Status (Stage 1)
|
||||
assert "retrofit_design_status" in deal_info # Retrofit Design Status
|
||||
|
||||
def test_get_deal_info_for_db(self, client: HubspotClient):
|
||||
deal_id: str = "263490768079"
|
||||
|
||||
deal, company, listing = client.get_deal_info_for_db(deal_id)
|
||||
|
||||
assert "dealname" in deal
|
||||
assert "dealstage" in deal
|
||||
assert "pipeline" in deal
|
||||
|
||||
assert company == Companies.APPLE.value
|
||||
|
||||
assert listing is None or "hs_object_id" in listing
|
||||
|
||||
def test_get_company_information(self, client: HubspotClient):
|
||||
company_id: str = Companies.APPLE.value
|
||||
|
||||
company_info: dict[str, str] = client.get_company_information(company_id)
|
||||
|
||||
assert "name" in company_info
|
||||
assert company_info["name"].lower() == "Apple".lower()
|
||||
|
||||
def test_get_all_pipelines(self, client: HubspotClient):
|
||||
pipelines: list[dict[str, str]] = client.get_all_pipelines()
|
||||
|
||||
assert len(pipelines) > 0
|
||||
pipeline_ids: list[str] = [p["id"] for p in pipelines]
|
||||
assert Pipeline.OPERATIONS_SOCIAL_HOUSING.value in pipeline_ids
|
||||
|
||||
def test_get_deal_stages_from_pipeline_id(self, client: HubspotClient):
|
||||
stages: list[dict[str, str]] = client.get_deal_stages_from_pipeline_id(
|
||||
Pipeline.OPERATIONS_SOCIAL_HOUSING.value
|
||||
)
|
||||
|
||||
assert len(stages) > 0
|
||||
stage_ids: list[str] = [s["stage_id"] for s in stages]
|
||||
assert DealStage.SURVEYED_COMPLETE_NEEDS_SIGN_OFF.value in stage_ids
|
||||
|
||||
def test_download_file_from_url(
|
||||
self, client: HubspotClient, tmp_path: Optional[str]
|
||||
):
|
||||
deal_info: dict[str, str] = client.from_deal_id_get_info("254427203793")
|
||||
download_url: str = deal_info["major_condition_issue_photos"]
|
||||
|
||||
save_path: str = client.download_file_from_url(download_url, str(tmp_path))
|
||||
|
||||
assert os.path.exists(save_path)
|
||||
assert os.path.getsize(save_path) > 0
|
||||
|
|
@ -35,3 +35,4 @@ locals {
|
|||
output "resolved_image_uri" {
|
||||
value = local.image_uri
|
||||
}
|
||||
|
||||
|
|
|
|||
44
infrastructure/terraform/lambda/hubspot_deal_etl/main.tf
Normal file
44
infrastructure/terraform/lambda/hubspot_deal_etl/main.tf
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
data "terraform_remote_state" "shared" {
|
||||
backend = "s3"
|
||||
config = {
|
||||
bucket = "assessment-model-terraform-state"
|
||||
key = "env:/${var.stage}/terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
data "aws_secretsmanager_secret_version" "db_credentials" {
|
||||
secret_id = "${var.stage}/assessment_model/db_credentials"
|
||||
}
|
||||
|
||||
locals {
|
||||
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
|
||||
}
|
||||
|
||||
|
||||
module "hubspot_deal_etl" {
|
||||
source = "../../modules/lambda_with_sqs"
|
||||
|
||||
name = "hubspot_deal_etl"
|
||||
stage = var.stage
|
||||
|
||||
image_uri = local.image_uri
|
||||
|
||||
# Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000)
|
||||
maximum_concurrency = var.maximum_concurrency
|
||||
|
||||
batch_size = var.batch_size
|
||||
|
||||
environment = {
|
||||
STAGE = var.stage
|
||||
LOG_LEVEL = "info"
|
||||
DB_USERNAME = local.db_credentials.db_assessment_model_username
|
||||
DB_PASSWORD = local.db_credentials.db_assessment_model_password
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_iam_role_policy_attachment" "lambda_s3_policy" {
|
||||
role = module.lambda.role_name
|
||||
policy_arn = data.terraform_remote_state.shared.outputs.hubspot_etl_s3_read_and_write_arn
|
||||
}
|
||||
16
infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf
Normal file
16
infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
terraform {
|
||||
required_providers {
|
||||
aws = {
|
||||
source = "hashicorp/aws"
|
||||
version = ">= 5.0"
|
||||
}
|
||||
}
|
||||
|
||||
backend "s3" {
|
||||
bucket = "hubspot-etl-bucket-terraform-state"
|
||||
key = "terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
|
||||
required_version = ">= 1.2.0"
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
variable "lambda_name" {
|
||||
type = string
|
||||
description = "Logical name of the lambda (e.g. address2uprn)"
|
||||
}
|
||||
|
||||
variable "stage" {
|
||||
description = "Deployment stage (e.g. dev, prod)"
|
||||
type = string
|
||||
}
|
||||
variable "ecr_repo_url" {
|
||||
type = string
|
||||
description = "ECR repository URL (no tag, no digest)"
|
||||
}
|
||||
|
||||
variable "image_digest" {
|
||||
type = string
|
||||
description = "Image digest (sha256:...)"
|
||||
}
|
||||
|
||||
variable "maximum_concurrency" {
|
||||
type = number
|
||||
default = null
|
||||
description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit."
|
||||
}
|
||||
|
||||
variable "batch_size" {
|
||||
type = number
|
||||
default = 1
|
||||
}
|
||||
|
||||
locals {
|
||||
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
|
||||
}
|
||||
|
||||
output "resolved_image_uri" {
|
||||
value = local.image_uri
|
||||
}
|
||||
|
||||
|
||||
variable "db_host" {
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "db_name" {
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "db_port" {
|
||||
type = string
|
||||
}
|
||||
|
|
@ -628,6 +628,8 @@ output "cdn_certificate_state_bucket" {
|
|||
value = module.cdn_certificate_state_bucket.bucket_name
|
||||
}
|
||||
|
||||
|
||||
|
||||
################################################
|
||||
# CDN
|
||||
################################################
|
||||
|
|
@ -639,3 +641,35 @@ module "cdn_state_bucket" {
|
|||
output "cdn_state_bucket" {
|
||||
value = module.cdn_state_bucket.bucket_name
|
||||
}
|
||||
|
||||
|
||||
################################################
|
||||
# Hubspot ETL Lambda
|
||||
################################################
|
||||
module "hubspot_etl_bucket" {
|
||||
source = "../modules/tf_state_bucket"
|
||||
bucket_name = "hubspot-etl-bucket-terraform-state"
|
||||
|
||||
}
|
||||
|
||||
module "hubspot_etl_registry" {
|
||||
source = "../modules/container_registry"
|
||||
name = "hubspot-etl"
|
||||
stage = var.stage
|
||||
|
||||
}
|
||||
|
||||
# S3 policy for postcode splitter to read from retrofit data bucket
|
||||
module "hubspot_etl_s3_read_and_write" {
|
||||
source = "../modules/s3_iam_policy"
|
||||
|
||||
policy_name = "HubspotETLReadandWriteS3"
|
||||
policy_description = "Allow hubspot_etl_lambda Lambda to read and write from retrofit-data bucket"
|
||||
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
|
||||
actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"]
|
||||
resource_paths = ["/*"]
|
||||
}
|
||||
|
||||
output "hubspot_etl_s3_read_and_write_arn" {
|
||||
value = module.hubspot_etl_s3_read_and_write.policy_arn
|
||||
}
|
||||
|
|
@ -3,6 +3,6 @@ pythonpath = .
|
|||
log_cli = true
|
||||
log_cli_level = INFO
|
||||
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
|
||||
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests
|
||||
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests
|
||||
markers =
|
||||
integration: mark a test as an integration test
|
||||
|
|
|
|||
|
|
@ -1062,8 +1062,8 @@ class HeatingRecommender:
|
|||
**hot_water_simulation_config
|
||||
}
|
||||
# This upgrade will only take the heating system to average energy efficiency
|
||||
if self.property.epc_record.mainheat_energy_eff in ["Very Poor", "Poor"] and not self.dual_heating:
|
||||
heating_simulation_config["mainheat_energy_eff_ending"] = "Average"
|
||||
if self.property.epc_record.mainheat_energy_eff in ["Very Poor", "Poor", "Average"] and not self.dual_heating:
|
||||
heating_simulation_config["mainheat_energy_eff_ending"] = "Good"
|
||||
else:
|
||||
heating_simulation_config["mainheat_energy_eff_ending"] = self.property.epc_record.mainheat_energy_eff
|
||||
|
||||
|
|
|
|||
5
scripts/init_db.py
Normal file
5
scripts/init_db.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
from sqlmodel import SQLModel
|
||||
import backend.app.db.models.organisation # noqa: F401
|
||||
from backend.app.db.connection import db_engine
|
||||
|
||||
SQLModel.metadata.create_all(db_engine)
|
||||
|
|
@ -26,15 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
|
|||
from collections import defaultdict
|
||||
from sqlalchemy import func
|
||||
|
||||
# PORTFOLIO_ID = 206
|
||||
# SCENARIOS = [389]
|
||||
PORTFOLIO_ID = 581
|
||||
SCENARIOS = [1124]
|
||||
PORTFOLIO_ID = 639
|
||||
SCENARIOS = [1157]
|
||||
scenario_names = {
|
||||
1124: "EPC C - Solar Focused",
|
||||
1157: "EPC C - no EWI solid floor",
|
||||
}
|
||||
|
||||
project_name = "WCHG EPC D rated properties"
|
||||
project_name = "Instagroup Sample"
|
||||
|
||||
|
||||
def get_data(portfolio_id, scenario_ids):
|
||||
|
|
|
|||
|
|
@ -4,4 +4,6 @@ pytest-cov
|
|||
pytest-mock
|
||||
dotenv
|
||||
psycopg[binary]
|
||||
pytest-postgresql
|
||||
pytest-postgresql
|
||||
hubspot-api-client
|
||||
fuzzywuzzy
|
||||
Loading…
Add table
Reference in a new issue