Merge pull request #913 from Hestia-Homes/feature/pashub-to-ara

Pas Hub to Ara: log in to pas hub and get token from local storage
This commit is contained in:
Daniel Roth 2026-03-25 14:27:21 +00:00 committed by GitHub
commit 8275be14c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 937 additions and 12 deletions

View file

@ -6,12 +6,14 @@ psycopg2-binary==2.9.10
python-jose==3.3.0
cryptography==43.0.3
mangum==0.19.0
playwright==1.58.0
# AWS
boto3==1.35.44
# Data
openpyxl==3.1.5
# Basic
pytz
msal
uvicorn[standard]
sqlmodel
# Testing

View file

@ -1,11 +0,0 @@
version: "3.9"
services:
categorisation-lambda:
build:
context: ../
dockerfile: backend/categorisation/handler/Dockerfile
ports:
- "9000:8080"
env_file:
- ../.env

View file

@ -0,0 +1,13 @@
from enum import Enum
class CoreFiles(Enum):
PHOTOPACK = "Photopack"
SITENOTE = "SiteNote"
RDSAP_SITENOTE = "RdSAP_SiteNote"
PAS2023_VENTILATION = "PAS 2023 Ventilation Assessment Report"
PAS2023_CONDITION = "PAS 2023 Condition Report"
PAS_SIGNIFICANCE = "PAS Significance"
PAR_PHOTOPACK = "PAR Photo Pack"
PAS2023_PROPERTY = "PAS 2023 Property Assessment Report"
PAS2023_OCCUPANCY = "PAS 2023 Occupancy Assessment Report"

View file

@ -0,0 +1,25 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional
@dataclass
class EvidenceFileData:
file_id: str
file_name: str
created_utc: str
file_size: int
file_extension: str
evidence_category: Optional[str] = None
@classmethod
def from_api(cls, data: Dict[str, Any]) -> EvidenceFileData:
return cls(
file_id=data["fileId"],
file_name=data["fileName"],
created_utc=data["createdUtc"],
file_size=data["fileSize"],
file_extension=data["fileExtension"],
evidence_category=data.get("evidenceCategory"),
)

View file

@ -0,0 +1,16 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class EvidenceMetadata:
container_name: str
blob_uri: str
@classmethod
def from_api(cls, data: Dict[str, Any]) -> EvidenceMetadata:
return cls(
container_name=data["containerName"],
blob_uri=data["blobUri"],
)

View file

@ -0,0 +1,23 @@
FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy
# Install AWS Lambda RIE
ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/local/bin/aws-lambda-rie
RUN chmod +x /usr/local/bin/aws-lambda-rie
# Set working directory (Lambda task root)
WORKDIR /var/task
COPY .env backend/.env
COPY utils/ utils/
COPY backend/pashub_fetcher/ backend/pashub_fetcher/
RUN pip install --no-cache-dir -r requirements.txt
# Lambda entrypoint
ENTRYPOINT ["/usr/local/bin/aws-lambda-rie", "python", "-m", "awslambdaric"]
# -----------------------------
# Lambda handler
# -----------------------------
CMD ["backend.pashub_fetcher.handler.test_handler.handler"]

View file

@ -0,0 +1,116 @@
import os
import re
from typing import Any, Dict, List, Mapping
from openpyxl import load_workbook
from backend.pashub_fetcher.job import Job
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
from utils.logger import setup_logger
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
def extract_jobs(filepath: str) -> List[Job]:
wb = load_workbook(filepath, data_only=True)
ws = wb["watford warm homes (wave 3) mai"]
HEADER_ROW = 3
headers: Dict[str, int] = {}
for col in range(1, ws.max_column + 1):
value = str(ws.cell(row=HEADER_ROW, column=col).value)
if value:
headers[value.strip()] = col
name_col = headers["Name"]
link_col = headers["Pashub Link"]
jobs: List[Job] = []
for row in range(HEADER_ROW + 1, ws.max_row + 1):
name = ws.cell(row=row, column=name_col).value
link = ws.cell(row=row, column=link_col).value
if not name or not link:
continue
link = str(link)
match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", link)
if not match:
continue
job_id = match.group(1)
jobs.append({"id": job_id, "address": str(name)})
return jobs
def handler(event: Mapping[str, Any], context: Any) -> None:
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
filepath = os.path.join(BASE_DIR, "Watford_Warm_Homes_Wave_3_RA Downloads .xlsx")
jobs: List[Job] = extract_jobs(filepath)
logger.info("Successfully loaded jobs from spreadsheet")
pas_hub_email = "random@test.com"
pas_hub_password = "my_fake_password"
try:
token: str = get_token_from_local_storage(pas_hub_email, pas_hub_password)
logger.info(f"Token extracted successfully")
except:
logger.error("Error getting auth token from Pas Hub")
raise
pashub_client = PashubClient(token=token)
sharepoint_client = DomnaSharepointClient(
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
)
sharepoint_client.makedir("Watford Test", "/JTK Test Folder")
saved_file_paths: List[str] = []
for job in jobs:
try:
job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(
job["id"]
)
# Upload files to sharepoint
sharepoint_client.makedir(job["address"], "/JTK Test Folder/Watford Test")
for file_path in job_files:
sharepoint_client.upload_file(
file_path,
f"/JTK Test Folder/Watford Test/{job['address']}",
file_path.split("/")[-1],
)
saved_file_paths.extend(job_files)
except UnauthorizedError:
logger.warning("Token expired - refreshing")
token = get_token_from_local_storage(pas_hub_email, pas_hub_password)
pashub_client = PashubClient(token=token)
# retry once
saved_file_paths.extend(
pashub_client.get_core_evidence_files_by_job_id(job["id"])
)
print(f"saved {len(saved_file_paths)} files")
if __name__ == "__main__":
event = {"Records": [{"body": "{}"}]}
handler(event, None)

View file

@ -0,0 +1,5 @@
awslambdaric
playwright==1.58.0
requests
msal
openpyxl

View file

@ -0,0 +1,7 @@
from typing import Any, Mapping
import json
def handler(event: Mapping[str, Any], context: Any) -> None:
print("Received event:")
print(json.dumps(event, indent=2))

View file

@ -0,0 +1,6 @@
from typing import TypedDict
class Job(TypedDict):
id: str
address: str

View file

@ -0,0 +1,11 @@
version: "3.9"
services:
pashub-fetcher-lambda:
build:
context: ../../../
dockerfile: backend/pashub_fetcher/handler/Dockerfile
ports:
- "9000:8080"
env_file:
- ../../../.env

View file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
import json
import requests
HOST = "localhost"
PORT = "9000"
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
payload = {
"Records": [
{
"body": json.dumps(
{
"uprn": 123456,
}
)
}
]
}
response = requests.post(LAMBDA_URL, json=payload)
print("Status code:", response.status_code)
print("Response:")
print(response.text)

View file

@ -0,0 +1,138 @@
from collections import defaultdict
import os
from typing import Dict, List, Optional
from datetime import datetime
import requests
from backend.pashub_fetcher.core_files import CoreFiles
from backend.pashub_fetcher.evidence_file_data import EvidenceFileData
from backend.pashub_fetcher.evidence_metadata import EvidenceMetadata
from utils.logger import setup_logger
logger = setup_logger()
class UnauthorizedError(Exception):
pass
class PashubClient:
def __init__(self, token: str):
self.token = token
self.company_id = "cb5249e2-8f31-4ef4-aefd-08ddaccb1fa2"
self.base = "https://pashub.net/api"
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/json",
}
)
logger.info("Finished initialising CotalityClient")
def get_core_evidence_files_by_job_id(self, job_id: str) -> List[str]:
logger.info(f"Getting Core Evidence Files for job ID {job_id}")
evidence_list: List[EvidenceFileData] = self._get_evidence_list(job_id)
logger.info(f"Found {len(evidence_list)} Evidence files to get")
if not evidence_list:
return []
saved_files: List[str] = []
core_files: Dict[CoreFiles, EvidenceFileData] = self._select_latest_core_files(
evidence_list
)
logger.info(f"Number of core files to download is {len(core_files)}")
for _, evidence in core_files.items():
evidence_id = evidence.file_id
if not evidence_id:
continue
logger.info(f"Getting metadata for file {evidence.file_name}")
metadata: EvidenceMetadata = self._get_evidence_metadata(
job_id, evidence_id
)
download_url: str = self._build_download_url(metadata, evidence.file_id)
output_dir: str = "/tmp"
file_name: str = evidence.file_name
file_path: str = os.path.join(output_dir, file_name)
self._download_file(download_url, file_path)
logger.info("Successfully downloaded file")
saved_files.append(file_path)
return saved_files
def _get_core_file_type(self, file: EvidenceFileData) -> Optional[CoreFiles]:
for core_file in CoreFiles:
if file.file_name.startswith(core_file.value):
return core_file
return None
def _select_latest_core_files(
self,
files: List[EvidenceFileData],
) -> Dict[CoreFiles, EvidenceFileData]:
grouped: Dict[CoreFiles, List[EvidenceFileData]] = defaultdict(list)
for file in files:
core_type = self._get_core_file_type(file)
if not core_type:
continue
grouped[core_type].append(file)
latest_files: Dict[CoreFiles, EvidenceFileData] = {}
for core_type, group in grouped.items():
latest = max(group, key=lambda f: datetime.fromisoformat(f.created_utc))
latest_files[core_type] = latest
return latest_files
def _get_evidence_list(self, job_id: str) -> List[EvidenceFileData]:
url = f"{self.base}/jobs/{job_id}/evidence"
r = self.session.get(url)
if r.status_code == 401:
raise UnauthorizedError("Token expired or invalid")
r.raise_for_status()
results = r.json().get("results", [])
return [EvidenceFileData.from_api(item) for item in results]
def _get_evidence_metadata(self, job_id: str, evidence_id: str) -> EvidenceMetadata:
url = f"{self.base}/jobs/{job_id}/evidenceMetadata"
params = {"evidenceIds": evidence_id}
r = self.session.get(url, params=params)
if r.status_code == 401:
raise UnauthorizedError()
r.raise_for_status()
return EvidenceMetadata.from_api(r.json())
def _build_download_url(self, metadata: EvidenceMetadata, file_id: str) -> str:
container = metadata.container_name
blob_uri = metadata.blob_uri
base, sas = blob_uri.split("?", 1)
return f"{base}{container}/{file_id}?{sas}"
def _download_file(self, url: str, file_path: str) -> None:
r = requests.get(url)
r.raise_for_status()
with open(file_path, "wb") as f:
f.write(r.content)

View file

@ -0,0 +1,56 @@
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
from utils.logger import setup_logger
logger = setup_logger()
def get_token_from_local_storage(email: str, password: str) -> str:
logger.info("Starting Playwright flow")
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
page = browser.new_page()
try:
logger.info("Navigating to site...")
page.goto("https://pashub.net/", timeout=30000)
logger.info("Filling login form...")
page.fill("#email", email)
page.fill("#password", password)
logger.info("Submitting login...")
page.wait_for_selector("#btn-login", state="visible", timeout=10000)
with page.expect_navigation(timeout=15000):
page.click("#btn-login")
page.wait_for_timeout(3000)
if "login" in page.url.lower():
raise Exception("Login failed (still on login page)")
logger.info(f"Login likely successful. URL: {page.url}")
token = page.evaluate(
"""() => {
return localStorage.getItem('token');
}"""
)
if not token:
raise Exception("Login succeeded but no token found")
return token
except PlaywrightTimeoutError as e:
raise Exception(f"Timeout during login flow: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
finally:
browser.close()

View file

@ -524,6 +524,15 @@ output "ordnance_s3_read_and_write_arn" {
value = module.ordnance_s3_read_and_write.policy_arn
}
################################################
# Pas Hub to Ara Lambda
################################################
module "pashub_to_ara_registry" {
source = "../modules/container_registry"
name = "pashub_to_ara"
stage = var.stage
}
################################################
# Engine Lambda ECR
################################################

View file

@ -1,7 +1,7 @@
import requests
import json
TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6Ik1EUTRNRU5GUTBVNU9FUXpOelk1TVRFME0wUkdOMFpFUkRoR1JVVkJNVGMxT1RFNFJERXlPQSJ9.eyJodHRwOi8vZW1haWwiOiJzZWJhc3RpYW5Ab3Ntb3Npcy1hY2QuY29tIiwiaHR0cDovL2NsdWsudG9rZW4vbGFzdFBhc3N3b3JkQ2hhbmdlIjoiMjAyNS0wOC0yNlQwOTo1NDoyNi4zMjZaIiwiaHR0cDovL2NsdWsudG9rZW4vY29ubmVjdGlvbiI6ImVUZWNoSUQiLCJodHRwOi8vY2x1ay50b2tlbi9zdHJhdGVneSI6ImF1dGgwIiwiaHR0cDovL2NsdWsudG9rZW4vc3RyYXRlZ3lUeXBlIjoiZGF0YWJhc2UiLCJpc3MiOiJodHRwczovL2V0ZWNoaWQuZXUuYXV0aDAuY29tLyIsInN1YiI6ImF1dGgwfDY4YWQ4NDUyZDI2YzI1ZmMyMzkwZmYxYSIsImF1ZCI6WyJodHRwczovL3Bhc2h1Yi5hcGkuZXRlY2gubmV0IiwiaHR0cHM6Ly9ldGVjaGlkLmV1LmF1dGgwLmNvbS91c2VyaW5mbyJdLCJpYXQiOjE3NzMyMzc4MjQsImV4cCI6MTc3MzI0NTAyNCwic2NvcGUiOiJvcGVuaWQiLCJhenAiOiJEaVp6d3VVaTVkVmozOXR3NG00bWZ6emZvRm5MdmVLZyJ9.mkkxeZiD_ByHY4TJKpLQ-trmeGs15s0ekL6u1n-ek9j-EzNyf6qalEHCyHf8gzdNhU_vay96bIOMRHp4vXFaLqSANwKZayIS3EoA_b9-u2FAZpooxEvReAMNJGoZ6WLD01AQXWv-l7ww1ZqAnQzw0moL_Oma6hVmA5oa-RJKJ3MerS7e0Wei97Db48E140-EAbQf2iPcKYYtCNRA4il6n8DFiqGeoUMGo99jkR1ceZAvMpOAj8RhKX-4qSiDfX6yXUS2G96U5m7S_GWI-DEj5TazkN10Af3TyOY3EVjmZoJcRpiAR4cFmlfcTydjrShU03DWmPZm1QItf2McxfCpNA"
TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6Ik1EUTRNRU5GUTBVNU9FUXpOelk1TVRFME0wUkdOMFpFUkRoR1JVVkJNVGMxT1RFNFJERXlPQSJ9.eyJodHRwOi8vZW1haWwiOiJzZWJhc3RpYW5Ab3Ntb3Npcy1hY2QuY29tIiwiaHR0cDovL2NsdWsudG9rZW4vbGFzdFBhc3N3b3JkQ2hhbmdlIjoiMjAyNS0wOC0yNlQwOTo1NDoyNi4zMjZaIiwiaHR0cDovL2NsdWsudG9rZW4vY29ubmVjdGlvbiI6ImVUZWNoSUQiLCJodHRwOi8vY2x1ay50b2tlbi9zdHJhdGVneSI6ImF1dGgwIiwiaHR0cDovL2NsdWsudG9rZW4vc3RyYXRlZ3lUeXBlIjoiZGF0YWJhc2UiLCJpc3MiOiJodHRwczovL2V0ZWNoaWQuZXUuYXV0aDAuY29tLyIsInN1YiI6ImF1dGgwfDY4YWQ4NDUyZDI2YzI1ZmMyMzkwZmYxYSIsImF1ZCI6WyJodHRwczovL3Bhc2h1Yi5hcGkuZXRlY2gubmV0IiwiaHR0cHM6Ly9ldGVjaGlkLmV1LmF1dGgwLmNvbS91c2VyaW5mbyJdLCJpYXQiOjE3NzQyODczOTMsImV4cCI6MTc3NDI5NDU5Mywic2NvcGUiOiJvcGVuaWQiLCJhenAiOiJEaVp6d3VVaTVkVmozOXR3NG00bWZ6emZvRm5MdmVLZyJ9.NHh21XfnRofsFkRkc-28Dz-vQAdY70lXkEmh-Mzz7Fg6gjDbZeMu7PnBwgbDP_U8r6R0mI_pDIUc1MzJe1Rf5SF2-RV36TcGzmVzb3ek9wPsy3lxST5WL-vn-qUJ7GsZiGOeQ-jDLLFn8b8tjFrD7BGv8uphrfYAbPDm0atznkdbUSQQy-rfRJWhisnDtHf99j96TuJz3dV4bfI6VGrin-jezbg6BCvUYWQtttUs7knQKEWO0sGGDxtS29sbn4MX8Jqz4-hf6N2XSlgv52aIDwTVX-lyMWzfoeuIGhvCKuDiJeVw2c0r2UZFpHqjnfhXcb0_aacukXe8z-srj8-Rdw"
base = "https://pashub.net/api"

View file

View file

@ -0,0 +1,105 @@
import os
from typing import Any, Dict, Optional
from io import BytesIO
from utils.logger import setup_logger
from utils.sharepoint.domna_sites import DomnaSites
from utils.sharepoint.sharepoint_client import SharePointClient
class DomnaSharepointClient:
"""
A simple scraper to get the contents of a sharepoint and validatate inputs so I can manually change
"""
def __init__(self, sharepoint_location: DomnaSites):
self.logger = setup_logger()
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
self.sharepoint_drive = sharepoint_location
assert (
self.sharepoint_client_id is not None
), "Please assign SHAREPOINT_CLIENT_ID env variable"
assert (
self.sharepoint_client_secret is not None
), "Please assign SHAREPOINT_CLIENT_SECRET env variable"
assert (
self.sharepoint_tenant_id is not None
), "Please assign SHAREPOINT_TENANT_ID env variable"
def get_folders_in_path(self, path: str) -> Dict[str, Any]:
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
return sharepoint_client.list_folder_contents(path)
def does_folder_exists_at(self, file_name: str, file_path: str):
folders: Dict[str, Any] = self.get_folders_in_path(file_path)
if "value" in folders:
for folder in folders["value"]:
if file_name.upper() in folder["name"].upper():
return True
return False
def create_dir(self, dir_name: str, at_path: str = "/") -> str:
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
folders: Dict[str, Any] = self.get_folders_in_path(at_path)
# Check if folder already exists (case-insensitive match)
if "value" in folders:
for folder in folders["value"]:
if "name" in folder and folder["name"].lower() == dir_name.lower():
self.logger.info(f"Folder already exists: {dir_name} at {at_path}")
return folder["webUrl"] # ✅ return existing folder
# Folder does NOT exist → create it
self.logger.info(f"Creating folder: {dir_name} at {at_path}")
created: Dict[str, Any] = sharepoint_client.create_folder(dir_name, at_path)
return created["webUrl"]
def makedir(self, dir_name: str, at_path: str = "/") -> str:
return self.create_dir(dir_name, at_path)
def upload_file(
self, file_path: str, sharepoint_path: str, file_name: str
) -> Optional[Dict[str, Any]]:
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
def get_file_stream(file_path: str):
return open(file_path, "rb")
sharepoint_client.upload_file(
file_name, get_file_stream(file_path), sharepoint_path
)
def create_temp_file(self, content: BytesIO, path: str):
# Ensure the path is under /tmp/
new_path = os.path.join("/tmp/sharepoint", path)
# Ensure the parent directory exists
os.makedirs(os.path.dirname(new_path), exist_ok=True)
# Write content to the specified file
with open(new_path, "wb+") as temp_file:
temp_file.write(content.getvalue())
self.logger.debug(f"Temporary file created at: {new_path}")
return new_path

View file

@ -0,0 +1,11 @@
from enum import Enum
import os
class DomnaSites(Enum):
# https//{tenant}.sharepoint.com/sites/{site}/_api/site/id
# TODO: Add these to github secrets!!!
DOMNA = os.getenv("DOMNA_SHAREPOINT_ID")
OSMOSIS_ACD = os.getenv("OSMOSIS_ACD_SHAREPOINT_ID")
PRIVATE_PAY = os.getenv("PRIVATE_PAY_SHAREPOINT_ID")
SOCIAL_HOUSING_WAVE_3 = os.getenv("SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID")

25
utils/sharepoint/main.py Normal file
View file

@ -0,0 +1,25 @@
# This is small script to see if Domna Sharepoint Client works
# for basic functionality
# Can we import it?
from io import BytesIO
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient, DomnaSites
# can we initliase it
client = DomnaSharepointClient(sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3)
# can we get an example of root path?
client.get_folders_in_path("/")
client.get_folders_in_path("/JTK Test Folder")
# can we make a folder appear in JTK Test Folder?
client.makedir("Dan is the best", "/JTK Test Folder")
content = BytesIO(b"Hello, this is some file content!")
path = client.create_temp_file(content, "some/place/over/the/rainbow")
client.upload_file(
path, "/JTK Test Folder/Dan is the best", "junte_is_the_worst_at_python.txt"
)

View file

@ -0,0 +1,342 @@
"""
This file contains the functions which enable interaction with SharePoint via the API.
Documentation to get api_id:
https://answers.microsoft.com/en-us/msoffice/forum/all/what-is-the-best-way-to-findout-the-share-point/7b2d4183-4188-4cd5-8441-dd93207c5a01
"""
from typing import Any, BinaryIO, Dict, Optional
from msal import ConfidentialClientApplication
from datetime import datetime, timedelta
import requests
from functools import wraps
import time
from io import BytesIO
# Api Documentation: https://learn.microsoft.com/en-us/graph/api/drive-get?view=graph-rest-1.0&tabs=http
def handle_error(response):
"""
Handle errors based on HTTP status codes and log detailed information.
"""
try:
error_json = response.json().get("error", {})
except ValueError:
error_json = {}
error_code = error_json.get("code", "unknownError")
error_message = error_json.get("message", "No detailed error message provided.")
inner_error = error_json.get("innererror", {})
details = error_json.get("details", [])
logger.error(f"Error Code: {error_code}")
logger.error(f"Error Message: {error_message}")
if inner_error:
logger.error(f"Inner Error: {inner_error}")
if details:
logger.error(f"Error Details: {details}")
if response.status_code == 401:
logger.error("Unauthorized. Token might be invalid.")
elif response.status_code == 403:
logger.error("Forbidden. Access denied to the requested resource.")
elif response.status_code == 404:
logger.error("Not Found. The requested resource doesnt exist.")
elif response.status_code == 429:
retry_after = int(
response.headers.get("Retry-After", 5)
) # Default to 5 seconds if not provided
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return "retry"
elif response.status_code in (500, 503):
retry_after = int(
response.headers.get("Retry-After", 5)
) # Default to 5 seconds if not provided
logger.error(f"Server error. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return "retry"
else:
raise ValueError(
f"API request failed with status code {response.status_code} - {error_message}"
)
raise ValueError(
f"API request failed with status code {response.status_code} - {error_message}"
)
def api_call_decorator(func):
"""
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
:param func: The function to be decorated.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
# Check and refresh the access token if needed
if self.is_access_token_expired():
self.retrieve_access_token()
logger.debug("Access token refreshed.")
# Get the HTTP method, URL, and optionally data from the function
http_method, url, data = func(self, *args, **kwargs)
# Initialize the results list and handle pagination if page_size is provided
results = []
page_size = kwargs.get("page_size", None)
response_data = {}
while url:
response = requests.request(
http_method, url, headers=self.headers, json=data
)
# Handle the response
if response.status_code == 200 or response.status_code == 201:
response_json = response.json() # Store the response JSON
if page_size:
results.extend(response_json.get("value", []))
url = response_json.get("@odata.nextLink", None)
else:
response_data = (
response_json # Capture the full response for consistency
)
break
else:
retry = handle_error(response)
if retry == "retry":
continue
if page_size:
response_data = {"value": results}
return response_data
except Exception as e:
logger.exception("An error occurred during the API call.")
raise e
return wrapper
class SharePointClient:
access_token = None
access_token_request_timestamp = None
access_token_expiry = None
headers = None
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(
self,
tenant_id,
client_id,
client_secret,
site_id,
access_token=None,
access_token_expiration_details=None,
):
"""
Initializes the SharePointClient with necessary credentials and site information.
:param tenant_id: The tenant ID.
:param client_id: The client ID.
:param client_secret: The client secret.
:param site_id: The site ID.
:param access_token: The access token (optional)
:param access_token_expiration_details: The access token expiration details (optional)
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
if access_token:
if not access_token_expiration_details:
raise ValueError("Access token expiration details must be provided.")
self.access_token = access_token
self.set_access_token_expiration_details(access_token_expiration_details)
self.headers = {
"Authorization": f"Bearer {self.access_token['access_token']}"
}
else:
self.retrieve_access_token()
# Retrieve static identifiers
self.site_id = site_id
self.document_drive = self.get_documents_drive()
self.document_drive_id = self.document_drive["id"]
def get_token_expiration_details(self):
"""
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
:return:
"""
return {
"access_token_request_timestamp": datetime.strftime(
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
),
"access_token_expiry": datetime.strftime(
self.access_token_expiry, self.TIMESTAMP_FORMAT
),
}
def set_access_token_expiration_details(self, access_token_expiration_details):
"""
Sets the access token expiration details from a serialized dictionary.
:param access_token_expiration_details: The serialized access token expiration details.
:return:
"""
self.access_token_request_timestamp = datetime.strptime(
access_token_expiration_details["access_token_request_timestamp"],
self.TIMESTAMP_FORMAT,
)
self.access_token_expiry = datetime.strptime(
access_token_expiration_details["access_token_expiry"],
self.TIMESTAMP_FORMAT,
)
def is_access_token_expired(self):
"""
Checks if the access token has expired. If it has, a new access token is retrieved.
:return: True if expired, False otherwise.
"""
return datetime.now() >= self.access_token_expiry
def retrieve_access_token(self, refresh=False):
"""
Implements authentication using MSAL.
:param refresh: If True, force a refresh of the access token.
:return: None
"""
app = ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret,
)
scope = ["https://graph.microsoft.com/.default"]
access_token_request_timestamp = datetime.now()
if refresh:
logger.debug("Forcing refresh of access token.")
token = app.acquire_token_for_client(scopes=scope)
else:
# Check if a token is already cached
token = app.acquire_token_silent(scope, account=None)
if not token:
token = app.acquire_token_for_client(scopes=scope)
if "access_token" not in token:
logger.error("Authentication failed.")
raise ValueError("Authentication failed")
access_token_expiry = access_token_request_timestamp + timedelta(
seconds=token["expires_in"] - 20
)
self.access_token = token
self.access_token_request_timestamp = access_token_request_timestamp
self.access_token_expiry = access_token_expiry
self.headers = {"Authorization": f"Bearer {self.access_token['access_token']}"}
# logger.debug("Access token retrieved successfully.")
@api_call_decorator
def get_documents_drive(self):
"""
Get the document drive of the SharePoint site.
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
# logger.debug(f"Getting document drive from URL: {url}")
return "GET", url, None
@api_call_decorator
def list_folder_contents(
self, folder_path: str, page_size: int = 100
) -> Dict[str, Any]:
"""
GET drive/root/children
This function will list the contents of a folder in SharePoint.
:param drive_id: The ID of the drive.
:param folder_path: The path of the folder.
:param page_size: The number of items per page (default is 100).
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/drives/{self.document_drive_id}/root:/{folder_path}:/children?$top={page_size}"
# logger.debug(f"Listing folder contents from URL: {url}")
return "GET", url, None
@api_call_decorator
def create_folder(self, file_name: str, folder_path: str) -> Dict[str, Any]:
"""
POST https://graph.microsoft.com/v1.0/me/drive/root/children
Content-Type: application/json
{
"name": "New Folder",
"folder": { },
"@microsoft.graph.conflictBehavior": "rename"
}
"""
data: Dict[str, Any] = {
"name": file_name,
"folder": {},
"@microsoft.graph.conflictBehavior": "rename",
}
url = f"https://graph.microsoft.com/v1.0/drives/{self.document_drive_id}/root:/{folder_path}:/children"
return "POST", url, data
def upload_file(
self, file_name: str, file_stream: BinaryIO, sharepoint_parent_id: str
) -> Optional[Dict[str, Any]]:
"""
Uploads a file to SharePoint using the Graph API.
PUT /drives/{drive-id}/root:/{path-to-file}:/content
:param file_name: Name of the file to upload
:param sharepoint_path: Path within the SharePoint site (folder path)
:param file_stream: File content as a binary stream (e.g., BytesIO or open(file, 'rb'))
:return: Response JSON from the API
"""
url = f"https://graph.microsoft.com/v1.0/drives/{self.document_drive_id}/root:/{sharepoint_parent_id}/{file_name}:/content"
# logger.debug(f"Uploading file to URL: {url}")
response = requests.put(url, headers=self.headers, data=file_stream)
if response.status_code in (200, 201):
# logger.info(f"File '{file_name}' uploaded successfully.")
return response.json()
else:
retry = handle_error(response)
if retry == "retry":
return self.upload_file(file_name, sharepoint_parent_id, file_stream)
@staticmethod
def download_sharepoint_file(download_url):
"""
Downloads a file from the given URL and returns its content.
:param download_url: The URL to download the file from.
:return: The content of the downloaded file.
"""
response = requests.get(download_url, stream=True)
response.raise_for_status() # Check if the request was successful
file_content = BytesIO()
# Read the file content into memory
for chunk in response.iter_content(chunk_size=8192):
file_content.write(chunk)
file_content.seek(0) # Reset the file pointer to the beginning
return file_content

0
utils/sharepoint/temp Normal file
View file