mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-30 13:10:56 +00:00
177 lines
7.8 KiB
Python
177 lines
7.8 KiB
Python
from pprint import pformat
|
|
from enum import Enum
|
|
import os
|
|
from etl.utils.logger import Logger
|
|
import logging
|
|
from etl.utils.sharepoint.sharepoint import SharePointClient
|
|
from functools import wraps
|
|
import re
|
|
from etl.validator.validator import DomnaSharePointValidator
|
|
import asyncio
|
|
|
|
WEEK_COMMENCING = os.getenv("WEEK_COMMENCING", "W.C. 24.02.2025")
|
|
|
|
class SharePointInstaller(Enum):
|
|
SOUTH_COAST_INSULATION_SERVICE = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
|
|
|
|
class SharePointScraper():
|
|
"""
|
|
A simple scraper to get the contents of a SharePoint and validate inputs so I can manually change.
|
|
"""
|
|
|
|
def __init__(self, sharepoint_location):
|
|
self.logger = Logger(name="SharePointScraper", level=logging.DEBUG).get_logger()
|
|
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
|
|
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
|
|
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
|
|
self.sharepoint_drive = sharepoint_location
|
|
|
|
assert self.sharepoint_client_id is not None, "Please assign SHAREPOINT_CLIENT_ID env variable"
|
|
assert self.sharepoint_client_secret is not None, "Please assign SHAREPOINT_CLIENT_SECRET env variable"
|
|
assert self.sharepoint_tenant_id is not None, "Please assign SHAREPOINT_TENANT_ID env variable"
|
|
assert self.sharepoint_drive is not None, "Please set SharePoint driver id env variable. See SharePointInstaller for more information"
|
|
|
|
self.surveyor_names = []
|
|
self.surveyor_to_dates_folder = {}
|
|
self.surveyor_to_housing_associations = {}
|
|
self.house_association_names = []
|
|
self.surveyor_work_completed = {}
|
|
|
|
def ensure_surveyor_names_loaded(func):
|
|
"""Decorator to ensure surveyor names are loaded before calling the function."""
|
|
@wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_names:
|
|
self.logger.info("Surveyor names not found, fetching from SharePoint...")
|
|
await self.upload_names_to_memory()
|
|
return await func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
async def get_folders_in_path(self, path):
|
|
sharepoint_client = SharePointClient(
|
|
tenant_id=self.sharepoint_tenant_id,
|
|
client_id=self.sharepoint_client_id,
|
|
client_secret=self.sharepoint_client_secret,
|
|
site_id=self.sharepoint_drive.value,
|
|
)
|
|
|
|
return await sharepoint_client.list_folder_contents(path)
|
|
|
|
async def upload_names_to_memory(self):
|
|
housing_association_folders = await self.get_folders_in_path("/")
|
|
|
|
if 'value' not in housing_association_folders:
|
|
raise RuntimeError("Failed to get information from SharePoint")
|
|
|
|
new_list = []
|
|
for surveyor_folder in housing_association_folders['value']:
|
|
if 'name' in surveyor_folder:
|
|
name = surveyor_folder['name']
|
|
if name not in new_list:
|
|
new_list.append(name)
|
|
|
|
self.surveyor_names = new_list
|
|
|
|
@ensure_surveyor_names_loaded
|
|
async def get_surveyor_names(self):
|
|
return self.surveyor_names
|
|
|
|
@ensure_surveyor_names_loaded
|
|
async def get_date_folder_names(self):
|
|
for name in self.surveyor_names:
|
|
dates_folders = await self.get_folders_in_path(f"/{name}")
|
|
if 'value' not in dates_folders:
|
|
raise RuntimeError(f"Failed to get dates folder from {name} in {self.sharepoint_drive.name}")
|
|
|
|
list_of_dates = []
|
|
|
|
for dates in dates_folders['value']:
|
|
if 'name' in dates:
|
|
list_of_dates.append(dates['name'])
|
|
|
|
self.surveyor_to_dates_folder.update({name: list_of_dates})
|
|
|
|
return self.surveyor_to_dates_folder
|
|
|
|
def ensure_dates_folder_loaded(func):
|
|
"""Decorator to ensure surveyor_to_dates_folder is loaded before calling the function."""
|
|
@wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_to_dates_folder:
|
|
self.logger.info("Surveyor to dates mapping not found, fetching from SharePoint...")
|
|
await self.get_date_folder_names()
|
|
return await func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@ensure_dates_folder_loaded
|
|
async def list_of_names_that_have_the_wrong_date_format(self):
|
|
naughty_names = []
|
|
good_names = []
|
|
|
|
for name, dates in self.surveyor_to_dates_folder.items():
|
|
self.logger.info(dates)
|
|
if DomnaSharePointValidator.valid_dates(dates):
|
|
good_names.append(name)
|
|
else:
|
|
naughty_names.append(name)
|
|
|
|
if naughty_names:
|
|
self.logger.warning(f"Dates FORMAT is wrong for the following folders {naughty_names}")
|
|
return naughty_names
|
|
|
|
@ensure_dates_folder_loaded
|
|
async def get_housing_association_names(self):
|
|
if not DomnaSharePointValidator.valid_dates([WEEK_COMMENCING]):
|
|
raise RuntimeError(f"WEEK COMMENCING is in wrong format {WEEK_COMMENCING}")
|
|
|
|
for name in self.surveyor_names:
|
|
if WEEK_COMMENCING in self.surveyor_to_dates_folder[name]:
|
|
housing_associations_folders = await self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}")
|
|
if 'value' not in housing_associations_folders:
|
|
raise RuntimeError("Failed to get housing association information")
|
|
else:
|
|
list_of_housing_association = []
|
|
for house_ass in housing_associations_folders['value']:
|
|
if 'name' in house_ass:
|
|
house_ass_name = house_ass['name']
|
|
list_of_housing_association.append(house_ass_name)
|
|
if house_ass_name not in self.house_association_names:
|
|
self.house_association_names.append(house_ass_name)
|
|
|
|
self.surveyor_to_housing_associations.update({name: list_of_housing_association})
|
|
else:
|
|
self.logger.warning(f"Failed to get housing association folder for {name}, {self.surveyor_to_dates_folder[name]}")
|
|
|
|
return self.surveyor_to_housing_associations
|
|
|
|
def ensure_housing_association_is_loaded(func):
|
|
"""Decorator to ensure housing association is loaded before calling the function."""
|
|
@wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
if not self.surveyor_to_housing_associations:
|
|
self.logger.info("Housing association not found, fetching from SharePoint...")
|
|
await self.get_housing_association_names()
|
|
return await func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@ensure_housing_association_is_loaded
|
|
async def get_number_of_surveys_completed(self):
|
|
for name in self.surveyor_names:
|
|
if name in self.surveyor_to_housing_associations:
|
|
for house_association in self.surveyor_to_housing_associations[name]:
|
|
address_folders = await self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_association}")
|
|
if 'value' not in address_folders:
|
|
raise RuntimeError("Failed to get address folders")
|
|
else:
|
|
self.surveyor_work_completed.update({name: len(address_folders['value'])})
|
|
|
|
return self.surveyor_work_completed
|
|
|
|
@ensure_housing_association_is_loaded
|
|
async def download_file_for_each_address(self):
|
|
pass
|
|
|
|
# To call async functions from sync code:
|
|
def run_sync():
|
|
scraper = SharePointScraper(SharePointInstaller.SOUTH_COAST_INSULATION_SERVICE)
|
|
asyncio.run(scraper.get_number_of_surveys_completed())
|