mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
download files
This commit is contained in:
parent
b121413b22
commit
33c4572f48
1 changed files with 113 additions and 37 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import os
|
||||
from enum import Enum
|
||||
from typing import Any, List, Mapping
|
||||
import re
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
from openpyxl import load_workbook
|
||||
from playwright.sync_api import (
|
||||
Locator,
|
||||
|
|
@ -21,36 +22,78 @@ class file_download_button_types(Enum):
|
|||
SAP_WORK_SHEET = 15
|
||||
|
||||
|
||||
def extract_ids_from_spreadsheet(filepath: str) -> List[str]:
|
||||
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
|
||||
wb = load_workbook(filepath, data_only=True)
|
||||
ws = wb["Southern RA-Lite Programme 3103"]
|
||||
|
||||
ids: List[str] = []
|
||||
properties: Dict[str, str] = {}
|
||||
|
||||
header_row = 1
|
||||
id_col_index = None
|
||||
deal_name_col_index = None
|
||||
|
||||
for col in range(1, ws.max_column + 1):
|
||||
cell_value = ws.cell(row=header_row, column=col).value
|
||||
|
||||
if cell_value and str(cell_value).strip().lower() == "id":
|
||||
id_col_index = col
|
||||
|
||||
if cell_value and str(cell_value).strip().lower() == "deal name":
|
||||
deal_name_col_index = col
|
||||
break
|
||||
|
||||
if id_col_index is None:
|
||||
raise Exception("ID column not found in spreadsheet")
|
||||
|
||||
for row in range(2, ws.max_row + 1):
|
||||
cell_value = ws.cell(row=row, column=id_col_index).value
|
||||
if deal_name_col_index is None:
|
||||
raise Exception("Deal Name column not found in spreadsheet")
|
||||
|
||||
if cell_value is None:
|
||||
for row in range(2, ws.max_row + 1):
|
||||
id_cell_value = ws.cell(row=row, column=id_col_index).value
|
||||
deal_name_cell_value = ws.cell(row=row, column=deal_name_col_index).value
|
||||
|
||||
if id_cell_value is None or deal_name_cell_value is None:
|
||||
continue
|
||||
|
||||
id_str = str(cell_value).strip()
|
||||
id_str = str(id_cell_value).strip()
|
||||
deal_name_str = str(deal_name_cell_value).strip()
|
||||
|
||||
if id_str:
|
||||
ids.append(id_str)
|
||||
if not id_str:
|
||||
continue
|
||||
|
||||
return ids
|
||||
sharepoint_address = extract_succinct_address(deal_name_str)
|
||||
|
||||
properties[id_str] = sharepoint_address
|
||||
|
||||
return properties
|
||||
|
||||
|
||||
def extract_succinct_address(deal_name: str) -> str:
|
||||
"""
|
||||
Input:
|
||||
'1 My Random Close, Town, AB12 3DC | Retrofit Assessment'
|
||||
|
||||
Output:
|
||||
'1 My Random Close AB12 3DC'
|
||||
"""
|
||||
left_part = deal_name.split("|")[0].strip()
|
||||
|
||||
postcode_match: Optional[re.Match[str]] = re.search(
|
||||
r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b",
|
||||
left_part,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
postcode = None
|
||||
if postcode_match:
|
||||
postcode = postcode_match.group(1).replace(" ", "").upper()
|
||||
|
||||
first_part = left_part.split(",")[0].strip()
|
||||
|
||||
if postcode:
|
||||
return f"{first_part} {postcode}"
|
||||
else:
|
||||
return first_part
|
||||
|
||||
|
||||
def build_property_id(address: str, postcode: str) -> str:
|
||||
|
|
@ -78,7 +121,10 @@ def download_report():
|
|||
BASE_DIR,
|
||||
property_list_file,
|
||||
)
|
||||
property_ids: List[str] = extract_ids_from_spreadsheet(filepath)
|
||||
property_id_to_address_map: Dict[str, str] = extract_addresses_from_spreadsheet(
|
||||
filepath
|
||||
)
|
||||
property_ids: List[str] = list(property_id_to_address_map.keys())
|
||||
|
||||
matching_properties: List[str] = []
|
||||
|
||||
|
|
@ -140,10 +186,65 @@ def download_report():
|
|||
|
||||
if property_id not in property_ids:
|
||||
continue
|
||||
|
||||
logger.info(f"MATCH FOUND: {property_id}")
|
||||
matching_properties.append(property_id)
|
||||
|
||||
try:
|
||||
sharepoint_address: str = property_id_to_address_map[
|
||||
property_id
|
||||
]
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"Unable to find sharepoint address for property ID {property_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Go to assessment details page and download files
|
||||
account_link = cells.nth(0).locator("a")
|
||||
with page.expect_navigation():
|
||||
account_link.click()
|
||||
|
||||
assessment_hub_sitenote_selector = f"a.download-report-btn[data-report-type='{file_download_button_types.ASSESSOR_HUB_SITENOTE_REPORT.value}']"
|
||||
|
||||
page.wait_for_selector(
|
||||
assessment_hub_sitenote_selector, timeout=10000
|
||||
)
|
||||
|
||||
with page.expect_download() as download_info:
|
||||
page.click(assessment_hub_sitenote_selector)
|
||||
|
||||
download = download_info.value
|
||||
|
||||
filename = download.suggested_filename
|
||||
save_path = os.path.join(os.getcwd(), filename)
|
||||
|
||||
download.save_as(save_path)
|
||||
|
||||
logger.info(f"Downloaded: {filename}")
|
||||
|
||||
sitenote_report_selector = f"a.download-report-btn[data-report-type='{file_download_button_types.SITENOTE_REPORT.value}']"
|
||||
|
||||
page.wait_for_selector(sitenote_report_selector, timeout=10000)
|
||||
|
||||
with page.expect_download() as download_info:
|
||||
page.click(sitenote_report_selector)
|
||||
|
||||
download = download_info.value
|
||||
|
||||
filename = download.suggested_filename
|
||||
save_path = os.path.join(os.getcwd(), filename)
|
||||
|
||||
download.save_as(save_path)
|
||||
|
||||
logger.info(f"Downloaded: {filename}")
|
||||
|
||||
# stick in sharepoint
|
||||
|
||||
page.go_back()
|
||||
page.wait_for_selector(
|
||||
"#assessmentDatatable tbody tr", timeout=15000
|
||||
)
|
||||
|
||||
except PlaywrightTimeoutError as e:
|
||||
raise Exception(f"Timeout occurred: {str(e)}")
|
||||
|
||||
|
|
@ -161,31 +262,6 @@ def download_report():
|
|||
|
||||
page.wait_for_timeout(2000)
|
||||
|
||||
# 5. Navigate to the assessment detail page
|
||||
# page.goto(
|
||||
# "https://assessorhub.net/Assessments/Assessments/Detail/1bd9fd74-08f6-4fc1-b2f7-3a13a8f9084d?returnUrl=/Companies/Assessments",
|
||||
# timeout=30000,
|
||||
# )
|
||||
|
||||
# # 6. Locate the correct download button
|
||||
# button = page.locator("a.download-report-btn[data-report-type='11']")
|
||||
|
||||
# button.wait_for(state="visible", timeout=10000)
|
||||
|
||||
# # 7. Click and capture the download
|
||||
# with page.expect_download(timeout=30000) as download_info:
|
||||
# button.click()
|
||||
|
||||
# download = download_info.value
|
||||
|
||||
# # 8. Save file locally
|
||||
# filename = download.suggested_filename
|
||||
# save_path = os.path.join(os.getcwd(), filename)
|
||||
|
||||
# download.save_as(save_path)
|
||||
|
||||
# print(f"Downloaded file saved to: {save_path}")
|
||||
|
||||
except PlaywrightTimeoutError as e:
|
||||
raise Exception(f"Timeout occurred: {str(e)}")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue