mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
149 lines
5.6 KiB
Python
149 lines
5.6 KiB
Python
import os
|
|
from typing import Any, Dict, Optional
|
|
from io import BytesIO
|
|
|
|
from utils.logger import setup_logger
|
|
from utils.sharepoint.domna_sites import DomnaSites
|
|
from utils.sharepoint.sharepoint_client import SharePointClient
|
|
|
|
|
|
class DomnaSharepointClient:
|
|
"""
|
|
A simple scraper to get the contents of a sharepoint and validatate inputs so I can manually change
|
|
"""
|
|
|
|
def __init__(self, sharepoint_location: DomnaSites):
|
|
self.logger = setup_logger()
|
|
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
|
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
|
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
|
|
self.sharepoint_drive = sharepoint_location
|
|
|
|
assert (
|
|
self.sharepoint_client_id is not None
|
|
), "Please assign SHAREPOINT_CLIENT_ID env variable"
|
|
assert (
|
|
self.sharepoint_client_secret is not None
|
|
), "Please assign SHAREPOINT_CLIENT_SECRET env variable"
|
|
assert (
|
|
self.sharepoint_tenant_id is not None
|
|
), "Please assign SHAREPOINT_TENANT_ID env variable"
|
|
|
|
def get_folders_in_path(self, path: str) -> Dict[str, Any]:
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
return sharepoint_client.list_folder_contents(path, page_size=500)
|
|
|
|
def does_folder_exists_at(self, file_name: str, file_path: str):
|
|
folders: Dict[str, Any] = self.get_folders_in_path(file_path)
|
|
if "value" in folders:
|
|
for folder in folders["value"]:
|
|
if file_name.upper() in folder["name"].upper():
|
|
return True
|
|
return False
|
|
|
|
def create_dir(self, dir_name: str, at_path: str = "/") -> str:
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
folders: Dict[str, Any] = self.get_folders_in_path(at_path)
|
|
|
|
# Check if folder already exists (case-insensitive match)
|
|
if "value" in folders:
|
|
for folder in folders["value"]:
|
|
if "name" in folder and folder["name"].lower() == dir_name.lower():
|
|
self.logger.info(f"Folder already exists: {dir_name} at {at_path}")
|
|
return folder["webUrl"] # ✅ return existing folder
|
|
|
|
# Folder does NOT exist → create it
|
|
self.logger.info(f"Creating folder: {dir_name} at {at_path}")
|
|
created: Dict[str, Any] = sharepoint_client.create_folder(dir_name, at_path)
|
|
|
|
return created["webUrl"]
|
|
|
|
def makedir(self, dir_name: str, at_path: str = "/") -> str:
|
|
return self.create_dir(dir_name, at_path)
|
|
|
|
def upload_file(
|
|
self, file_path: str, sharepoint_path: str, file_name: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
def get_file_stream(file_path: str):
|
|
return open(file_path, "rb")
|
|
|
|
sharepoint_client.upload_file(
|
|
file_name, get_file_stream(file_path), sharepoint_path
|
|
)
|
|
|
|
def download_file(self, sharepoint_path: str, local_path: str) -> bool:
|
|
"""
|
|
Download a file from SharePoint to a local path.
|
|
|
|
Returns True if the file was downloaded, False if it does not exist yet.
|
|
Raises on any other error.
|
|
"""
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
try:
|
|
metadata: Dict[str, Any] = sharepoint_client.get_file_metadata(sharepoint_path)
|
|
except ValueError:
|
|
return False
|
|
|
|
download_url: Optional[str] = metadata.get("@microsoft.graph.downloadUrl")
|
|
if not download_url:
|
|
return False
|
|
|
|
content: BytesIO = SharePointClient.download_sharepoint_file(download_url)
|
|
|
|
parent_dir = os.path.dirname(local_path)
|
|
if parent_dir:
|
|
os.makedirs(parent_dir, exist_ok=True)
|
|
|
|
with open(local_path, "wb") as f:
|
|
f.write(content.getvalue())
|
|
|
|
self.logger.debug(f"Downloaded SharePoint file to: {local_path}")
|
|
return True
|
|
|
|
def rename_file(self, item_id: str, new_name: str) -> None:
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
sharepoint_client.rename_file(item_id, new_name)
|
|
|
|
def create_temp_file(self, content: BytesIO, path: str):
|
|
# Ensure the path is under /tmp/
|
|
new_path = os.path.join("/tmp/sharepoint", path)
|
|
|
|
# Ensure the parent directory exists
|
|
os.makedirs(os.path.dirname(new_path), exist_ok=True)
|
|
|
|
# Write content to the specified file
|
|
with open(new_path, "wb+") as temp_file:
|
|
temp_file.write(content.getvalue())
|
|
|
|
self.logger.debug(f"Temporary file created at: {new_path}")
|
|
return new_path
|