mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
refactor
This commit is contained in:
parent
0009a50dc5
commit
998a732428
6 changed files with 322 additions and 405 deletions
55
backend/ecmk_fetcher/address_list.py
Normal file
55
backend/ecmk_fetcher/address_list.py
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
from typing import Dict, Optional
|
||||
from openpyxl import load_workbook
|
||||
import re
|
||||
|
||||
|
||||
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
|
||||
wb = load_workbook(filepath, data_only=True)
|
||||
ws = wb["Southern RA-Lite Programme 3103"]
|
||||
|
||||
properties: Dict[str, str] = {}
|
||||
|
||||
header_row = 1
|
||||
id_col_index = None
|
||||
deal_name_col_index = None
|
||||
|
||||
for col in range(1, ws.max_column + 1):
|
||||
value = ws.cell(row=header_row, column=col).value
|
||||
|
||||
if value and str(value).strip().lower() == "id":
|
||||
id_col_index = col
|
||||
|
||||
if value and str(value).strip().lower() == "deal name":
|
||||
deal_name_col_index = col
|
||||
break
|
||||
|
||||
if id_col_index is None or deal_name_col_index is None:
|
||||
raise Exception("Required columns not found")
|
||||
|
||||
for row in range(2, ws.max_row + 1):
|
||||
id_val = ws.cell(row=row, column=id_col_index).value
|
||||
deal_name = ws.cell(row=row, column=deal_name_col_index).value
|
||||
|
||||
if not id_val or not deal_name:
|
||||
continue
|
||||
|
||||
properties[str(id_val).strip()] = extract_succinct_address(
|
||||
str(deal_name).strip()
|
||||
)
|
||||
|
||||
return properties
|
||||
|
||||
|
||||
def extract_succinct_address(deal_name: str) -> str:
|
||||
left_part = deal_name.split("|")[0].strip()
|
||||
|
||||
postcode_match: Optional[re.Match[str]] = re.search(
|
||||
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
|
||||
left_part,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
postcode = postcode_match.group(1).upper() if postcode_match else None
|
||||
first_part = left_part.split(",")[0].strip()
|
||||
|
||||
return f"{first_part} {postcode}" if postcode else first_part
|
||||
98
backend/ecmk_fetcher/browser.py
Normal file
98
backend/ecmk_fetcher/browser.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
import os
|
||||
from typing import Optional
|
||||
from playwright.sync_api import Page, Locator, Response
|
||||
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
|
||||
|
||||
from backend.ecmk_fetcher.reports import build_report_selector
|
||||
from utils.logger import setup_logger
|
||||
|
||||
# from .reports import build_report_selector
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def attach_debug_listeners(page: Page) -> None:
|
||||
def handle_response(response: Response) -> None:
|
||||
if "download" in response.url or "report" in response.url:
|
||||
logger.info(f"[RESPONSE] {response.status} {response.url}")
|
||||
|
||||
page.on("response", handle_response)
|
||||
|
||||
|
||||
def login(page: Page, username: str, password: str) -> None:
|
||||
page.goto("https://assessorhub.net/", timeout=30000)
|
||||
|
||||
page.locator("#Username").fill(username)
|
||||
page.locator("#Password").fill(password)
|
||||
|
||||
with page.expect_navigation():
|
||||
page.click("button[type='submit']")
|
||||
|
||||
if "login" in page.url.lower():
|
||||
raise Exception("Login failed")
|
||||
|
||||
logger.info("Login successful")
|
||||
|
||||
|
||||
def go_to_assessments(page: Page) -> None:
|
||||
page.goto("https://assessorhub.net/Companies/Assessments")
|
||||
page.wait_for_selector("#assessmentDatatable tbody tr")
|
||||
|
||||
|
||||
def go_to_assessment_details(page: Page, row: Locator) -> None:
|
||||
row.locator("a").click()
|
||||
page.wait_for_load_state("networkidle")
|
||||
page.wait_for_selector("a.download-report-btn")
|
||||
|
||||
|
||||
def get_first_row_signature(page: Page) -> str:
|
||||
return page.locator("#assessmentDatatable tbody tr").first.inner_text()
|
||||
|
||||
|
||||
def go_to_next_page(page: Page) -> bool:
|
||||
before = get_first_row_signature(page)
|
||||
|
||||
page.locator("#assessmentDatatable_next a").click()
|
||||
page.wait_for_timeout(2000)
|
||||
|
||||
after = get_first_row_signature(page)
|
||||
|
||||
return before != after
|
||||
|
||||
|
||||
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
|
||||
try:
|
||||
element = page.locator(selector)
|
||||
element.wait_for(state="visible", timeout=10000)
|
||||
|
||||
if not element.is_enabled():
|
||||
return None
|
||||
|
||||
element.scroll_into_view_if_needed()
|
||||
|
||||
with page.expect_download(timeout=15000) as download_info:
|
||||
element.click()
|
||||
|
||||
download = download_info.value
|
||||
filename = download.suggested_filename
|
||||
|
||||
save_path = os.path.join(os.getcwd(), filename)
|
||||
download.save_as(save_path)
|
||||
|
||||
return save_path
|
||||
|
||||
except PlaywrightTimeoutError:
|
||||
logger.error(f"Download failed for {selector}")
|
||||
return None
|
||||
|
||||
|
||||
def download_with_retry(page: Page, report_type: int) -> Optional[str]:
|
||||
selector: str = build_report_selector(report_type)
|
||||
|
||||
for _ in range(3):
|
||||
file_path = download_report_by_selector(page, selector)
|
||||
if file_path:
|
||||
return file_path
|
||||
page.wait_for_timeout(1500)
|
||||
|
||||
return None
|
||||
|
|
@ -1,412 +1,10 @@
|
|||
from datetime import datetime, timezone
|
||||
import os
|
||||
from enum import Enum
|
||||
import re
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
from openpyxl import load_workbook
|
||||
from playwright.sync_api import (
|
||||
Locator,
|
||||
Page,
|
||||
Response,
|
||||
sync_playwright,
|
||||
TimeoutError as PlaywrightTimeoutError,
|
||||
)
|
||||
from typing import Any, Mapping
|
||||
|
||||
from backend.app.db.connection import db_session
|
||||
from backend.app.db.models.uploaded_file import FileSourceEnum, UploadedFile
|
||||
from utils.logger import setup_logger
|
||||
from utils.s3 import upload_file_to_s3
|
||||
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
||||
from utils.sharepoint.domna_sites import DomnaSites
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class file_download_button_types(Enum):
|
||||
ASSESSOR_HUB_SITENOTE_REPORT = 11
|
||||
CERTIFICATE = 9
|
||||
SITENOTE_REPORT = 8
|
||||
RAW_XML = 7
|
||||
SAP_WORK_SHEET = 15
|
||||
|
||||
|
||||
def attach_debug_listeners(page: Page) -> None:
|
||||
def handle_response(response: Response) -> None:
|
||||
url: str = response.url
|
||||
status: int = response.status
|
||||
|
||||
if "download" in url or "report" in url:
|
||||
logger.info(f"[RESPONSE] {status} {url}")
|
||||
|
||||
if status >= 400:
|
||||
logger.error(f"[ERROR RESPONSE] {status} {url}")
|
||||
|
||||
page.on("response", handle_response)
|
||||
|
||||
|
||||
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
|
||||
wb = load_workbook(filepath, data_only=True)
|
||||
ws = wb["Southern RA-Lite Programme 3103"]
|
||||
|
||||
properties: Dict[str, str] = {}
|
||||
|
||||
header_row = 1
|
||||
id_col_index = None
|
||||
deal_name_col_index = None
|
||||
|
||||
for col in range(1, ws.max_column + 1):
|
||||
cell_value = ws.cell(row=header_row, column=col).value
|
||||
|
||||
if cell_value and str(cell_value).strip().lower() == "id":
|
||||
id_col_index = col
|
||||
|
||||
if cell_value and str(cell_value).strip().lower() == "deal name":
|
||||
deal_name_col_index = col
|
||||
break
|
||||
|
||||
if id_col_index is None:
|
||||
raise Exception("ID column not found in spreadsheet")
|
||||
|
||||
if deal_name_col_index is None:
|
||||
raise Exception("Deal Name column not found in spreadsheet")
|
||||
|
||||
for row in range(2, ws.max_row + 1):
|
||||
id_cell_value = ws.cell(row=row, column=id_col_index).value
|
||||
deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value
|
||||
|
||||
if id_cell_value is None or deal_name_cell_value is None:
|
||||
continue
|
||||
|
||||
id_str = str(id_cell_value).strip()
|
||||
deal_name_str = str(deal_name_cell_value).strip()
|
||||
|
||||
if not id_str:
|
||||
continue
|
||||
|
||||
sharepoint_address = extract_succinct_address(deal_name_str)
|
||||
|
||||
properties[id_str] = sharepoint_address
|
||||
|
||||
return properties
|
||||
|
||||
|
||||
def extract_succinct_address(deal_name: str) -> str:
|
||||
"""
|
||||
Input:
|
||||
'1 My Random Close, Town, AB12 3DC | Retrofit Assessment'
|
||||
|
||||
Output:
|
||||
'1 My Random Close AB12 3DC'
|
||||
"""
|
||||
left_part = deal_name.split("|")[0].strip()
|
||||
|
||||
postcode_match: Optional[re.Match[str]] = re.search(
|
||||
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
|
||||
left_part,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
postcode = None
|
||||
if postcode_match:
|
||||
postcode = postcode_match.group(1).upper()
|
||||
|
||||
first_part = left_part.split(",")[0].strip()
|
||||
|
||||
if postcode:
|
||||
return f"{first_part} {postcode}"
|
||||
else:
|
||||
return first_part
|
||||
|
||||
|
||||
def build_property_id(address: str, postcode: str) -> str:
|
||||
"""
|
||||
Extract number from address and concat with postcode
|
||||
Example:
|
||||
'9 Random Close', 'AB1 2YZ' → '9AB12YZ'
|
||||
"""
|
||||
number = address.split(" ")[0]
|
||||
|
||||
postcode_clean = postcode.replace(" ", "").upper()
|
||||
|
||||
return f"{number}{postcode_clean}"
|
||||
|
||||
|
||||
def login(page: Page, username: str, password: str) -> None:
|
||||
page.goto("https://assessorhub.net/", timeout=30000)
|
||||
|
||||
username_input: Locator = page.locator("#Username")
|
||||
password_input: Locator = page.locator("#Password")
|
||||
|
||||
username_input.wait_for(state="visible", timeout=10000)
|
||||
username_input.fill(username)
|
||||
|
||||
password_input.wait_for(state="visible", timeout=10000)
|
||||
password_input.fill(password)
|
||||
|
||||
with page.expect_navigation(timeout=15000):
|
||||
page.click("button[type='submit']")
|
||||
|
||||
if "login" in page.url.lower():
|
||||
raise Exception("Login failed")
|
||||
|
||||
logger.info("Login successful")
|
||||
|
||||
|
||||
def go_to_assessments(page: Page) -> None:
|
||||
page.goto("https://assessorhub.net/Companies/Assessments", timeout=30000)
|
||||
page.wait_for_selector("#assessmentDatatable tbody tr", timeout=20000)
|
||||
|
||||
|
||||
def go_to_assessment_details(page: Page, row: Locator) -> None:
|
||||
account_link: Locator = row.locator("a")
|
||||
with page.expect_navigation():
|
||||
account_link.click()
|
||||
|
||||
page.wait_for_load_state("networkidle")
|
||||
|
||||
page.wait_for_selector("a.download-report-btn", timeout=10000)
|
||||
|
||||
logger.info("Assessment details page fully loaded")
|
||||
|
||||
|
||||
def get_first_row_signature(page: Page) -> str:
|
||||
first_row = page.locator("#assessmentDatatable tbody tr").first
|
||||
return first_row.inner_text()
|
||||
|
||||
|
||||
def go_to_next_page(page: Page) -> bool:
|
||||
first_signature_before = get_first_row_signature(page)
|
||||
|
||||
next_button = page.locator("#assessmentDatatable_next a")
|
||||
next_button.click()
|
||||
|
||||
page.wait_for_timeout(2000)
|
||||
|
||||
first_signature_after = get_first_row_signature(page)
|
||||
|
||||
if first_signature_before == first_signature_after:
|
||||
logger.info("No page change detected - reached end of table")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def build_report_selector(report_type: int) -> str:
|
||||
return f"a.download-report-btn[data-report-type='{report_type}']"
|
||||
|
||||
|
||||
def download_report_by_selector(page: Page, selector: str) -> Optional[str]:
|
||||
try:
|
||||
element: Locator = page.locator(selector)
|
||||
|
||||
element.wait_for(state="visible", timeout=10000)
|
||||
|
||||
if not element.is_enabled():
|
||||
logger.warning(f"Element not enabled: {selector}")
|
||||
return None
|
||||
|
||||
element.scroll_into_view_if_needed()
|
||||
|
||||
page.wait_for_timeout(300)
|
||||
|
||||
logger.info(f"Attempting download via selector: {selector}")
|
||||
logger.info(f"Current URL: {page.url}")
|
||||
|
||||
with page.expect_download(timeout=15000) as download_info:
|
||||
element.click()
|
||||
|
||||
download = download_info.value
|
||||
filename: str = download.suggested_filename
|
||||
|
||||
save_path: str = os.path.join(os.getcwd(), filename)
|
||||
download.save_as(save_path)
|
||||
|
||||
logger.info(f"Downloaded: {filename}")
|
||||
|
||||
return save_path
|
||||
|
||||
except PlaywrightTimeoutError:
|
||||
logger.error(f"Download NOT triggered for selector: {selector}")
|
||||
logger.error(f"Current URL at failure: {page.url}")
|
||||
|
||||
try:
|
||||
content_snippet = page.content()[:1000]
|
||||
logger.error(f"Page snippet: {content_snippet}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def download_with_retry(page: Page, selector: str) -> Optional[str]:
|
||||
for attempt in range(3):
|
||||
file_path = download_report_by_selector(page, selector)
|
||||
|
||||
if file_path:
|
||||
return file_path
|
||||
|
||||
logger.warning(f"Retry {attempt + 1} for {selector}")
|
||||
page.wait_for_timeout(1500)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None:
|
||||
bucket = "retrofit-energy-assessments-dev"
|
||||
|
||||
base_path = f"documents/uprn/{uprn}"
|
||||
|
||||
uploaded_files: List[UploadedFile] = []
|
||||
|
||||
for file_path in job_files:
|
||||
filename = os.path.basename(file_path)
|
||||
file_key = f"{base_path}/{filename}"
|
||||
|
||||
upload_file_to_s3(file_path, bucket, file_key)
|
||||
|
||||
# load row to db
|
||||
uploaded_files.append(
|
||||
UploadedFile(
|
||||
s3_file_bucket=bucket,
|
||||
s3_file_key=file_key,
|
||||
s3_upload_timestamp=datetime.now(timezone.utc),
|
||||
uprn=int(uprn),
|
||||
file_source=FileSourceEnum.ECMK.value,
|
||||
)
|
||||
)
|
||||
|
||||
with db_session() as session:
|
||||
session.add_all(uploaded_files)
|
||||
session.commit()
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def download_report() -> None:
|
||||
username: str = ""
|
||||
password: str = ""
|
||||
|
||||
property_list_file: str = (
|
||||
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
|
||||
)
|
||||
|
||||
BASE_DIR: str = os.path.dirname(os.path.dirname(__file__))
|
||||
filepath: str = os.path.join(BASE_DIR, property_list_file)
|
||||
|
||||
property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet(
|
||||
filepath
|
||||
)
|
||||
property_ids: List[str] = list(property_id_to_address_map.keys())
|
||||
|
||||
matching_properties: List[str] = []
|
||||
|
||||
sharepoint_client = DomnaSharepointClient(
|
||||
sharepoint_location=DomnaSites.PRIVATE_PAY
|
||||
)
|
||||
sharepoint_base_path = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
|
||||
context = browser.new_context()
|
||||
page = context.new_page()
|
||||
attach_debug_listeners(page)
|
||||
|
||||
try:
|
||||
login(page, username, password)
|
||||
print("Login successful:", page.url)
|
||||
|
||||
go_to_assessments(page)
|
||||
|
||||
while True:
|
||||
rows: Locator = page.locator("#assessmentDatatable tbody tr")
|
||||
row_count: int = rows.count()
|
||||
|
||||
logger.info(f"Processing {row_count} rows on current page")
|
||||
|
||||
for i in range(row_count):
|
||||
row: Locator = rows.nth(i)
|
||||
|
||||
try:
|
||||
cells: Locator = row.locator("td")
|
||||
|
||||
first_name: str = cells.nth(1).inner_text().strip()
|
||||
last_name: str = cells.nth(2).inner_text().strip()
|
||||
address: str = cells.nth(5).inner_text().strip()
|
||||
postcode: str = cells.nth(7).inner_text().strip()
|
||||
# uprn: str = cells.nth(8).inner_text().strip()
|
||||
status: str = cells.nth(9).inner_text().strip()
|
||||
|
||||
if first_name == "Oliver" and last_name == "Stephens":
|
||||
continue
|
||||
|
||||
if status != "Submitted (not Lodged)":
|
||||
continue
|
||||
|
||||
property_id: str = build_property_id(address, postcode)
|
||||
|
||||
if property_id not in property_ids:
|
||||
continue
|
||||
|
||||
logger.info(f"MATCH FOUND: {property_id}")
|
||||
matching_properties.append(property_id)
|
||||
|
||||
sharepoint_address: str = property_id_to_address_map[
|
||||
property_id
|
||||
]
|
||||
go_to_assessment_details(page, row)
|
||||
|
||||
report_types: List[int] = [
|
||||
file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value,
|
||||
file_download_button_types.SITENOTE_REPORT.value,
|
||||
]
|
||||
|
||||
for report_type in report_types:
|
||||
selector: str = build_report_selector(report_type)
|
||||
file_path: Optional[str] = download_with_retry(
|
||||
page, selector
|
||||
)
|
||||
|
||||
if not file_path:
|
||||
continue
|
||||
try:
|
||||
sharepoint_client.upload_file(
|
||||
file_path=file_path,
|
||||
sharepoint_path=f"{sharepoint_base_path}/{sharepoint_address}/1. Retrofit Assessment/A. Assessment",
|
||||
file_name=os.path.basename(file_path),
|
||||
)
|
||||
logger.info(
|
||||
f"Successfully uploaded file {os.path.basename(file_path)} to sharepoint"
|
||||
)
|
||||
# TODO: could s3 load happen for all files at once to reduce db roundtrips?
|
||||
# if uprn:
|
||||
# upload_job_to_s3_and_update_db([file_path], uprn)
|
||||
finally:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
logger.info(f"Deleted local file: {file_path}")
|
||||
|
||||
page.go_back()
|
||||
page.wait_for_selector(
|
||||
"#assessmentDatatable tbody tr", timeout=15000
|
||||
)
|
||||
|
||||
except PlaywrightTimeoutError as e:
|
||||
raise Exception(f"Timeout occurred: {str(e)}")
|
||||
|
||||
if not go_to_next_page(page):
|
||||
break
|
||||
|
||||
except PlaywrightTimeoutError as e:
|
||||
raise Exception(f"Timeout occurred: {str(e)}")
|
||||
|
||||
finally:
|
||||
context.close()
|
||||
browser.close()
|
||||
from backend.ecmk_fetcher.processor import run_job
|
||||
|
||||
|
||||
def handler(event: Mapping[str, Any], context: Any) -> None:
|
||||
download_report()
|
||||
run_job()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
121
backend/ecmk_fetcher/processor.py
Normal file
121
backend/ecmk_fetcher/processor.py
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
import os
|
||||
from typing import Dict, List
|
||||
|
||||
from playwright.sync_api import (
|
||||
sync_playwright,
|
||||
Locator,
|
||||
Page,
|
||||
Browser,
|
||||
BrowserContext,
|
||||
)
|
||||
|
||||
from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet
|
||||
from backend.ecmk_fetcher.browser import (
|
||||
attach_debug_listeners,
|
||||
download_with_retry,
|
||||
go_to_assessment_details,
|
||||
go_to_assessments,
|
||||
go_to_next_page,
|
||||
login,
|
||||
)
|
||||
from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id
|
||||
from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint
|
||||
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
||||
from utils.sharepoint.domna_sites import DomnaSites
|
||||
|
||||
|
||||
def run_job() -> None:
|
||||
username: str = ""
|
||||
password: str = ""
|
||||
|
||||
property_list_file: str = (
|
||||
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
|
||||
)
|
||||
|
||||
BASE_DIR: str = os.path.dirname(__file__)
|
||||
filepath: str = os.path.join(BASE_DIR, property_list_file)
|
||||
|
||||
property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath)
|
||||
property_ids: List[str] = list(property_map.keys())
|
||||
|
||||
sharepoint_client: DomnaSharepointClient = DomnaSharepointClient(
|
||||
sharepoint_location=DomnaSites.PRIVATE_PAY
|
||||
)
|
||||
|
||||
sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser: Browser = p.chromium.launch(headless=True)
|
||||
context: BrowserContext = browser.new_context()
|
||||
page: Page = context.new_page()
|
||||
|
||||
attach_debug_listeners(page)
|
||||
|
||||
try:
|
||||
login(page, username, password)
|
||||
go_to_assessments(page)
|
||||
|
||||
while True:
|
||||
rows: Locator = page.locator("#assessmentDatatable tbody tr")
|
||||
row_count: int = rows.count()
|
||||
|
||||
for i in range(row_count):
|
||||
row: Locator = rows.nth(i)
|
||||
|
||||
try:
|
||||
cells: Locator = row.locator("td")
|
||||
|
||||
first_name: str = cells.nth(1).inner_text().strip()
|
||||
last_name: str = cells.nth(2).inner_text().strip()
|
||||
address: str = cells.nth(5).inner_text().strip()
|
||||
postcode: str = cells.nth(7).inner_text().strip()
|
||||
status: str = cells.nth(9).inner_text().strip()
|
||||
|
||||
if first_name == "Oliver" and last_name == "Stephens":
|
||||
continue
|
||||
|
||||
if status != "Submitted (not Lodged)":
|
||||
continue
|
||||
|
||||
property_id: str = build_property_id(address, postcode)
|
||||
|
||||
if property_id not in property_ids:
|
||||
continue
|
||||
|
||||
sharepoint_address: str = property_map[property_id]
|
||||
|
||||
go_to_assessment_details(page, row)
|
||||
|
||||
for report_type in REPORT_TYPES:
|
||||
file_path: str | None = download_with_retry(
|
||||
page, report_type
|
||||
)
|
||||
|
||||
if not file_path:
|
||||
continue
|
||||
|
||||
try:
|
||||
upload_file_to_sharepoint(
|
||||
client=sharepoint_client,
|
||||
file_path=file_path,
|
||||
base_path=sharepoint_base_path,
|
||||
subpath=sharepoint_address,
|
||||
)
|
||||
finally:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
page.go_back()
|
||||
page.wait_for_selector(
|
||||
"#assessmentDatatable tbody tr", timeout=15000
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Row processing failed: {str(e)}") from e
|
||||
|
||||
if not go_to_next_page(page):
|
||||
break
|
||||
|
||||
finally:
|
||||
context.close()
|
||||
browser.close()
|
||||
25
backend/ecmk_fetcher/reports.py
Normal file
25
backend/ecmk_fetcher/reports.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
from enum import Enum
|
||||
|
||||
|
||||
class FileDownloadButtonType(Enum):
|
||||
ASSESSOR_HUB_SITENOTE_REPORT = 11
|
||||
CERTIFICATE = 9
|
||||
SITENOTE_REPORT = 8
|
||||
RAW_XML = 7
|
||||
SAP_WORK_SHEET = 15
|
||||
|
||||
|
||||
REPORT_TYPES = [
|
||||
FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value,
|
||||
FileDownloadButtonType.SITENOTE_REPORT.value,
|
||||
]
|
||||
|
||||
|
||||
def build_report_selector(report_type: int) -> str:
|
||||
return f"a.download-report-btn[data-report-type='{report_type}']"
|
||||
|
||||
|
||||
def build_property_id(address: str, postcode: str) -> str:
|
||||
number = address.split(" ")[0]
|
||||
postcode_clean = postcode.replace(" ", "").upper()
|
||||
return f"{number}{postcode_clean}"
|
||||
20
backend/ecmk_fetcher/sharepoint.py
Normal file
20
backend/ecmk_fetcher/sharepoint.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
import os
|
||||
|
||||
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
||||
|
||||
|
||||
def upload_file_to_sharepoint(
|
||||
client: DomnaSharepointClient,
|
||||
file_path: str,
|
||||
base_path: str,
|
||||
subpath: str,
|
||||
) -> None:
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment"
|
||||
|
||||
client.upload_file(
|
||||
file_path=file_path,
|
||||
sharepoint_path=full_path,
|
||||
file_name=filename,
|
||||
)
|
||||
Loading…
Add table
Reference in a new issue