Model/backend/ecmk_fetcher/handler/handler.py
2026-04-01 14:13:46 +00:00

284 lines
8.2 KiB
Python

import os
from enum import Enum
import re
from typing import Any, Dict, List, Mapping, Optional
from openpyxl import load_workbook
from playwright.sync_api import (
Locator,
Page,
sync_playwright,
TimeoutError as PlaywrightTimeoutError,
)
from utils.logger import setup_logger
logger = setup_logger()
class file_download_button_types(Enum):
ASSESSOR_HUB_SITENOTE_REPORT = 11
CERTIFICATE = 9
SITENOTE_REPORT = 8
RAW_XML = 7
SAP_WORK_SHEET = 15
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
wb = load_workbook(filepath, data_only=True)
ws = wb["Southern RA-Lite Programme 3103"]
properties: Dict[str, str] = {}
header_row = 1
id_col_index = None
deal_name_col_index = None
for col in range(1, ws.max_column + 1):
cell_value = ws.cell(row=header_row, column=col).value
if cell_value and str(cell_value).strip().lower() == "id":
id_col_index = col
if cell_value and str(cell_value).strip().lower() == "deal name":
deal_name_col_index = col
break
if id_col_index is None:
raise Exception("ID column not found in spreadsheet")
if deal_name_col_index is None:
raise Exception("Deal Name column not found in spreadsheet")
for row in range(2, ws.max_row + 1):
id_cell_value = ws.cell(row=row, column=id_col_index).value
deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value
if id_cell_value is None or deal_name_cell_value is None:
continue
id_str = str(id_cell_value).strip()
deal_name_str = str(deal_name_cell_value).strip()
if not id_str:
continue
sharepoint_address = extract_succinct_address(deal_name_str)
properties[id_str] = sharepoint_address
return properties
def extract_succinct_address(deal_name: str) -> str:
"""
Input:
'1 My Random Close, Town, AB12 3DC | Retrofit Assessment'
Output:
'1 My Random Close AB12 3DC'
"""
left_part = deal_name.split("|")[0].strip()
postcode_match: Optional[re.Match[str]] = re.search(
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
left_part,
re.IGNORECASE,
)
postcode = None
if postcode_match:
postcode = postcode_match.group(1).replace(" ", "").upper()
first_part = left_part.split(",")[0].strip()
if postcode:
return f"{first_part} {postcode}"
else:
return first_part
def build_property_id(address: str, postcode: str) -> str:
"""
Extract number from address and concat with postcode
Example:
'9 Random Close', 'AB1 2YZ''9AB12YZ'
"""
number = address.split(" ")[0]
postcode_clean = postcode.replace(" ", "").upper()
return f"{number}{postcode_clean}"
def login(page: Page, username: str, password: str) -> None:
page.goto("https://assessorhub.net/", timeout=30000)
username_input: Locator = page.locator("#Username")
password_input: Locator = page.locator("#Password")
username_input.wait_for(state="visible", timeout=10000)
username_input.fill(username)
password_input.wait_for(state="visible", timeout=10000)
password_input.fill(password)
with page.expect_navigation(timeout=15000):
page.click("button[type='submit']")
if "login" in page.url.lower():
raise Exception("Login failed")
logger.info("Login successful")
def go_to_assessments(page: Page) -> None:
page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000)
page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000)
def go_to_assessment_details(page: Page, row: Locator) -> None:
account_link: Locator = row.locator("a")
with page.expect_navigation():
account_link.click()
def go_to_next_page(page: Page) -> bool:
next_button: Locator = page.locator("#assessmentDatatable_next a")
class_attr: Optional[str] = next_button.get_attribute("class") or ""
if "disabled" in class_attr:
logger.info("No more pages")
return False
next_button.scroll_into_view_if_needed()
next_button.click()
page.wait_for_timeout(2000)
return True
def build_report_selector(report_type: int) -> str:
return f"a.download-report-btn[data-report-type='{report_type}']"
def download_report_by_selector(page: Page, selector: str) -> str:
page.wait_for_selector(selector, timeout=10000)
with page.expect_download() as download_info:
page.click(selector)
download = download_info.value
filename: str = download.suggested_filename
save_path: str = os.path.join(os.getcwd(), filename)
download.save_as(save_path)
logger.info(f"Downloaded: {filename}")
return save_path
def download_report() -> None:
username: str = ""
password: str = ""
property_list_file: str = (
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
BASE_DIR: str = os.path.dirname(os.path.dirname(__file__))
filepath: str = os.path.join(BASE_DIR, property_list_file)
property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet(
filepath
)
property_ids: List[str] = list(property_id_to_address_map.keys())
matching_properties: List[str] = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
login(page, username, password)
print("Login successful:", page.url)
go_to_assessments(page)
while True:
rows: Locator = page.locator("#assessmentDatatable tbody tr")
row_count: int = rows.count()
logger.info(f"Processing {row_count} rows on current page")
for i in range(row_count):
row: Locator = rows.nth(i)
try:
cells: Locator = row.locator("td")
address: str = cells.nth(5).inner_text().strip()
postcode: str = cells.nth(7).inner_text().strip()
first_name: str = cells.nth(1).inner_text().strip()
last_name: str = cells.nth(2).inner_text().strip()
status: str = cells.nth(9).inner_text().strip()
if first_name == "Oliver" and last_name == "Stephens":
continue
if status != "Submitted (not Lodged)":
continue
property_id: str = build_property_id(address, postcode)
if property_id not in property_ids:
continue
logger.info(f"MATCH FOUND: {property_id}")
matching_properties.append(property_id)
sharepoint_address: str = property_id_to_address_map[
property_id
]
go_to_assessment_details(page, row)
report_types: List[int] = [
file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value,
file_download_button_types.SITENOTE_REPORT.value,
]
for report_type in report_types:
selector: str = build_report_selector(report_type)
download_report_by_selector(page, selector)
# TODO: stick in sharepoint
page.go_back()
page.wait_for_selector(
"#assessmentDatatable tbody tr", timeout=15000
)
except PlaywrightTimeoutError as e:
raise Exception(f"Timeout occurred: {str(e)}")
if not go_to_next_page(page):
break
except PlaywrightTimeoutError as e:
raise Exception(f"Timeout occurred: {str(e)}")
finally:
context.close()
browser.close()
def handler(event: Mapping[str, Any], context: Any) -> None:
download_report()
if __name__ == "__main__":
event = {"Records": [{"body": "{}"}]}
handler(event, None)