mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
182 lines
5 KiB
Python
182 lines
5 KiB
Python
import os
|
|
import re
|
|
from typing import Any, Dict, List, Mapping, Optional
|
|
from openpyxl import load_workbook
|
|
|
|
from backend.pashub_fetcher.job import Job
|
|
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
|
|
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
|
|
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import upload_file_to_s3
|
|
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
|
from utils.sharepoint.domna_sites import DomnaSites
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def extract_jobs(filepath: str) -> List[Job]:
|
|
wb = load_workbook(filepath, data_only=True)
|
|
ws = wb["watford warm homes (wave 3) mai"]
|
|
|
|
HEADER_ROW = 3
|
|
|
|
headers: Dict[str, int] = {}
|
|
for col in range(1, ws.max_column + 1):
|
|
value = str(ws.cell(row=HEADER_ROW, column=col).value)
|
|
if value:
|
|
headers[value.strip()] = col
|
|
|
|
name_col = headers["Name"]
|
|
link_col = headers["Pashub Link"]
|
|
|
|
jobs: List[Job] = []
|
|
|
|
for row in range(HEADER_ROW + 1, ws.max_row + 1):
|
|
name = ws.cell(row=row, column=name_col).value
|
|
link = ws.cell(row=row, column=link_col).value
|
|
|
|
if not name or not link:
|
|
continue
|
|
|
|
match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", str(link))
|
|
if not match:
|
|
continue
|
|
|
|
jobs.append(
|
|
{
|
|
"id": match.group(1),
|
|
"address": str(name),
|
|
}
|
|
)
|
|
|
|
return jobs
|
|
|
|
|
|
def get_pashub_client(email: str, password: str) -> PashubClient:
|
|
token = get_token_from_local_storage(email, password)
|
|
logger.info("Token extracted successfully")
|
|
return PashubClient(token=token)
|
|
|
|
|
|
def upload_job_to_sharepoint(
|
|
sharepoint_client: DomnaSharepointClient,
|
|
base_path: str,
|
|
job: Job,
|
|
job_files: List[str],
|
|
) -> None:
|
|
job_path = f"{base_path}/{job['address']}"
|
|
|
|
# Create main job folder
|
|
sharepoint_client.makedir(job["address"], base_path)
|
|
|
|
# Create subfolders
|
|
for folder in SharepointSubfolders:
|
|
sharepoint_client.makedir(folder.value, job_path)
|
|
|
|
# Upload into assessment folder
|
|
assessment_path = f"{job_path}/{SharepointSubfolders.ASSESSMENT.value}"
|
|
|
|
for file_path in job_files:
|
|
filename = file_path.split("/")[-1]
|
|
|
|
sharepoint_client.upload_file(
|
|
file_path,
|
|
assessment_path,
|
|
filename,
|
|
)
|
|
|
|
|
|
def upload_job_to_s3(job_files: List[str], uprn: str) -> None:
|
|
bucket = "retrofit-energy-assessments-dev" # TODO: create new bucket
|
|
|
|
base_path = f"documents/uprn/{uprn}"
|
|
|
|
for file_path in job_files:
|
|
filename = os.path.basename(file_path)
|
|
file_key = f"{base_path}/{filename}"
|
|
|
|
upload_file_to_s3(file_path, bucket, file_key)
|
|
|
|
pass
|
|
|
|
|
|
def process_job(
|
|
job: Job,
|
|
pashub_client: PashubClient,
|
|
sharepoint_client: DomnaSharepointClient,
|
|
base_path: str,
|
|
) -> List[str]:
|
|
job_id = job["id"]
|
|
|
|
uprn: Optional[str] = pashub_client.get_uprn_by_job_id(job_id)
|
|
if uprn:
|
|
logger.info(f"Got UPRN {uprn} for job {job_id}")
|
|
else:
|
|
logger.info(f"No UPRN found for job {job_id}")
|
|
|
|
job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id)
|
|
|
|
if uprn:
|
|
logger.info("Uploading files to s3")
|
|
upload_job_to_s3(job_files, uprn)
|
|
|
|
upload_job_to_sharepoint(sharepoint_client, base_path, job, job_files)
|
|
|
|
return job_files
|
|
|
|
|
|
def handler(event: Mapping[str, Any], context: Any) -> None:
|
|
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
|
|
filepath = os.path.join(BASE_DIR, "Watford_Warm_Homes_Wave_3_RA Downloads .xlsx")
|
|
|
|
jobs: List[Job] = extract_jobs(filepath)
|
|
logger.info("Successfully loaded jobs from spreadsheet")
|
|
|
|
pas_hub_email = "random@test.com"
|
|
pas_hub_password = "my_fake_password"
|
|
|
|
pashub_client = get_pashub_client(pas_hub_email, pas_hub_password)
|
|
|
|
sharepoint_client = DomnaSharepointClient(
|
|
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
|
|
)
|
|
|
|
BASE_PATH = "/Osmosis-ACD Projects/Watford Warm Homes/Watford Property Folders (Shared with Client)"
|
|
|
|
saved_file_paths: List[str] = []
|
|
|
|
for job in jobs:
|
|
try:
|
|
files = process_job(
|
|
job,
|
|
pashub_client,
|
|
sharepoint_client,
|
|
BASE_PATH,
|
|
)
|
|
saved_file_paths.extend(files)
|
|
|
|
except UnauthorizedError:
|
|
logger.warning("Token expired - refreshing")
|
|
|
|
pashub_client = get_pashub_client(
|
|
pas_hub_email,
|
|
pas_hub_password,
|
|
)
|
|
|
|
# retry once
|
|
files = process_job(
|
|
job,
|
|
pashub_client,
|
|
sharepoint_client,
|
|
BASE_PATH,
|
|
)
|
|
saved_file_paths.extend(files)
|
|
|
|
logger.info(f"Saved {len(saved_file_paths)} files")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
event = {"Records": [{"body": "{}"}]}
|
|
handler(event, None)
|