mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
82 lines
2.9 KiB
Python
82 lines
2.9 KiB
Python
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
from backend.app.config import get_settings
|
|
from backend.pashub_fetcher.pashub_client import PashubClient
|
|
from backend.pashub_fetcher.pashub_service import PashubService
|
|
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
|
|
PashubToAraTriggerRequest,
|
|
)
|
|
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
|
|
from backend.app.db.models.tasks import SourceEnum
|
|
from backend.utils.subtasks import task_handler
|
|
from utils.logger import setup_logger
|
|
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
|
from utils.sharepoint.domna_sites import DomnaSites
|
|
|
|
logger = setup_logger()
|
|
|
|
S3_BUCKET = "retrofit-energy-assessments-dev"
|
|
|
|
|
|
def get_pashub_client(email: str, password: str) -> PashubClient:
|
|
token = get_token_from_local_storage(email, password)
|
|
logger.info("Token extracted successfully")
|
|
return PashubClient(token=token)
|
|
|
|
|
|
@task_handler(task_source="pashub_fetcher", source=SourceEnum.HUBSPOT_DEAL)
|
|
def handler(body: Dict[str, Any], context: Any) -> List[str]:
|
|
logger.info("Received message")
|
|
|
|
settings = get_settings()
|
|
|
|
pashub_email = settings.PASHUB_EMAIL
|
|
pashub_password = settings.PASHUB_PASSWORD
|
|
|
|
coordination_hub_email = settings.PASHUB_COORDINATION_EMAIL
|
|
coordination_hub_password = settings.PASHUB_COORDINATION_PASSWORD
|
|
coordination_client_factory: Optional[Callable[[], PashubClient]] = None
|
|
|
|
if (not pashub_email) or (not pashub_password):
|
|
raise ValueError("Pas Hub credentials not provided")
|
|
|
|
if coordination_hub_email and coordination_hub_password:
|
|
_coord_email, _coord_password = (
|
|
coordination_hub_email,
|
|
coordination_hub_password,
|
|
)
|
|
coordination_client_factory = lambda: get_pashub_client(
|
|
_coord_email, _coord_password
|
|
)
|
|
|
|
logger.debug("Validating request body")
|
|
payload = PashubToAraTriggerRequest.model_validate(body)
|
|
logger.debug("Successfully validated request body")
|
|
|
|
sharepoint_client: Optional[DomnaSharepointClient] = None
|
|
if payload.sharepoint_site is not None:
|
|
try:
|
|
resolved_site = DomnaSites[payload.sharepoint_site]
|
|
sharepoint_client = DomnaSharepointClient(sharepoint_location=resolved_site)
|
|
except KeyError:
|
|
logger.warning(
|
|
f"Unrecognised sharepoint_site '{payload.sharepoint_site}'; skipping SharePoint upload"
|
|
)
|
|
|
|
service = PashubService(
|
|
pashub_client=get_pashub_client(pashub_email, pashub_password),
|
|
sharepoint_client=sharepoint_client,
|
|
s3_bucket=S3_BUCKET,
|
|
coordination_client_factory=coordination_client_factory,
|
|
)
|
|
|
|
files: List[str] = service.run(payload)
|
|
|
|
logger.info(f"Saved {len(files)} files")
|
|
|
|
return files
|
|
|
|
|
|
if __name__ == "__main__":
|
|
event = {"Records": [{"body": "{}"}]}
|
|
handler(event, None)
|