Model/etl/access_reporting/app.py
2025-01-28 15:10:23 +00:00

440 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from msal import ConfidentialClientApplication
from datetime import datetime, timedelta
import requests
from functools import wraps
import time
import logging
from io import BytesIO
import pandas as pd
# Configure logging
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def handle_error(response):
"""
Handle errors based on HTTP status codes and log detailed information.
"""
try:
error_json = response.json().get('error', {})
except ValueError:
error_json = {}
error_code = error_json.get('code', 'unknownError')
error_message = error_json.get('message', 'No detailed error message provided.')
inner_error = error_json.get('innererror', {})
details = error_json.get('details', [])
logger.error(f"Error Code: {error_code}")
logger.error(f"Error Message: {error_message}")
if inner_error:
logger.error(f"Inner Error: {inner_error}")
if details:
logger.error(f"Error Details: {details}")
if response.status_code == 401:
logger.error("Unauthorized. Token might be invalid.")
elif response.status_code == 403:
logger.error("Forbidden. Access denied to the requested resource.")
elif response.status_code == 404:
logger.error("Not Found. The requested resource doesnt exist.")
elif response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
elif response.status_code in (500, 503):
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.error(f"Server error. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
else:
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
def api_call_decorator(func):
"""
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
:param func: The function to be decorated.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
# Check and refresh the access token if needed
if self.is_access_token_expired():
self.retrieve_access_token()
logger.info("Access token refreshed.")
# Get the HTTP method, URL, and optionally data from the function
http_method, url, data = func(self, *args, **kwargs)
# Initialize the results list and handle pagination if page_size is provided
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
n_calls = 0
while url:
logger.info("Making call for page: " + str(n_calls + 1))
n_calls += 1
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
if response.status_code == 200:
response_json = response.json() # Store the response JSON
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
logger.info(f"Next page URL: {url}")
else:
response_data = response_json # Capture the full response for consistency
break
else:
retry = handle_error(response)
if retry == 'retry':
continue
if page_size:
response_data = {'value': results}
return response_data
except Exception as e:
logger.exception("An error occurred during the API call.")
raise e
return wrapper
class SharePointClient:
access_token = None
access_token_request_timestamp = None
access_token_expiry = None
headers = None
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None,
access_token_expiration_details=None):
"""
Initializes the SharePointClient with necessary credentials and site information.
:param tenant_id: The tenant ID.
:param client_id: The client ID.
:param client_secret: The client secret.
:param site_id: The site ID.
:param access_token: The access token (optional)
:param access_token_expiration_details: The access token expiration details (optional)
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
if access_token:
if not access_token_expiration_details:
raise ValueError("Access token expiration details must be provided.")
self.access_token = access_token
self.set_access_token_expiration_details(access_token_expiration_details)
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
else:
self.retrieve_access_token()
# Retrieve static identifiers
self.site_id = site_id
self.document_drive = self.get_documents_drive()
def get_token_expiration_details(self):
"""
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
:return:
"""
return {
'access_token_request_timestamp': datetime.strftime(
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
),
'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT)
}
def set_access_token_expiration_details(self, access_token_expiration_details):
"""
Sets the access token expiration details from a serialized dictionary.
:param access_token_expiration_details: The serialized access token expiration details.
:return:
"""
self.access_token_request_timestamp = datetime.strptime(
access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT
)
self.access_token_expiry = datetime.strptime(
access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT
)
def is_access_token_expired(self):
"""
Checks if the access token has expired. If it has, a new access token is retrieved.
:return: True if expired, False otherwise.
"""
return datetime.now() >= self.access_token_expiry
def retrieve_access_token(self, refresh=False):
"""
Implements authentication using MSAL.
:param refresh: If True, force a refresh of the access token.
:return: None
"""
app = ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret
)
scope = ["https://graph.microsoft.com/.default"]
access_token_request_timestamp = datetime.now()
if refresh:
logger.info("Forcing refresh of access token.")
token = app.acquire_token_for_client(scopes=scope)
else:
# Check if a token is already cached
token = app.acquire_token_silent(scope, account=None)
if not token:
token = app.acquire_token_for_client(scopes=scope)
if "access_token" not in token:
logger.error("Authentication failed.")
raise ValueError("Authentication failed")
access_token_expiry = access_token_request_timestamp + timedelta(
seconds=token['expires_in'] - 20
)
self.access_token = token
self.access_token_request_timestamp = access_token_request_timestamp
self.access_token_expiry = access_token_expiry
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
logger.info("Access token retrieved successfully.")
@api_call_decorator
def get_documents_drive(self):
"""
Get the document drive of the SharePoint site.
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
logger.info(f"Getting document drive from URL: {url}")
return 'GET', url, None
@api_call_decorator
def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100):
"""
This function will list the contents of a folder in SharePoint.
:param drive_id: The ID of the drive.
:param folder_path: The path of the folder.
:param page_size: The number of items per page (default is 100).
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}"
logger.info(f"Listing folder contents from URL: {url}")
return 'GET', url, None
@staticmethod
def download_sharepoint_file(download_url):
"""
Downloads a file from the given URL and returns its content.
:param download_url: The URL to download the file from.
:return: The content of the downloaded file.
"""
response = requests.get(download_url, stream=True)
response.raise_for_status() # Check if the request was successful
file_content = BytesIO()
# Read the file content into memory
for chunk in response.iter_content(chunk_size=8192):
file_content.write(chunk)
file_content.seek(0) # Reset the file pointer to the beginning
return file_content
def download_sharepoint_folder(self, drive_id, folder_path, download_dir, excluded_file_types=None):
"""
Downloads all files in a SharePoint folder to the specified local directory.
:param drive_id: The ID of the SharePoint drive.
:param folder_path: The path of the folder in SharePoint.
:param download_dir: The local directory to save the downloaded files.
:param excluded_file_types: A list of file types to exclude from download (default is None).
"""
excluded_file_types = [] if excluded_file_types is None else excluded_file_types
# Ensure the download directory exists
os.makedirs(download_dir, exist_ok=True)
# List folder contents
folder_contents = self.list_folder_contents(drive_id, folder_path)
files = folder_contents.get('value', [])
for item in files:
if item.get('folder'): # Check if it's a folder
# Recursively handle subfolders
subfolder_path = f"{folder_path}/{item['name']}"
subfolder_dir = os.path.join(download_dir, item['name'])
self.download_sharepoint_folder(drive_id, subfolder_path, subfolder_dir)
else:
# It's a file, download it
file_name = item['name']
if file_name.split(".")[-1] in excluded_file_types:
continue
download_url = item['@microsoft.graph.downloadUrl']
logger.info(f"Downloading file: {file_name}")
file_content = self.download_sharepoint_file(download_url)
# Save the file locally
file_path = os.path.join(download_dir, file_name)
with open(file_path, 'wb') as f:
f.write(file_content.read())
logger.info(f"File saved to: {file_path}")
def app():
# Customers for WC 18/11/2024
#
# ----- Eastlight location -----
# No data this week, low on data
# Housing Associations/Eastlight/Survey Outcomes/
#
# ----- Settle location -----
# No data this week, in separate files
# Housing Associations/Settle/Survey Outcomes/
#
# ----- Community Housing -----
# In separate files - will we get to a singular form?
# Housing Associations/Community Housing/Survey Outcomes/
#
# ----- ACIS location -----
# Doesn't have this week's data
# Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx
#
# ----- Southern location -----
#
#
# ------ Unitas location ------
# Does have this week's data
# Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx
locations = {
"Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx",
"Eastlight": "Housing Associations/Eastlight/Survey Outcomes/",
"Settle": "Housing Associations/Settle/Survey Outcomes/",
"Community Housing": "Housing Associations/Community Housing/Survey Outcomes/",
"ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx",
"Southern": None,
}
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None)
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None)
WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None)
sharepoint_client = SharePointClient(
tenant_id=SHAREPOINT_TENANT_ID,
client_id=SHAREPOINT_CLIENT_ID,
client_secret=SHAREPOINT_CLIENT_SECRET,
site_id=WARMFRONT_SHAREPOINT_SITE_ID
)
results = []
for customer, location in locations.items():
if location is None:
continue
if location.endswith(".xlsx"):
# Read in the file
# List the contents of the folder
location_folder = os.path.dirname(location)
contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path=location_folder
)
filepaths = contents["value"]
download_url = next(
(file['@microsoft.graph.downloadUrl'] for file in filepaths
if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)),
None
)
if download_url is None:
raise ValueError("File not found in the SharePoint folder.")
file_content = sharepoint_client.download_sharepoint_file(download_url)
# Convert to pandas dataframe since file is an excel file
df = pd.read_excel(file_content)
df["Outcome"] = df["Outcome"].str.strip().str.lower()
# We cannot group by funding type accurately because any job that is not funded will have a NaN value
# and therefore we have a 100% acces rate for funded jobs and 0% otherwise
surveyor_outcomes = []
for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]):
funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)]
if funding_type:
funding_type = " + ".join(funding_type)
else:
funding_type = "No Funding"
surveyed = group[group["Outcome"] == "surveyed"]
no_answer = group[
group["Outcome"] == "no answer"
]
other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])]
surveyor_outcomes.append(
{
"Surveyor": surveyor,
"Week": week,
"Funding": funding_type,
"Surveyed": surveyed.shape[0],
"No Answer": no_answer.shape[0],
"Other Issue": other_issue.shape[0],
}
)
surveyor_outcomes = pd.DataFrame(surveyor_outcomes)
surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"])
weekly_access = (
surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index()
)
# Sort by week and surveyor ascending
surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True])
surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / (
surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"]
)
weekly_access["Total"] = (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)
weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)