mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
216 lines
8 KiB
Python
216 lines
8 KiB
Python
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Callable, List, NamedTuple, Optional, cast
|
|
|
|
from backend.app.db.connection import db_session
|
|
from backend.app.db.models.uploaded_file import (
|
|
FileSourceEnum,
|
|
FileTypeEnum,
|
|
UploadedFile,
|
|
)
|
|
from backend.documents_parser.db_writer import save_epc_property_data
|
|
from backend.documents_parser.parser import parse_site_notes_pdf
|
|
from backend.pashub_fetcher.core_files import get_file_type_string
|
|
from backend.pashub_fetcher.pashub_client import (
|
|
DownloadedFile,
|
|
DownloadedFiles,
|
|
PashubClient,
|
|
UnauthorizedError,
|
|
)
|
|
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
|
|
PashubToAraTriggerRequest,
|
|
)
|
|
from datatypes.epc.domain.epc_property_data import EpcPropertyData
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import upload_file_to_s3
|
|
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class _FileUploadRecord(NamedTuple):
|
|
file_path: str
|
|
file_type: Optional[str]
|
|
uploaded_file_id: int
|
|
|
|
|
|
class PashubService:
|
|
def __init__(
|
|
self,
|
|
pashub_client: PashubClient,
|
|
sharepoint_client: Optional[DomnaSharepointClient],
|
|
s3_bucket: str,
|
|
coordination_client_factory: Optional[Callable[[], PashubClient]] = None,
|
|
) -> None:
|
|
self._pashub_client = pashub_client
|
|
self._sharepoint_client = sharepoint_client
|
|
self._s3_bucket = s3_bucket
|
|
self._coordination_client_factory = coordination_client_factory
|
|
self._coordination_client: Optional[PashubClient] = None
|
|
|
|
def _get_coordination_client(self) -> PashubClient:
|
|
if self._coordination_client_factory is None:
|
|
raise UnauthorizedError("No coordination client factory configured")
|
|
if self._coordination_client is None:
|
|
self._coordination_client = self._coordination_client_factory()
|
|
return self._coordination_client
|
|
|
|
def run(self, request: PashubToAraTriggerRequest) -> List[str]:
|
|
job_id = request.pashub_job_id
|
|
active_client = self._pashub_client
|
|
|
|
if request.uprn:
|
|
uprn: Optional[str] = request.uprn
|
|
else:
|
|
try:
|
|
uprn = active_client.get_uprn_by_job_id(job_id)
|
|
except UnauthorizedError:
|
|
logger.info(
|
|
f"PasHub credentials unauthorized for job {job_id}; retrying with CoordinationHub credentials"
|
|
)
|
|
active_client = self._get_coordination_client()
|
|
uprn = active_client.get_uprn_by_job_id(job_id)
|
|
|
|
hubspot_deal_id: Optional[str] = request.hubspot_deal_id
|
|
|
|
if uprn:
|
|
logger.info(f"Got UPRN {uprn} for job {job_id}")
|
|
else:
|
|
logger.info(f"No UPRN found for job {job_id}")
|
|
|
|
try:
|
|
downloaded: DownloadedFiles = active_client.get_evidence_files_by_job_id(
|
|
job_id, include_other=request.get_other_files
|
|
)
|
|
except UnauthorizedError:
|
|
if active_client is not self._pashub_client:
|
|
raise
|
|
active_client = self._get_coordination_client()
|
|
downloaded: DownloadedFiles = active_client.get_evidence_files_by_job_id(
|
|
job_id, include_other=request.get_other_files
|
|
)
|
|
|
|
if uprn or hubspot_deal_id:
|
|
logger.info("Uploading files to s3")
|
|
file_source = (
|
|
FileSourceEnum.PAS_HUB
|
|
if active_client is self._pashub_client
|
|
else FileSourceEnum.COORDINATION_HUB
|
|
)
|
|
upload_records = self._upload_to_s3_and_update_db(
|
|
downloaded.core, uprn, hubspot_deal_id, file_source
|
|
)
|
|
self._save_site_notes(upload_records)
|
|
|
|
if downloaded.other:
|
|
self._upload_to_s3_and_update_db(
|
|
downloaded.other,
|
|
uprn,
|
|
hubspot_deal_id,
|
|
file_source,
|
|
default_file_type=FileTypeEnum.OTHER.value,
|
|
)
|
|
|
|
if self._sharepoint_client and request.sharepoint_link and request.address:
|
|
folder_name = request.address.split("|")[0].strip()
|
|
folders = self._sharepoint_client.get_folders_in_path(request.sharepoint_link)
|
|
match = next(
|
|
(f["name"] for f in folders.get("value", []) if f["name"].lower() == folder_name.lower()),
|
|
None,
|
|
)
|
|
if match is None:
|
|
logger.warning(f"SharePoint folder not found for '{folder_name}' in {request.sharepoint_link}")
|
|
else:
|
|
property_folder_path = f"{request.sharepoint_link}/{match}"
|
|
self._upload_to_sharepoint(property_folder_path, downloaded.core + downloaded.other)
|
|
|
|
for df in downloaded.core + downloaded.other:
|
|
try:
|
|
os.remove(df.file_path)
|
|
except OSError:
|
|
logger.warning(f"Failed to delete temp file {df.file_path}")
|
|
|
|
return [df.file_path for df in downloaded.core + downloaded.other]
|
|
|
|
def _upload_to_s3_and_update_db(
|
|
self,
|
|
job_files: List[DownloadedFile],
|
|
uprn: Optional[str],
|
|
hubspot_deal_id: Optional[str],
|
|
file_source: FileSourceEnum,
|
|
default_file_type: Optional[str] = None,
|
|
) -> List[_FileUploadRecord]:
|
|
if not uprn and not hubspot_deal_id:
|
|
return []
|
|
|
|
base_path = (
|
|
f"documents/uprn/{uprn}"
|
|
if uprn
|
|
else f"documents/hubspot_deal_id/{hubspot_deal_id}"
|
|
)
|
|
|
|
file_paths: List[str] = []
|
|
uploaded_files: List[UploadedFile] = []
|
|
|
|
for df in job_files:
|
|
filename = os.path.basename(df.file_path)
|
|
file_key = f"{base_path}/{filename}"
|
|
|
|
upload_file_to_s3(df.file_path, self._s3_bucket, file_key)
|
|
|
|
uploaded_file = UploadedFile(
|
|
s3_file_bucket=self._s3_bucket,
|
|
s3_file_key=file_key,
|
|
s3_upload_timestamp=datetime.now(timezone.utc),
|
|
uprn=int(uprn) if uprn else None,
|
|
hubspot_deal_id=hubspot_deal_id,
|
|
file_source=file_source.value,
|
|
file_type=get_file_type_string(filename, df.evidence_category) or default_file_type,
|
|
)
|
|
file_paths.append(df.file_path)
|
|
uploaded_files.append(uploaded_file)
|
|
|
|
with db_session() as session:
|
|
session.add_all(uploaded_files)
|
|
session.flush()
|
|
upload_records = [
|
|
_FileUploadRecord(
|
|
file_path=fp,
|
|
file_type=cast(Optional[str], uf.file_type),
|
|
uploaded_file_id=cast(int, uf.id),
|
|
)
|
|
for fp, uf in zip(file_paths, uploaded_files)
|
|
]
|
|
|
|
return upload_records
|
|
|
|
def _save_site_notes(self, upload_records: List[_FileUploadRecord]) -> None:
|
|
for record in upload_records:
|
|
if (
|
|
record.file_type is None
|
|
or FileTypeEnum(record.file_type) != FileTypeEnum.RD_SAP_SITE_NOTE
|
|
):
|
|
continue
|
|
try:
|
|
epc_data: EpcPropertyData = parse_site_notes_pdf(record.file_path)
|
|
with db_session() as session:
|
|
save_epc_property_data(
|
|
session, epc_data, uploaded_file_id=record.uploaded_file_id
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
f"Failed to parse site notes {record.file_path}", exc_info=True
|
|
)
|
|
|
|
def _upload_to_sharepoint(
|
|
self,
|
|
property_folder_path: str,
|
|
files: List[DownloadedFile],
|
|
) -> None:
|
|
assert self._sharepoint_client is not None
|
|
for df in files:
|
|
filename = os.path.basename(df.file_path)
|
|
try:
|
|
self._sharepoint_client.upload_file(df.file_path, property_folder_path, filename)
|
|
except Exception:
|
|
logger.warning(f"Failed to upload {filename} to SharePoint", exc_info=True)
|