import os from typing import Dict from playwright.sync_api import Browser, BrowserContext, Locator, Page, sync_playwright from backend.app.db.connection import db_session from backend.app.db.functions.uploaded_files_functions import ( get_uploaded_file_by_listing_type_and_source, ) from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum from backend.documents_parser.db_writer import save_epc_property_data from backend.documents_parser.parser import parse_site_notes_pdf from backend.ecmk_fetcher.address_list import ( PropertyRow, extract_addresses_from_spreadsheet, ) from backend.ecmk_fetcher.browser import ( attach_debug_listeners, download_with_retry, go_to_assessment_details, go_to_assessments, go_to_next_page, login, ) from backend.ecmk_fetcher.excel_writer import write_row from backend.ecmk_fetcher.reports import ( REPORT_TYPES, FileDownloadButtonType, build_property_id, map_report_type_to_db_file_type, ) from backend.ecmk_fetcher.upload import ( upload_excel_to_sharepoint, upload_file_to_s3_and_record, upload_file_to_sharepoint, ) from backend.ecmk_fetcher.xml_processor import flatten_sap_property, parse_rdsap from utils.logger import setup_logger from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient logger = setup_logger() DIMENSIONS_FILENAME: str = "Dimensions.xlsx" class EcmkService: def __init__( self, sharepoint_client: DomnaSharepointClient, s3_bucket: str, property_list_filepath: str, sharepoint_base_path: str, sharepoint_excel_path: str, local_dimensions_path: str, ) -> None: self._sharepoint_client = sharepoint_client self._s3_bucket = s3_bucket self._sharepoint_base_path = sharepoint_base_path self._sharepoint_excel_path = sharepoint_excel_path self._local_dimensions_path = local_dimensions_path self._property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet( property_list_filepath ) def run(self) -> None: self._sharepoint_client.download_file( sharepoint_path=f"{self._sharepoint_excel_path}/{DIMENSIONS_FILENAME}", local_path=self._local_dimensions_path, ) with sync_playwright() as p: browser: Browser = p.chromium.launch(headless=True) context: BrowserContext = browser.new_context() page: Page = context.new_page() try: self._run_browser_session(page) finally: context.close() browser.close() def _run_browser_session(self, page: Page) -> None: username: str = "" # TODO: get from github secrets password: str = "" attach_debug_listeners(page) login(page, username, password) go_to_assessments(page) while True: rows: Locator = page.locator("#assessmentDatatable tbody tr") row_count: int = rows.count() for i in range(row_count): row: Locator = rows.nth(i) try: cells: Locator = row.locator("td") first_name: str = cells.nth(1).inner_text().strip() last_name: str = cells.nth(2).inner_text().strip() address: str = cells.nth(5).inner_text().strip() postcode: str = cells.nth(7).inner_text().strip() status: str = cells.nth(9).inner_text().strip() if first_name == "Oliver" and last_name == "Stephens": continue if status != "Submitted (not Lodged)": continue property_id: str = build_property_id(address, postcode) property_row: PropertyRow | None = self._property_map.get( property_id ) if not property_row: continue logger.info(f"Match found for property {address}") sharepoint_address: str = property_row.address go_to_assessment_details(page, row) for report_type in REPORT_TYPES: hubspot_listing_id: str = property_row.listing_id try: db_file_type: FileTypeEnum = ( map_report_type_to_db_file_type(report_type) ) except ValueError: logger.error( f"Unknown report type {report_type}, skipping file" ) continue if get_uploaded_file_by_listing_type_and_source( hubspot_listing_id=int(hubspot_listing_id), file_type=db_file_type, file_source=FileSourceEnum.ECMK, ): logger.debug("File already uploaded to s3, skipping") continue file_path: str | None = download_with_retry(page, report_type) if not file_path: continue logger.info( f"Successfully downloaded file {os.path.basename(file_path)} from ECMK" ) try: self._process_file( file_path=file_path, report_type=report_type, db_file_type=db_file_type, sharepoint_address=sharepoint_address, hubspot_listing_id=hubspot_listing_id, ) except Exception: raise finally: if os.path.exists(file_path): os.remove(file_path) page.go_back() page.wait_for_selector( "#assessmentDatatable tbody tr", timeout=15000 ) except Exception as e: raise Exception(f"Row processing failed: {str(e)}") from e if not go_to_next_page(page): break def _process_file( self, file_path: str, report_type: int, db_file_type: FileTypeEnum, sharepoint_address: str, hubspot_listing_id: str, ) -> None: if report_type == FileDownloadButtonType.RAW_XML.value: self._process_xml_file( file_path=file_path, db_file_type=db_file_type, hubspot_listing_id=hubspot_listing_id, ) else: self._process_pdf_file( file_path=file_path, file_type=db_file_type, sharepoint_address=sharepoint_address, hubspot_listing_id=hubspot_listing_id, ) def _process_xml_file( self, file_path: str, db_file_type: FileTypeEnum, hubspot_listing_id: str, ) -> None: with open(file_path, "r", encoding="utf-8") as f: xml_string: str = f.read() details = parse_rdsap(xml_string) row_data = flatten_sap_property(details) write_row(self._local_dimensions_path, row_data) upload_excel_to_sharepoint( client=self._sharepoint_client, file_path=self._local_dimensions_path, sharepoint_path=self._sharepoint_excel_path, ) upload_file_to_s3_and_record( bucket=self._s3_bucket, file_path=file_path, hubspot_listing_id=hubspot_listing_id, file_type=db_file_type, ) def _process_pdf_file( self, file_path: str, file_type: FileTypeEnum, sharepoint_address: str, hubspot_listing_id: str, ) -> None: upload_file_to_sharepoint( client=self._sharepoint_client, file_path=file_path, base_path=self._sharepoint_base_path, subpath=sharepoint_address, ) uploaded_file_id: int = upload_file_to_s3_and_record( bucket=self._s3_bucket, file_path=file_path, hubspot_listing_id=hubspot_listing_id, file_type=file_type, ) if file_type == FileTypeEnum.ECMK_RD_SAP_SITE_NOTE: try: epc_data = parse_site_notes_pdf(file_path) with db_session() as session: save_epc_property_data( session=session, data=epc_data, uploaded_file_id=uploaded_file_id, ) except Exception: logger.warning( f"EPC extraction failed for {os.path.basename(file_path)} — file record retained" )