diff --git a/backend/ecmk_fetcher/address_list.py b/backend/ecmk_fetcher/address_list.py new file mode 100644 index 00000000..d273c45d --- /dev/null +++ b/backend/ecmk_fetcher/address_list.py @@ -0,0 +1,55 @@ +from typing import Dict, Optional +from openpyxl import load_workbook +import re + + +def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]: + wb = load_workbook(filepath, data_only=True) + ws = wb["Southern RA-Lite Programme 3103"] + + properties: Dict[str, str] = {} + + header_row = 1 + id_col_index = None + deal_name_col_index = None + + for col in range(1, ws.max_column + 1): + value = ws.cell(row=header_row, column=col).value + + if value and str(value).strip().lower() == "id": + id_col_index = col + + if value and str(value).strip().lower() == "deal name": + deal_name_col_index = col + break + + if id_col_index is None or deal_name_col_index is None: + raise Exception("Required columns not found") + + for row in range(2, ws.max_row + 1): + id_val = ws.cell(row=row, column=id_col_index).value + deal_name = ws.cell(row=row, column=deal_name_col_index).value + + if not id_val or not deal_name: + continue + + properties[str(id_val).strip()] = extract_succinct_address( + str(deal_name).strip() + ) + + return properties + + +def extract_succinct_address(deal_name: str) -> str: + left_part = deal_name.split("|")[0].strip() + + postcode_match: Optional[re.Match[str]] = re.search( + r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b", + left_part, + re.IGNORECASE, + ) + + postcode = postcode_match.group(1).upper() if postcode_match else None + first_part = left_part.split(",")[0].strip() + + return f"{first_part} {postcode}" if postcode else first_part diff --git a/backend/ecmk_fetcher/browser.py b/backend/ecmk_fetcher/browser.py new file mode 100644 index 00000000..6d018537 --- /dev/null +++ b/backend/ecmk_fetcher/browser.py @@ -0,0 +1,98 @@ +import os +from typing import Optional +from playwright.sync_api import Page, Locator, Response +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError + +from backend.ecmk_fetcher.reports import build_report_selector +from utils.logger import setup_logger + +# from .reports import build_report_selector + +logger = setup_logger() + + +def attach_debug_listeners(page: Page) -> None: + def handle_response(response: Response) -> None: + if "download" in response.url or "report" in response.url: + logger.info(f"[RESPONSE] {response.status} {response.url}") + + page.on("response", handle_response) + + +def login(page: Page, username: str, password: str) -> None: + page.goto("https://assessorhub.net/", timeout=30000) + + page.locator("#Username").fill(username) + page.locator("#Password").fill(password) + + with page.expect_navigation(): + page.click("button[type='submit']") + + if "login" in page.url.lower(): + raise Exception("Login failed") + + logger.info("Login successful") + + +def go_to_assessments(page: Page) -> None: + page.goto("https://assessorhub.net/Companies/Assessments") + page.wait_for_selector("#assessmentDatatable tbody tr") + + +def go_to_assessment_details(page: Page, row: Locator) -> None: + row.locator("a").click() + page.wait_for_load_state("networkidle") + page.wait_for_selector("a.download-report-btn") + + +def get_first_row_signature(page: Page) -> str: + return page.locator("#assessmentDatatable tbody tr").first.inner_text() + + +def go_to_next_page(page: Page) -> bool: + before = get_first_row_signature(page) + + page.locator("#assessmentDatatable_next a").click() + page.wait_for_timeout(2000) + + after = get_first_row_signature(page) + + return before != after + + +def download_report_by_selector(page: Page, selector: str) -> Optional[str]: + try: + element = page.locator(selector) + element.wait_for(state="visible", timeout=10000) + + if not element.is_enabled(): + return None + + element.scroll_into_view_if_needed() + + with page.expect_download(timeout=15000) as download_info: + element.click() + + download = download_info.value + filename = download.suggested_filename + + save_path = os.path.join(os.getcwd(), filename) + download.save_as(save_path) + + return save_path + + except PlaywrightTimeoutError: + logger.error(f"Download failed for {selector}") + return None + + +def download_with_retry(page: Page, report_type: int) -> Optional[str]: + selector: str = build_report_selector(report_type) + + for _ in range(3): + file_path = download_report_by_selector(page, selector) + if file_path: + return file_path + page.wait_for_timeout(1500) + + return None diff --git a/backend/ecmk_fetcher/handler/handler.py b/backend/ecmk_fetcher/handler/handler.py index 4c5299d0..4ce3a949 100644 --- a/backend/ecmk_fetcher/handler/handler.py +++ b/backend/ecmk_fetcher/handler/handler.py @@ -1,412 +1,10 @@ -from datetime import datetime, timezone -import os -from enum import Enum -import re -from typing import Any, Dict, List, Mapping, Optional -from openpyxl import load_workbook -from playwright.sync_api import ( - Locator, - Page, - Response, - sync_playwright, - TimeoutError as PlaywrightTimeoutError, -) +from typing import Any, Mapping -from backend.app.db.connection import db_session -from backend.app.db.models.uploaded_file import FileSourceEnum, UploadedFile -from utils.logger import setup_logger -from utils.s3 import upload_file_to_s3 -from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient -from utils.sharepoint.domna_sites import DomnaSites - -logger = setup_logger() - - -class file_download_button_types(Enum): - ASSESSOR_HUB_SITENOTE_REPORT = 11 - CERTIFICATE = 9 - SITENOTE_REPORT = 8 - RAW_XML = 7 - SAP_WORK_SHEET = 15 - - -def attach_debug_listeners(page: Page) -> None: - def handle_response(response: Response) -> None: - url: str = response.url - status: int = response.status - - if "download" in url or "report" in url: - logger.info(f"[RESPONSE] {status} {url}") - - if status >= 400: - logger.error(f"[ERROR RESPONSE] {status} {url}") - - page.on("response", handle_response) - - -def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]: - wb = load_workbook(filepath, data_only=True) - ws = wb["Southern RA-Lite Programme 3103"] - - properties: Dict[str, str] = {} - - header_row = 1 - id_col_index = None - deal_name_col_index = None - - for col in range(1, ws.max_column + 1): - cell_value = ws.cell(row=header_row, column=col).value - - if cell_value and str(cell_value).strip().lower() == "id": - id_col_index = col - - if cell_value and str(cell_value).strip().lower() == "deal name": - deal_name_col_index = col - break - - if id_col_index is None: - raise Exception("ID column not found in spreadsheet") - - if deal_name_col_index is None: - raise Exception("Deal Name column not found in spreadsheet") - - for row in range(2, ws.max_row + 1): - id_cell_value = ws.cell(row=row, column=id_col_index).value - deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value - - if id_cell_value is None or deal_name_cell_value is None: - continue - - id_str = str(id_cell_value).strip() - deal_name_str = str(deal_name_cell_value).strip() - - if not id_str: - continue - - sharepoint_address = extract_succinct_address(deal_name_str) - - properties[id_str] = sharepoint_address - - return properties - - -def extract_succinct_address(deal_name: str) -> str: - """ - Input: - '1 My Random Close, Town, AB12 3DC | Retrofit Assessment' - - Output: - '1 My Random Close AB12 3DC' - """ - left_part = deal_name.split("|")[0].strip() - - postcode_match: Optional[re.Match[str]] = re.search( - r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b", - left_part, - re.IGNORECASE, - ) - - postcode = None - if postcode_match: - postcode = postcode_match.group(1).upper() - - first_part = left_part.split(",")[0].strip() - - if postcode: - return f"{first_part} {postcode}" - else: - return first_part - - -def build_property_id(address: str, postcode: str) -> str: - """ - Extract number from address and concat with postcode - Example: - '9 Random Close', 'AB1 2YZ' → '9AB12YZ' - """ - number = address.split(" ")[0] - - postcode_clean = postcode.replace(" ", "").upper() - - return f"{number}{postcode_clean}" - - -def login(page: Page, username: str, password: str) -> None: - page.goto("https://assessorhub.net/", timeout=30000) - - username_input: Locator = page.locator("#Username") - password_input: Locator = page.locator("#Password") - - username_input.wait_for(state="visible", timeout=10000) - username_input.fill(username) - - password_input.wait_for(state="visible", timeout=10000) - password_input.fill(password) - - with page.expect_navigation(timeout=15000): - page.click("button[type='submit']") - - if "login" in page.url.lower(): - raise Exception("Login failed") - - logger.info("Login successful") - - -def go_to_assessments(page: Page) -> None: - page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000) - page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000) - - -def go_to_assessment_details(page: Page, row: Locator) -> None: - account_link: Locator = row.locator("a") - with page.expect_navigation(): - account_link.click() - - page.wait_for_load_state("networkidle") - - page.wait_for_selector("a.download-report-btn", timeout=10000) - - logger.info("Assessment details page fully loaded") - - -def get_first_row_signature(page: Page) -> str: - first_row = page.locator("#assessmentDatatable tbody tr").first - return first_row.inner_text() - - -def go_to_next_page(page: Page) -> bool: - first_signature_before = get_first_row_signature(page) - - next_button = page.locator("#assessmentDatatable_next a") - next_button.click() - - page.wait_for_timeout(2000) - - first_signature_after = get_first_row_signature(page) - - if first_signature_before == first_signature_after: - logger.info("No page change detected - reached end of table") - return False - - return True - - -def build_report_selector(report_type: int) -> str: - return f"a.download-report-btn[data-report-type='{report_type}']" - - -def download_report_by_selector(page: Page, selector: str) -> Optional[str]: - try: - element: Locator = page.locator(selector) - - element.wait_for(state="visible", timeout=10000) - - if not element.is_enabled(): - logger.warning(f"Element not enabled: {selector}") - return None - - element.scroll_into_view_if_needed() - - page.wait_for_timeout(300) - - logger.info(f"Attempting download via selector: {selector}") - logger.info(f"Current URL: {page.url}") - - with page.expect_download(timeout=15000) as download_info: - element.click() - - download = download_info.value - filename: str = download.suggested_filename - - save_path: str = os.path.join(os.getcwd(), filename) - download.save_as(save_path) - - logger.info(f"Downloaded: {filename}") - - return save_path - - except PlaywrightTimeoutError: - logger.error(f"Download NOT triggered for selector: {selector}") - logger.error(f"Current URL at failure: {page.url}") - - try: - content_snippet = page.content()[:1000] - logger.error(f"Page snippet: {content_snippet}") - except Exception: - pass - - return None - - -def download_with_retry(page: Page, selector: str) -> Optional[str]: - for attempt in range(3): - file_path = download_report_by_selector(page, selector) - - if file_path: - return file_path - - logger.warning(f"Retry {attempt + 1} for {selector}") - page.wait_for_timeout(1500) - - return None - - -def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None: - bucket = "retrofit-energy-assessments-dev" - - base_path = f"documents/uprn/{uprn}" - - uploaded_files: List[UploadedFile] = [] - - for file_path in job_files: - filename = os.path.basename(file_path) - file_key = f"{base_path}/{filename}" - - upload_file_to_s3(file_path, bucket, file_key) - - # load row to db - uploaded_files.append( - UploadedFile( - s3_file_bucket=bucket, - s3_file_key=file_key, - s3_upload_timestamp=datetime.now(timezone.utc), - uprn=int(uprn), - file_source=FileSourceEnum.ECMK.value, - ) - ) - - with db_session() as session: - session.add_all(uploaded_files) - session.commit() - - pass - - -def download_report() -> None: - username: str = "" - password: str = "" - - property_list_file: str = ( - "hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx" - ) - - BASE_DIR: str = os.path.dirname(os.path.dirname(__file__)) - filepath: str = os.path.join(BASE_DIR, property_list_file) - - property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet( - filepath - ) - property_ids: List[str] = list(property_id_to_address_map.keys()) - - matching_properties: List[str] = [] - - sharepoint_client = DomnaSharepointClient( - sharepoint_location=DomnaSites.PRIVATE_PAY - ) - sharepoint_base_path = "/Projects/Southern Housing/SH-SURV-26-001/Assessments" - - with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - - context = browser.new_context() - page = context.new_page() - attach_debug_listeners(page) - - try: - login(page, username, password) - print("Login successful:", page.url) - - go_to_assessments(page) - - while True: - rows: Locator = page.locator("#assessmentDatatable tbody tr") - row_count: int = rows.count() - - logger.info(f"Processing {row_count} rows on current page") - - for i in range(row_count): - row: Locator = rows.nth(i) - - try: - cells: Locator = row.locator("td") - - first_name: str = cells.nth(1).inner_text().strip() - last_name: str = cells.nth(2).inner_text().strip() - address: str = cells.nth(5).inner_text().strip() - postcode: str = cells.nth(7).inner_text().strip() - # uprn: str = cells.nth(8).inner_text().strip() - status: str = cells.nth(9).inner_text().strip() - - if first_name == "Oliver" and last_name == "Stephens": - continue - - if status != "Submitted (not Lodged)": - continue - - property_id: str = build_property_id(address, postcode) - - if property_id not in property_ids: - continue - - logger.info(f"MATCH FOUND: {property_id}") - matching_properties.append(property_id) - - sharepoint_address: str = property_id_to_address_map[ - property_id - ] - go_to_assessment_details(page, row) - - report_types: List[int] = [ - file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value, - file_download_button_types.SITENOTE_REPORT.value, - ] - - for report_type in report_types: - selector: str = build_report_selector(report_type) - file_path: Optional[str] = download_with_retry( - page, selector - ) - - if not file_path: - continue - try: - sharepoint_client.upload_file( - file_path=file_path, - sharepoint_path=f"{sharepoint_base_path}/{sharepoint_address}/1. Retrofit Assessment/A. Assessment", - file_name=os.path.basename(file_path), - ) - logger.info( - f"Successfully uploaded file {os.path.basename(file_path)} to sharepoint" - ) - # TODO: could s3 load happen for all files at once to reduce db roundtrips? - # if uprn: - # upload_job_to_s3_and_update_db([file_path], uprn) - finally: - if os.path.exists(file_path): - os.remove(file_path) - logger.info(f"Deleted local file: {file_path}") - - page.go_back() - page.wait_for_selector( - "#assessmentDatatable tbody tr", timeout=15000 - ) - - except PlaywrightTimeoutError as e: - raise Exception(f"Timeout occurred: {str(e)}") - - if not go_to_next_page(page): - break - - except PlaywrightTimeoutError as e: - raise Exception(f"Timeout occurred: {str(e)}") - - finally: - context.close() - browser.close() +from backend.ecmk_fetcher.processor import run_job def handler(event: Mapping[str, Any], context: Any) -> None: - download_report() + run_job() if __name__ == "__main__": diff --git a/backend/ecmk_fetcher/processor.py b/backend/ecmk_fetcher/processor.py new file mode 100644 index 00000000..1852b867 --- /dev/null +++ b/backend/ecmk_fetcher/processor.py @@ -0,0 +1,121 @@ +import os +from typing import Dict, List + +from playwright.sync_api import ( + sync_playwright, + Locator, + Page, + Browser, + BrowserContext, +) + +from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet +from backend.ecmk_fetcher.browser import ( + attach_debug_listeners, + download_with_retry, + go_to_assessment_details, + go_to_assessments, + go_to_next_page, + login, +) +from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id +from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient +from utils.sharepoint.domna_sites import DomnaSites + + +def run_job() -> None: + username: str = "" + password: str = "" + + property_list_file: str = ( + "hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx" + ) + + BASE_DIR: str = os.path.dirname(__file__) + filepath: str = os.path.join(BASE_DIR, property_list_file) + + property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath) + property_ids: List[str] = list(property_map.keys()) + + sharepoint_client: DomnaSharepointClient = DomnaSharepointClient( + sharepoint_location=DomnaSites.PRIVATE_PAY + ) + + sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments" + + with sync_playwright() as p: + browser: Browser = p.chromium.launch(headless=True) + context: BrowserContext = browser.new_context() + page: Page = context.new_page() + + attach_debug_listeners(page) + + try: + login(page, username, password) + go_to_assessments(page) + + while True: + rows: Locator = page.locator("#assessmentDatatable tbody tr") + row_count: int = rows.count() + + for i in range(row_count): + row: Locator = rows.nth(i) + + try: + cells: Locator = row.locator("td") + + first_name: str = cells.nth(1).inner_text().strip() + last_name: str = cells.nth(2).inner_text().strip() + address: str = cells.nth(5).inner_text().strip() + postcode: str = cells.nth(7).inner_text().strip() + status: str = cells.nth(9).inner_text().strip() + + if first_name == "Oliver" and last_name == "Stephens": + continue + + if status != "Submitted (not Lodged)": + continue + + property_id: str = build_property_id(address, postcode) + + if property_id not in property_ids: + continue + + sharepoint_address: str = property_map[property_id] + + go_to_assessment_details(page, row) + + for report_type in REPORT_TYPES: + file_path: str | None = download_with_retry( + page, report_type + ) + + if not file_path: + continue + + try: + upload_file_to_sharepoint( + client=sharepoint_client, + file_path=file_path, + base_path=sharepoint_base_path, + subpath=sharepoint_address, + ) + finally: + if os.path.exists(file_path): + os.remove(file_path) + + page.go_back() + page.wait_for_selector( + "#assessmentDatatable tbody tr", timeout=15000 + ) + + except Exception as e: + raise Exception(f"Row processing failed: {str(e)}") from e + + if not go_to_next_page(page): + break + + finally: + context.close() + browser.close() diff --git a/backend/ecmk_fetcher/reports.py b/backend/ecmk_fetcher/reports.py new file mode 100644 index 00000000..a8f12792 --- /dev/null +++ b/backend/ecmk_fetcher/reports.py @@ -0,0 +1,25 @@ +from enum import Enum + + +class FileDownloadButtonType(Enum): + ASSESSOR_HUB_SITENOTE_REPORT = 11 + CERTIFICATE = 9 + SITENOTE_REPORT = 8 + RAW_XML = 7 + SAP_WORK_SHEET = 15 + + +REPORT_TYPES = [ + FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value, + FileDownloadButtonType.SITENOTE_REPORT.value, +] + + +def build_report_selector(report_type: int) -> str: + return f"a.download-report-btn[data-report-type='{report_type}']" + + +def build_property_id(address: str, postcode: str) -> str: + number = address.split(" ")[0] + postcode_clean = postcode.replace(" ", "").upper() + return f"{number}{postcode_clean}" diff --git a/backend/ecmk_fetcher/sharepoint.py b/backend/ecmk_fetcher/sharepoint.py new file mode 100644 index 00000000..79db1294 --- /dev/null +++ b/backend/ecmk_fetcher/sharepoint.py @@ -0,0 +1,20 @@ +import os + +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient + + +def upload_file_to_sharepoint( + client: DomnaSharepointClient, + file_path: str, + base_path: str, + subpath: str, +) -> None: + filename = os.path.basename(file_path) + + full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment" + + client.upload_file( + file_path=file_path, + sharepoint_path=full_path, + file_name=filename, + )