Model/backend/ecmk_fetcher/handler/handler.py

203 lines
6 KiB
Python

import os
from enum import Enum
from typing import Any, List, Mapping
from openpyxl import load_workbook
from playwright.sync_api import (
Locator,
sync_playwright,
TimeoutError as PlaywrightTimeoutError,
)
from utils.logger import setup_logger
logger = setup_logger()
class file_download_button_types(Enum):
ASSESSOR_HUB_SITENOTE_REPORT = 11
CERTIFICATE = 9
SITENOTE_REPORT = 8
RAW_XML = 7
SAP_WORK_SHEET = 15
def extract_ids_from_spreadsheet(filepath: str) -> List[str]:
wb = load_workbook(filepath, data_only=True)
ws = wb["Southern RA-Lite Programme 3103"]
ids: List[str] = []
header_row = 1
id_col_index = None
for col in range(1, ws.max_column + 1):
cell_value = ws.cell(row=header_row, column=col).value
if cell_value and str(cell_value).strip().lower() == "id":
id_col_index = col
break
if id_col_index is None:
raise Exception("ID column not found in spreadsheet")
for row in range(2, ws.max_row + 1):
cell_value = ws.cell(row=row, column=id_col_index).value
if cell_value is None:
continue
id_str = str(cell_value).strip()
if id_str:
ids.append(id_str)
return ids
def build_property_id(address: str, postcode: str) -> str:
"""
Extract number from address and concat with postcode
Example:
'9 Random Close', 'AB1 2YZ''9AB12YZ'
"""
number = address.split(" ")[0]
postcode_clean = postcode.replace(" ", "").upper()
return f"{number}{postcode_clean}"
def download_report():
username = ""
password = ""
property_list_file = (
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
filepath = os.path.join(
BASE_DIR,
property_list_file,
)
property_ids: List[str] = extract_ids_from_spreadsheet(filepath)
matching_properties: List[str] = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
# Log into ECMK with playwright
page.goto("https://assessorhub.net/", timeout=30000)
username_input = page.locator("#Username")
password_input = page.locator("#Password")
username_input.wait_for(state="visible", timeout=10000)
username_input.fill(username)
password_input.wait_for(state="visible", timeout=10000)
password_input.fill(password)
with page.expect_navigation(timeout=15000):
page.click("button[type='submit']")
if "login" in page.url.lower():
raise Exception("Login failed")
print("Login successful:", page.url)
page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000)
page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000)
while True:
rows = page.locator("#assessmentDatatable tbody tr")
row_count = rows.count()
logger.info(f"Processing {row_count} rows on current page")
for i in range(row_count):
row = rows.nth(i)
try:
cells = row.locator("td")
address = cells.nth(5).inner_text().strip()
postcode = cells.nth(7).inner_text().strip()
first_name = cells.nth(1).inner_text().strip()
last_name = cells.nth(2).inner_text().strip()
status = cells.nth(9).inner_text().strip()
if first_name == "Oliver" and last_name == "Stephens":
continue
if status != "Submitted (not Lodged)":
continue
property_id = build_property_id(address, postcode)
if property_id not in property_ids:
continue
logger.info(f"MATCH FOUND: {property_id}")
matching_properties.append(property_id)
except PlaywrightTimeoutError as e:
raise Exception(f"Timeout occurred: {str(e)}")
next_button: Locator = page.locator("#assessmentDatatable_next a")
class_attr = next_button.get_attribute("class") or ""
if "disabled" in class_attr:
logger.info("No more pages")
break
# first_row_text = rows.first.inner_text()
next_button.scroll_into_view_if_needed()
next_button.click()
page.wait_for_timeout(2000)
# 5. Navigate to the assessment detail page
# page.goto(
# "https://assessorhub.net/Assessments/Assessments/Detail/1bd9fd74-08f6-4fc1-b2f7-3a13a8f9084d?returnUrl=/Companies/Assessments",
# timeout=30000,
# )
# # 6. Locate the correct download button
# button = page.locator("a.download-report-btn[data-report-type='11']")
# button.wait_for(state="visible", timeout=10000)
# # 7. Click and capture the download
# with page.expect_download(timeout=30000) as download_info:
# button.click()
# download = download_info.value
# # 8. Save file locally
# filename = download.suggested_filename
# save_path = os.path.join(os.getcwd(), filename)
# download.save_as(save_path)
# print(f"Downloaded file saved to: {save_path}")
except PlaywrightTimeoutError as e:
raise Exception(f"Timeout occurred: {str(e)}")
finally:
context.close()
browser.close()
def handler(event: Mapping[str, Any], context: Any) -> None:
download_report()
if __name__ == "__main__":
event = {"Records": [{"body": "{}"}]}
handler(event, None)