Merge pull request #1 from Hestia-Homes/feature/read_local_files_and_do_interesting_stuff

Feature/read local files and do interesting stuff
This commit is contained in:
Jun-te Kim 2025-03-04 12:39:00 +00:00 committed by GitHub
commit be208a9f88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 444 additions and 5 deletions

View file

@ -10,5 +10,9 @@
"ms-azuretools.vscode-docker"
]
}
}
},
// temporary mount local file from local computer. DELETE ME if you are not Jun-te Kim
"runArgs": [
"--mount", "type=bind,source=/home/kimjunte/data,target=/workspaces/survey-extraction/data"
]
}

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
data/
.env

View file

@ -11,7 +11,39 @@ Definition of multiple places:
Definition of one place:
- into a CSV...today (03/03/2025)
- [] Read a file from what khalim has shared
- [x] Added sharepointclient that khalim made - Need to proof it works
- [x] Read a file from what khalim has shared
Add a local file:
- [x] mount a local folder directory wiht what Khalim sharepoint he has shared
- [x] REad files and file path
Once I have sharepoint api working:
- [] Make validator for retro team
- [] once validated, produce a csv file
- [] show some cool productivity metric
Currently working on:
- [] Validator
- [x] check names
- [in progress, blocked unitl sharepoint. Easy to add] check it has dates
- [] Useful file reader:
- [] Khalim showed me a useful pdf, that I should try to extract and get some information
- With Khalim:
- [] Check if I have access to sharepoint
- [] Try and get his client API working and see if I can read files
MVP:
Script we can run that will
Go to share point fetch all the data
provide some form of output
that shows the number of surverys done
Flat table
<Survery name> <cUSTOMER NAME> <DETAILS> <installer>
Billing:
Billing table, left join

20
etl/poetry.lock generated
View file

@ -1,7 +1,23 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
package = []
[[package]]
name = "pymupdf"
version = "1.25.3"
description = "A high performance Python library for data extraction, analysis, conversion & manipulation of PDF (and other) documents."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "pymupdf-1.25.3-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:96878e1b748f9c2011aecb2028c5f96b5a347a9a91169130ad0133053d97915e"},
{file = "pymupdf-1.25.3-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:6ef753005b72ebfd23470f72f7e30f61e21b0b5e748045ec5b8f89e6e3068d62"},
{file = "pymupdf-1.25.3-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:46d90c4f9e62d1856e8db4b9f04a202ff4a7f086a816af73abdc86adb7f5e25a"},
{file = "pymupdf-1.25.3-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a5de51efdbe4d486b6c1111c84e8a231cbfb426f3d6ff31ab530ad70e6f39756"},
{file = "pymupdf-1.25.3-cp39-abi3-win32.whl", hash = "sha256:bca72e6089f985d800596e22973f79cc08af6cbff1d93e5bda9248326a03857c"},
{file = "pymupdf-1.25.3-cp39-abi3-win_amd64.whl", hash = "sha256:4fb357438c9129fbf939b5af85323434df64e36759c399c376b62ad6da95498c"},
{file = "pymupdf-1.25.3.tar.gz", hash = "sha256:b640187c64c5ac5d97505a92e836da299da79c2f689f3f94a67a37a493492193"},
]
[metadata]
lock-version = "2.1"
python-versions = ">=3.12"
content-hash = "75265641fd1a3f2a4d608312a3879427b7141ac2a51d0873da5711cbc8ead28e"
content-hash = "0ff0789ceee91157e5f804e4e3248e78513ae898a14b1973e46da2e50c332ef6"

View file

@ -8,6 +8,7 @@ authors = [
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pymupdf (>=1.25.3,<2.0.0)"
]
[tool.poetry]

View file

@ -0,0 +1,40 @@
import os
import logging
from utils.logger import Logger
class RetroHomeFileStructureValidator():
def __init__(self, source_loc_path):
self.source_path = source_loc_path
self.logger = Logger(name='RetroHomeFileStructureValidator', level=logging.DEBUG).get_logger()
self.innocent = []
self.guilty = []
self.validate()
def validate(self):
self.logger.debug(f"Starting File Structure Validation on '{self.source_path}'")
for filepath in os.listdir(self.source_path):
if os.path.isdir(os.path.join(self.source_path, filepath)):
self.innocent.append(filepath)
else:
self.logger.warning(f"Found a file when expecting directory. Ignoring file {filepath}")
self.logger.debug(self.innocent)
self.valid_name()
self.valid_file_structure()
def valid_name(self):
for i, names in enumerate(self.innocent):
temp = names.split(" ")
if len(temp) > 2:
self.logger.warning(f"The name '{names}' is not in the correct format")
self.guilty.append(names)
self.innocent.remove(names)
def valid_file_structure(self):
for names in self.innocent:
path_to_check = os.path.join(self.source_path, names)
def date_checker_extractor(self):
raise NotImplementedError("Please contact Jun-te Kim to make this feature")

View file

@ -1 +1,23 @@
print("Hello world")
import os
from filePathValidator.retrohome import RetroHomeFileStructureValidator
from pdfReader.pdfReaderToText import pdfReaderToText
from pprint import pprint
DATA_LOC = "/workspaces/survey-extraction/data/"
INTERESTING_FILE_LOC = "/workspaces/survey-extraction/data/first last/Submission 03.03.25/customer/10 Sandbeck Lane DN21 3LZ/PRE SITE NOTES.pdf"
def main():
# RetroHomeFileStructureValidator(DATA_LOC)
list_ = pdfReaderToText(INTERESTING_FILE_LOC).get_list_of_test()
pprint(list_)
if __name__ == "__main__":
main()
# Read file from local file path directory
# proof of concept of some validator
# proof of concept of something i do with a particular flie
# the important file at the moment is "Pre site notes"
# Ask khalim how sharepoint is going

View file

View file

@ -0,0 +1,25 @@
from utils.logger import Logger
import logging
import pymupdf
class pdfReaderToText():
def __init__(self, file_path):
self.source_path = file_path
self.logger = Logger(name='pdfReader', level=logging.DEBUG).get_logger()
self.all_text = ""
self.text_list = []
self.get_text_from_pdf_file()
def get_text_from_pdf_file(self):
self.logger.debug(f"Extrating text from {self.source_path}")
pdf = pymupdf.open(self.source_path)
for page in pdf:
text = page.get_text()
self.all_text += text
self.text_list = self.all_text.split('\n')
def get_list_of_test(self):
return self.text_list

View file

@ -0,0 +1,22 @@
import logging
import os
class Logger:
def __init__(self, name, level=logging.INFO):
# Create a custom logger
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Create handlers
c_handler = logging.StreamHandler()
c_handler.setLevel(level)
# Create formatters and add it to handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(formatter)
# Add handlers to the logger
self.logger.addHandler(c_handler)
def get_logger(self):
return self.logger

View file

View file

@ -0,0 +1,272 @@
"""
This file contains the functions which enable interaction with SharePoint via the API.
"""
from msal import ConfidentialClientApplication
from datetime import datetime, timedelta
import requests
from functools import wraps
import time
import logging
from io import BytesIO
# Configure logging
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def handle_error(response):
"""
Handle errors based on HTTP status codes and log detailed information.
"""
try:
error_json = response.json().get('error', {})
except ValueError:
error_json = {}
error_code = error_json.get('code', 'unknownError')
error_message = error_json.get('message', 'No detailed error message provided.')
inner_error = error_json.get('innererror', {})
details = error_json.get('details', [])
logger.error(f"Error Code: {error_code}")
logger.error(f"Error Message: {error_message}")
if inner_error:
logger.error(f"Inner Error: {inner_error}")
if details:
logger.error(f"Error Details: {details}")
if response.status_code == 401:
logger.error("Unauthorized. Token might be invalid.")
elif response.status_code == 403:
logger.error("Forbidden. Access denied to the requested resource.")
elif response.status_code == 404:
logger.error("Not Found. The requested resource doesnt exist.")
elif response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
elif response.status_code in (500, 503):
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.error(f"Server error. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
else:
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
def api_call_decorator(func):
"""
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
:param func: The function to be decorated.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
# Check and refresh the access token if needed
if self.is_access_token_expired():
self.retrieve_access_token()
logger.info("Access token refreshed.")
# Get the HTTP method, URL, and optionally data from the function
http_method, url, data = func(self, *args, **kwargs)
# Initialize the results list and handle pagination if page_size is provided
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
while url:
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
if response.status_code == 200:
response_json = response.json() # Store the response JSON
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
else:
response_data = response_json # Capture the full response for consistency
break
else:
retry = handle_error(response)
if retry == 'retry':
continue
if page_size:
response_data = {'value': results}
return response_data
except Exception as e:
logger.exception("An error occurred during the API call.")
raise e
return wrapper
class SharePointClient:
access_token = None
access_token_request_timestamp = None
access_token_expiry = None
headers = None
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None,
access_token_expiration_details=None):
"""
Initializes the SharePointClient with necessary credentials and site information.
:param tenant_id: The tenant ID.
:param client_id: The client ID.
:param client_secret: The client secret.
:param site_id: The site ID.
:param access_token: The access token (optional)
:param access_token_expiration_details: The access token expiration details (optional)
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
if access_token:
if not access_token_expiration_details:
raise ValueError("Access token expiration details must be provided.")
self.access_token = access_token
self.set_access_token_expiration_details(access_token_expiration_details)
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
else:
self.retrieve_access_token()
# Retrieve static identifiers
self.site_id = site_id
self.document_drive = self.get_documents_drive()
def get_token_expiration_details(self):
"""
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
:return:
"""
return {
'access_token_request_timestamp': datetime.strftime(
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
),
'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT)
}
def set_access_token_expiration_details(self, access_token_expiration_details):
"""
Sets the access token expiration details from a serialized dictionary.
:param access_token_expiration_details: The serialized access token expiration details.
:return:
"""
self.access_token_request_timestamp = datetime.strptime(
access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT
)
self.access_token_expiry = datetime.strptime(
access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT
)
def is_access_token_expired(self):
"""
Checks if the access token has expired. If it has, a new access token is retrieved.
:return: True if expired, False otherwise.
"""
return datetime.now() >= self.access_token_expiry
def retrieve_access_token(self, refresh=False):
"""
Implements authentication using MSAL.
:param refresh: If True, force a refresh of the access token.
:return: None
"""
app = ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret
)
scope = ["https://graph.microsoft.com/.default"]
access_token_request_timestamp = datetime.now()
if refresh:
logger.info("Forcing refresh of access token.")
token = app.acquire_token_for_client(scopes=scope)
else:
# Check if a token is already cached
token = app.acquire_token_silent(scope, account=None)
if not token:
token = app.acquire_token_for_client(scopes=scope)
if "access_token" not in token:
logger.error("Authentication failed.")
raise ValueError("Authentication failed")
access_token_expiry = access_token_request_timestamp + timedelta(
seconds=token['expires_in'] - 20
)
self.access_token = token
self.access_token_request_timestamp = access_token_request_timestamp
self.access_token_expiry = access_token_expiry
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
logger.info("Access token retrieved successfully.")
@api_call_decorator
def get_documents_drive(self):
"""
Get the document drive of the SharePoint site.
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
logger.info(f"Getting document drive from URL: {url}")
return 'GET', url, None
@api_call_decorator
def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100):
"""
This function will list the contents of a folder in SharePoint.
:param drive_id: The ID of the drive.
:param folder_path: The path of the folder.
:param page_size: The number of items per page (default is 100).
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}"
logger.info(f"Listing folder contents from URL: {url}")
return 'GET', url, None
@staticmethod
def download_sharepoint_file(download_url):
"""
Downloads a file from the given URL and returns its content.
:param download_url: The URL to download the file from.
:return: The content of the downloaded file.
"""
response = requests.get(download_url, stream=True)
response.raise_for_status() # Check if the request was successful
file_content = BytesIO()
# Read the file content into memory
for chunk in response.iter_content(chunk_size=8192):
file_content.write(chunk)
file_content.seek(0) # Reset the file pointer to the beginning
return file_content