import os from datetime import datetime, timezone from typing import Callable, List, NamedTuple, Optional, cast from backend.app.db.connection import db_session from infrastructure.postgres.uploaded_file_table import ( FileSourceEnum, FileTypeEnum, UploadedFile, ) from backend.documents_parser.db_writer import save_epc_property_data from backend.documents_parser.parser import parse_site_notes_pdf from backend.pashub_fetcher.core_files import get_file_type_string from backend.pashub_fetcher.pashub_client import ( DownloadedFile, DownloadedFiles, PashubClient, UnauthorizedError, ) from backend.pashub_fetcher.pashub_to_ara_trigger_request import ( PashubToAraTriggerRequest, ) from datatypes.epc.domain.epc_property_data import EpcPropertyData from utils.logger import setup_logger from utils.s3 import upload_file_to_s3 from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient logger = setup_logger() class _FileUploadRecord(NamedTuple): file_path: str file_type: Optional[str] uploaded_file_id: int class PashubService: def __init__( self, pashub_client: PashubClient, sharepoint_client: Optional[DomnaSharepointClient], s3_bucket: str, coordination_client_factory: Optional[Callable[[], PashubClient]] = None, ) -> None: self._pashub_client = pashub_client self._sharepoint_client = sharepoint_client self._s3_bucket = s3_bucket self._coordination_client_factory = coordination_client_factory self._coordination_client: Optional[PashubClient] = None def _get_coordination_client(self) -> PashubClient: if self._coordination_client_factory is None: raise UnauthorizedError("No coordination client factory configured") if self._coordination_client is None: self._coordination_client = self._coordination_client_factory() return self._coordination_client def run(self, request: PashubToAraTriggerRequest) -> List[str]: job_id = request.pashub_job_id active_client = self._pashub_client if request.uprn: uprn: Optional[str] = request.uprn else: try: uprn = active_client.get_uprn_by_job_id(job_id) except UnauthorizedError: logger.info( f"PasHub credentials unauthorized for job {job_id}; retrying with CoordinationHub credentials" ) active_client = self._get_coordination_client() uprn = active_client.get_uprn_by_job_id(job_id) hubspot_deal_id: Optional[str] = request.hubspot_deal_id if uprn: logger.info(f"Got UPRN {uprn} for job {job_id}") else: logger.info(f"No UPRN found for job {job_id}") try: downloaded: DownloadedFiles = active_client.get_evidence_files_by_job_id( job_id, include_other=request.get_other_files ) except UnauthorizedError: if active_client is not self._pashub_client: raise active_client = self._get_coordination_client() downloaded: DownloadedFiles = active_client.get_evidence_files_by_job_id( job_id, include_other=request.get_other_files ) if uprn or hubspot_deal_id: logger.info("Uploading files to s3") file_source = ( FileSourceEnum.PAS_HUB if active_client is self._pashub_client else FileSourceEnum.COORDINATION_HUB ) upload_records = self._upload_to_s3_and_update_db( downloaded.core, uprn, hubspot_deal_id, file_source ) self._save_site_notes(upload_records) if downloaded.other: self._upload_to_s3_and_update_db( downloaded.other, uprn, hubspot_deal_id, file_source, default_file_type=FileTypeEnum.OTHER.value, ) if self._sharepoint_client and request.sharepoint_link and request.address: folder_name = request.address.split("|")[0].strip() folders = self._sharepoint_client.get_folders_in_path(request.sharepoint_link) match = next( (f["name"] for f in folders.get("value", []) if f["name"].lower() == folder_name.lower()), None, ) if match is None: logger.warning(f"SharePoint folder not found for '{folder_name}' in {request.sharepoint_link}") else: property_folder_path = f"{request.sharepoint_link}/{match}" self._upload_to_sharepoint(property_folder_path, downloaded.core + downloaded.other) for df in downloaded.core + downloaded.other: try: os.remove(df.file_path) except OSError: logger.warning(f"Failed to delete temp file {df.file_path}") return [df.file_path for df in downloaded.core + downloaded.other] def _upload_to_s3_and_update_db( self, job_files: List[DownloadedFile], uprn: Optional[str], hubspot_deal_id: Optional[str], file_source: FileSourceEnum, default_file_type: Optional[str] = None, ) -> List[_FileUploadRecord]: if not uprn and not hubspot_deal_id: return [] base_path = ( f"documents/uprn/{uprn}" if uprn else f"documents/hubspot_deal_id/{hubspot_deal_id}" ) file_paths: List[str] = [] uploaded_files: List[UploadedFile] = [] for df in job_files: filename = os.path.basename(df.file_path) file_key = f"{base_path}/{filename}" upload_file_to_s3(df.file_path, self._s3_bucket, file_key) uploaded_file = UploadedFile( s3_file_bucket=self._s3_bucket, s3_file_key=file_key, s3_upload_timestamp=datetime.now(timezone.utc), uprn=int(uprn) if uprn else None, hubspot_deal_id=hubspot_deal_id, file_source=file_source.value, file_type=get_file_type_string(filename, df.evidence_category) or default_file_type, ) file_paths.append(df.file_path) uploaded_files.append(uploaded_file) with db_session() as session: session.add_all(uploaded_files) session.flush() upload_records = [ _FileUploadRecord( file_path=fp, file_type=cast(Optional[str], uf.file_type), uploaded_file_id=cast(int, uf.id), ) for fp, uf in zip(file_paths, uploaded_files) ] return upload_records def _save_site_notes(self, upload_records: List[_FileUploadRecord]) -> None: for record in upload_records: if ( record.file_type is None or FileTypeEnum(record.file_type) != FileTypeEnum.RD_SAP_SITE_NOTE ): continue try: epc_data: EpcPropertyData = parse_site_notes_pdf(record.file_path) with db_session() as session: save_epc_property_data( session, epc_data, uploaded_file_id=record.uploaded_file_id ) except Exception: logger.warning( f"Failed to parse site notes {record.file_path}", exc_info=True ) def _upload_to_sharepoint( self, property_folder_path: str, files: List[DownloadedFile], ) -> None: assert self._sharepoint_client is not None for df in files: filename = os.path.basename(df.file_path) try: self._sharepoint_client.upload_file(df.file_path, property_folder_path, filename) except Exception: logger.warning(f"Failed to upload {filename} to SharePoint", exc_info=True)