mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-08 11:17:29 +00:00
326 lines
No EOL
15 KiB
Python
326 lines
No EOL
15 KiB
Python
from pprint import pformat
|
|
from enum import Enum
|
|
import os
|
|
from etl.utils.logger import Logger
|
|
import logging
|
|
# Awesome Khalim's Sharepoint code that I'll probably will need to learn how it work in the future
|
|
from etl.utils.sharepoint.sharepoint import SharePointClient
|
|
from functools import wraps
|
|
import re
|
|
from etl.validator.validator import DomnaSharePointValidator
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
def previous_monday():
|
|
today = datetime.today()
|
|
print(f"Todays date is {today}")
|
|
monday = today - timedelta(days=today.weekday()) # weekday() = 0 for Monday
|
|
return f"W.C. {monday.strftime('%d.%m.%Y')}"
|
|
|
|
WEEK_COMMENCING = os.getenv("WEEK_COMMENCING", previous_monday())
|
|
|
|
class SharePointInstaller(Enum):
|
|
# https//{tenant}.sharepoint.com/sites/{site}/_api/site/id
|
|
SOUTH_COAST_INSULATION = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
|
|
JJC = os.getenv("JJC_SERVICE_SHAREPOINT_ID", None)
|
|
SGEC = os.getenv("SGEC_SERVICE_SHAREPOINT_ID", "52018e5c-3215-4fe4-a4e3-bbf0d0aa7cd9")
|
|
BAXTER_KELLY = os.getenv("BAXTER_KELLY_SERVICE_SHAREPOINT_ID", "6f930bf3-572d-4f91-b1ae-ec536fa319e2")
|
|
DOMNA = os.getenv("DOMNA_SHAREPOINT_ID", "8ab64924-ccde-4b56-b0dc-4e11596446e4")
|
|
OSMOSIS_WAVE_3 = os.getenv("OSMOSIS_SHAREPOINT_ID", "350a3b48-8311-4506-8abb-69bafc280d6f")
|
|
OSMOSIS_WAVE_2 = os.getenv("OSMOSIS_SHAREPOINT_ID", "bc925a9a-ad0b-4de9-9a3c-e61014cc7489")
|
|
WARMFRONT = os.getenv("WARMFRONT_SHARPOINT_ID", "bea71c30-d366-454c-a484-ae4d6fd95bc4")
|
|
|
|
class SharePointScraper():
|
|
"""
|
|
A simple scraper to get the contents of a sharepoint and validatate inputs so I can manually change
|
|
"""
|
|
|
|
def __init__(self, sharepoint_location, development=False):
|
|
self.logger = Logger(name="SharePointScraper", level=logging.DEBUG).get_logger()
|
|
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
|
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
|
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
|
|
self.sharepoint_drive = sharepoint_location
|
|
|
|
assert self.sharepoint_client_id is not None, "Please assign SHAREPOINT_CLIENT_ID env variable"
|
|
assert self.sharepoint_client_secret is not None, "Please assign SHAREPOINT_CLIENT_SECRET env variable"
|
|
assert self.sharepoint_tenant_id is not None, "Please assign SHAREPOINT_TENANT_ID env variable"
|
|
assert self.sharepoint_drive.value is not None, "Please set sharepoint driver id env variable. See SharePointInstaller for more information"
|
|
|
|
self.surveyor_names = []
|
|
|
|
self.surveyor_to_dates_folder = {}
|
|
|
|
self.surveyor_to_housing_assosications = {}
|
|
|
|
self.house_association_names = []
|
|
|
|
self.surveyor_work_completed = {}
|
|
|
|
# Delete me for production
|
|
if development:
|
|
self.surveyor_names = ["Jun-te Kim (SCIS)"]
|
|
self.surveyor_to_dates_folder = {
|
|
'Jun-te Kim (SCIS)': 'W.C. 31.03.2025',
|
|
}
|
|
|
|
|
|
|
|
def ensure_surveyor_names_loaded(func):
|
|
"""Decorator to ensure surveyor names are loaded before calling the function."""
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_names:
|
|
self.logger.info("Surveyor names not found, fetching from SharePoint...")
|
|
self.upload_names_to_memory()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
def get_folders_in_path(self, path):
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
return sharepoint_client.list_folder_contents(path)
|
|
|
|
def get_file_content(self, url):
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
return sharepoint_client.download_sharepoint_file(url)
|
|
|
|
def upload_names_to_memory(self):
|
|
housing_assosiaction_folders = self.get_folders_in_path("/")
|
|
|
|
if 'value' not in housing_assosiaction_folders:
|
|
raise RuntimeError("Failed to get information from sharepoint")
|
|
|
|
new_list = []
|
|
for surveyor_folder in housing_assosiaction_folders['value']:
|
|
if 'name' in surveyor_folder:
|
|
name = surveyor_folder['name']
|
|
if name not in new_list:
|
|
new_list.append(name)
|
|
|
|
self.surveyor_names = new_list
|
|
|
|
|
|
def does_folder_exists_at(self, file_name, file_path):
|
|
folders = self.get_folders_in_path(file_path)
|
|
if 'value' in folders:
|
|
for folder in folders['value']:
|
|
if file_name.upper() in folder["name"].upper():
|
|
return True
|
|
return False
|
|
|
|
def create_dir(self, file_name, at_path="/"):
|
|
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
if self.does_folder_exists_at(file_name, at_path) is False:
|
|
return sharepoint_client.create_folder(file_name, at_path)['webUrl']
|
|
else:
|
|
for folders in self.get_folders_in_path(at_path)['value']:
|
|
if file_name.upper() in folders["name"].upper():
|
|
return folders["webUrl"]
|
|
|
|
def upload_file(self, file_path, sharepoint_path, file_name):
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
def get_file_stream(file_path):
|
|
return open(file_path, 'rb')
|
|
|
|
sharepoint_client.upload_file(file_name, get_file_stream(file_path), sharepoint_path)
|
|
|
|
@ensure_surveyor_names_loaded
|
|
def get_surveryor_names(self):
|
|
return self.surveyor_names
|
|
|
|
@ensure_surveyor_names_loaded
|
|
def get_date_folder_names(self):
|
|
for name in self.surveyor_names:
|
|
dates_folders = self.get_folders_in_path(f"/{name}")
|
|
if 'value' not in dates_folders:
|
|
raise RuntimeError(f"Failed to get dates folder from {name} in {self.sharepoint_drive.name}")
|
|
|
|
list_of_dates = []
|
|
|
|
for dates in dates_folders['value']:
|
|
if 'name' in dates:
|
|
list_of_dates.append(dates['name'])
|
|
|
|
self.surveyor_to_dates_folder.update({name: list_of_dates})
|
|
|
|
|
|
return self.surveyor_to_dates_folder
|
|
|
|
def ensure_dates_folder_loaded(func):
|
|
"""Decorator to ensure surveyor_to_dates_folder is loaded before calling the function."""
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_to_dates_folder:
|
|
self.logger.info("Surveyor to dates mapping not found, fetching from SharePoint...")
|
|
self.get_date_folder_names()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@ensure_dates_folder_loaded
|
|
def list_of_names_that_has_the_wrong_date_format(self):
|
|
naughty_names = []
|
|
good_names = []
|
|
|
|
for name, dates in self.surveyor_to_dates_folder.items():
|
|
self.logger.info(dates)
|
|
if DomnaSharePointValidator.valid_dates(dates):
|
|
good_names.append(name)
|
|
else:
|
|
naughty_names.append(name)
|
|
|
|
if naughty_names:
|
|
self.logger.warning(f"Dates FORMAT is wrong for the following folders {naughty_names}")
|
|
return naughty_names
|
|
|
|
@ensure_dates_folder_loaded
|
|
def get_housing_association_names(self):
|
|
if DomnaSharePointValidator.valid_dates([WEEK_COMMENCING]) is False:
|
|
raise RuntimeError(f"WEEK COMMENCING is in wrong format {WEEK_COMMENCING}")
|
|
|
|
for name in self.surveyor_names:
|
|
if WEEK_COMMENCING in self.surveyor_to_dates_folder[name]:
|
|
housing_associations_folders = self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}")
|
|
if 'value' not in housing_associations_folders:
|
|
raise RuntimeError("Failed to get hosing association information")
|
|
else:
|
|
list_of_housing_association = []
|
|
for house_ass in housing_associations_folders['value']:
|
|
if 'name' in house_ass:
|
|
house_ass_name = house_ass['name']
|
|
list_of_housing_association.append(house_ass_name)
|
|
if house_ass_name not in self.house_association_names:
|
|
self.house_association_names.append(house_ass_name)
|
|
self.surveyor_to_housing_assosications.update({name: list_of_housing_association})
|
|
else:
|
|
self.logger.warning(f"Failed to get housing association folder for {name}, {self.surveyor_to_dates_folder[name]}")
|
|
|
|
|
|
|
|
return self.surveyor_to_housing_assosications
|
|
|
|
def ensure_housing_assosiation_is_loaded(func):
|
|
"""Decorator to ensure surveyor names are loaded before calling the function."""
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_to_housing_assosications:
|
|
self.logger.info("Housing association not found, fetching from SharePoint...")
|
|
self.get_housing_association_names()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@ensure_housing_assosiation_is_loaded
|
|
def get_number_of_surverys_completed(self):
|
|
for name in self.surveyor_names:
|
|
if name in self.surveyor_to_housing_assosications:
|
|
for house_ass in self.surveyor_to_housing_assosications[name]:
|
|
address_folders = self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_ass}")
|
|
if 'value' not in address_folders:
|
|
raise RuntimeError("Failed to get address folders")
|
|
else:
|
|
allAddress = []
|
|
for address in address_folders['value']:
|
|
if 'file' not in address:
|
|
allAddress.append(address['name'])
|
|
|
|
for address in allAddress:
|
|
path = f"/{name}/{WEEK_COMMENCING}/{house_ass}/{address}"
|
|
files_to_download_sharepoint_info = self.get_folders_in_path(path)
|
|
if 'value' not in files_to_download_sharepoint_info:
|
|
raise RuntimeError("Failed to get files to download")
|
|
else:
|
|
file_names_to_download = {}
|
|
only_pdf = [".pdf"]
|
|
for file in files_to_download_sharepoint_info['value']:
|
|
if 'file' in file:
|
|
if any(file["name"].endswith(ext) for ext in only_pdf):
|
|
file_names_to_download.update({file["name"]: file['@microsoft.graph.downloadUrl']})
|
|
for file_name, url in file_names_to_download.items():
|
|
print(pformat(file_names_to_download))
|
|
content = self.get_file_content(url)
|
|
path = self.create_temp_file(content, f"{name}/{WEEK_COMMENCING}/{house_ass}/{address}/{file_name}")
|
|
if DomnaSharePointValidator.is_quidos_presite(path):
|
|
if name in self.surveyor_work_completed:
|
|
self.surveyor_work_completed[name] += 1
|
|
else:
|
|
self.surveyor_work_completed.update({name: 1})
|
|
break
|
|
return self.surveyor_work_completed
|
|
|
|
@ensure_housing_assosiation_is_loaded
|
|
def download_file_for_each_address(self):
|
|
paths = []
|
|
for name in self.surveyor_names:
|
|
if WEEK_COMMENCING in self.surveyor_to_dates_folder[name]:
|
|
for house_ass in self.surveyor_to_housing_assosications[name]:
|
|
address_files = self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_ass}")
|
|
if 'value' not in address_files:
|
|
raise RuntimeError("Failed to get files to download")
|
|
else:
|
|
allAddress = []
|
|
for address in address_files['value']:
|
|
if 'file' not in address:
|
|
# Only directories
|
|
allAddress.append(address['name'])
|
|
for i, address in enumerate(allAddress):
|
|
path = f"/{name}/{WEEK_COMMENCING}/{house_ass}/{address}"
|
|
address_paths = {}
|
|
files_to_download_sharepoint_info = self.get_folders_in_path(path)
|
|
if 'value' not in files_to_download_sharepoint_info:
|
|
raise RuntimeError("Failed to get files to download")
|
|
else:
|
|
file_names_to_download = {}
|
|
avoid = [".jpg",".mov", ".JPG", ".heic", ".HEIC", ".png", ".PNG", ".jpeg", ".JPEG", ".mov", ".MOV", ".mp4", ".MP4"]
|
|
|
|
for file in files_to_download_sharepoint_info['value']:
|
|
if 'file' in file:
|
|
if any(file["name"].endswith(ext) for ext in avoid):
|
|
continue
|
|
file_names_to_download.update({file["name"]: file['@microsoft.graph.downloadUrl']})
|
|
each_file = []
|
|
for file_name, url in file_names_to_download.items():
|
|
self.logger.info(f"Downloading {file_name} from {url}")
|
|
content = self.get_file_content(url)
|
|
file_path = self.create_temp_file(content, f"{name}/{WEEK_COMMENCING}/{house_ass}/{address}/{file_name}")
|
|
each_file.append(file_path)
|
|
address_paths.update({address: each_file})
|
|
paths.append(address_paths)
|
|
return paths
|
|
|
|
def create_temp_file(self, content, path):
|
|
# Ensure the path is under /tmp/
|
|
path = os.path.join("/tmp/sharepoint/", path)
|
|
|
|
# Ensure the parent directory exists
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
# Write content to the specified file
|
|
with open(path, 'wb+') as temp_file:
|
|
temp_file.write(content.getvalue())
|
|
|
|
self.logger.info(f"Temporary file created at: {path}")
|
|
return path
|
|
|