Model/backend/pashub_fetcher/cotality_client.py

137 lines
4.3 KiB
Python

from collections import defaultdict
from typing import Dict, List, Optional
from datetime import datetime
import requests
from backend.pashub_fetcher.core_files import CoreFiles
from backend.pashub_fetcher.evidence_file_data import EvidenceFileData
from backend.pashub_fetcher.evidence_metadata import EvidenceMetadata
from utils.logger import setup_logger
logger = setup_logger()
class UnauthorizedError(Exception):
pass
class CotalityClient:
def __init__(self, token: str):
self.token = token
self.company_id = "cb5249e2-8f31-4ef4-aefd-08ddaccb1fa2"
self.base = "https://pashub.net/api"
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/json",
}
)
logger.info("Finished initialising CotalityClient")
def get_core_envidence_files_by_job_id(self, job_id: str) -> List[str]:
logger.info(f"Getting Core Evidence Files for job ID {job_id}")
evidence_list: List[EvidenceFileData] = self._get_evidence_list(job_id)
logger.info(f"Found {len(evidence_list)} Evidence files to get")
if not evidence_list:
return []
saved_files: List[str] = []
core_files: Dict[CoreFiles, EvidenceFileData] = self._select_latest_core_files(
evidence_list
)
logger.info(f"Number of core files to download is {len(core_files)}")
for _, evidence in core_files.items():
evidence_id = evidence.file_id
if not evidence_id:
continue
logger.info(f"Getting metadata for file {evidence.file_name}")
metadata: EvidenceMetadata = self._get_evidence_metadata(
job_id, evidence_id
)
download_url: str = self._build_download_url(metadata, evidence.file_id)
file_name = evidence.file_name
self._download_file(download_url, file_name)
logger.info("Successfully downloaded file")
saved_files.append(file_name)
return saved_files
def _get_core_file_type(self, file: EvidenceFileData) -> Optional[CoreFiles]:
for core_file in CoreFiles:
if file.file_name.startswith(core_file.value):
return core_file
return None
def _select_latest_core_files(
self,
files: List[EvidenceFileData],
) -> Dict[CoreFiles, EvidenceFileData]:
grouped: Dict[CoreFiles, List[EvidenceFileData]] = defaultdict(list)
for file in files:
core_type = self._get_core_file_type(file)
if not core_type:
continue
grouped[core_type].append(file)
latest_files: Dict[CoreFiles, EvidenceFileData] = {}
for core_type, group in grouped.items():
latest = max(group, key=lambda f: datetime.fromisoformat(f.created_utc))
latest_files[core_type] = latest
return latest_files
def _get_evidence_list(self, job_id: str) -> List[EvidenceFileData]:
url = f"{self.base}/jobs/{job_id}/evidence"
r = self.session.get(url)
if r.status_code == 401:
raise UnauthorizedError("Token expired or invalid")
r.raise_for_status()
results = r.json().get("results", [])
return [EvidenceFileData.from_api(item) for item in results]
def _get_evidence_metadata(self, job_id: str, evidence_id: str) -> EvidenceMetadata:
url = f"{self.base}/jobs/{job_id}/evidenceMetadata"
params = {"evidenceIds": evidence_id}
r = self.session.get(url, params=params)
if r.status_code == 401:
raise UnauthorizedError()
r.raise_for_status()
return EvidenceMetadata.from_api(r.json())
def _build_download_url(self, metadata: EvidenceMetadata, file_id: str) -> str:
container = metadata.container_name
blob_uri = metadata.blob_uri
base, sas = blob_uri.split("?", 1)
return f"{base}{container}/{file_id}?{sas}"
def _download_file(self, url: str, file_name: str) -> None:
r = requests.get(url)
if r.status_code == 401:
raise UnauthorizedError()
r.raise_for_status()
with open(file_name, "wb") as f:
f.write(r.content)