Merge pull request #955 from Hestia-Homes/main

ECMK to sharepoint: local runner
This commit is contained in:
Jun-te Kim 2026-04-02 12:39:08 +01:00 committed by GitHub
commit 00a9bfc14f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 332 additions and 0 deletions

View file

@ -20,6 +20,7 @@ class FileSourceEnum(enum.Enum):
PAS_HUB = "pas hub"
SHAREPOINT = "sharepoint"
HUBSPOT = "hubspot"
ECMK = "ecmk"
class UploadedFile(Base):

View file

@ -0,0 +1,55 @@
from typing import Dict, Optional
from openpyxl import load_workbook
import re
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
wb = load_workbook(filepath, data_only=True)
ws = wb["Southern RA-Lite Programme 3103"]
properties: Dict[str, str] = {}
header_row = 1
id_col_index = None
deal_name_col_index = None
for col in range(1, ws.max_column + 1):
value = ws.cell(row=header_row, column=col).value
if value and str(value).strip().lower() == "id":
id_col_index = col
if value and str(value).strip().lower() == "deal name":
deal_name_col_index = col
break
if id_col_index is None or deal_name_col_index is None:
raise Exception("Required columns not found")
for row in range(2, ws.max_row + 1):
id_val = ws.cell(row=row, column=id_col_index).value
deal_name = ws.cell(row=row, column=deal_name_col_index).value
if not id_val or not deal_name:
continue
properties[str(id_val).strip()] = extract_succinct_address(
str(deal_name).strip()
)
return properties
def extract_succinct_address(deal_name: str) -> str:
left_part = deal_name.split("|")[0].strip()
postcode_match: Optional[re.Match[str]] = re.search(
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
left_part,
re.IGNORECASE,
)
postcode = postcode_match.group(1).upper() if postcode_match else None
first_part = left_part.split(",")[0].strip()
return f"{first_part} {postcode}" if postcode else first_part

View file

@ -0,0 +1,98 @@
import os
from typing import Optional
from playwright.sync_api import Page, Locator, Response
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from backend.ecmk_fetcher.reports import build_report_selector
from utils.logger import setup_logger
# from .reports import build_report_selector
logger = setup_logger()
def attach_debug_listeners(page: Page) -> None:
def handle_response(response: Response) -> None:
if "download" in response.url or "report" in response.url:
logger.info(f"[RESPONSE] {response.status} {response.url}")
page.on("response", handle_response)
def login(page: Page, username: str, password: str) -> None:
page.goto("https://assessorhub.net/", timeout=30000)
page.locator("#Username").fill(username)
page.locator("#Password").fill(password)
with page.expect_navigation():
page.click("button[type='submit']")
if "login" in page.url.lower():
raise Exception("Login failed")
logger.info("Login successful")
def go_to_assessments(page: Page) -> None:
page.goto("https://assessorhub.net/Companies/Assessments")
page.wait_for_selector("#assessmentDatatable tbody tr")
def go_to_assessment_details(page: Page, row: Locator) -> None:
row.locator("a").click()
page.wait_for_load_state("networkidle")
page.wait_for_selector("a.download-report-btn")
def get_first_row_signature(page: Page) -> str:
return page.locator("#assessmentDatatable tbody tr").first.inner_text()
def go_to_next_page(page: Page) -> bool:
before = get_first_row_signature(page)
page.locator("#assessmentDatatable_next a").click()
page.wait_for_timeout(2000)
after = get_first_row_signature(page)
return before != after
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
try:
element = page.locator(selector)
element.wait_for(state="visible", timeout=10000)
if not element.is_enabled():
return None
element.scroll_into_view_if_needed()
with page.expect_download(timeout=15000) as download_info:
element.click()
download = download_info.value
filename = download.suggested_filename
save_path = os.path.join(os.getcwd(), filename)
download.save_as(save_path)
return save_path
except PlaywrightTimeoutError:
logger.error(f"Download failed for {selector}")
return None
def download_with_retry(page: Page, report_type: int) -> Optional[str]:
selector: str = build_report_selector(report_type)
for _ in range(3):
file_path = download_report_by_selector(page, selector)
if file_path:
return file_path
page.wait_for_timeout(1500)
return None

View file

@ -0,0 +1,12 @@
from typing import Any, Mapping
from backend.ecmk_fetcher.processor import run_job
def handler(event: Mapping[str, Any], context: Any) -> None:
run_job()
if __name__ == "__main__":
event = {"Records": [{"body": "{}"}]}
handler(event, None)

View file

@ -0,0 +1,121 @@
import os
from typing import Dict, List
from playwright.sync_api import (
sync_playwright,
Locator,
Page,
Browser,
BrowserContext,
)
from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet
from backend.ecmk_fetcher.browser import (
attach_debug_listeners,
download_with_retry,
go_to_assessment_details,
go_to_assessments,
go_to_next_page,
login,
)
from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id
from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
def run_job() -> None:
username: str = ""
password: str = ""
property_list_file: str = (
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
BASE_DIR: str = os.path.dirname(__file__)
filepath: str = os.path.join(BASE_DIR, property_list_file)
property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath)
property_ids: List[str] = list(property_map.keys())
sharepoint_client: DomnaSharepointClient = DomnaSharepointClient(
sharepoint_location=DomnaSites.PRIVATE_PAY
)
sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
with sync_playwright() as p:
browser: Browser = p.chromium.launch(headless=True)
context: BrowserContext = browser.new_context()
page: Page = context.new_page()
attach_debug_listeners(page)
try:
login(page, username, password)
go_to_assessments(page)
while True:
rows: Locator = page.locator("#assessmentDatatable tbody tr")
row_count: int = rows.count()
for i in range(row_count):
row: Locator = rows.nth(i)
try:
cells: Locator = row.locator("td")
first_name: str = cells.nth(1).inner_text().strip()
last_name: str = cells.nth(2).inner_text().strip()
address: str = cells.nth(5).inner_text().strip()
postcode: str = cells.nth(7).inner_text().strip()
status: str = cells.nth(9).inner_text().strip()
if first_name == "Oliver" and last_name == "Stephens":
continue
if status != "Submitted (not Lodged)":
continue
property_id: str = build_property_id(address, postcode)
if property_id not in property_ids:
continue
sharepoint_address: str = property_map[property_id]
go_to_assessment_details(page, row)
for report_type in REPORT_TYPES:
file_path: str | None = download_with_retry(
page, report_type
)
if not file_path:
continue
try:
upload_file_to_sharepoint(
client=sharepoint_client,
file_path=file_path,
base_path=sharepoint_base_path,
subpath=sharepoint_address,
)
finally:
if os.path.exists(file_path):
os.remove(file_path)
page.go_back()
page.wait_for_selector(
"#assessmentDatatable tbody tr", timeout=15000
)
except Exception as e:
raise Exception(f"Row processing failed: {str(e)}") from e
if not go_to_next_page(page):
break
finally:
context.close()
browser.close()

View file

@ -0,0 +1,25 @@
from enum import Enum
class FileDownloadButtonType(Enum):
ASSESSOR_HUB_SITENOTE_REPORT = 11
CERTIFICATE = 9
SITENOTE_REPORT = 8
RAW_XML = 7
SAP_WORK_SHEET = 15
REPORT_TYPES = [
FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value,
FileDownloadButtonType.SITENOTE_REPORT.value,
]
def build_report_selector(report_type: int) -> str:
return f"a.download-report-btn[data-report-type='{report_type}']"
def build_property_id(address: str, postcode: str) -> str:
number = address.split(" ")[0]
postcode_clean = postcode.replace(" ", "").upper()
return f"{number}{postcode_clean}"

View file

@ -0,0 +1,20 @@
import os
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
def upload_file_to_sharepoint(
client: DomnaSharepointClient,
file_path: str,
base_path: str,
subpath: str,
) -> None:
filename = os.path.basename(file_path)
full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment"
client.upload_file(
file_path=file_path,
sharepoint_path=full_path,
file_name=filename,
)