mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
440 lines
17 KiB
Python
440 lines
17 KiB
Python
import os
|
||
from msal import ConfidentialClientApplication
|
||
from datetime import datetime, timedelta
|
||
import requests
|
||
from functools import wraps
|
||
import time
|
||
import logging
|
||
from io import BytesIO
|
||
import pandas as pd
|
||
|
||
# Configure logging
|
||
logger = logging.getLogger(__name__)
|
||
if not logger.handlers:
|
||
handler = logging.StreamHandler()
|
||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
handler.setFormatter(formatter)
|
||
logger.addHandler(handler)
|
||
logger.setLevel(logging.INFO)
|
||
|
||
|
||
def handle_error(response):
|
||
"""
|
||
Handle errors based on HTTP status codes and log detailed information.
|
||
"""
|
||
try:
|
||
error_json = response.json().get('error', {})
|
||
except ValueError:
|
||
error_json = {}
|
||
|
||
error_code = error_json.get('code', 'unknownError')
|
||
error_message = error_json.get('message', 'No detailed error message provided.')
|
||
inner_error = error_json.get('innererror', {})
|
||
details = error_json.get('details', [])
|
||
|
||
logger.error(f"Error Code: {error_code}")
|
||
logger.error(f"Error Message: {error_message}")
|
||
if inner_error:
|
||
logger.error(f"Inner Error: {inner_error}")
|
||
if details:
|
||
logger.error(f"Error Details: {details}")
|
||
|
||
if response.status_code == 401:
|
||
logger.error("Unauthorized. Token might be invalid.")
|
||
elif response.status_code == 403:
|
||
logger.error("Forbidden. Access denied to the requested resource.")
|
||
elif response.status_code == 404:
|
||
logger.error("Not Found. The requested resource doesn’t exist.")
|
||
elif response.status_code == 429:
|
||
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
|
||
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
|
||
time.sleep(retry_after)
|
||
return 'retry'
|
||
elif response.status_code in (500, 503):
|
||
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
|
||
logger.error(f"Server error. Retrying after {retry_after} seconds...")
|
||
time.sleep(retry_after)
|
||
return 'retry'
|
||
else:
|
||
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
|
||
|
||
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
|
||
|
||
|
||
def api_call_decorator(func):
|
||
"""
|
||
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
|
||
:param func: The function to be decorated.
|
||
:return: The wrapped function.
|
||
"""
|
||
|
||
@wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
try:
|
||
# Check and refresh the access token if needed
|
||
if self.is_access_token_expired():
|
||
self.retrieve_access_token()
|
||
logger.info("Access token refreshed.")
|
||
|
||
# Get the HTTP method, URL, and optionally data from the function
|
||
http_method, url, data = func(self, *args, **kwargs)
|
||
|
||
# Initialize the results list and handle pagination if page_size is provided
|
||
results = []
|
||
page_size = kwargs.get('page_size', None)
|
||
response_data = {}
|
||
n_calls = 0
|
||
|
||
while url:
|
||
logger.info("Making call for page: " + str(n_calls + 1))
|
||
n_calls += 1
|
||
response = requests.request(http_method, url, headers=self.headers, json=data)
|
||
|
||
# Handle the response
|
||
if response.status_code == 200:
|
||
response_json = response.json() # Store the response JSON
|
||
if page_size:
|
||
results.extend(response_json.get('value', []))
|
||
url = response_json.get('@odata.nextLink', None)
|
||
logger.info(f"Next page URL: {url}")
|
||
else:
|
||
response_data = response_json # Capture the full response for consistency
|
||
break
|
||
else:
|
||
retry = handle_error(response)
|
||
if retry == 'retry':
|
||
continue
|
||
|
||
if page_size:
|
||
response_data = {'value': results}
|
||
|
||
return response_data
|
||
|
||
except Exception as e:
|
||
logger.exception("An error occurred during the API call.")
|
||
raise e
|
||
|
||
return wrapper
|
||
|
||
|
||
class SharePointClient:
|
||
access_token = None
|
||
access_token_request_timestamp = None
|
||
access_token_expiry = None
|
||
headers = None
|
||
|
||
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
|
||
|
||
def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None,
|
||
access_token_expiration_details=None):
|
||
"""
|
||
Initializes the SharePointClient with necessary credentials and site information.
|
||
:param tenant_id: The tenant ID.
|
||
:param client_id: The client ID.
|
||
:param client_secret: The client secret.
|
||
:param site_id: The site ID.
|
||
:param access_token: The access token (optional)
|
||
:param access_token_expiration_details: The access token expiration details (optional)
|
||
"""
|
||
self.tenant_id = tenant_id
|
||
self.client_id = client_id
|
||
self.client_secret = client_secret
|
||
|
||
if access_token:
|
||
if not access_token_expiration_details:
|
||
raise ValueError("Access token expiration details must be provided.")
|
||
self.access_token = access_token
|
||
self.set_access_token_expiration_details(access_token_expiration_details)
|
||
self.headers = {
|
||
'Authorization': f"Bearer {self.access_token['access_token']}"
|
||
}
|
||
else:
|
||
self.retrieve_access_token()
|
||
|
||
# Retrieve static identifiers
|
||
self.site_id = site_id
|
||
self.document_drive = self.get_documents_drive()
|
||
|
||
def get_token_expiration_details(self):
|
||
"""
|
||
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
|
||
:return:
|
||
"""
|
||
return {
|
||
'access_token_request_timestamp': datetime.strftime(
|
||
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
|
||
),
|
||
'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT)
|
||
}
|
||
|
||
def set_access_token_expiration_details(self, access_token_expiration_details):
|
||
"""
|
||
Sets the access token expiration details from a serialized dictionary.
|
||
:param access_token_expiration_details: The serialized access token expiration details.
|
||
:return:
|
||
"""
|
||
self.access_token_request_timestamp = datetime.strptime(
|
||
access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT
|
||
)
|
||
self.access_token_expiry = datetime.strptime(
|
||
access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT
|
||
)
|
||
|
||
def is_access_token_expired(self):
|
||
"""
|
||
Checks if the access token has expired. If it has, a new access token is retrieved.
|
||
:return: True if expired, False otherwise.
|
||
"""
|
||
return datetime.now() >= self.access_token_expiry
|
||
|
||
def retrieve_access_token(self, refresh=False):
|
||
"""
|
||
Implements authentication using MSAL.
|
||
:param refresh: If True, force a refresh of the access token.
|
||
:return: None
|
||
"""
|
||
app = ConfidentialClientApplication(
|
||
self.client_id,
|
||
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
|
||
client_credential=self.client_secret
|
||
)
|
||
|
||
scope = ["https://graph.microsoft.com/.default"]
|
||
|
||
access_token_request_timestamp = datetime.now()
|
||
|
||
if refresh:
|
||
logger.info("Forcing refresh of access token.")
|
||
token = app.acquire_token_for_client(scopes=scope)
|
||
else:
|
||
# Check if a token is already cached
|
||
token = app.acquire_token_silent(scope, account=None)
|
||
|
||
if not token:
|
||
token = app.acquire_token_for_client(scopes=scope)
|
||
|
||
if "access_token" not in token:
|
||
logger.error("Authentication failed.")
|
||
raise ValueError("Authentication failed")
|
||
|
||
access_token_expiry = access_token_request_timestamp + timedelta(
|
||
seconds=token['expires_in'] - 20
|
||
)
|
||
|
||
self.access_token = token
|
||
self.access_token_request_timestamp = access_token_request_timestamp
|
||
self.access_token_expiry = access_token_expiry
|
||
self.headers = {
|
||
'Authorization': f"Bearer {self.access_token['access_token']}"
|
||
}
|
||
|
||
logger.info("Access token retrieved successfully.")
|
||
|
||
@api_call_decorator
|
||
def get_documents_drive(self):
|
||
"""
|
||
Get the document drive of the SharePoint site.
|
||
:return: Tuple containing HTTP method, URL, and None for data.
|
||
"""
|
||
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
|
||
logger.info(f"Getting document drive from URL: {url}")
|
||
return 'GET', url, None
|
||
|
||
@api_call_decorator
|
||
def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100):
|
||
"""
|
||
This function will list the contents of a folder in SharePoint.
|
||
:param drive_id: The ID of the drive.
|
||
:param folder_path: The path of the folder.
|
||
:param page_size: The number of items per page (default is 100).
|
||
:return: Tuple containing HTTP method, URL, and None for data.
|
||
"""
|
||
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}"
|
||
logger.info(f"Listing folder contents from URL: {url}")
|
||
return 'GET', url, None
|
||
|
||
@staticmethod
|
||
def download_sharepoint_file(download_url):
|
||
"""
|
||
Downloads a file from the given URL and returns its content.
|
||
|
||
:param download_url: The URL to download the file from.
|
||
:return: The content of the downloaded file.
|
||
"""
|
||
response = requests.get(download_url, stream=True)
|
||
response.raise_for_status() # Check if the request was successful
|
||
|
||
file_content = BytesIO()
|
||
|
||
# Read the file content into memory
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
file_content.write(chunk)
|
||
|
||
file_content.seek(0) # Reset the file pointer to the beginning
|
||
|
||
return file_content
|
||
|
||
def download_sharepoint_folder(self, drive_id, folder_path, download_dir, excluded_file_types=None):
|
||
"""
|
||
Downloads all files in a SharePoint folder to the specified local directory.
|
||
|
||
:param drive_id: The ID of the SharePoint drive.
|
||
:param folder_path: The path of the folder in SharePoint.
|
||
:param download_dir: The local directory to save the downloaded files.
|
||
:param excluded_file_types: A list of file types to exclude from download (default is None).
|
||
"""
|
||
|
||
excluded_file_types = [] if excluded_file_types is None else excluded_file_types
|
||
|
||
# Ensure the download directory exists
|
||
os.makedirs(download_dir, exist_ok=True)
|
||
|
||
# List folder contents
|
||
folder_contents = self.list_folder_contents(drive_id, folder_path)
|
||
files = folder_contents.get('value', [])
|
||
|
||
for item in files:
|
||
if item.get('folder'): # Check if it's a folder
|
||
# Recursively handle subfolders
|
||
subfolder_path = f"{folder_path}/{item['name']}"
|
||
subfolder_dir = os.path.join(download_dir, item['name'])
|
||
self.download_sharepoint_folder(drive_id, subfolder_path, subfolder_dir)
|
||
else:
|
||
# It's a file, download it
|
||
file_name = item['name']
|
||
if file_name.split(".")[-1] in excluded_file_types:
|
||
continue
|
||
download_url = item['@microsoft.graph.downloadUrl']
|
||
|
||
logger.info(f"Downloading file: {file_name}")
|
||
file_content = self.download_sharepoint_file(download_url)
|
||
|
||
# Save the file locally
|
||
file_path = os.path.join(download_dir, file_name)
|
||
with open(file_path, 'wb') as f:
|
||
f.write(file_content.read())
|
||
|
||
logger.info(f"File saved to: {file_path}")
|
||
|
||
|
||
def app():
|
||
# Customers for WC 18/11/2024
|
||
#
|
||
# ----- Eastlight location -----
|
||
# No data this week, low on data
|
||
# Housing Associations/Eastlight/Survey Outcomes/
|
||
#
|
||
# ----- Settle location -----
|
||
# No data this week, in separate files
|
||
# Housing Associations/Settle/Survey Outcomes/
|
||
#
|
||
# ----- Community Housing -----
|
||
# In separate files - will we get to a singular form?
|
||
# Housing Associations/Community Housing/Survey Outcomes/
|
||
#
|
||
# ----- ACIS location -----
|
||
# Doesn't have this week's data
|
||
# Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx
|
||
#
|
||
# ----- Southern location -----
|
||
#
|
||
#
|
||
# ------ Unitas location ------
|
||
# Does have this week's data
|
||
# Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx
|
||
|
||
locations = {
|
||
"Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx",
|
||
"Eastlight": "Housing Associations/Eastlight/Survey Outcomes/",
|
||
"Settle": "Housing Associations/Settle/Survey Outcomes/",
|
||
"Community Housing": "Housing Associations/Community Housing/Survey Outcomes/",
|
||
"ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx",
|
||
"Southern": None,
|
||
}
|
||
|
||
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
||
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
||
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None)
|
||
WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None)
|
||
|
||
sharepoint_client = SharePointClient(
|
||
tenant_id=SHAREPOINT_TENANT_ID,
|
||
client_id=SHAREPOINT_CLIENT_ID,
|
||
client_secret=SHAREPOINT_CLIENT_SECRET,
|
||
site_id=WARMFRONT_SHAREPOINT_SITE_ID
|
||
)
|
||
|
||
results = []
|
||
for customer, location in locations.items():
|
||
if location is None:
|
||
continue
|
||
|
||
if location.endswith(".xlsx"):
|
||
# Read in the file
|
||
# List the contents of the folder
|
||
location_folder = os.path.dirname(location)
|
||
contents = sharepoint_client.list_folder_contents(
|
||
drive_id=sharepoint_client.document_drive["id"],
|
||
folder_path=location_folder
|
||
)
|
||
filepaths = contents["value"]
|
||
|
||
download_url = next(
|
||
(file['@microsoft.graph.downloadUrl'] for file in filepaths
|
||
if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)),
|
||
None
|
||
)
|
||
|
||
if download_url is None:
|
||
raise ValueError("File not found in the SharePoint folder.")
|
||
|
||
file_content = sharepoint_client.download_sharepoint_file(download_url)
|
||
|
||
# Convert to pandas dataframe since file is an excel file
|
||
df = pd.read_excel(file_content)
|
||
df["Outcome"] = df["Outcome"].str.strip().str.lower()
|
||
|
||
# We cannot group by funding type accurately because any job that is not funded will have a NaN value
|
||
# and therefore we have a 100% acces rate for funded jobs and 0% otherwise
|
||
surveyor_outcomes = []
|
||
for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]):
|
||
funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)]
|
||
if funding_type:
|
||
funding_type = " + ".join(funding_type)
|
||
else:
|
||
funding_type = "No Funding"
|
||
surveyed = group[group["Outcome"] == "surveyed"]
|
||
no_answer = group[
|
||
group["Outcome"] == "no answer"
|
||
]
|
||
other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])]
|
||
|
||
surveyor_outcomes.append(
|
||
{
|
||
"Surveyor": surveyor,
|
||
"Week": week,
|
||
"Funding": funding_type,
|
||
"Surveyed": surveyed.shape[0],
|
||
"No Answer": no_answer.shape[0],
|
||
"Other Issue": other_issue.shape[0],
|
||
}
|
||
)
|
||
|
||
surveyor_outcomes = pd.DataFrame(surveyor_outcomes)
|
||
surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"])
|
||
|
||
weekly_access = (
|
||
surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index()
|
||
)
|
||
# Sort by week and surveyor ascending
|
||
surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True])
|
||
surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / (
|
||
surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"]
|
||
)
|
||
|
||
weekly_access["Total"] = (
|
||
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
|
||
)
|
||
weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / (
|
||
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
|
||
)
|