Model/backend/ecmk_fetcher/browser.py

99 lines
2.8 KiB
Python

import os
from typing import Optional
from playwright.sync_api import Page, Locator, Response
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from backend.ecmk_fetcher.reports import build_report_selector
from utils.logger import setup_logger
# from .reports import build_report_selector
logger = setup_logger()
def attach_debug_listeners(page: Page) -> None:
def handle_response(response: Response) -> None:
if "download" in response.url or "report" in response.url:
logger.info(f"[RESPONSE] {response.status} {response.url}")
page.on("response", handle_response)
def login(page: Page, username: str, password: str) -> None:
page.goto("https://assessorhub.net/", timeout=30000)
page.locator("#Username").fill(username)
page.locator("#Password").fill(password)
with page.expect_navigation():
page.click("button[type='submit']")
if "login" in page.url.lower():
raise Exception("Login failed")
logger.info("Login successful")
def go_to_assessments(page: Page) -> None:
page.goto("https://assessorhub.net/Companies/Assessments")
page.wait_for_selector("#assessmentDatatable tbody tr")
def go_to_assessment_details(page: Page, row: Locator) -> None:
row.locator("a").click()
page.wait_for_load_state("networkidle")
page.wait_for_selector("a.download-report-btn")
def get_first_row_signature(page: Page) -> str:
return page.locator("#assessmentDatatable tbody tr").first.inner_text()
def go_to_next_page(page: Page) -> bool:
logger.info("Going to next page")
before = get_first_row_signature(page)
page.locator("#assessmentDatatable_next a").click()
page.wait_for_timeout(2000)
after = get_first_row_signature(page)
return before != after
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
try:
element = page.locator(selector)
element.wait_for(state="visible", timeout=10000)
if not element.is_enabled():
return None
element.scroll_into_view_if_needed()
with page.expect_download(timeout=15000) as download_info:
element.click()
download = download_info.value
filename = download.suggested_filename
save_path = os.path.join(os.getcwd(), filename)
download.save_as(save_path)
return save_path
except PlaywrightTimeoutError:
logger.error(f"Download failed for {selector}")
return None
def download_with_retry(page: Page, report_type: int) -> Optional[str]:
selector: str = build_report_selector(report_type)
for _ in range(3):
file_path = download_report_by_selector(page, selector)
if file_path:
return file_path
page.wait_for_timeout(1500)
return None