Model/backend/pashub_fetcher/handler/handler.py

245 lines
7.1 KiB
Python

from datetime import datetime, timezone
import json
import os
import re
from typing import Any, Dict, List, Mapping, Optional
from openpyxl import load_workbook
from backend.app.config import get_settings
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
UploadedFile,
)
from backend.pashub_fetcher.core_files import infer_file_type
from backend.pashub_fetcher.job import Job
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
PashubToAraTriggerRequest,
)
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
from backend.utils.subtasks import task_handler
from utils.logger import setup_logger
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
def extract_jobs(filepath: str) -> List[Job]:
wb = load_workbook(filepath, data_only=True)
# ws = wb["watford warm homes (wave 3) mai"]
ws = wb["filtered"]
HEADER_ROW = 3
headers: Dict[str, int] = {}
for col in range(1, ws.max_column + 1):
value = str(ws.cell(row=HEADER_ROW, column=col).value)
if value:
headers[value.strip()] = col
name_col = headers["Name"]
# link_col = headers["Pashub Link"]
link_col = headers["PasHub Link"]
jobs: List[Job] = []
for row in range(HEADER_ROW + 1, ws.max_row + 1):
name = ws.cell(row=row, column=name_col).value
link = ws.cell(row=row, column=link_col).value
if not name or not link:
continue
match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", str(link))
if not match:
continue
jobs.append(
{
"id": match.group(1),
"address": str(name),
}
)
return jobs
def get_pashub_client(email: str, password: str) -> PashubClient:
token = get_token_from_local_storage(email, password)
logger.info("Token extracted successfully")
return PashubClient(token=token)
def upload_job_to_sharepoint(
sharepoint_client: DomnaSharepointClient,
# base_path: str,
sharepoint_link: str,
job_files: List[str],
) -> None:
# job_path = f"{base_path}/{job['address']}"
# Create main job folder
# sharepoint_client.makedir(job["address"], base_path)
# Create subfolders
# for folder in SharepointSubfolders:
# sharepoint_client.makedir(folder.value, job_path)
# Upload into assessment folder
assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}"
for file_path in job_files:
filename = file_path.split("/")[-1]
sharepoint_client.upload_file(
file_path,
assessment_path,
filename,
)
def upload_job_to_s3_and_update_db(
job_files: List[str], uprn: Optional[str], hubspot_deal_id: Optional[str]
) -> None:
bucket = "retrofit-energy-assessments-dev"
if not uprn and not hubspot_deal_id:
return
base_path = (
f"documents/uprn/{uprn}"
if uprn
else f"documents/hubspot_deal_id/{hubspot_deal_id}"
)
uploaded_files: List[UploadedFile] = []
for file_path in job_files:
filename = os.path.basename(file_path)
file_key = f"{base_path}/{filename}"
upload_file_to_s3(file_path, bucket, file_key)
# load row to db
# TODO: use same upload_file_to_s3_and_update_db method as ecmk fetcher does
uploaded_files.append(
UploadedFile(
s3_file_bucket=bucket,
s3_file_key=file_key,
s3_upload_timestamp=datetime.now(timezone.utc),
uprn=int(uprn) if uprn else None,
hubspot_deal_id=hubspot_deal_id,
file_source=FileSourceEnum.PAS_HUB.value,
file_type=infer_file_type(filename),
)
)
with db_session() as session:
session.add_all(uploaded_files)
session.commit()
pass
def process_job(
job: PashubToAraTriggerRequest,
pashub_client: PashubClient,
sharepoint_client: DomnaSharepointClient,
) -> List[str]:
job_id = job.pashub_job_id
uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id)
hubspot_deal_id: Optional[str] = job.hubspot_deal_id
if uprn:
logger.info(f"Got UPRN {uprn} for job {job_id}")
else:
logger.info(f"No UPRN found for job {job_id}")
job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id)
if uprn or hubspot_deal_id:
logger.info("Uploading files to s3")
upload_job_to_s3_and_update_db(job_files, uprn, hubspot_deal_id)
# # Comment out sharepoint loading for now:
# Seems like the sharepoint link in pas hub is inconsistent in terms
# of whether it points to a property or a project
# if job.sharepoint_link:
# upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files)
return job_files
def handler(event: Mapping[str, Any], context: Any) -> None:
logger.info("Received message")
logger.info(f"Number of events: {len(event.get('Records', []))}")
settings = get_settings()
pas_hub_email = settings.PASHUB_EMAIL
pas_hub_password = settings.PASHUB_PASSWORD
if (not pas_hub_email) or (not pas_hub_password):
raise ValueError("Pas Hub credentials not provided")
pashub_client = get_pashub_client(pas_hub_email, pas_hub_password)
sharepoint_client = DomnaSharepointClient(
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
)
saved_file_paths: List[str] = []
for record in event.get("Records", []):
try:
body_dict = json.loads(record["body"])
logger.debug("Validating request body")
payload = PashubToAraTriggerRequest.model_validate(body_dict)
logger.debug("Successfully validated request body")
try:
files: List[str] = process_job(
payload,
pashub_client,
sharepoint_client,
)
saved_file_paths.extend(files)
except UnauthorizedError:
logger.warning("Token expired - refreshing")
pashub_client = get_pashub_client(
pas_hub_email,
pas_hub_password,
)
# retry once
files: List[str] = process_job(
payload,
pashub_client,
sharepoint_client,
)
saved_file_paths.extend(files)
except Exception as e:
logger.info("Handler exception")
logger.error(f"Failed to process record: {e}")
logger.info("Successfully loaded jobs from spreadsheet")
logger.info(f"Saved {len(saved_file_paths)} files")
if __name__ == "__main__":
event = {"Records": [{"body": "{}"}]}
handler(event, None)