mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
99 lines
2.8 KiB
Python
99 lines
2.8 KiB
Python
import os
|
|
from typing import Optional
|
|
from playwright.sync_api import Page, Locator, Response
|
|
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
|
|
|
|
from backend.ecmk_fetcher.reports import build_report_selector
|
|
from utils.logger import setup_logger
|
|
|
|
# from .reports import build_report_selector
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def attach_debug_listeners(page: Page) -> None:
|
|
def handle_response(response: Response) -> None:
|
|
if "download" in response.url or "report" in response.url:
|
|
logger.info(f"[RESPONSE] {response.status} {response.url}")
|
|
|
|
page.on("response", handle_response)
|
|
|
|
|
|
def login(page: Page, username: str, password: str) -> None:
|
|
page.goto("https://assessorhub.net/", timeout=30000)
|
|
|
|
page.locator("#Username").fill(username)
|
|
page.locator("#Password").fill(password)
|
|
|
|
with page.expect_navigation():
|
|
page.click("button[type='submit']")
|
|
|
|
if "login" in page.url.lower():
|
|
raise Exception("Login failed")
|
|
|
|
logger.info("Login successful")
|
|
|
|
|
|
def go_to_assessments(page: Page) -> None:
|
|
page.goto("https://assessorhub.net/Companies/Assessments")
|
|
page.wait_for_selector("#assessmentDatatable tbody tr")
|
|
|
|
|
|
def go_to_assessment_details(page: Page, row: Locator) -> None:
|
|
row.locator("a").click()
|
|
page.wait_for_load_state("networkidle")
|
|
page.wait_for_selector("a.download-report-btn")
|
|
|
|
|
|
def get_first_row_signature(page: Page) -> str:
|
|
return page.locator("#assessmentDatatable tbody tr").first.inner_text()
|
|
|
|
|
|
def go_to_next_page(page: Page) -> bool:
|
|
logger.info("Going to next page")
|
|
before = get_first_row_signature(page)
|
|
|
|
page.locator("#assessmentDatatable_next a").click()
|
|
page.wait_for_timeout(2000)
|
|
|
|
after = get_first_row_signature(page)
|
|
|
|
return before != after
|
|
|
|
|
|
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
|
|
try:
|
|
element = page.locator(selector)
|
|
element.wait_for(state="visible", timeout=10000)
|
|
|
|
if not element.is_enabled():
|
|
return None
|
|
|
|
element.scroll_into_view_if_needed()
|
|
|
|
with page.expect_download(timeout=15000) as download_info:
|
|
element.click()
|
|
|
|
download = download_info.value
|
|
filename = download.suggested_filename
|
|
|
|
save_path = os.path.join(os.getcwd(), filename)
|
|
download.save_as(save_path)
|
|
|
|
return save_path
|
|
|
|
except PlaywrightTimeoutError:
|
|
logger.error(f"Download failed for {selector}")
|
|
return None
|
|
|
|
|
|
def download_with_retry(page: Page, report_type: int) -> Optional[str]:
|
|
selector: str = build_report_selector(report_type)
|
|
|
|
for _ in range(3):
|
|
file_path = download_report_by_selector(page, selector)
|
|
if file_path:
|
|
return file_path
|
|
page.wait_for_timeout(1500)
|
|
|
|
return None
|