mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-08 11:17:29 +00:00
moved validator
This commit is contained in:
parent
77e4f0f1ff
commit
70bc5d8b5a
6 changed files with 135 additions and 96 deletions
|
|
@ -1,40 +0,0 @@
|
|||
import os
|
||||
import logging
|
||||
from utils.logger import Logger
|
||||
|
||||
class RetroHomeFileStructureValidator():
|
||||
def __init__(self, source_loc_path):
|
||||
self.source_path = source_loc_path
|
||||
self.logger = Logger(name='RetroHomeFileStructureValidator', level=logging.DEBUG).get_logger()
|
||||
self.innocent = []
|
||||
self.guilty = []
|
||||
self.validate()
|
||||
|
||||
def validate(self):
|
||||
self.logger.debug(f"Starting File Structure Validation on '{self.source_path}'")
|
||||
|
||||
for filepath in os.listdir(self.source_path):
|
||||
if os.path.isdir(os.path.join(self.source_path, filepath)):
|
||||
self.innocent.append(filepath)
|
||||
else:
|
||||
self.logger.warning(f"Found a file when expecting directory. Ignoring file {filepath}")
|
||||
|
||||
self.logger.debug(self.innocent)
|
||||
self.valid_name()
|
||||
self.valid_file_structure()
|
||||
|
||||
def valid_name(self):
|
||||
for i, names in enumerate(self.innocent):
|
||||
temp = names.split(" ")
|
||||
if len(temp) > 2:
|
||||
self.logger.warning(f"The name '{names}' is not in the correct format")
|
||||
self.guilty.append(names)
|
||||
self.innocent.remove(names)
|
||||
|
||||
def valid_file_structure(self):
|
||||
for names in self.innocent:
|
||||
path_to_check = os.path.join(self.source_path, names)
|
||||
|
||||
|
||||
def date_checker_extractor(self):
|
||||
raise NotImplementedError("Please contact Jun-te Kim to make this feature")
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
import os
|
||||
from filePathValidator.retrohome import RetroHomeFileStructureValidator
|
||||
from pdfReader.pdfReaderToText import pdfReaderToText
|
||||
from utils.sharepoint.sharepoint import SharePointClient
|
||||
from scraper.scraper import SharePointScraper, SharePointInstaller
|
||||
from pprint import pprint, pformat
|
||||
import logging
|
||||
import tempfile
|
||||
|
|
@ -22,35 +21,27 @@ def main():
|
|||
|
||||
# POC Scraper from sharepoint and get useful data
|
||||
|
||||
sharepoint_client = SharePointClient(
|
||||
tenant_id=SHAREPOINT_TENANT_ID,
|
||||
client_id=SHAREPOINT_CLIENT_ID,
|
||||
client_secret=SHAREPOINT_CLIENT_SECRET,
|
||||
site_id=SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID,
|
||||
)
|
||||
south_coast_scraper = SharePointScraper(SharePointInstaller.SOUTH_COAST_INSULATION_SERVICE)
|
||||
list_of_names = south_coast_scraper.get_date_folder_names()
|
||||
logger.info(pformat(list_of_names))
|
||||
|
||||
installer_folders = sharepoint_client.list_folder_contents("/")
|
||||
# # Get each name
|
||||
# for survey_folder in installer_folders['value']:
|
||||
# if survey_folder["name"] not in surverys_work_completed:
|
||||
# surverys_work_completed.update({survey_folder["name"]: 0})
|
||||
|
||||
surverys_work_completed = {}
|
||||
first_layer_folder_structure = {}
|
||||
# for name, value in surverys_work_completed.items():
|
||||
# housing_assosciation = sharepoint_client.list_folder_contents(f"/{name}")
|
||||
|
||||
# Get each name
|
||||
for survey_folder in installer_folders['value']:
|
||||
if survey_folder["name"] not in surverys_work_completed:
|
||||
surverys_work_completed.update({survey_folder["name"]: 0})
|
||||
|
||||
for name, value in surverys_work_completed.items():
|
||||
housing_assosciation = sharepoint_client.list_folder_contents(f"/{name}")
|
||||
|
||||
if 'value' in housing_assosciation:
|
||||
if len(housing_assosciation['value']) > 0:
|
||||
first_layer_folder_structure.update({name: housing_assosciation['value'][0]['name']})
|
||||
else:
|
||||
logger.info(f"Skipping 1 {name}")
|
||||
else:
|
||||
logger.info(f"Skipping 2 {name}")
|
||||
# if 'value' in housing_assosciation:
|
||||
# if len(housing_assosciation['value']) > 0:
|
||||
# first_layer_folder_structure.update({name: housing_assosciation['value'][0]['name']})
|
||||
# else:
|
||||
# logger.info(f"Skipping 1 {name}")
|
||||
# else:
|
||||
# logger.info(f"Skipping 2 {name}")
|
||||
|
||||
logger.info(pformat(first_layer_folder_structure))
|
||||
# logger.info(pformat(first_layer_folder_structure))
|
||||
|
||||
|
||||
# logger.info(pformat(surverys_work_completed))
|
||||
|
|
|
|||
|
|
@ -1,25 +0,0 @@
|
|||
from enum import Enum
|
||||
import os
|
||||
|
||||
# Awesome Khalim's Sharepoint code that I'll probably will need to learn how it work in the future
|
||||
from utils.sharepoint.sharepoint import SharePointClient
|
||||
|
||||
class SharePointInstaller(Enum):
|
||||
SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
|
||||
|
||||
class SharePointScraper():
|
||||
"""
|
||||
A simple scraper to get the contents of a sharepoint and validatate inputs so I can manually change
|
||||
"""
|
||||
|
||||
def __init__(self, sharepoint_location):
|
||||
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
||||
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
||||
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
|
||||
self.sharepoint_drive_id = os.getenv(sharepoint_location.value)
|
||||
|
||||
def get_folders_in_path("/")
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,11 +1,16 @@
|
|||
from pprint import pformat
|
||||
from enum import Enum
|
||||
import os
|
||||
|
||||
from utils.logger import Logger
|
||||
import logging
|
||||
# Awesome Khalim's Sharepoint code that I'll probably will need to learn how it work in the future
|
||||
from utils.sharepoint.sharepoint import SharePointClient
|
||||
from functools import wraps
|
||||
import re
|
||||
from validator.validator import DomnaSharePointValidator
|
||||
|
||||
class SharePointInstaller(Enum):
|
||||
SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
|
||||
SOUTH_COAST_INSULATION_SERVICE = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
|
||||
|
||||
class SharePointScraper():
|
||||
"""
|
||||
|
|
@ -13,13 +18,98 @@ class SharePointScraper():
|
|||
"""
|
||||
|
||||
def __init__(self, sharepoint_location):
|
||||
self.logger = Logger(name="SharePointScraper", level=logging.DEBUG).get_logger()
|
||||
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
||||
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
||||
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
|
||||
self.sharepoint_drive_id = os.getenv(sharepoint_location.value)
|
||||
self.sharepoint_drive = sharepoint_location
|
||||
|
||||
def get_folders_in_path("/")
|
||||
assert self.sharepoint_client_id is not None, "Please assign SHAREPOINT_CLIENT_ID env variable"
|
||||
assert self.sharepoint_client_secret is not None, "Please assign SHAREPOINT_CLIENT_SECRET env variable"
|
||||
assert self.sharepoint_tenant_id is not None, "Please assign SHAREPOINT_TENANT_ID env variable"
|
||||
assert self.sharepoint_drive is not None, "Please set sharepoint driver id env variable. See SharePointInstaller for more information"
|
||||
|
||||
self.surveyor_names = []
|
||||
|
||||
self.surveyor_to_dates_folder = {}
|
||||
|
||||
def ensure_surveyor_names_loaded(func):
|
||||
"""Decorator to ensure surveyor names are loaded before calling the function."""
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if not self.surveyor_names:
|
||||
self.logger.info("Surveyor names not found, fetching from SharePoint...")
|
||||
self.upload_names_to_memory()
|
||||
return func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
def get_folders_in_path(self, path):
|
||||
sharepoint_client = SharePointClient(
|
||||
tenant_id=self.sharepoint_tenant_id,
|
||||
client_id=self.sharepoint_client_id,
|
||||
client_secret=self.sharepoint_client_secret,
|
||||
site_id=self.sharepoint_drive.value,
|
||||
)
|
||||
|
||||
return sharepoint_client.list_folder_contents(path)
|
||||
|
||||
def upload_names_to_memory(self):
|
||||
housing_assosiaction_folders = self.get_folders_in_path("/")
|
||||
|
||||
if 'value' not in housing_assosiaction_folders:
|
||||
raise RuntimeError("Failed to get information from sharepoint")
|
||||
|
||||
new_list = []
|
||||
for surveyor_folder in housing_assosiaction_folders['value']:
|
||||
if 'name' in surveyor_folder:
|
||||
name = surveyor_folder['name']
|
||||
if name not in new_list:
|
||||
new_list.append(name)
|
||||
|
||||
self.surveyor_names = new_list
|
||||
|
||||
@ensure_surveyor_names_loaded
|
||||
def get_surveryor_names(self):
|
||||
return self.surveyor_names
|
||||
|
||||
@ensure_surveyor_names_loaded
|
||||
def get_date_folder_names(self):
|
||||
for name in self.surveyor_names:
|
||||
dates_folders = self.get_folders_in_path(f"/{name}")
|
||||
if 'value' not in dates_folders:
|
||||
raise RuntimeError(f"Failed to get dates folder from {name} in {self.sharepoint_drive.name}")
|
||||
|
||||
list_of_dates = []
|
||||
|
||||
for dates in dates_folders['value']:
|
||||
self.logger.info(pformat(dates))
|
||||
if 'name' in dates:
|
||||
list_of_dates.append(dates['name'])
|
||||
|
||||
self.surveyor_to_dates_folder.update({name: list_of_dates})
|
||||
|
||||
|
||||
return self.surveyor_to_dates_folder
|
||||
|
||||
def ensure_dates_folder_loaded(func):
|
||||
"""Decorator to ensure surveyor_to_dates_folder is loaded before calling the function."""
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if not self.surveyor_to_dates_folder:
|
||||
self.logger.info("Surveyor to dates mapping not found, fetching from SharePoint...")
|
||||
self.get_date_folder_names()
|
||||
return func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
@ensure_dates_folder_loaded
|
||||
def list_of_names_that_has_the_wrong_date_format(self):
|
||||
naughty_names = []
|
||||
|
||||
# Patten Nic wants: W.C. DD.MM.YYYY
|
||||
pattern = r"^W\.C\. (0[1-9]|[12][0-9]|3[01])\.(0[1-9]|1[0-2])\.(\d{4})$"
|
||||
|
||||
for names, dates in self.surveyor_to_dates_folder:
|
||||
if
|
||||
for s in test_strings:
|
||||
match = re.match(pattern, s)
|
||||
print(f"{s}: {'Valid' if match else 'Invalid'}")
|
||||
|
|
|
|||
23
etl/src/etl/validator/validator
Normal file
23
etl/src/etl/validator/validator
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
import os
|
||||
import logging
|
||||
from utils.logger import Logger
|
||||
import re
|
||||
|
||||
class DomnaSharePointValidator():
|
||||
"""
|
||||
A simple class to check certain things are in certain format in Domna sharepoint with surveyors
|
||||
"""
|
||||
def __init__(self):
|
||||
self.logger = Logger(name='RetroHomeFileStructureValidator', level=logging.DEBUG).get_logger()
|
||||
|
||||
def valid_dates(self, list_of_dates_to_check):
|
||||
# Patten Nic wants: W.C. DD.MM.YYYY
|
||||
pattern = r"^W\.C\. (0[1-9]|[12][0-9]|3[01])\.(0[1-9]|1[0-2])\.(\d{4})$"
|
||||
|
||||
for date in list_of_dates_to_check:
|
||||
match = re.match(pattern, date)
|
||||
if match is False:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
Loading…
Add table
Reference in a new issue