diff --git a/backend/pashub_fetcher/handler/handler.py b/backend/pashub_fetcher/handler/handler.py index 7874d686..0d12b6bf 100644 --- a/backend/pashub_fetcher/handler/handler.py +++ b/backend/pashub_fetcher/handler/handler.py @@ -1,33 +1,18 @@ -from datetime import datetime, timezone -import os -from typing import Any, Dict, List, Optional, Tuple, cast +from typing import Any, Dict, List from backend.app.config import get_settings -from backend.app.db.connection import db_session -from backend.app.db.models.uploaded_file import ( - FileSourceEnum, - FileTypeEnum, - UploadedFile, -) -from backend.documents_parser.db_writer import save_epc_property_data -from backend.documents_parser.parser import parse_site_notes_pdf -from backend.pashub_fetcher.core_files import infer_file_type from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError -from backend.pashub_fetcher.pashub_to_ara_trigger_request import ( - PashubToAraTriggerRequest, -) -from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders +from backend.pashub_fetcher.pashub_service import PashubService +from backend.pashub_fetcher.pashub_to_ara_trigger_request import PashubToAraTriggerRequest from backend.pashub_fetcher.token_getter import get_token_from_local_storage from backend.utils.subtasks import task_handler -from datatypes.epc.domain.epc_property_data import EpcPropertyData from utils.logger import setup_logger -from utils.s3 import upload_file_to_s3 from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient from utils.sharepoint.domna_sites import DomnaSites - logger = setup_logger() +S3_BUCKET = "retrofit-energy-assessments-dev" def get_pashub_client(email: str, password: str) -> PashubClient: @@ -36,130 +21,6 @@ def get_pashub_client(email: str, password: str) -> PashubClient: return PashubClient(token=token) -def upload_job_to_sharepoint( - sharepoint_client: DomnaSharepointClient, - # base_path: str, - sharepoint_link: str, - job_files: List[str], -) -> None: - # job_path = f"{base_path}/{job['address']}" - - # Create main job folder - # sharepoint_client.makedir(job["address"], base_path) - - # Create subfolders - # for folder in SharepointSubfolders: - # sharepoint_client.makedir(folder.value, job_path) - - # Upload into assessment folder - assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}" - - for file_path in job_files: - filename = file_path.split("/")[-1] - - sharepoint_client.upload_file( - file_path, - assessment_path, - filename, - ) - - -def upload_job_to_s3_and_update_db( - job_files: List[str], uprn: Optional[str], hubspot_deal_id: Optional[str] -) -> None: - bucket = "retrofit-energy-assessments-dev" - - if not uprn and not hubspot_deal_id: - return - - base_path = ( - f"documents/uprn/{uprn}" - if uprn - else f"documents/hubspot_deal_id/{hubspot_deal_id}" - ) - - uploaded_files: List[UploadedFile] = [] - site_notes_pairs: List[Tuple[UploadedFile, EpcPropertyData]] = [] - - for file_path in job_files: - filename = os.path.basename(file_path) - file_key = f"{base_path}/{filename}" - - upload_file_to_s3(file_path, bucket, file_key) - - # TODO: use same upload_file_to_s3_and_update_db method as ecmk fetcher does - uploaded_file = UploadedFile( - s3_file_bucket=bucket, - s3_file_key=file_key, - s3_upload_timestamp=datetime.now(timezone.utc), - uprn=int(uprn) if uprn else None, - hubspot_deal_id=hubspot_deal_id, - file_source=FileSourceEnum.PAS_HUB.value, - file_type=infer_file_type(filename), - ) - uploaded_files.append(uploaded_file) - - file_type: Optional[str] = cast(Optional[str], uploaded_file.file_type) - if ( - file_type is not None - and FileTypeEnum(file_type) == FileTypeEnum.RD_SAP_SITE_NOTE - ): - try: - site_notes_pairs.append( - (uploaded_file, parse_site_notes_pdf(file_path)) - ) - except Exception: - logger.warning(f"Failed to parse site notes {file_path}", exc_info=True) - - with db_session() as session: - session.add_all(uploaded_files) - session.flush() - - for uploaded_file, epc_data in site_notes_pairs: - save_epc_property_data( - session, epc_data, uploaded_file_id=cast(int, uploaded_file.id) - ) - - session.commit() - - -def process_job( - job: PashubToAraTriggerRequest, - pashub_client: PashubClient, - sharepoint_client: DomnaSharepointClient, -) -> List[str]: - job_id = job.pashub_job_id - - uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id) - hubspot_deal_id: Optional[str] = job.hubspot_deal_id - - if uprn: - logger.info(f"Got UPRN {uprn} for job {job_id}") - else: - logger.info(f"No UPRN found for job {job_id}") - - job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id) - - if uprn or hubspot_deal_id: - logger.info("Uploading files to s3") - upload_job_to_s3_and_update_db(job_files, uprn, hubspot_deal_id) - - # # Comment out sharepoint loading for now: - # Seems like the sharepoint link in pas hub is inconsistent in terms - # of whether it points to a property or a project - - # if job.sharepoint_link: - # upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files) - - for file_path in job_files: - try: - os.remove(file_path) - except OSError: - logger.warning(f"Failed to delete temp file {file_path}") - - return job_files - - @task_handler() def handler(body: Dict[str, Any], context: Any) -> List[str]: logger.info("Received message") @@ -172,8 +33,6 @@ def handler(body: Dict[str, Any], context: Any) -> List[str]: if (not pas_hub_email) or (not pas_hub_password): raise ValueError("Pas Hub credentials not provided") - pashub_client = get_pashub_client(pas_hub_email, pas_hub_password) - sharepoint_client = DomnaSharepointClient( sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3 ) @@ -182,26 +41,24 @@ def handler(body: Dict[str, Any], context: Any) -> List[str]: payload = PashubToAraTriggerRequest.model_validate(body) logger.debug("Successfully validated request body") + service = PashubService( + pashub_client=get_pashub_client(pas_hub_email, pas_hub_password), + sharepoint_client=sharepoint_client, + s3_bucket=S3_BUCKET, + ) + try: - files: List[str] = process_job( - payload, - pashub_client, - sharepoint_client, - ) + files: List[str] = service.run(payload) except UnauthorizedError: logger.warning("Token expired - refreshing") - pashub_client = get_pashub_client( - pas_hub_email, - pas_hub_password, + service = PashubService( + pashub_client=get_pashub_client(pas_hub_email, pas_hub_password), + sharepoint_client=sharepoint_client, + s3_bucket=S3_BUCKET, ) - # retry once - files = process_job( - payload, - pashub_client, - sharepoint_client, - ) + files = service.run(payload) logger.info(f"Saved {len(files)} files") diff --git a/backend/pashub_fetcher/pashub_service.py b/backend/pashub_fetcher/pashub_service.py new file mode 100644 index 00000000..19c5c2f9 --- /dev/null +++ b/backend/pashub_fetcher/pashub_service.py @@ -0,0 +1,126 @@ +import os +from datetime import datetime, timezone +from typing import List, Optional, Tuple, cast + +from backend.app.db.connection import db_session +from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum, UploadedFile +from backend.documents_parser.db_writer import save_epc_property_data +from backend.documents_parser.parser import parse_site_notes_pdf +from backend.pashub_fetcher.core_files import infer_file_type +from backend.pashub_fetcher.pashub_client import PashubClient +from backend.pashub_fetcher.pashub_to_ara_trigger_request import PashubToAraTriggerRequest +from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from utils.logger import setup_logger +from utils.s3 import upload_file_to_s3 +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient + +logger = setup_logger() + + +class PashubService: + def __init__( + self, + pashub_client: PashubClient, + sharepoint_client: DomnaSharepointClient, + s3_bucket: str, + ) -> None: + self._pashub_client = pashub_client + self._sharepoint_client = sharepoint_client + self._s3_bucket = s3_bucket + + def run(self, request: PashubToAraTriggerRequest) -> List[str]: + job_id = request.pashub_job_id + + uprn: Optional[str] = request.uprn or self._pashub_client.get_uprn_by_job_id(job_id) + hubspot_deal_id: Optional[str] = request.hubspot_deal_id + + if uprn: + logger.info(f"Got UPRN {uprn} for job {job_id}") + else: + logger.info(f"No UPRN found for job {job_id}") + + job_files: List[str] = self._pashub_client.get_core_evidence_files_by_job_id(job_id) + + if uprn or hubspot_deal_id: + logger.info("Uploading files to s3") + self._upload_to_s3_and_update_db(job_files, uprn, hubspot_deal_id) + + # SharePoint upload disabled: pashub sharepoint_link is inconsistent + # (points to property or project unpredictably) + # if request.sharepoint_link: + # self._upload_to_sharepoint(request.sharepoint_link, job_files) + + for file_path in job_files: + try: + os.remove(file_path) + except OSError: + logger.warning(f"Failed to delete temp file {file_path}") + + return job_files + + def _upload_to_s3_and_update_db( + self, + job_files: List[str], + uprn: Optional[str], + hubspot_deal_id: Optional[str], + ) -> None: + if not uprn and not hubspot_deal_id: + return + + base_path = ( + f"documents/uprn/{uprn}" + if uprn + else f"documents/hubspot_deal_id/{hubspot_deal_id}" + ) + + uploaded_files: List[UploadedFile] = [] + site_notes_pairs: List[Tuple[UploadedFile, EpcPropertyData]] = [] + + for file_path in job_files: + filename = os.path.basename(file_path) + file_key = f"{base_path}/{filename}" + + upload_file_to_s3(file_path, self._s3_bucket, file_key) + + uploaded_file = UploadedFile( + s3_file_bucket=self._s3_bucket, + s3_file_key=file_key, + s3_upload_timestamp=datetime.now(timezone.utc), + uprn=int(uprn) if uprn else None, + hubspot_deal_id=hubspot_deal_id, + file_source=FileSourceEnum.PAS_HUB.value, + file_type=infer_file_type(filename), + ) + uploaded_files.append(uploaded_file) + + file_type: Optional[str] = cast(Optional[str], uploaded_file.file_type) + if file_type is not None and FileTypeEnum(file_type) == FileTypeEnum.RD_SAP_SITE_NOTE: + try: + site_notes_pairs.append( + (uploaded_file, parse_site_notes_pdf(file_path)) + ) + except Exception: + logger.warning(f"Failed to parse site notes {file_path}", exc_info=True) + + with db_session() as session: + session.add_all(uploaded_files) + session.flush() + + for uploaded_file, epc_data in site_notes_pairs: + save_epc_property_data( + session, epc_data, uploaded_file_id=cast(int, uploaded_file.id) + ) + + session.commit() + + def _upload_to_sharepoint( + self, + sharepoint_link: str, + job_files: List[str], + ) -> None: + assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}" + + for file_path in job_files: + filename = file_path.split("/")[-1] + self._sharepoint_client.upload_file(file_path, assessment_path, filename) diff --git a/backend/pashub_fetcher/tests/test_pashub_service.py b/backend/pashub_fetcher/tests/test_pashub_service.py new file mode 100644 index 00000000..2aff416b --- /dev/null +++ b/backend/pashub_fetcher/tests/test_pashub_service.py @@ -0,0 +1,254 @@ +from typing import Optional +from unittest.mock import MagicMock, call, patch + + +from backend.pashub_fetcher.pashub_client import PashubClient +from backend.pashub_fetcher.pashub_service import PashubService +from backend.pashub_fetcher.pashub_to_ara_trigger_request import ( + PashubToAraTriggerRequest, +) +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient + + +FAKE_JOB_LINK = "https://pashub.net/jobs/job-id-123/details" + + +def make_request( + pashub_link: str = FAKE_JOB_LINK, + uprn: Optional[str] = None, + hubspot_deal_id: Optional[str] = None, + sharepoint_link: Optional[str] = None, +) -> PashubToAraTriggerRequest: + return PashubToAraTriggerRequest( + pashub_link=pashub_link, + uprn=uprn, + hubspot_deal_id=hubspot_deal_id, + sharepoint_link=sharepoint_link, + ) + + +def make_service( + pashub_client: Optional[PashubClient] = None, + sharepoint_client: Optional[DomnaSharepointClient] = None, + s3_bucket: str = "test-bucket", +) -> PashubService: + return PashubService( + pashub_client=pashub_client or MagicMock(spec=PashubClient), + sharepoint_client=sharepoint_client or MagicMock(spec=DomnaSharepointClient), + s3_bucket=s3_bucket, + ) + + +# --------------------------------------------------------------------------- +# run(): returns file paths +# --------------------------------------------------------------------------- + + +def test_run_returns_file_paths() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/a.pdf", + "/tmp/b.pdf", + ] + + service = make_service(pashub_client=mock_client) + + with patch("backend.pashub_fetcher.pashub_service.os.remove"): + result = service.run(make_request()) + + assert result == ["/tmp/a.pdf", "/tmp/b.pdf"] + + +# --------------------------------------------------------------------------- +# run(): skips upload when neither uprn nor hubspot_deal_id +# --------------------------------------------------------------------------- + + +def test_run_skips_upload_when_no_uprn_and_no_deal_id() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = ["/tmp/a.pdf"] + + service = make_service(pashub_client=mock_client) + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3, + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + service.run(make_request(uprn=None, hubspot_deal_id=None)) + + mock_s3.assert_not_called() + + +# --------------------------------------------------------------------------- +# run(): UPRN present → uploads each file to S3 with correct bucket/key +# --------------------------------------------------------------------------- + + +def test_run_uploads_files_to_s3_using_uprn_path() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/SiteNote_001.pdf", + "/tmp/Photopack_002.pdf", + ] + + service = make_service(pashub_client=mock_client, s3_bucket="my-bucket") + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3, + patch("backend.pashub_fetcher.pashub_service.db_session"), + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + service.run(make_request(uprn="12345")) + + mock_s3.assert_has_calls( + [ + call( + "/tmp/SiteNote_001.pdf", + "my-bucket", + "documents/uprn/12345/SiteNote_001.pdf", + ), + call( + "/tmp/Photopack_002.pdf", + "my-bucket", + "documents/uprn/12345/Photopack_002.pdf", + ), + ], + any_order=False, + ) + + +# --------------------------------------------------------------------------- +# run(): UPRN present → UploadedFile records added to DB session +# --------------------------------------------------------------------------- + + +def test_run_persists_uploaded_file_records_to_db() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/SiteNote_001.pdf" + ] + + fake_session = MagicMock() + service = make_service(pashub_client=mock_client) + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"), + patch("backend.pashub_fetcher.pashub_service.db_session") as mock_db, + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + mock_db.return_value.__enter__.return_value = fake_session + service.run(make_request(uprn="12345")) + + fake_session.add_all.assert_called_once() + added: list = fake_session.add_all.call_args[0][0] + assert len(added) == 1 + assert added[0].s3_file_bucket == "test-bucket" + assert added[0].uprn == 12345 + + +# --------------------------------------------------------------------------- +# run(): hubspot_deal_id only → uses deal_id S3 path prefix +# --------------------------------------------------------------------------- + + +def test_run_uses_hubspot_deal_id_path_when_no_uprn() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/SiteNote_001.pdf" + ] + + service = make_service(pashub_client=mock_client, s3_bucket="my-bucket") + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3, + patch("backend.pashub_fetcher.pashub_service.db_session"), + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + service.run(make_request(uprn=None, hubspot_deal_id="deal-abc")) + + mock_s3.assert_called_once_with( + "/tmp/SiteNote_001.pdf", + "my-bucket", + "documents/hubspot_deal_id/deal-abc/SiteNote_001.pdf", + ) + + +# --------------------------------------------------------------------------- +# run(): RD_SAP_SITE_NOTE file → site notes parsed and saved to DB +# --------------------------------------------------------------------------- + + +def test_run_parses_and_saves_site_notes_for_rd_sap_site_note_file() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/RdSAP_SiteNote_001.pdf" + ] + + fake_epc_data = MagicMock() + fake_session = MagicMock() + fake_uploaded_file_id = 99 + + service = make_service(pashub_client=mock_client) + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"), + patch( + "backend.pashub_fetcher.pashub_service.parse_site_notes_pdf", + return_value=fake_epc_data, + ) as mock_parse, + patch( + "backend.pashub_fetcher.pashub_service.save_epc_property_data" + ) as mock_save, + patch("backend.pashub_fetcher.pashub_service.db_session") as mock_db, + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + fake_session.add_all = MagicMock( + side_effect=lambda files: setattr(files[0], "id", fake_uploaded_file_id) + ) + mock_db.return_value.__enter__.return_value = fake_session + service.run(make_request(uprn="12345")) + + mock_parse.assert_called_once_with("/tmp/RdSAP_SiteNote_001.pdf") + mock_save.assert_called_once_with( + fake_session, fake_epc_data, uploaded_file_id=fake_uploaded_file_id + ) + + +# --------------------------------------------------------------------------- +# run(): site notes parse failure → warning logged, run returns normally +# --------------------------------------------------------------------------- + + +def test_run_warns_and_continues_when_site_notes_parsing_fails() -> None: + mock_client = MagicMock(spec=PashubClient) + mock_client.get_uprn_by_job_id.return_value = None + mock_client.get_core_evidence_files_by_job_id.return_value = [ + "/tmp/RdSAP_SiteNote_001.pdf" + ] + + service = make_service(pashub_client=mock_client) + + with ( + patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"), + patch( + "backend.pashub_fetcher.pashub_service.parse_site_notes_pdf", + side_effect=ValueError("corrupt pdf"), + ), + patch( + "backend.pashub_fetcher.pashub_service.save_epc_property_data" + ) as mock_save, + patch("backend.pashub_fetcher.pashub_service.db_session"), + patch("backend.pashub_fetcher.pashub_service.logger") as mock_logger, + patch("backend.pashub_fetcher.pashub_service.os.remove"), + ): + result = service.run(make_request(uprn="12345")) + + assert result == ["/tmp/RdSAP_SiteNote_001.pdf"] + mock_logger.warning.assert_called() + mock_save.assert_not_called() diff --git a/pytest.ini b/pytest.ini index 33231c61..db46c287 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,6 +3,6 @@ pythonpath = . log_cli = true log_cli_level = INFO addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial -testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests +testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/pashub_fetcher/tests backend/documents_parser/tests markers = integration: mark a test as an integration test