Model/backend/pashub_fetcher/pashub_client.py

194 lines
6.4 KiB
Python

from collections import defaultdict
import os
from typing import Dict, List, NamedTuple, Optional
from datetime import datetime
import requests
from backend.pashub_fetcher.core_files import CoreFiles, get_core_file_type
from backend.pashub_fetcher.evidence_file_data import EvidenceFileData
from backend.pashub_fetcher.evidence_metadata import EvidenceMetadata
from utils.logger import setup_logger
logger = setup_logger()
class DownloadedFile(NamedTuple):
file_path: str
evidence_category: Optional[str]
created_utc: datetime
class _EvidenceFileGroups(NamedTuple):
core: Dict[CoreFiles, EvidenceFileData]
other: List[EvidenceFileData]
class DownloadedFiles(NamedTuple):
core: List[DownloadedFile]
other: List[DownloadedFile]
class UnauthorizedError(Exception):
pass
class PashubClient:
def __init__(self, token: str):
self.token = token
self.company_id = "cb5249e2-8f31-4ef4-aefd-08ddaccb1fa2"
self.base = "https://pashub.net/api"
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/json",
}
)
logger.info("Finished initialising CotalityClient")
def get_evidence_files_by_job_id(
self, job_id: str, include_other: bool = False
) -> DownloadedFiles:
logger.info(f"Getting evidence files for job ID {job_id}")
evidence_list: List[EvidenceFileData] = self._get_evidence_list(job_id)
logger.info(f"Found {len(evidence_list)} evidence files")
if not evidence_list:
return DownloadedFiles(core=[], other=[])
grouped: _EvidenceFileGroups = self._group_into_core_and_other_files(
evidence_list
)
core_files: List[DownloadedFile] = []
for _, evidence in grouped.core.items():
if not evidence.file_id:
continue
metadata: EvidenceMetadata = self._get_evidence_metadata(
job_id, evidence.file_id
)
download_url: str = self._build_download_url(metadata, evidence.file_id)
file_path: str = os.path.join("/tmp", evidence.file_name)
self._download_file(download_url, file_path)
logger.info("Successfully downloaded file")
core_files.append(
DownloadedFile(
file_path=file_path,
evidence_category=evidence.evidence_category,
created_utc=datetime.fromisoformat(evidence.created_utc),
)
)
other_files: List[DownloadedFile] = []
if include_other:
for evidence in grouped.other:
if not evidence.file_id:
continue
metadata = self._get_evidence_metadata(job_id, evidence.file_id)
download_url = self._build_download_url(metadata, evidence.file_id)
file_path = os.path.join("/tmp", evidence.file_name)
self._download_file(download_url, file_path)
logger.info("Successfully downloaded other file")
other_files.append(
DownloadedFile(
file_path=file_path,
evidence_category=evidence.evidence_category,
created_utc=datetime.fromisoformat(evidence.created_utc),
)
)
return DownloadedFiles(core=core_files, other=other_files)
def get_uprn_by_job_id(self, job_id: str) -> Optional[str]:
logger.info(f"Getting UPRN for job ID {job_id}")
url = f"{self.base}/jobs/{job_id}"
logger.debug(
f"About to make API request with session headers: {self.session.headers}"
)
r = self.session.get(url)
if r.status_code == 401:
raise UnauthorizedError("Token expired or invalid")
r.raise_for_status()
try:
return r.json()["uprn"]
except Exception as e:
logger.warning(
f"Failed to get UPRN for Job ID {job_id} with exception: {e}"
)
return None
def _group_into_core_and_other_files(
self,
files: List[EvidenceFileData],
) -> _EvidenceFileGroups:
grouped: Dict[CoreFiles, List[EvidenceFileData]] = defaultdict(list)
other: List[EvidenceFileData] = []
for file in files:
core_type: Optional[CoreFiles] = get_core_file_type(
file.file_name, file.evidence_category
)
if not core_type:
other.append(file)
continue
grouped[core_type].append(file)
latest_core_files: Dict[CoreFiles, EvidenceFileData] = {}
for core_type, group in grouped.items():
if core_type == CoreFiles.RETROFIT_DESIGN_DOC and len(group) > 1:
osm_candidates = [f for f in group if "-OSM-" in f.file_name]
group = osm_candidates if osm_candidates else group
latest = max(group, key=lambda f: datetime.fromisoformat(f.created_utc))
latest_core_files[core_type] = latest
return _EvidenceFileGroups(core=latest_core_files, other=other)
def _get_evidence_list(self, job_id: str) -> List[EvidenceFileData]:
url = f"{self.base}/jobs/{job_id}/evidence"
r = self.session.get(url)
if r.status_code == 401:
raise UnauthorizedError("Token expired or invalid")
r.raise_for_status()
results = r.json().get("results", [])
return [EvidenceFileData.from_api(item) for item in results]
def _get_evidence_metadata(self, job_id: str, evidence_id: str) -> EvidenceMetadata:
url = f"{self.base}/jobs/{job_id}/evidenceMetadata"
params = {"evidenceIds": evidence_id}
r = self.session.get(url, params=params)
if r.status_code == 401:
raise UnauthorizedError()
r.raise_for_status()
return EvidenceMetadata.from_api(r.json())
def _build_download_url(self, metadata: EvidenceMetadata, file_id: str) -> str:
container = metadata.container_name
blob_uri = metadata.blob_uri
base, sas = blob_uri.split("?", 1)
return f"{base}{container}/{file_id}?{sas}"
def _download_file(self, url: str, file_path: str) -> None:
r = requests.get(url)
r.raise_for_status()
with open(file_path, "wb") as f:
f.write(r.content)