diff --git a/asset_list/app.py b/asset_list/app.py index 02c94f10..5794eaf3 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -75,22 +75,22 @@ def app(): data_folder = "/workspaces/model/asset_list" data_filename = "Calico ARA Upload Review.xlsx" - sheet_name = "Upload to Ara - Needs Sign Off" + sheet_name = "Sheet1" postcode_column = "Postcode" - address1_column = "Address 1" + address1_column = "Units" address1_method = None - fulladdress_column = "Address 1" - address_cols_to_concat = [] + fulladdress_column = "Units" + address_cols_to_concat = ["Units"] missing_postcodes_method = None landlord_year_built = None - landlord_os_uprn = "ara_found_uprn" - landlord_property_type = "Property Type" - landlord_built_form = "Property Type" + landlord_os_uprn = None + landlord_property_type = None # Good to include if landlord gave + landlord_built_form = None # Good to include if landlord gave landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None - landlord_property_id = "Asset Reference" + landlord_property_id = "llid" landlord_sap = None outcomes_filename = None outcomes_sheetname = None diff --git a/backend/app/db/models/organisation.py b/backend/app/db/models/organisation.py index 774a05af..a3c79e3c 100644 --- a/backend/app/db/models/organisation.py +++ b/backend/app/db/models/organisation.py @@ -1,6 +1,8 @@ -from sqlmodel import SQLModel, Field +from sqlmodel import SQLModel, Field, Column, text from datetime import datetime, timezone from typing import Optional +from sqlalchemy import DateTime +from sqlalchemy.sql import func import uuid @@ -11,3 +13,46 @@ class Organisation(SQLModel, table=True): updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) hubspot_company_id: Optional[str] = None name: Optional[str] = None + + +class HubspotDealData(SQLModel, table=True): + __tablename__ = "hubspot_deal_data" + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + + # HubSpot Deal identifiers + deal_id: str = Field(index=True, nullable=False) + dealname: Optional[str] = Field(default=None) + dealstage: Optional[str] = Field(default=None) + company_id: Optional[str] = Field(default=None) + project_code: Optional[str] = Field(default=None) + + # HubSpot custom properties + landlord_property_id: Optional[str] = Field(default=None) + uprn: Optional[str] = Field(default=None) + outcome: Optional[str] = Field(default=None) + outcome_notes: Optional[str] = Field(default=None) + + major_condition_issue_description: Optional[str] = Field(default=None) + major_condition_issue_photos: Optional[str] = Field(default=None) + major_condition_issue_evidence_s3_url: Optional[str] = Field(default=None) + + coordination_status: Optional[str] = Field(default=None) + design_status: Optional[str] = Field(default=None) + + created_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("NOW() AT TIME ZONE 'utc'"), + nullable=False, + ) + ) + + updated_at: datetime = Field( + sa_column=Column( + DateTime(timezone=True), + server_default=text("NOW() AT TIME ZONE 'utc'"), + onupdate=func.now(), + nullable=False, + ) + ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index 8fe61a3e..4ed579e9 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -1,8 +1,10 @@ from backend.app.db.connection import db_read_session -from backend.app.db.models.organisation import Organisation +from backend.app.db.models.organisation import Organisation, HubspotDealData from sqlmodel import select from datetime import datetime, timezone from typing import TypedDict +from etl.hubspot.s3_uploader import S3Uploader +import hashlib class CompanyData(TypedDict): @@ -12,7 +14,7 @@ class CompanyData(TypedDict): class HubspotDataToDb: def __init__(self): - pass + self.s3 = S3Uploader() def read_org_table(self, limit: int = 10): with db_read_session() as session: @@ -53,3 +55,287 @@ class HubspotDataToDb: session.commit() return record + + ### + # Check from here + ### + + def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client): + print("⚠️ Deprecated — use the new interface instead.") + return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client) + + def find_all_deals_with_company_id(self, company_id): + """Returns a list of deals for a given company_id.""" + with db_read_session() as session: + return ( + session.query(HubspotDealData) + .filter(HubspotDealData.company_id == company_id) + .all() + ) + + def find_deal_with_deal_id(self, deal_id): + with db_read_session() as session: + return ( + session.query(HubspotDealData) + .filter(HubspotDealData.deal_id == deal_id) + .one_or_none() + ) + + def _sha256(self, file_path: str) -> str: + """Compute SHA-256 checksum of a file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return sha256.hexdigest() + + def update_deal(self, deal_in_db, hubspot_client): + """ + Checks if a deal needs updating and syncs it with HubSpot. + Also handles major_condition_issue_photos file upload to S3 with integrity check. + """ + + def soft_assert(condition, message="Assertion Failed"): + if not condition: + print(f"⚠️ Soft Assert Failed: {message}") + return False + return True + + print(f"🔍 Checking if deal needs updating (deal_id={deal_in_db.deal_id})") + + hs_deal, hs_company_id, hs_listing = hubspot_client.get_deal_info_for_db( + deal_in_db.deal_id + ) + + # Soft compare key fields + checks = [ + soft_assert( + deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch" + ), + soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), + soft_assert( + deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), + "landlord_property_id mismatch", + ), + soft_assert( + deal_in_db.outcome == hs_deal.get("outcome"), "outcome mismatch" + ), + soft_assert( + deal_in_db.dealstage == hs_deal.get("dealstage"), "dealstage mismatch" + ), + soft_assert( + deal_in_db.dealname == hs_deal.get("dealname"), "dealname mismatch" + ), + soft_assert( + deal_in_db.project_code == hs_deal.get("project_code"), + "project_code mismatch", + ), + soft_assert( + deal_in_db.uprn == hs_listing.get("national_uprn"), "uprn mismatch" + ), + soft_assert( + deal_in_db.outcome_notes == hs_deal.get("outcome_notes"), + "outcome_notes mismatch", + ), + soft_assert( + deal_in_db.major_condition_issue_description + == hs_deal.get("major_condition_issue_description"), + "major condition description mismatch", + ), + soft_assert( + deal_in_db.major_condition_issue_photos + == hs_deal.get("major_condition_issue_photos"), + "major condition issue photos mismatch", + ), + soft_assert( + deal_in_db.coordination_status + == hs_deal.get("coordination_status__stage_1_"), + "coordination stage 1 status mismatch", + ), + soft_assert( + deal_in_db.design_status == hs_deal.get("retrofit_design_status"), + "retrofit design mismatch", + ), + ] + + # If discrepancies found, update from HubSpot + if not all(checks): + print( + f"❗ Discrepancies found for deal_id {deal_in_db.deal_id} — syncing with HubSpot." + ) + self.upsert_hubspot_deal(hs_deal, hs_company_id, hs_listing, hubspot_client) + return False + + # Handle photo upload if it exists but S3 URL is missing + if ( + deal_in_db.major_condition_issue_photos + and not deal_in_db.major_condition_issue_evidence_s3_url + ): + print( + f"🖼️ Found photo for deal_id {deal_in_db.deal_id} — uploading to S3..." + ) + + photo_url = hs_deal.get("major_condition_issue_photos") + if photo_url: + try: + # Download from HubSpot using fresh URL from hs_deal (not stale DB URL) + local_file = hubspot_client.download_file_from_url(photo_url) + + # Upload to S3 + bucket = "retrofit-data-dev" + s3_url = self.s3.upload_file( + local_file, bucket, prefix="hubspot/awaabs_law_evidence/" + ) + + # Download again to verify integrity + downloaded = self.s3.download_from_url(s3_url) + if self._sha256(local_file) == self._sha256(downloaded): + print("✅ SHA256 match verified — upload successful.") + else: + print("❌ SHA256 mismatch — integrity check failed.") + raise ValueError("File integrity check failed after S3 upload.") + + # Update DB record with S3 URL + with db_read_session() as session: + db_record = session.get(HubspotDealData, deal_in_db.id) + db_record.major_condition_issue_evidence_s3_url = s3_url + session.add(db_record) + session.commit() + print( + f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}" + ) + return False + except Exception as e: + print( + f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}" + ) + # Continue without the file — don't crash the entire update + else: + print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") + + else: + print(f"✅ No update or upload required for deal_id {deal_in_db.deal_id}.") + + return True + + def upsert_hubspot_deal(self, deal_data, company, listing, hubspot_client): + """ + Inserts or updates a deal record. + Also uploads photos if present and adds S3 URL. + """ + with db_read_session() as session: + deal_id = deal_data.get("hs_object_id") + + statement = select(HubspotDealData).where( + HubspotDealData.deal_id == deal_id + ) + existing = session.exec(statement).first() + + if existing: + print(f"🔄 Updating existing deal (deal_id={deal_id})") + + for attr, value in { + "dealname": deal_data.get("dealname"), + "dealstage": deal_data.get("dealstage"), + "landlord_property_id": listing.get("owner_property_id"), + "uprn": listing.get("national_uprn"), + "outcome": deal_data.get("outcome"), + "outcome_notes": deal_data.get("outcome_notes"), + "project_code": deal_data.get("project_code"), + "company_id": company, + "major_condition_issue_description": deal_data.get( + "major_condition_issue_description" + ), + "major_condition_issue_photos": deal_data.get( + "major_condition_issue_photos" + ), + "major_condition_issue_description": deal_data.get( + "major_condition_issue_description" + ), + "major_condition_issue_photos": deal_data.get( + "major_condition_issue_photos" + ), + "coordination_status": deal_data.get( + "coordination_status__stage_1_" + ), + "design_status": deal_data.get("retrofit_design_status"), + }.items(): + setattr(existing, attr, value or getattr(existing, attr)) + + # Upload if photo exists but S3 link missing + if ( + existing.major_condition_issue_photos + and not existing.major_condition_issue_evidence_s3_url + ): + # Fetch fresh URL from HubSpot instead of using potentially expired stored URL + fresh_deal = hubspot_client.from_deal_id_get_info(existing.deal_id) + photo_url = fresh_deal.get("major_condition_issue_photos") + + if photo_url: + try: + local_file = hubspot_client.download_file_from_url( + photo_url + ) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + existing.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print( + f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}" + ) + # Continue without the file — don't crash the update + else: + print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}") + + session.add(existing) + session.commit() + session.refresh(existing) + return existing + + else: + print(f"🆕 Inserting new deal (deal_id={deal_id})") + new_record = HubspotDealData( + deal_id=deal_id, + dealname=deal_data.get("dealname"), + dealstage=deal_data.get("dealstage"), + landlord_property_id=listing.get("owner_property_id"), + uprn=listing.get("national_uprn"), + outcome=deal_data.get("outcome"), + outcome_notes=deal_data.get("outcome_notes"), + project_code=deal_data.get("project_code"), + company_id=company, + major_condition_issue_description=deal_data.get( + "major_condition_issue_description" + ), + major_condition_issue_photos=deal_data.get( + "major_condition_issue_photos" + ), + coordination_status=deal_data.get("coordination_status__stage_1_"), + design_status=deal_data.get("retrofit_design_status"), + ) + + # Handle upload at insert time + if new_record.major_condition_issue_photos: + try: + local_file = hubspot_client.download_file_from_url( + new_record.major_condition_issue_photos + ) + s3_url = self.s3.upload_file( + local_file, + "retrofit-data-dev", + prefix="hubspot/awaabs_law_evidence/", + ) + new_record.major_condition_issue_evidence_s3_url = s3_url + except Exception as e: + print( + f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}" + ) + # Continue without the file — don't crash the insert + + session.add(new_record) + session.commit() + session.refresh(new_record) + return new_record diff --git a/etl/hubspot/requirements.txt b/etl/hubspot/requirements.txt index ef8e3ebc..44a58f77 100644 --- a/etl/hubspot/requirements.txt +++ b/etl/hubspot/requirements.txt @@ -1 +1 @@ -hubspot-api-client \ No newline at end of file +hubspot-api-client diff --git a/etl/hubspot/s3_uploader.py b/etl/hubspot/s3_uploader.py new file mode 100644 index 00000000..0d217bd2 --- /dev/null +++ b/etl/hubspot/s3_uploader.py @@ -0,0 +1,116 @@ +import os +import boto3 +from botocore.exceptions import ClientError +from urllib.parse import urlparse +from datetime import datetime +import requests + + +class S3Uploader: + """ + Simple helper to upload local files to S3 and return their S3 HTTPS URI. + """ + + def __init__( + self, + aws_access_key: str = "AKIAU5A36PPNK7RXX52V", + aws_secret_key: str = "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", + region: str = "eu-west-2", + ): + self.aws_access_key = aws_access_key + self.aws_secret_key = aws_secret_key + self.region = region + + self.s3 = boto3.client( + "s3", + aws_access_key_id=self.aws_access_key, + aws_secret_access_key=self.aws_secret_key, + region_name=self.region, + ) + + def upload_file(self, file_path: str, bucket: str, prefix: str = "uploads/") -> str: + """ + Upload a local file to an S3 bucket and return its HTTPS URI. + + Args: + file_path (str): Path to the local file. + bucket (str): S3 bucket name. + prefix (str): Folder/prefix in the bucket. + + Returns: + str: HTTPS-style S3 URI (not signed). + """ + try: + filename = os.path.basename(file_path) + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + s3_key = os.path.join(prefix, f"{timestamp}_{filename}") + + self.s3.upload_file(file_path, bucket, s3_key) + + s3_uri = f"https://{bucket}.s3.{self.region}.amazonaws.com/{s3_key}" + return s3_uri + + except ClientError as e: + raise RuntimeError(f"❌ S3 upload failed: {e}") + + def print_bucket(self): + print(self.s3.head_bucket(Bucket="retrofit-data-dev")) + + def generate_presigned_url( + self, bucket: str, key: str, expires_in: int = 3600 + ) -> str: + """ + Generate a temporary presigned URL for an S3 object. + """ + try: + return self.s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=expires_in, + ) + except ClientError as e: + raise RuntimeError(f"❌ Failed to generate signed URL: {e}") + + def download_from_url( + self, s3_url: str, local_dir: str = ".", expires_in: int = 3600 + ) -> str: + """ + Download a file from a public or private S3 URL. + If private, generates a presigned URL first. + + Args: + s3_url (str): Full S3 HTTPS URL (e.g., https://bucket.s3.region.amazonaws.com/path/file.txt) + local_dir (str): Folder to save the file in. + expires_in (int): Presigned URL lifetime (seconds). + + Returns: + str: Local file path of the downloaded file. + """ + parsed = urlparse(s3_url) + host_parts = parsed.netloc.split(".") + if len(host_parts) < 3 or host_parts[1] != "s3": + raise ValueError("❌ Not a valid S3 HTTPS URL") + + bucket = host_parts[0] + key = parsed.path.lstrip("/") + + # Generate presigned URL (whether public or private) + presigned_url = self.generate_presigned_url(bucket, key, expires_in) + + filename = os.path.basename(key) + local_path = os.path.join(local_dir, filename) + + try: + response = requests.get(presigned_url, stream=True) + response.raise_for_status() + + os.makedirs(local_dir, exist_ok=True) + with open(local_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + print(f"✅ Downloaded: {local_path}") + return local_path + + except requests.exceptions.RequestException as e: + raise RuntimeError(f"❌ Failed to download file: {e}") diff --git a/etl/hubspot/scripts/scraper/README.md b/etl/hubspot/scripts/scraper/README.md new file mode 100644 index 00000000..2d7fe975 --- /dev/null +++ b/etl/hubspot/scripts/scraper/README.md @@ -0,0 +1,15 @@ +Input: + + + + +Function: + + + + +Used in: + +when changes are made in hubspot, this will trigger a workflow in make. + +This in turn will trigger this sqs which I'm building from this directory \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/__init__.py b/etl/hubspot/scripts/scraper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/hubspot/scripts/scraper/handler/Dockerfile b/etl/hubspot/scripts/scraper/handler/Dockerfile new file mode 100644 index 00000000..bbcc3e22 --- /dev/null +++ b/etl/hubspot/scripts/scraper/handler/Dockerfile @@ -0,0 +1,38 @@ +FROM public.ecr.aws/lambda/python:3.10 +# FROM python:3.11.10-bullseye + + +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} + + +# Set working directory (Lambda task root) +WORKDIR /var/task + +# ----------------------------- +# Copy requirements FIRST (for Docker layer caching) +# ----------------------------- +COPY etl/hubspot/scripts/scraper/handler/requirements.txt . + +# Install dependencies into Lambda runtime +RUN pip install --no-cache-dir -r requirements.txt + + +# Copy necessary files for database and utility imports +COPY backend/ backend/ +COPY utils/ utils/ +COPY datatypes/ datatypes/ +COPY etl/hubspot etl/hubspot + +# Copy the handler +COPY etl/hubspot/scripts/scraper/main.py . + +# ----------------------------- +# Lambda handler +# ----------------------------- +CMD ["main.handler"] \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/handler/requirements.txt b/etl/hubspot/scripts/scraper/handler/requirements.txt new file mode 100644 index 00000000..230b460e --- /dev/null +++ b/etl/hubspot/scripts/scraper/handler/requirements.txt @@ -0,0 +1,12 @@ +pandas==2.2.2 +numpy<2.0 +requests +tqdm +openpyxl +epc-api-python==1.0.2 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 +hubspot-api-client \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml b/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml new file mode 100644 index 00000000..77679650 --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + hubspot-scraper: + build: + context: ../../../../../ + dockerfile: etl/hubspot/scripts/scraper/handler/Dockerfile + ports: + - "9000:8080" + env_file: + - ../../../../../.env \ No newline at end of file diff --git a/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py b/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py new file mode 100644 index 00000000..69580a93 --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/invoke_local_lambda.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +import json +import requests + +HOST = "localhost" +PORT = "9000" + +LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations" + +payload = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "hubspot_deal_id": "254427203793", + } + ) + } + ] +} + +response = requests.post(LAMBDA_URL, json=payload) + +print("Status code:", response.status_code) +print("Response:") +print(response.text) diff --git a/etl/hubspot/scripts/scraper/local_handler/run_local.sh b/etl/hubspot/scripts/scraper/local_handler/run_local.sh new file mode 100644 index 00000000..17474bdb --- /dev/null +++ b/etl/hubspot/scripts/scraper/local_handler/run_local.sh @@ -0,0 +1,2 @@ +docker compose build --no-cache +docker compose up --force-recreate diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py new file mode 100644 index 00000000..a51cd4a4 --- /dev/null +++ b/etl/hubspot/scripts/scraper/main.py @@ -0,0 +1,45 @@ +""" +TODO: + +1) [completed]Get hubspot deal properties from one deal +2) Put it in some class +3) [completed] Load the db and check if upsert it into the table +4) Getting working on a AWS lambda +5) [completed] subtask and tasks history +6) The new sexy deal properties, move it over +""" + +from backend.utils.subtasks import subtask_handler +from etl.hubspot.hubspotClient import HubspotClient +from etl.hubspot.hubspotDataTodB import HubspotDataToDb +from typing import Any + + +@subtask_handler() +def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: + if local is True: + body = { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "hubspot_deal_id": "254427203793", + } + + hubspot_deal_id = body.get("hubspot_deal_id", "") + + if hubspot_deal_id == "": + raise RuntimeError( + "Missing Hubspot Deal ID in SQS body request, 'hubspot_deal_id'" + ) + + hubspot = HubspotClient() + dbloader = HubspotDataToDb() + + deal = dbloader.find_deal_with_deal_id(hubspot_deal_id) + + if deal: + dbloader.update_deal(deal, hubspot) + else: + deal, company, listing = hubspot.get_deal_info_for_db(hubspot_deal_id) + dbloader.upsert_hubspot_deal(deal, company, listing, hubspot) + + print("Finsihed running")