survey-extraction/etl/scraper/scraper.py
2025-03-08 07:42:11 +00:00

177 lines
7.8 KiB
Python

from pprint import pformat
from enum import Enum
import os
from etl.utils.logger import Logger
import logging
from etl.utils.sharepoint.sharepoint import SharePointClient
from functools import wraps
import re
from etl.validator.validator import DomnaSharePointValidator
import asyncio
WEEK_COMMENCING = os.getenv("WEEK_COMMENCING", "W.C. 24.02.2025")
class SharePointInstaller(Enum):
SOUTH_COAST_INSULATION_SERVICE = os.getenv("SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID", None)
class SharePointScraper():
"""
A simple scraper to get the contents of a SharePoint and validate inputs so I can manually change.
"""
def __init__(self, sharepoint_location):
self.logger = Logger(name="SharePointScraper", level=logging.DEBUG).get_logger()
self.sharepoint_client_id = os.getenv("SHAREPOINT_CLIENT_ID", None)
self.sharepoint_client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
self.sharepoint_tenant_id = os.getenv("SHAREPOINT_TENANT_ID", None)
self.sharepoint_drive = sharepoint_location
assert self.sharepoint_client_id is not None, "Please assign SHAREPOINT_CLIENT_ID env variable"
assert self.sharepoint_client_secret is not None, "Please assign SHAREPOINT_CLIENT_SECRET env variable"
assert self.sharepoint_tenant_id is not None, "Please assign SHAREPOINT_TENANT_ID env variable"
assert self.sharepoint_drive is not None, "Please set SharePoint driver id env variable. See SharePointInstaller for more information"
self.surveyor_names = []
self.surveyor_to_dates_folder = {}
self.surveyor_to_housing_associations = {}
self.house_association_names = []
self.surveyor_work_completed = {}
def ensure_surveyor_names_loaded(func):
"""Decorator to ensure surveyor names are loaded before calling the function."""
@wraps(func)
async def wrapper(self, *args, **kwargs):
if not self.surveyor_names:
self.logger.info("Surveyor names not found, fetching from SharePoint...")
await self.upload_names_to_memory()
return await func(self, *args, **kwargs)
return wrapper
async def get_folders_in_path(self, path):
sharepoint_client = SharePointClient(
tenant_id=self.sharepoint_tenant_id,
client_id=self.sharepoint_client_id,
client_secret=self.sharepoint_client_secret,
site_id=self.sharepoint_drive.value,
)
return await sharepoint_client.list_folder_contents(path)
async def upload_names_to_memory(self):
housing_association_folders = await self.get_folders_in_path("/")
if 'value' not in housing_association_folders:
raise RuntimeError("Failed to get information from SharePoint")
new_list = []
for surveyor_folder in housing_association_folders['value']:
if 'name' in surveyor_folder:
name = surveyor_folder['name']
if name not in new_list:
new_list.append(name)
self.surveyor_names = new_list
@ensure_surveyor_names_loaded
async def get_surveyor_names(self):
return self.surveyor_names
@ensure_surveyor_names_loaded
async def get_date_folder_names(self):
for name in self.surveyor_names:
dates_folders = await self.get_folders_in_path(f"/{name}")
if 'value' not in dates_folders:
raise RuntimeError(f"Failed to get dates folder from {name} in {self.sharepoint_drive.name}")
list_of_dates = []
for dates in dates_folders['value']:
if 'name' in dates:
list_of_dates.append(dates['name'])
self.surveyor_to_dates_folder.update({name: list_of_dates})
return self.surveyor_to_dates_folder
def ensure_dates_folder_loaded(func):
"""Decorator to ensure surveyor_to_dates_folder is loaded before calling the function."""
@wraps(func)
async def wrapper(self, *args, **kwargs):
if not self.surveyor_to_dates_folder:
self.logger.info("Surveyor to dates mapping not found, fetching from SharePoint...")
await self.get_date_folder_names()
return await func(self, *args, **kwargs)
return wrapper
@ensure_dates_folder_loaded
async def list_of_names_that_have_the_wrong_date_format(self):
naughty_names = []
good_names = []
for name, dates in self.surveyor_to_dates_folder.items():
self.logger.info(dates)
if DomnaSharePointValidator.valid_dates(dates):
good_names.append(name)
else:
naughty_names.append(name)
if naughty_names:
self.logger.warning(f"Dates FORMAT is wrong for the following folders {naughty_names}")
return naughty_names
@ensure_dates_folder_loaded
async def get_housing_association_names(self):
if not DomnaSharePointValidator.valid_dates([WEEK_COMMENCING]):
raise RuntimeError(f"WEEK COMMENCING is in wrong format {WEEK_COMMENCING}")
for name in self.surveyor_names:
if WEEK_COMMENCING in self.surveyor_to_dates_folder[name]:
housing_associations_folders = await self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}")
if 'value' not in housing_associations_folders:
raise RuntimeError("Failed to get housing association information")
else:
list_of_housing_association = []
for house_ass in housing_associations_folders['value']:
if 'name' in house_ass:
house_ass_name = house_ass['name']
list_of_housing_association.append(house_ass_name)
if house_ass_name not in self.house_association_names:
self.house_association_names.append(house_ass_name)
self.surveyor_to_housing_associations.update({name: list_of_housing_association})
else:
self.logger.warning(f"Failed to get housing association folder for {name}, {self.surveyor_to_dates_folder[name]}")
return self.surveyor_to_housing_associations
def ensure_housing_association_is_loaded(func):
"""Decorator to ensure housing association is loaded before calling the function."""
@wraps(func)
async def wrapper(self, *args, **kwargs):
if not self.surveyor_to_housing_associations:
self.logger.info("Housing association not found, fetching from SharePoint...")
await self.get_housing_association_names()
return await func(self, *args, **kwargs)
return wrapper
@ensure_housing_association_is_loaded
async def get_number_of_surveys_completed(self):
for name in self.surveyor_names:
if name in self.surveyor_to_housing_associations:
for house_association in self.surveyor_to_housing_associations[name]:
address_folders = await self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_association}")
if 'value' not in address_folders:
raise RuntimeError("Failed to get address folders")
else:
self.surveyor_work_completed.update({name: len(address_folders['value'])})
return self.surveyor_work_completed
@ensure_housing_association_is_loaded
async def download_file_for_each_address(self):
pass
# To call async functions from sync code:
def run_sync():
scraper = SharePointScraper(SharePointInstaller.SOUTH_COAST_INSULATION_SERVICE)
asyncio.run(scraper.get_number_of_surveys_completed())