Model/backend/ecmk_fetcher/upload.py
2026-04-08 07:42:00 +00:00

51 lines
1.4 KiB
Python

from datetime import datetime, timezone
import os
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
def upload_file_to_sharepoint(
client: DomnaSharepointClient,
file_path: str,
base_path: str,
subpath: str,
) -> None:
filename = os.path.basename(file_path)
full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment"
client.upload_file(
file_path=file_path,
sharepoint_path=full_path,
file_name=filename,
)
def upload_file_to_s3_and_update_db(
bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum
) -> None:
filename: str = os.path.basename(file_path)
key: str = f"documents/hubspot_listing_id/{hubspot_listing_id}/{filename}"
upload_file_to_s3(file_path, bucket, key)
uploaded_file = UploadedFile(
s3_file_bucket=bucket,
s3_file_key=key,
s3_upload_timestamp=datetime.now(timezone.utc),
hubspot_listing_id=hubspot_listing_id,
file_source=FileSourceEnum.ECMK.value,
file_type=file_type.value,
)
with db_session() as session:
# TODO: we should do multiple files at once to reduce db trips
session.add(uploaded_file)
session.commit()