mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
213 lines
6.5 KiB
Python
213 lines
6.5 KiB
Python
from datetime import datetime, timezone
|
|
import os
|
|
from typing import Any, Dict, List, Optional, Tuple, cast
|
|
|
|
from backend.app.config import get_settings
|
|
from backend.app.db.connection import db_session
|
|
from backend.app.db.models.uploaded_file import (
|
|
FileSourceEnum,
|
|
FileTypeEnum,
|
|
UploadedFile,
|
|
)
|
|
from backend.documents_parser.db_writer import save_epc_property_data
|
|
from backend.documents_parser.parser import parse_pashub_site_notes
|
|
from backend.pashub_fetcher.core_files import infer_file_type
|
|
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
|
|
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
|
|
PashubToAraTriggerRequest,
|
|
)
|
|
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
|
|
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
|
|
from backend.utils.subtasks import task_handler
|
|
from datatypes.epc.domain.epc_property_data import EpcPropertyData
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import upload_file_to_s3
|
|
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
|
from utils.sharepoint.domna_sites import DomnaSites
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
|
|
def get_pashub_client(email: str, password: str) -> PashubClient:
|
|
token = get_token_from_local_storage(email, password)
|
|
logger.info("Token extracted successfully")
|
|
return PashubClient(token=token)
|
|
|
|
|
|
def upload_job_to_sharepoint(
|
|
sharepoint_client: DomnaSharepointClient,
|
|
# base_path: str,
|
|
sharepoint_link: str,
|
|
job_files: List[str],
|
|
) -> None:
|
|
# job_path = f"{base_path}/{job['address']}"
|
|
|
|
# Create main job folder
|
|
# sharepoint_client.makedir(job["address"], base_path)
|
|
|
|
# Create subfolders
|
|
# for folder in SharepointSubfolders:
|
|
# sharepoint_client.makedir(folder.value, job_path)
|
|
|
|
# Upload into assessment folder
|
|
assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}"
|
|
|
|
for file_path in job_files:
|
|
filename = file_path.split("/")[-1]
|
|
|
|
sharepoint_client.upload_file(
|
|
file_path,
|
|
assessment_path,
|
|
filename,
|
|
)
|
|
|
|
|
|
def upload_job_to_s3_and_update_db(
|
|
job_files: List[str], uprn: Optional[str], hubspot_deal_id: Optional[str]
|
|
) -> None:
|
|
bucket = "retrofit-energy-assessments-dev"
|
|
|
|
if not uprn and not hubspot_deal_id:
|
|
return
|
|
|
|
base_path = (
|
|
f"documents/uprn/{uprn}"
|
|
if uprn
|
|
else f"documents/hubspot_deal_id/{hubspot_deal_id}"
|
|
)
|
|
|
|
uploaded_files: List[UploadedFile] = []
|
|
site_notes_pairs: List[Tuple[UploadedFile, EpcPropertyData]] = []
|
|
|
|
for file_path in job_files:
|
|
filename = os.path.basename(file_path)
|
|
file_key = f"{base_path}/{filename}"
|
|
|
|
upload_file_to_s3(file_path, bucket, file_key)
|
|
|
|
# TODO: use same upload_file_to_s3_and_update_db method as ecmk fetcher does
|
|
uploaded_file = UploadedFile(
|
|
s3_file_bucket=bucket,
|
|
s3_file_key=file_key,
|
|
s3_upload_timestamp=datetime.now(timezone.utc),
|
|
uprn=int(uprn) if uprn else None,
|
|
hubspot_deal_id=hubspot_deal_id,
|
|
file_source=FileSourceEnum.PAS_HUB.value,
|
|
file_type=infer_file_type(filename),
|
|
)
|
|
uploaded_files.append(uploaded_file)
|
|
|
|
file_type: Optional[str] = cast(Optional[str], uploaded_file.file_type)
|
|
if (
|
|
file_type is not None
|
|
and FileTypeEnum(file_type) == FileTypeEnum.RD_SAP_SITE_NOTE
|
|
):
|
|
try:
|
|
site_notes_pairs.append(
|
|
(uploaded_file, parse_pashub_site_notes(file_path))
|
|
)
|
|
except Exception:
|
|
logger.warning(f"Failed to parse site notes {file_path}", exc_info=True)
|
|
|
|
with db_session() as session:
|
|
session.add_all(uploaded_files)
|
|
session.flush()
|
|
|
|
for uploaded_file, epc_data in site_notes_pairs:
|
|
save_epc_property_data(
|
|
session, epc_data, uploaded_file_id=cast(int, uploaded_file.id)
|
|
)
|
|
|
|
session.commit()
|
|
|
|
|
|
def process_job(
|
|
job: PashubToAraTriggerRequest,
|
|
pashub_client: PashubClient,
|
|
sharepoint_client: DomnaSharepointClient,
|
|
) -> List[str]:
|
|
job_id = job.pashub_job_id
|
|
|
|
uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id)
|
|
hubspot_deal_id: Optional[str] = job.hubspot_deal_id
|
|
|
|
if uprn:
|
|
logger.info(f"Got UPRN {uprn} for job {job_id}")
|
|
else:
|
|
logger.info(f"No UPRN found for job {job_id}")
|
|
|
|
job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id)
|
|
|
|
if uprn or hubspot_deal_id:
|
|
logger.info("Uploading files to s3")
|
|
upload_job_to_s3_and_update_db(job_files, uprn, hubspot_deal_id)
|
|
|
|
# # Comment out sharepoint loading for now:
|
|
# Seems like the sharepoint link in pas hub is inconsistent in terms
|
|
# of whether it points to a property or a project
|
|
|
|
# if job.sharepoint_link:
|
|
# upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files)
|
|
|
|
for file_path in job_files:
|
|
try:
|
|
os.remove(file_path)
|
|
except OSError:
|
|
logger.warning(f"Failed to delete temp file {file_path}")
|
|
|
|
return job_files
|
|
|
|
|
|
@task_handler()
|
|
def handler(body: Dict[str, Any], context: Any) -> List[str]:
|
|
logger.info("Received message")
|
|
|
|
settings = get_settings()
|
|
|
|
pas_hub_email = settings.PASHUB_EMAIL
|
|
pas_hub_password = settings.PASHUB_PASSWORD
|
|
|
|
if (not pas_hub_email) or (not pas_hub_password):
|
|
raise ValueError("Pas Hub credentials not provided")
|
|
|
|
pashub_client = get_pashub_client(pas_hub_email, pas_hub_password)
|
|
|
|
sharepoint_client = DomnaSharepointClient(
|
|
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
|
|
)
|
|
|
|
logger.debug("Validating request body")
|
|
payload = PashubToAraTriggerRequest.model_validate(body)
|
|
logger.debug("Successfully validated request body")
|
|
|
|
try:
|
|
files: List[str] = process_job(
|
|
payload,
|
|
pashub_client,
|
|
sharepoint_client,
|
|
)
|
|
except UnauthorizedError:
|
|
logger.warning("Token expired - refreshing")
|
|
|
|
pashub_client = get_pashub_client(
|
|
pas_hub_email,
|
|
pas_hub_password,
|
|
)
|
|
|
|
# retry once
|
|
files = process_job(
|
|
payload,
|
|
pashub_client,
|
|
sharepoint_client,
|
|
)
|
|
|
|
logger.info(f"Saved {len(files)} files")
|
|
|
|
return files
|
|
|
|
|
|
if __name__ == "__main__":
|
|
event = {"Records": [{"body": "{}"}]}
|
|
handler(event, None)
|