Model/utils/sharepoint/domna_sharepoint_client.py

172 lines
6.2 KiB
Python

from pprint import pformat
from enum import Enum
import os
from utils.logger import setup_logger
from utils.sharepoint.sharepoint_client import SharePointClient
from functools import wraps
import re
from datetime import datetime, timedelta
from io import BytesIO
class DomnaSites(Enum):
# https//{tenant}.sharepoint.com/sites/{site}/_api/site/id
# TODO: Add these to github secrets!!!
DOMNA = os.getenv("DOMNA_SHAREPOINT_ID")
OSMOSIS_ACD = os.getenv("OSMOSIS_ACD_SHAREPOINT_ID")
PRIVATE_PAY = os.getenv("PRIVATE_PAY_SHAREPOINT_ID")
SOCIAL_HOUSING_WAVE_3 = os.getenv("SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID")
class DomnaSharepointClient:
"""
A simple scraper to get the contents of a sharepoint and validatate inputs so I can manually change
"""
def __init__(self, sharepoint_location, development=False):
self.logger = setup_logger()
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
self.sharepoint_drive = sharepoint_location
assert (
self.sharepoint_client_id is not None
), "Please assign SHAREPOINT_CLIENT_ID env variable"
assert (
self.sharepoint_client_secret is not None
), "Please assign SHAREPOINT_CLIENT_SECRET env variable"
assert (
self.sharepoint_tenant_id is not None
), "Please assign SHAREPOINT_TENANT_ID env variable"
assert (
self.sharepoint_drive.value is not None
), "Please set sharepoint driver id env variable. See SharePointInstaller for more information"
def get_folders_in_path(self, path):
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
return sharepoint_client.list_folder_contents(path)
def get_file_content(self, url):
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
return sharepoint_client.download_sharepoint_file(url)
def does_folder_exists_at(self, file_name, file_path):
folders = self.get_folders_in_path(file_path)
if "value" in folders:
for folder in folders["value"]:
if file_name.upper() in folder["name"].upper():
return True
return False
def create_dir(self, file_name, at_path="/"):
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
folders = self.get_folders_in_path(at_path)
# Check if folder already exists (case-insensitive match)
if "value" in folders:
for folder in folders["value"]:
if "name" in folder and folder["name"].lower() == file_name.lower():
self.logger.info(f"Folder already exists: {file_name} at {at_path}")
return folder["webUrl"] # ✅ return existing folder
# Folder does NOT exist → create it
self.logger.info(f"Creating folder: {file_name} at {at_path}")
created = sharepoint_client.create_folder(file_name, at_path)
return created["webUrl"]
def makedir(self, dir_name, at_path="/"):
return self.create_dir(dir_name, at_path)
def upload_file(self, file_path, sharepoint_path, file_name):
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
def get_file_stream(file_path):
return open(file_path, "rb")
sharepoint_client.upload_file(
file_name, get_file_stream(file_path), sharepoint_path
)
def download_files_from_path(self, path, avoid=None):
"""
Download all non-media files from a list of root paths.
Args:
root_paths (List[str]): List of full folder paths to start from.
Returns:
List[Dict[str, List[str]]]: A list of dictionaries mapping address folder names to downloaded file paths.
"""
if avoid is None:
avoid = [
".jpg",
".mov",
".JPG",
".heic",
".HEIC",
".png",
".PNG",
".jpeg",
".JPEG",
".mp4",
".MP4",
]
files_info = self.get_folders_in_path(path)
if "value" not in files_info:
raise RuntimeError(f"Failed to get files from {path}")
file_names_to_download = {
file["name"]: file["@microsoft.graph.downloadUrl"]
for file in files_info["value"]
if "file" in file and not any(file["name"].endswith(ext) for ext in avoid)
}
downloaded_files = []
for file_name, url in file_names_to_download.items():
self.logger.info(f"Downloading {file_name} from {url}")
content = self.get_file_content(url)
file_path = self.create_temp_file(content, f"{path}/{file_name}")
downloaded_files.append(file_path)
return downloaded_files
def create_temp_file(self, content: BytesIO, path: str):
# Ensure the path is under /tmp/
new_path = os.path.join("/tmp/sharepoint", path)
# Ensure the parent directory exists
os.makedirs(os.path.dirname(new_path), exist_ok=True)
# Write content to the specified file
with open(new_path, "wb+") as temp_file:
temp_file.write(content.getvalue())
self.logger.debug(f"Temporary file created at: {new_path}")
return new_path