mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Rename files in subfolders too
This commit is contained in:
parent
ff4a2e4242
commit
a135d88721
4 changed files with 235 additions and 35 deletions
30
.claude/settings.local.json
Normal file
30
.claude/settings.local.json
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
{
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(python -m pytest backend/pashub_fetcher/tests/test_pashub_client.py::test_select_latest_core_files_falls_back_to_latest_when_no_osm_candidates -v)",
|
||||
"Bash(python -m pyright backend/app/db/functions/magic_plan_functions.py)",
|
||||
"Bash(npx pyright *)",
|
||||
"Bash(python -m pytest backend/app/db/functions/tests/test_magic_plan_functions.py --no-header -q)",
|
||||
"Bash(python -m pyright backend/magic_plan/tests/test_audit_script.py)",
|
||||
"Bash(find /workspaces/model -name \"pyrightconfig.json\" 2>/dev/null | head -5 && which pyright || find /home -name \"pyright\" 2>/dev/null | head -3 && find /usr -name \"pyright\" 2>/dev/null | head -3)",
|
||||
"Read(//home/**)",
|
||||
"Read(//usr/**)",
|
||||
"Bash(/home/vscode/.npm/_npx/110e52990071af13/node_modules/.bin/pyright backend/magic_plan/tests/test_audit_script.py)",
|
||||
"Bash(python -m pytest backend/magic_plan/tests/test_audit_script.py::test_write_headers_two_rows_correct_labels_and_column_positions -x)",
|
||||
"Bash(python -m pytest backend/magic_plan/tests/test_audit_script.py -x)",
|
||||
"Bash(python -m pytest backend/magic_plan/tests/test_audit_script.py::test_apply_section_borders_sets_medium_right_border_on_boundary_columns -x)",
|
||||
"Bash(python -m pytest backend/magic_plan/tests/test_audit_script.py)",
|
||||
"Bash(/home/vscode/.npm/_npx/110e52990071af13/node_modules/.bin/pyright backend/magic_plan/audit_script.py)",
|
||||
"Bash(mkdir -p /workspaces/model/infrastructure/solar)",
|
||||
"Bash(mkdir -p /workspaces/model/tests/infrastructure/solar)",
|
||||
"Bash(python -m pytest datatypes/magicplan/domain/tests/test_mapper.py::test_kitchen_window_has_ventilation -x)",
|
||||
"Bash(python -m pytest datatypes/magicplan/domain/tests/test_mapper.py -x)",
|
||||
"Bash(git add *)",
|
||||
"Bash(git commit -m ' *)",
|
||||
"Bash(python3 -c ' *)",
|
||||
"Bash(python -m pytest datatypes/magicplan/domain/tests/test_mapper.py::test_toilet_door_has_ventilation_undercut -x)",
|
||||
"Bash(python -m pytest tests/infrastructure/postgres/test_uploaded_file_table.py -x -q)",
|
||||
"Bash(python -m pytest tests/infrastructure/postgres/test_uploaded_file_table.py tests/repositories/tasks/ tests/repositories/magic_plan/ tests/repositories/property/ -x -q)"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -16,8 +16,8 @@ from utils.logger import setup_logger
|
|||
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
|
||||
from utils.sharepoint.domna_sites import DomnaSites
|
||||
|
||||
DRY_RUN: bool = True
|
||||
CSV_PATH: str = "scripts/sero_address_list.csv"
|
||||
DRY_RUN: bool = False
|
||||
CSV_PATH: str = "scripts/sero_address_list_test.csv"
|
||||
|
||||
BASE_PATH = (
|
||||
"Osmosis-ACD Projects/Sero-Clarion Housing/"
|
||||
|
|
@ -70,6 +70,47 @@ def build_canonical_filename(
|
|||
return f"{uprn}_{street_post}{ext}"
|
||||
|
||||
|
||||
def process_folder(
|
||||
sp_client: DomnaSharepointClient,
|
||||
folder_path: str,
|
||||
uprn: str,
|
||||
address: str,
|
||||
postcode: str,
|
||||
) -> None:
|
||||
try:
|
||||
contents = sp_client.get_folders_in_path(folder_path)
|
||||
except ValueError:
|
||||
logger.warning(f"Missing folder for UPRN {uprn}: {folder_path}")
|
||||
return
|
||||
|
||||
for item in contents.get("value", []):
|
||||
if "folder" in item:
|
||||
process_folder(
|
||||
sp_client, f"{folder_path}/{item['name']}", uprn, address, postcode
|
||||
)
|
||||
elif "file" in item:
|
||||
original_name: str = item["name"]
|
||||
new_name = build_canonical_filename(uprn, address, postcode, original_name)
|
||||
|
||||
if new_name is None:
|
||||
continue
|
||||
|
||||
if DRY_RUN:
|
||||
logger.info(
|
||||
f'[DRY RUN] Renaming: "{original_name}" → "{new_name}" (UPRN: {uprn})'
|
||||
)
|
||||
else:
|
||||
try:
|
||||
sp_client.rename_file(item["id"], new_name)
|
||||
logger.info(
|
||||
f'Renamed: "{original_name}" → "{new_name}" (UPRN: {uprn})'
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f'Failed to rename "{original_name}" → "{new_name}" (UPRN: {uprn}): {e}'
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
sp_client = DomnaSharepointClient(DomnaSites.SOCIAL_HOUSING_WAVE_3)
|
||||
|
||||
|
|
@ -89,39 +130,7 @@ def main() -> None:
|
|||
f"{BASE_PATH}/{address}, {postcode}"
|
||||
f"/{SharepointSubfolders.ASSESSMENT.value}/{ASSESSMENT_SUBFOLDER}"
|
||||
)
|
||||
|
||||
try:
|
||||
contents = sp_client.get_folders_in_path(folder_path)
|
||||
except ValueError:
|
||||
logger.warning(f"Missing folder for UPRN {uprn}: {folder_path}")
|
||||
continue
|
||||
|
||||
for item in contents.get("value", []):
|
||||
if "file" not in item:
|
||||
continue
|
||||
|
||||
original_name: str = item["name"]
|
||||
new_name = build_canonical_filename(
|
||||
uprn, address, postcode, original_name
|
||||
)
|
||||
|
||||
if new_name is None:
|
||||
continue
|
||||
|
||||
if DRY_RUN:
|
||||
logger.info(
|
||||
f'[DRY RUN] Renaming: "{original_name}" → "{new_name}" (UPRN: {uprn})'
|
||||
)
|
||||
else:
|
||||
try:
|
||||
sp_client.rename_file(item["id"], new_name)
|
||||
logger.info(
|
||||
f'Renamed: "{original_name}" → "{new_name}" (UPRN: {uprn})'
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f'Failed to rename "{original_name}" → "{new_name}" (UPRN: {uprn}): {e}'
|
||||
)
|
||||
process_folder(sp_client, folder_path, uprn, address, postcode)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
0
tests/scripts/__init__.py
Normal file
0
tests/scripts/__init__.py
Normal file
161
tests/scripts/test_rename_sharepoint_files.py
Normal file
161
tests/scripts/test_rename_sharepoint_files.py
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
from typing import Any
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import scripts.rename_sharepoint_files as module
|
||||
from scripts.rename_sharepoint_files import build_canonical_filename, process_folder
|
||||
|
||||
|
||||
def _make_file(name: str, item_id: str = "id-1") -> dict[str, Any]:
|
||||
return {"name": name, "id": item_id, "file": {}}
|
||||
|
||||
|
||||
def _make_folder(name: str) -> dict[str, Any]:
|
||||
return {"name": name, "folder": {}}
|
||||
|
||||
|
||||
def _make_package(name: str) -> dict[str, Any]:
|
||||
return {"name": name, "package": {}}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_canonical_filename
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_already_canonical_returns_none() -> None:
|
||||
assert build_canonical_filename("100", "1 High St", "AB1 2CD", "100_High St AB1 2CD_Report.pdf") is None
|
||||
|
||||
|
||||
def test_strips_address_prefix_and_adds_uprn() -> None:
|
||||
result = build_canonical_filename("100", "1 High St", "AB1 2CD", "1 High St AB1 2CD - Survey.pdf")
|
||||
assert result == "100_1 High St AB1 2CD_Survey.pdf"
|
||||
|
||||
|
||||
def test_no_prefix_still_canonical() -> None:
|
||||
result = build_canonical_filename("100", "1 High St", "AB1 2CD", "Survey.pdf")
|
||||
assert result == "100_1 High St AB1 2CD_Survey.pdf"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — files only at root level
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_renames_top_level_files(caplog: pytest.LogCaptureFixture) -> None:
|
||||
sp = MagicMock()
|
||||
sp.get_folders_in_path.return_value = {
|
||||
"value": [
|
||||
_make_file("Survey.pdf", "id-1"),
|
||||
_make_file("Report.docx", "id-2"),
|
||||
]
|
||||
}
|
||||
|
||||
with patch.object(module, "DRY_RUN", False):
|
||||
process_folder(sp, "some/path", "100", "1 High St", "AB1 2CD")
|
||||
|
||||
assert sp.rename_file.call_count == 2
|
||||
sp.rename_file.assert_any_call("id-1", "100_1 High St AB1 2CD_Survey.pdf")
|
||||
sp.rename_file.assert_any_call("id-2", "100_1 High St AB1 2CD_Report.docx")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — recursive two-level hierarchy
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_recurses_into_subfolders_and_renames_all_files() -> None:
|
||||
sp = MagicMock()
|
||||
|
||||
root_contents: dict[str, Any] = {
|
||||
"value": [
|
||||
_make_file("Root.pdf", "root-file"),
|
||||
_make_folder("SubA"),
|
||||
]
|
||||
}
|
||||
suba_contents: dict[str, Any] = {
|
||||
"value": [
|
||||
_make_file("Sub.pdf", "sub-file"),
|
||||
]
|
||||
}
|
||||
|
||||
sp.get_folders_in_path.side_effect = lambda path: (
|
||||
root_contents if path == "base/path" else suba_contents
|
||||
)
|
||||
|
||||
with patch.object(module, "DRY_RUN", False):
|
||||
process_folder(sp, "base/path", "200", "2 Main Rd", "XY9 8ZW")
|
||||
|
||||
assert sp.rename_file.call_count == 2
|
||||
sp.rename_file.assert_any_call("root-file", "200_2 Main Rd XY9 8ZW_Root.pdf")
|
||||
sp.rename_file.assert_any_call("sub-file", "200_2 Main Rd XY9 8ZW_Sub.pdf")
|
||||
|
||||
sp.get_folders_in_path.assert_any_call("base/path/SubA")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — non-file, non-folder items are skipped
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_ignores_package_items() -> None:
|
||||
sp = MagicMock()
|
||||
sp.get_folders_in_path.return_value = {
|
||||
"value": [_make_package("Notebook")]
|
||||
}
|
||||
|
||||
with patch.object(module, "DRY_RUN", False):
|
||||
process_folder(sp, "some/path", "300", "3 Oak Ave", "ZZ1 1ZZ")
|
||||
|
||||
sp.rename_file.assert_not_called()
|
||||
assert sp.get_folders_in_path.call_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — missing folder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_missing_folder_logs_warning_and_returns(caplog: pytest.LogCaptureFixture) -> None:
|
||||
sp = MagicMock()
|
||||
sp.get_folders_in_path.side_effect = ValueError("not found")
|
||||
|
||||
with patch.object(module, "DRY_RUN", False):
|
||||
process_folder(sp, "missing/path", "400", "4 Elm St", "AA2 2BB")
|
||||
|
||||
sp.rename_file.assert_not_called()
|
||||
assert any("Missing folder" in r.message and "400" in r.message for r in caplog.records)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — dry run
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_dry_run_logs_without_renaming(caplog: pytest.LogCaptureFixture) -> None:
|
||||
sp = MagicMock()
|
||||
sp.get_folders_in_path.return_value = {"value": [_make_file("Doc.pdf", "id-x")]}
|
||||
|
||||
with patch.object(module, "DRY_RUN", True):
|
||||
process_folder(sp, "some/path", "500", "5 Pine Ln", "BB3 3CC")
|
||||
|
||||
sp.rename_file.assert_not_called()
|
||||
assert any("[DRY RUN]" in r.message for r in caplog.records)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# process_folder — already-canonical files are skipped
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_skips_already_canonical_files() -> None:
|
||||
sp = MagicMock()
|
||||
sp.get_folders_in_path.return_value = {
|
||||
"value": [_make_file("500_Pine Ln BB3 3CC_Doc.pdf", "id-y")]
|
||||
}
|
||||
|
||||
with patch.object(module, "DRY_RUN", False):
|
||||
process_folder(sp, "some/path", "500", "5 Pine Ln", "BB3 3CC")
|
||||
|
||||
sp.rename_file.assert_not_called()
|
||||
Loading…
Add table
Reference in a new issue