update spreadsheet with properties that have already been processed

This commit is contained in:
Daniel Roth 2026-04-02 15:42:56 +00:00
parent eda990285b
commit cd7b59a62f
2 changed files with 108 additions and 28 deletions

View file

@ -1,45 +1,107 @@
from typing import Dict, Optional
from openpyxl import load_workbook
import re
from dataclasses import dataclass
from typing import Any, Dict, Optional, cast
from openpyxl import Workbook, load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]:
wb = load_workbook(filepath, data_only=True)
ws = wb["Southern RA-Lite Programme 3103"]
@dataclass
class PropertyRow:
row_index: int
address: str
processed: bool
properties: Dict[str, str] = {}
header_row = 1
id_col_index = None
deal_name_col_index = None
def extract_addresses_from_spreadsheet(
filepath: str,
) -> Dict[str, PropertyRow]:
wb: Workbook = load_workbook(filepath, data_only=True)
ws: Worksheet = wb["Southern RA-Lite Programme 3103"]
header_row: int = 1
id_col: Optional[int] = None
deal_name_col: Optional[int] = None
processed_col: Optional[int] = None
# find columns
for col in range(1, ws.max_column + 1):
value = ws.cell(row=header_row, column=col).value
raw_value: Any = ws.cell(row=header_row, column=col).value
value: str = str(raw_value).strip().lower() if raw_value else ""
if value and str(value).strip().lower() == "id":
id_col_index = col
if value == "id":
id_col = col
elif value == "deal name":
deal_name_col = col
elif value == "processed":
processed_col = col
if value and str(value).strip().lower() == "deal name":
deal_name_col_index = col
break
if id_col is None or deal_name_col is None:
raise Exception("Missing required columns")
if id_col_index is None or deal_name_col_index is None:
raise Exception("Required columns not found")
# create processed column if missing
if processed_col is None:
processed_col = ws.max_column + 1
cast(Cell, ws.cell(row=header_row, column=processed_col)).value = "processed"
properties: Dict[str, PropertyRow] = {}
for row in range(2, ws.max_row + 1):
id_val = ws.cell(row=row, column=id_col_index).value
deal_name = ws.cell(row=row, column=deal_name_col_index).value
id_val: Any = ws.cell(row=row, column=id_col).value
deal_name: Any = ws.cell(row=row, column=deal_name_col).value
if not id_val or not deal_name:
continue
properties[str(id_val).strip()] = extract_succinct_address(
str(deal_name).strip()
processed_val: Any = ws.cell(row=row, column=processed_col).value
processed: bool = str(processed_val).lower() == "true"
property_id: str = str(id_val).strip()
properties[property_id] = PropertyRow(
row_index=row,
address=extract_succinct_address(str(deal_name)),
processed=processed,
)
return properties
def mark_properties_as_processed(
filepath: str,
property_map: Dict[str, PropertyRow],
) -> None:
wb: Workbook = load_workbook(filepath)
ws: Worksheet = wb["Southern RA-Lite Programme 3103"]
header_row: int = 1
# find processed column
processed_col: int | None = None
for col in range(1, ws.max_column + 1):
value = ws.cell(row=header_row, column=col).value
if value and str(value).strip().lower() == "processed":
processed_col = col
break
if processed_col is None:
raise Exception("Processed column not found")
# update rows
for property_row in property_map.values():
if property_row.processed:
cast(
Cell,
ws.cell(
row=property_row.row_index,
column=processed_col,
),
).value = True
wb.save(filepath)
def extract_succinct_address(deal_name: str) -> str:
left_part = deal_name.split("|")[0].strip()

View file

@ -1,6 +1,5 @@
import os
from typing import Dict, List
from typing import Dict
from playwright.sync_api import (
sync_playwright,
Locator,
@ -9,7 +8,11 @@ from playwright.sync_api import (
BrowserContext,
)
from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet
from backend.ecmk_fetcher.address_list import (
PropertyRow,
extract_addresses_from_spreadsheet,
mark_properties_as_processed,
)
from backend.ecmk_fetcher.browser import (
attach_debug_listeners,
download_with_retry,
@ -35,8 +38,7 @@ def run_job() -> None:
BASE_DIR: str = os.path.dirname(__file__)
filepath: str = os.path.join(BASE_DIR, property_list_file)
property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath)
property_ids: List[str] = list(property_map.keys())
property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(filepath)
sharepoint_client: DomnaSharepointClient = DomnaSharepointClient(
sharepoint_location=DomnaSites.PRIVATE_PAY
@ -79,19 +81,27 @@ def run_job() -> None:
property_id: str = build_property_id(address, postcode)
if property_id not in property_ids:
property_row: PropertyRow | None = property_map.get(property_id)
if not property_row:
continue
sharepoint_address: str = property_map[property_id]
if property_row.processed:
continue
sharepoint_address: str = property_row.address
go_to_assessment_details(page, row)
all_uploaded: bool = True
for report_type in REPORT_TYPES:
file_path: str | None = download_with_retry(
page, report_type
)
if not file_path:
all_uploaded = False
continue
try:
@ -101,10 +111,16 @@ def run_job() -> None:
base_path=sharepoint_base_path,
subpath=sharepoint_address,
)
except Exception:
all_uploaded = False
raise
finally:
if os.path.exists(file_path):
os.remove(file_path)
if all_uploaded:
property_row.processed = True
page.go_back()
page.wait_for_selector(
"#assessmentDatatable tbody tr", timeout=15000
@ -119,3 +135,5 @@ def run_job() -> None:
finally:
context.close()
browser.close()
mark_properties_as_processed(filepath, property_map)