process messages rather than files

This commit is contained in:
Daniel Roth 2026-03-31 09:01:48 +00:00
parent 3085edb0e9
commit 464c0607ec

View file

@ -1,4 +1,5 @@
from datetime import datetime, timezone
import json
import os
import re
from typing import Any, Dict, List, Mapping, Optional
@ -11,8 +12,12 @@ from backend.app.db.models.uploaded_file import (
UploadedFile,
)
from backend.pashub_fetcher.core_files import infer_file_type
from backend.pashub_fetcher.job import Job
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
PashubToAraTriggerRequest,
)
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
from utils.logger import setup_logger
@ -72,21 +77,21 @@ def get_pashub_client(email: str, password: str) -> PashubClient:
def upload_job_to_sharepoint(
sharepoint_client: DomnaSharepointClient,
base_path: str,
job: Job,
# base_path: str,
sharepoint_link: str,
job_files: List[str],
) -> None:
job_path = f"{base_path}/{job['address']}"
# job_path = f"{base_path}/{job['address']}"
# Create main job folder
sharepoint_client.makedir(job["address"], base_path)
# sharepoint_client.makedir(job["address"], base_path)
# Create subfolders
for folder in SharepointSubfolders:
sharepoint_client.makedir(folder.value, job_path)
# for folder in SharepointSubfolders:
# sharepoint_client.makedir(folder.value, job_path)
# Upload into assessment folder
assessment_path = f"{job_path}/{SharepointSubfolders.ASSESSMENT.value}"
assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}"
for file_path in job_files:
filename = file_path.split("/")[-1]
@ -131,14 +136,14 @@ def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None:
def process_job(
job: Job,
job: PashubToAraTriggerRequest,
pashub_client: PashubClient,
sharepoint_client: DomnaSharepointClient,
base_path: str,
) -> List[str]:
job_id = job["id"]
job_id = job.pashub_job_id
uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id)
uprn: Optional[str] = pashub_client.get_uprn_by_job_id(job_id)
if uprn:
logger.info(f"Got UPRN {uprn} for job {job_id}")
else:
@ -150,24 +155,22 @@ def process_job(
logger.info("Uploading files to s3")
upload_job_to_s3_and_update_db(job_files, uprn)
upload_job_to_sharepoint(sharepoint_client, base_path, job, job_files)
# # Comment out sharepoint loading for now:
# Seems like the sharepoint link in pas hub is inconsistent in terms
# of whether it points to a property or a project
# if job.sharepoint_link:
# upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files)
return job_files
def handler(event: Mapping[str, Any], context: Any) -> None:
logger.info("Received message")
logger.info(f"Number of events: {len(event.get('Records', []))}")
settings = get_settings()
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# filepath = os.path.join(BASE_DIR, "Watford_Warm_Homes_Wave_3_RA Downloads .xlsx")
filepath = os.path.join(
BASE_DIR,
"The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx",
)
jobs: List[Job] = extract_jobs(filepath)
logger.info("Successfully loaded jobs from spreadsheet")
pas_hub_email = settings.PASHUB_EMAIL
pas_hub_password = settings.PASHUB_PASSWORD
@ -180,36 +183,46 @@ def handler(event: Mapping[str, Any], context: Any) -> None:
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
)
BASE_PATH = "/Osmosis-ACD Projects/Watford Warm Homes/Watford Property Folders (Shared with Client)"
saved_file_paths: List[str] = []
for job in jobs:
for record in event.get("Records", []):
try:
files = process_job(
job,
pashub_client,
sharepoint_client,
BASE_PATH,
)
saved_file_paths.extend(files)
body_dict = json.loads(record["body"])
logger.debug("Validating request body")
except UnauthorizedError:
logger.warning("Token expired - refreshing")
payload = PashubToAraTriggerRequest.model_validate(body_dict)
pashub_client = get_pashub_client(
pas_hub_email,
pas_hub_password,
)
logger.debug("Successfully validated request body")
# retry once
files = process_job(
job,
pashub_client,
sharepoint_client,
BASE_PATH,
)
saved_file_paths.extend(files)
try:
files: List[str] = process_job(
payload,
pashub_client,
sharepoint_client,
)
saved_file_paths.extend(files)
except UnauthorizedError:
logger.warning("Token expired - refreshing")
pashub_client = get_pashub_client(
pas_hub_email,
pas_hub_password,
)
# retry once
files: List[str] = process_job(
payload,
pashub_client,
sharepoint_client,
)
saved_file_paths.extend(files)
except Exception as e:
logger.info("Handler exception")
logger.error(f"Failed to process record: {e}")
logger.info("Successfully loaded jobs from spreadsheet")
logger.info(f"Saved {len(saved_file_paths)} files")