from collections import defaultdict import os from typing import Dict, List, NamedTuple, Optional from datetime import datetime import requests from backend.pashub_fetcher.core_files import CoreFiles, get_core_file_type from backend.pashub_fetcher.evidence_file_data import EvidenceFileData from backend.pashub_fetcher.evidence_metadata import EvidenceMetadata from utils.logger import setup_logger logger = setup_logger() class DownloadedFile(NamedTuple): file_path: str evidence_category: Optional[str] created_utc: datetime class _EvidenceFileGroups(NamedTuple): core: Dict[CoreFiles, EvidenceFileData] other: List[EvidenceFileData] class DownloadedFiles(NamedTuple): core: List[DownloadedFile] other: List[DownloadedFile] class UnauthorizedError(Exception): pass class PashubClient: def __init__(self, token: str): self.token = token self.company_id = "cb5249e2-8f31-4ef4-aefd-08ddaccb1fa2" self.base = "https://pashub.net/api" self.session = requests.Session() self.session.headers.update( { "Authorization": f"Bearer {self.token}", "Accept": "application/json", } ) logger.info("Finished initialising CotalityClient") def get_evidence_files_by_job_id( self, job_id: str, include_other: bool = False ) -> DownloadedFiles: logger.info(f"Getting evidence files for job ID {job_id}") evidence_list: List[EvidenceFileData] = self._get_evidence_list(job_id) logger.info(f"Found {len(evidence_list)} evidence files") if not evidence_list: return DownloadedFiles(core=[], other=[]) grouped: _EvidenceFileGroups = self._group_into_core_and_other_files( evidence_list ) core_files: List[DownloadedFile] = [] for _, evidence in grouped.core.items(): if not evidence.file_id: continue metadata: EvidenceMetadata = self._get_evidence_metadata( job_id, evidence.file_id ) download_url: str = self._build_download_url(metadata, evidence.file_id) file_path: str = os.path.join("/tmp", evidence.file_name) self._download_file(download_url, file_path) logger.info("Successfully downloaded file") core_files.append( DownloadedFile( file_path=file_path, evidence_category=evidence.evidence_category, created_utc=datetime.fromisoformat(evidence.created_utc), ) ) other_files: List[DownloadedFile] = [] if include_other: for evidence in grouped.other: if not evidence.file_id: continue metadata = self._get_evidence_metadata(job_id, evidence.file_id) download_url = self._build_download_url(metadata, evidence.file_id) file_path = os.path.join("/tmp", evidence.file_name) self._download_file(download_url, file_path) logger.info("Successfully downloaded other file") other_files.append( DownloadedFile( file_path=file_path, evidence_category=evidence.evidence_category, created_utc=datetime.fromisoformat(evidence.created_utc), ) ) return DownloadedFiles(core=core_files, other=other_files) def get_uprn_by_job_id(self, job_id: str) -> Optional[str]: logger.info(f"Getting UPRN for job ID {job_id}") url = f"{self.base}/jobs/{job_id}" logger.debug( f"About to make API request with session headers: {self.session.headers}" ) r = self.session.get(url) if r.status_code == 401: raise UnauthorizedError("Token expired or invalid") r.raise_for_status() try: return r.json()["uprn"] except Exception as e: logger.warning( f"Failed to get UPRN for Job ID {job_id} with exception: {e}" ) return None def _group_into_core_and_other_files( self, files: List[EvidenceFileData], ) -> _EvidenceFileGroups: grouped: Dict[CoreFiles, List[EvidenceFileData]] = defaultdict(list) other: List[EvidenceFileData] = [] for file in files: core_type: Optional[CoreFiles] = get_core_file_type( file.file_name, file.evidence_category ) if not core_type: other.append(file) continue grouped[core_type].append(file) latest_core_files: Dict[CoreFiles, EvidenceFileData] = {} for core_type, group in grouped.items(): if core_type == CoreFiles.RETROFIT_DESIGN_DOC and len(group) > 1: osm_candidates = [f for f in group if "-OSM-" in f.file_name] group = osm_candidates if osm_candidates else group latest = max(group, key=lambda f: datetime.fromisoformat(f.created_utc)) latest_core_files[core_type] = latest return _EvidenceFileGroups(core=latest_core_files, other=other) def _get_evidence_list(self, job_id: str) -> List[EvidenceFileData]: url = f"{self.base}/jobs/{job_id}/evidence" r = self.session.get(url) if r.status_code == 401: raise UnauthorizedError("Token expired or invalid") r.raise_for_status() results = r.json().get("results", []) return [EvidenceFileData.from_api(item) for item in results] def _get_evidence_metadata(self, job_id: str, evidence_id: str) -> EvidenceMetadata: url = f"{self.base}/jobs/{job_id}/evidenceMetadata" params = {"evidenceIds": evidence_id} r = self.session.get(url, params=params) if r.status_code == 401: raise UnauthorizedError() r.raise_for_status() return EvidenceMetadata.from_api(r.json()) def _build_download_url(self, metadata: EvidenceMetadata, file_id: str) -> str: container = metadata.container_name blob_uri = metadata.blob_uri base, sas = blob_uri.split("?", 1) return f"{base}{container}/{file_id}?{sas}" def _download_file(self, url: str, file_path: str) -> None: r = requests.get(url) r.raise_for_status() with open(file_path, "wb") as f: f.write(r.content)