mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
408 lines
12 KiB
Python
408 lines
12 KiB
Python
from datetime import datetime, timezone
|
|
import os
|
|
from enum import Enum
|
|
import re
|
|
from typing import Any, Dict, List, Mapping, Optional
|
|
from openpyxl import load_workbook
|
|
from playwright.sync_api import (
|
|
Locator,
|
|
Page,
|
|
Response,
|
|
sync_playwright,
|
|
TimeoutError as PlaywrightTimeoutError,
|
|
)
|
|
|
|
from backend.app.db.connection import db_session
|
|
from backend.app.db.models.uploaded_file import FileSourceEnum, UploadedFile
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import upload_file_to_s3
|
|
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
|
from utils.sharepoint.domna_sites import DomnaSites
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class file_download_button_types(Enum):
|
|
ASSESSOR_HUB_SITENOTE_REPORT = 11
|
|
CERTIFICATE = 9
|
|
SITENOTE_REPORT = 8
|
|
RAW_XML = 7
|
|
SAP_WORK_SHEET = 15
|
|
|
|
|
|
def attach_debug_listeners(page: Page) -> None:
|
|
def handle_response(response: Response) -> None:
|
|
url: str = response.url
|
|
status: int = response.status
|
|
|
|
if "download" in url or "report" in url:
|
|
logger.info(f"[RESPONSE] {status} {url}")
|
|
|
|
if status >= 400:
|
|
logger.error(f"[ERROR RESPONSE] {status} {url}")
|
|
|
|
page.on("response", handle_response)
|
|
|
|
|
|
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
|
|
wb = load_workbook(filepath, data_only=True)
|
|
ws = wb["Southern RA-Lite Programme 3103"]
|
|
|
|
properties: Dict[str, str] = {}
|
|
|
|
header_row = 1
|
|
id_col_index = None
|
|
deal_name_col_index = None
|
|
|
|
for col in range(1, ws.max_column + 1):
|
|
cell_value = ws.cell(row=header_row, column=col).value
|
|
|
|
if cell_value and str(cell_value).strip().lower() == "id":
|
|
id_col_index = col
|
|
|
|
if cell_value and str(cell_value).strip().lower() == "deal name":
|
|
deal_name_col_index = col
|
|
break
|
|
|
|
if id_col_index is None:
|
|
raise Exception("ID column not found in spreadsheet")
|
|
|
|
if deal_name_col_index is None:
|
|
raise Exception("Deal Name column not found in spreadsheet")
|
|
|
|
for row in range(2, ws.max_row + 1):
|
|
id_cell_value = ws.cell(row=row, column=id_col_index).value
|
|
deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value
|
|
|
|
if id_cell_value is None or deal_name_cell_value is None:
|
|
continue
|
|
|
|
id_str = str(id_cell_value).strip()
|
|
deal_name_str = str(deal_name_cell_value).strip()
|
|
|
|
if not id_str:
|
|
continue
|
|
|
|
sharepoint_address = extract_succinct_address(deal_name_str)
|
|
|
|
properties[id_str] = sharepoint_address
|
|
|
|
return properties
|
|
|
|
|
|
def extract_succinct_address(deal_name: str) -> str:
|
|
"""
|
|
Input:
|
|
'1 My Random Close, Town, AB12 3DC | Retrofit Assessment'
|
|
|
|
Output:
|
|
'1 My Random Close AB12 3DC'
|
|
"""
|
|
left_part = deal_name.split("|")[0].strip()
|
|
|
|
postcode_match: Optional[re.Match[str]] = re.search(
|
|
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
|
|
left_part,
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
postcode = None
|
|
if postcode_match:
|
|
postcode = postcode_match.group(1).upper()
|
|
|
|
first_part = left_part.split(",")[0].strip()
|
|
|
|
if postcode:
|
|
return f"{first_part} {postcode}"
|
|
else:
|
|
return first_part
|
|
|
|
|
|
def build_property_id(address: str, postcode: str) -> str:
|
|
"""
|
|
Extract number from address and concat with postcode
|
|
Example:
|
|
'9 Random Close', 'AB1 2YZ' → '9AB12YZ'
|
|
"""
|
|
number = address.split(" ")[0]
|
|
|
|
postcode_clean = postcode.replace(" ", "").upper()
|
|
|
|
return f"{number}{postcode_clean}"
|
|
|
|
|
|
def login(page: Page, username: str, password: str) -> None:
|
|
page.goto("https://assessorhub.net/", timeout=30000)
|
|
|
|
username_input: Locator = page.locator("#Username")
|
|
password_input: Locator = page.locator("#Password")
|
|
|
|
username_input.wait_for(state="visible", timeout=10000)
|
|
username_input.fill(username)
|
|
|
|
password_input.wait_for(state="visible", timeout=10000)
|
|
password_input.fill(password)
|
|
|
|
with page.expect_navigation(timeout=15000):
|
|
page.click("button[type='submit']")
|
|
|
|
if "login" in page.url.lower():
|
|
raise Exception("Login failed")
|
|
|
|
logger.info("Login successful")
|
|
|
|
|
|
def go_to_assessments(page: Page) -> None:
|
|
page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000)
|
|
page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000)
|
|
|
|
|
|
def go_to_assessment_details(page: Page, row: Locator) -> None:
|
|
account_link: Locator = row.locator("a")
|
|
with page.expect_navigation():
|
|
account_link.click()
|
|
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
page.wait_for_selector("a.download-report-btn", timeout=10000)
|
|
|
|
logger.info("Assessment details page fully loaded")
|
|
|
|
|
|
def go_to_next_page(page: Page) -> bool:
|
|
next_button: Locator = page.locator("#assessmentDatatable_next a")
|
|
|
|
class_attr: Optional[str] = next_button.get_attribute("class") or ""
|
|
|
|
if "disabled" in class_attr:
|
|
logger.info("No more pages")
|
|
return False
|
|
|
|
next_button.scroll_into_view_if_needed()
|
|
next_button.click()
|
|
|
|
page.wait_for_timeout(2000)
|
|
return True
|
|
|
|
|
|
def build_report_selector(report_type: int) -> str:
|
|
return f"a.download-report-btn[data-report-type='{report_type}']"
|
|
|
|
|
|
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
|
|
try:
|
|
element: Locator = page.locator(selector)
|
|
|
|
element.wait_for(state="visible", timeout=10000)
|
|
|
|
if not element.is_enabled():
|
|
logger.warning(f"Element not enabled: {selector}")
|
|
return None
|
|
|
|
element.scroll_into_view_if_needed()
|
|
|
|
page.wait_for_timeout(300)
|
|
|
|
logger.info(f"Attempting download via selector: {selector}")
|
|
logger.info(f"Current URL: {page.url}")
|
|
|
|
with page.expect_download(timeout=15000) as download_info:
|
|
element.click()
|
|
|
|
download = download_info.value
|
|
filename: str = download.suggested_filename
|
|
|
|
save_path: str = os.path.join(os.getcwd(), filename)
|
|
download.save_as(save_path)
|
|
|
|
logger.info(f"Downloaded: {filename}")
|
|
|
|
return save_path
|
|
|
|
except PlaywrightTimeoutError:
|
|
logger.error(f"Download NOT triggered for selector: {selector}")
|
|
logger.error(f"Current URL at failure: {page.url}")
|
|
|
|
try:
|
|
content_snippet = page.content()[:1000]
|
|
logger.error(f"Page snippet: {content_snippet}")
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def download_with_retry(page: Page, selector: str) -> Optional[str]:
|
|
for attempt in range(3):
|
|
file_path = download_report_by_selector(page, selector)
|
|
|
|
if file_path:
|
|
return file_path
|
|
|
|
logger.warning(f"Retry {attempt + 1} for {selector}")
|
|
page.wait_for_timeout(1500)
|
|
|
|
return None
|
|
|
|
|
|
def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None:
|
|
bucket = "retrofit-energy-assessments-dev"
|
|
|
|
base_path = f"documents/uprn/{uprn}"
|
|
|
|
uploaded_files: List[UploadedFile] = []
|
|
|
|
for file_path in job_files:
|
|
filename = os.path.basename(file_path)
|
|
file_key = f"{base_path}/{filename}"
|
|
|
|
upload_file_to_s3(file_path, bucket, file_key)
|
|
|
|
# load row to db
|
|
uploaded_files.append(
|
|
UploadedFile(
|
|
s3_file_bucket=bucket,
|
|
s3_file_key=file_key,
|
|
s3_upload_timestamp=datetime.now(timezone.utc),
|
|
uprn=int(uprn),
|
|
file_source=FileSourceEnum.ECMK.value,
|
|
)
|
|
)
|
|
|
|
with db_session() as session:
|
|
session.add_all(uploaded_files)
|
|
session.commit()
|
|
|
|
pass
|
|
|
|
|
|
def download_report() -> None:
|
|
username: str = ""
|
|
password: str = ""
|
|
|
|
property_list_file: str = (
|
|
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
|
|
)
|
|
|
|
BASE_DIR: str = os.path.dirname(os.path.dirname(__file__))
|
|
filepath: str = os.path.join(BASE_DIR, property_list_file)
|
|
|
|
property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet(
|
|
filepath
|
|
)
|
|
property_ids: List[str] = list(property_id_to_address_map.keys())
|
|
|
|
matching_properties: List[str] = []
|
|
|
|
sharepoint_client = DomnaSharepointClient(
|
|
sharepoint_location=DomnaSites.PRIVATE_PAY
|
|
)
|
|
sharepoint_base_path = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
|
|
context = browser.new_context()
|
|
page = context.new_page()
|
|
attach_debug_listeners(page)
|
|
|
|
try:
|
|
login(page, username, password)
|
|
print("Login successful:", page.url)
|
|
|
|
go_to_assessments(page)
|
|
|
|
while True:
|
|
rows: Locator = page.locator("#assessmentDatatable tbody tr")
|
|
row_count: int = rows.count()
|
|
|
|
logger.info(f"Processing {row_count} rows on current page")
|
|
|
|
for i in range(row_count):
|
|
row: Locator = rows.nth(i)
|
|
|
|
try:
|
|
cells: Locator = row.locator("td")
|
|
|
|
first_name: str = cells.nth(1).inner_text().strip()
|
|
last_name: str = cells.nth(2).inner_text().strip()
|
|
address: str = cells.nth(5).inner_text().strip()
|
|
postcode: str = cells.nth(7).inner_text().strip()
|
|
# uprn: str = cells.nth(8).inner_text().strip()
|
|
status: str = cells.nth(9).inner_text().strip()
|
|
|
|
if first_name == "Oliver" and last_name == "Stephens":
|
|
continue
|
|
|
|
if status != "Submitted (not Lodged)":
|
|
continue
|
|
|
|
property_id: str = build_property_id(address, postcode)
|
|
|
|
if property_id not in property_ids:
|
|
continue
|
|
|
|
logger.info(f"MATCH FOUND: {property_id}")
|
|
matching_properties.append(property_id)
|
|
|
|
sharepoint_address: str = property_id_to_address_map[
|
|
property_id
|
|
]
|
|
go_to_assessment_details(page, row)
|
|
|
|
report_types: List[int] = [
|
|
file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value,
|
|
file_download_button_types.SITENOTE_REPORT.value,
|
|
]
|
|
|
|
for report_type in report_types:
|
|
selector: str = build_report_selector(report_type)
|
|
file_path: Optional[str] = download_with_retry(
|
|
page, selector
|
|
)
|
|
|
|
if not file_path:
|
|
continue
|
|
try:
|
|
sharepoint_client.upload_file(
|
|
file_path=file_path,
|
|
sharepoint_path=f"{sharepoint_base_path}/{sharepoint_address}/1. Retrofit Assessment/A. Assessment",
|
|
file_name=os.path.basename(file_path),
|
|
)
|
|
logger.info(
|
|
f"Successfully uploaded file {os.path.basename(file_path)} to sharepoint"
|
|
)
|
|
# TODO: could s3 load happen for all files at once to reduce db roundtrips?
|
|
# if uprn:
|
|
# upload_job_to_s3_and_update_db([file_path], uprn)
|
|
finally:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
logger.info(f"Deleted local file: {file_path}")
|
|
|
|
page.go_back()
|
|
page.wait_for_selector(
|
|
"#assessmentDatatable tbody tr", timeout=15000
|
|
)
|
|
|
|
except PlaywrightTimeoutError as e:
|
|
raise Exception(f"Timeout occurred: {str(e)}")
|
|
|
|
if not go_to_next_page(page):
|
|
break
|
|
|
|
except PlaywrightTimeoutError as e:
|
|
raise Exception(f"Timeout occurred: {str(e)}")
|
|
|
|
finally:
|
|
context.close()
|
|
browser.close()
|
|
|
|
|
|
def handler(event: Mapping[str, Any], context: Any) -> None:
|
|
download_report()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
event = {"Records": [{"body": "{}"}]}
|
|
handler(event, None)
|