from datetime import datetime, timezone import os from enum import Enum import re from typing import Any, Dict, List, Mapping, Optional from openpyxl import load_workbook from playwright.sync_api import ( Locator, Page, sync_playwright, TimeoutError as PlaywrightTimeoutError, ) from backend.app.db.connection import db_session from backend.app.db.models.uploaded_file import FileSourceEnum, UploadedFile from utils.logger import setup_logger from utils.s3 import upload_file_to_s3 from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient from utils.sharepoint.domna_sites import DomnaSites logger = setup_logger() class file_download_button_types(Enum): ASSESSOR_HUB_SITENOTE_REPORT = 11 CERTIFICATE = 9 SITENOTE_REPORT = 8 RAW_XML = 7 SAP_WORK_SHEET = 15 def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]: wb = load_workbook(filepath, data_only=True) ws = wb["Southern RA-Lite Programme 3103"] properties: Dict[str, str] = {} header_row = 1 id_col_index = None deal_name_col_index = None for col in range(1, ws.max_column + 1): cell_value = ws.cell(row=header_row, column=col).value if cell_value and str(cell_value).strip().lower() == "id": id_col_index = col if cell_value and str(cell_value).strip().lower() == "deal name": deal_name_col_index = col break if id_col_index is None: raise Exception("ID column not found in spreadsheet") if deal_name_col_index is None: raise Exception("Deal Name column not found in spreadsheet") for row in range(2, ws.max_row + 1): id_cell_value = ws.cell(row=row, column=id_col_index).value deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value if id_cell_value is None or deal_name_cell_value is None: continue id_str = str(id_cell_value).strip() deal_name_str = str(deal_name_cell_value).strip() if not id_str: continue sharepoint_address = extract_succinct_address(deal_name_str) properties[id_str] = sharepoint_address return properties def extract_succinct_address(deal_name: str) -> str: """ Input: '1 My Random Close, Town, AB12 3DC | Retrofit Assessment' Output: '1 My Random Close AB12 3DC' """ left_part = deal_name.split("|")[0].strip() postcode_match: Optional[re.Match[str]] = re.search( r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b", left_part, re.IGNORECASE, ) postcode = None if postcode_match: postcode = postcode_match.group(1).upper() first_part = left_part.split(",")[0].strip() if postcode: return f"{first_part} {postcode}" else: return first_part def build_property_id(address: str, postcode: str) -> str: """ Extract number from address and concat with postcode Example: '9 Random Close', 'AB1 2YZ' → '9AB12YZ' """ number = address.split(" ")[0] postcode_clean = postcode.replace(" ", "").upper() return f"{number}{postcode_clean}" def login(page: Page, username: str, password: str) -> None: page.goto("https://assessorhub.net/", timeout=30000) username_input: Locator = page.locator("#Username") password_input: Locator = page.locator("#Password") username_input.wait_for(state="visible", timeout=10000) username_input.fill(username) password_input.wait_for(state="visible", timeout=10000) password_input.fill(password) with page.expect_navigation(timeout=15000): page.click("button[type='submit']") if "login" in page.url.lower(): raise Exception("Login failed") logger.info("Login successful") def go_to_assessments(page: Page) -> None: page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000) page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000) def go_to_assessment_details(page: Page, row: Locator) -> None: account_link: Locator = row.locator("a") with page.expect_navigation(): account_link.click() def go_to_next_page(page: Page) -> bool: next_button: Locator = page.locator("#assessmentDatatable_next a") class_attr: Optional[str] = next_button.get_attribute("class") or "" if "disabled" in class_attr: logger.info("No more pages") return False next_button.scroll_into_view_if_needed() next_button.click() page.wait_for_timeout(2000) return True def build_report_selector(report_type: int) -> str: return f"a.download-report-btn[data-report-type='{report_type}']" def download_report_by_selector(page: Page, selector: str) -> str: page.wait_for_selector(selector, timeout=10000) with page.expect_download() as download_info: page.click(selector) download = download_info.value filename: str = download.suggested_filename save_path: str = os.path.join(os.getcwd(), filename) download.save_as(save_path) logger.info(f"Downloaded: {filename}") return save_path def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None: bucket = "retrofit-energy-assessments-dev" base_path = f"documents/uprn/{uprn}" uploaded_files: List[UploadedFile] = [] for file_path in job_files: filename = os.path.basename(file_path) file_key = f"{base_path}/{filename}" upload_file_to_s3(file_path, bucket, file_key) # load row to db uploaded_files.append( UploadedFile( s3_file_bucket=bucket, s3_file_key=file_key, s3_upload_timestamp=datetime.now(timezone.utc), uprn=int(uprn), file_source=FileSourceEnum.ECMK.value, ) ) with db_session() as session: session.add_all(uploaded_files) session.commit() pass def download_report() -> None: username: str = "" password: str = "" property_list_file: str = ( "hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx" ) BASE_DIR: str = os.path.dirname(os.path.dirname(__file__)) filepath: str = os.path.join(BASE_DIR, property_list_file) property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet( filepath ) property_ids: List[str] = list(property_id_to_address_map.keys()) matching_properties: List[str] = [] sharepoint_client = DomnaSharepointClient( sharepoint_location=DomnaSites.PRIVATE_PAY ) sharepoint_base_path = "/Projects/Southern Housing/SH-SURV-26-001/Assessments" with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context() page = context.new_page() try: login(page, username, password) print("Login successful:", page.url) go_to_assessments(page) while True: rows: Locator = page.locator("#assessmentDatatable tbody tr") row_count: int = rows.count() logger.info(f"Processing {row_count} rows on current page") for i in range(row_count): row: Locator = rows.nth(i) try: cells: Locator = row.locator("td") first_name: str = cells.nth(1).inner_text().strip() last_name: str = cells.nth(2).inner_text().strip() address: str = cells.nth(5).inner_text().strip() postcode: str = cells.nth(7).inner_text().strip() uprn: str = cells.nth(8).inner_text().strip() status: str = cells.nth(9).inner_text().strip() if first_name == "Oliver" and last_name == "Stephens": continue if status != "Submitted (not Lodged)": continue property_id: str = build_property_id(address, postcode) if property_id not in property_ids: continue logger.info(f"MATCH FOUND: {property_id}") matching_properties.append(property_id) sharepoint_address: str = property_id_to_address_map[ property_id ] go_to_assessment_details(page, row) report_types: List[int] = [ file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value, file_download_button_types.SITENOTE_REPORT.value, ] for report_type in report_types: selector: str = build_report_selector(report_type) file_path: str = download_report_by_selector(page, selector) try: sharepoint_client.upload_file( file_path=file_path, sharepoint_path=f"{sharepoint_base_path}/{sharepoint_address}/1. Retrofit Assessment/A. Assessment", file_name=os.path.basename(file_path), ) # TODO: could s3 load happen for all files at once to reduce db roundtrips? if uprn: upload_job_to_s3_and_update_db([file_path], uprn) finally: if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted local file: {file_path}") page.go_back() page.wait_for_selector( "#assessmentDatatable tbody tr", timeout=15000 ) except PlaywrightTimeoutError as e: raise Exception(f"Timeout occurred: {str(e)}") if not go_to_next_page(page): break except PlaywrightTimeoutError as e: raise Exception(f"Timeout occurred: {str(e)}") finally: context.close() browser.close() def handler(event: Mapping[str, Any], context: Any) -> None: download_report() if __name__ == "__main__": event = {"Records": [{"body": "{}"}]} handler(event, None)