diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 920c9ab0..73660bb5 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -284,6 +284,46 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + # ============================================================ + # Build Bulk Upload Finaliser image and Push + # ============================================================ + bulkUploadFinaliser_image: + needs: [determine_stage, shared_terraform] + uses: ./.github/workflows/_build_image.yml + with: + ecr_repo: bulk_upload_finaliser-${{ needs.determine_stage.outputs.stage }} + dockerfile_path: applications/bulk_upload_finaliser/Dockerfile + build_context: . + build_args: | + DEV_DB_HOST=$DEV_DB_HOST + DEV_DB_PORT=$DEV_DB_PORT + DEV_DB_NAME=$DEV_DB_NAME + secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} + DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} + DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} + + # ============================================================ + # Deploy Bulk Upload Finaliser Lambda + # ============================================================ + bulkUploadFinaliser_lambda: + needs: [bulkUploadFinaliser_image, determine_stage, shared_terraform] + uses: ./.github/workflows/_deploy_lambda.yml + with: + lambda_name: bulkUploadFinaliser + lambda_path: deployment/terraform/lambda/bulkUploadFinaliser + stage: ${{ needs.determine_stage.outputs.stage }} + ecr_repo: bulk_upload_finaliser-${{ needs.determine_stage.outputs.stage }} + image_digest: ${{ needs.bulkUploadFinaliser_image.outputs.image_digest }} + terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }} + secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + # ============================================================ # Condition ETL image and Push # ============================================================ @@ -459,7 +499,7 @@ jobs: # Deploy FastAPI Lambda # ============================================================ fast_api_lambda: - needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda] + needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda, bulkUploadFinaliser_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: ara_fast_api diff --git a/.github/workflows/lambda_smoke_tests.yml b/.github/workflows/lambda_smoke_tests.yml index 44288821..6fe947ce 100644 --- a/.github/workflows/lambda_smoke_tests.yml +++ b/.github/workflows/lambda_smoke_tests.yml @@ -63,6 +63,16 @@ jobs: build_context: . service_name: bulk-address2uprn-combiner + # ============================================================ + # Bulk Upload Finaliser + # ============================================================ + bulk_upload_finaliser_smoke_test: + uses: ./.github/workflows/_smoke_test_lambda.yml + with: + dockerfile_path: applications/bulk_upload_finaliser/Dockerfile + build_context: . + service_name: bulk-upload-finaliser + # ============================================================ # Condition ETL # ============================================================ diff --git a/applications/bulk_upload_finaliser/Dockerfile b/applications/bulk_upload_finaliser/Dockerfile new file mode 100644 index 00000000..0c8f792f --- /dev/null +++ b/applications/bulk_upload_finaliser/Dockerfile @@ -0,0 +1,38 @@ +FROM public.ecr.aws/lambda/python:3.11 + +# Postgres host/port/database are baked into the image at build time from the +# deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets), +# mirroring the landlord_description_overrides Dockerfile. They map onto the +# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT baked +# in -- Terraform injects those as Lambda env vars from Secrets Manager. +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV POSTGRES_HOST=${DEV_DB_HOST} +ENV POSTGRES_PORT=${DEV_DB_PORT} +ENV POSTGRES_DATABASE=${DEV_DB_NAME} + +WORKDIR /var/task + +COPY applications/bulk_upload_finaliser/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# DDD-shaped packages only -- no pandas, no legacy backend/. The finaliser writes +# both `property` and the terminal `bulk_address_uploads` status through DDD repos +# on its own PostgresConfig session (ADR-0013). `datatypes/` comes in via the +# Property aggregate's import closure (domain/property/property.py -> site_notes -> +# datatypes/epc/...), enforced by tests/test_lambda_packaging.py. +COPY datatypes/ datatypes/ +COPY domain/ domain/ +COPY infrastructure/ infrastructure/ +COPY orchestration/ orchestration/ +COPY repositories/ repositories/ +COPY utilities/ utilities/ +COPY applications/ applications/ + +# Place the handler at the Lambda task root so the runtime resolves +# ``main.handler`` without an extra package prefix. +COPY applications/bulk_upload_finaliser/handler.py /var/task/main.py + +CMD ["main.handler"] diff --git a/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py new file mode 100644 index 00000000..086ca291 --- /dev/null +++ b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py @@ -0,0 +1,22 @@ +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class BulkUploadFinaliserTriggerBody(BaseModel): + """Trigger body for the bulk_upload_finaliser Lambda (ADR-0013). + + Dispatched by the Next.js Finalise action via + ``POST /v1/bulk-uploads/trigger-finaliser``. ``s3_uri`` is the combiner output + (``combined_output_s3_uri``) — the same address/UPRN CSV the old synchronous + ``/finalize`` route read. + """ + + model_config = ConfigDict(extra="allow") + + task_id: UUID + sub_task_id: UUID + s3_uri: str + # bigint in the FE schema; Python int is unbounded so Pydantic stays simple. + portfolio_id: int + bulk_upload_id: UUID diff --git a/applications/bulk_upload_finaliser/handler.py b/applications/bulk_upload_finaliser/handler.py new file mode 100644 index 00000000..6017cbb5 --- /dev/null +++ b/applications/bulk_upload_finaliser/handler.py @@ -0,0 +1,104 @@ +"""bulk_upload_finaliser Lambda (ADR-0013). + +Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse +the trigger, read the combiner output CSV from S3, hand the rows to the +``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then +write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to +the backend). ``complete`` is written in the *same* transaction as the property +insert (atomic finalise); ``failed`` is written on a fresh session on error. + +PostgresConfig-only, like the landlord classifier Lambda — no legacy ``backend/`` +connection — so a single DB config (POSTGRES_*) drives the whole run. +""" + +import logging +import os +from typing import Any +from uuid import UUID + +import boto3 +from sqlalchemy.engine import Engine + +from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import ( + BulkUploadFinaliserTriggerBody, +) +from infrastructure.postgres.config import PostgresConfig +from infrastructure.postgres.engine import commit_scope, make_engine, make_session +from infrastructure.s3.csv_s3_client import CsvS3Client +from infrastructure.s3.s3_uri import parse_s3_uri +from orchestration.bulk_upload_finaliser_orchestrator import ( + BulkUploadFinaliserOrchestrator, +) +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.bulk_upload.bulk_upload_status_writer_postgres import ( + BulkUploadStatusWriterPostgresRepository, +) +from repositories.property.property_postgres_repository import ( + PropertyPostgresRepository, +) +from utilities.aws_lambda.subtask_handler import subtask_handler + +logger = logging.getLogger(__name__) + + +def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int: + bucket, _key = parse_s3_uri(trigger.s3_uri) + + boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + boto_s3: Any = boto3_client("s3") + rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri) + + session = make_session(engine) + try: + orchestrator = BulkUploadFinaliserOrchestrator( + # Write-only path: no EpcRepository needed for inserts. + property_repo=PropertyPostgresRepository(session), + status_writer=BulkUploadStatusWriterPostgresRepository(session), + ) + # Atomic finalise: the orchestrator inserts properties and marks `complete` + # via its injected writers; the transaction here makes them land together — + # a failure in either rolls back both, leaving the row for the failure path. + with commit_scope(session): + inserted = orchestrator.finalise( + rows, trigger.portfolio_id, trigger.task_id + ) + finally: + session.close() + + logger.info( + "Finalised bulk upload %s: %d rows read, %d properties inserted.", + trigger.bulk_upload_id, + len(rows), + inserted, + ) + return inserted + + +def _mark_failed(engine: Engine, task_id: UUID) -> None: + session = make_session(engine) + try: + with commit_scope(session): + BulkUploadStatusWriterPostgresRepository(session).set_status( + task_id, "failed" + ) + finally: + session.close() + + +@subtask_handler() +def handler( + body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator +) -> dict[str, int]: + trigger = BulkUploadFinaliserTriggerBody.model_validate(body) + engine = make_engine(PostgresConfig.from_env(os.environ)) + + try: + inserted = _run(engine, trigger) + except Exception: + # Hand the BulkUpload to the terminal `failed` state so the UI leaves + # `finalising`; the @subtask_handler also marks the SubTask FAILED on the + # re-raise below. + _mark_failed(engine, trigger.task_id) + raise + + return {"inserted": inserted} diff --git a/applications/bulk_upload_finaliser/requirements.txt b/applications/bulk_upload_finaliser/requirements.txt new file mode 100644 index 00000000..6a85a255 --- /dev/null +++ b/applications/bulk_upload_finaliser/requirements.txt @@ -0,0 +1,4 @@ +boto3 +pydantic +sqlmodel +psycopg2-binary diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index c050b18c..ea02fd80 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -12,6 +12,7 @@ from backend.app.bulk_uploads.schema import ( CombinedResultRow, CombinedResultsResponse, CombinerTriggerRequest, + FinaliserTriggerRequest, FlagsSummary, LandlordOverridesTriggerRequest, PostcodeSplitterTriggerRequest, @@ -113,6 +114,26 @@ async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest): } +@router.post("/trigger-finaliser", status_code=202) +async def trigger_finaliser(req: FinaliserTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.FINALISER_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + @router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) async def get_combined_results( task_id: UUID, diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index af797cac..759f76ca 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -23,6 +23,14 @@ class LandlordOverridesTriggerRequest(BaseModel): column_mapping: dict[str, str] +class FinaliserTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + s3_uri: str # combiner output (combined_output_s3_uri) + portfolio_id: int + bulk_upload_id: str + + class FlagsSummary(BaseModel): duplicates: int missing: int diff --git a/backend/app/config.py b/backend/app/config.py index 1dc3daaf..8939e6ff 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -43,6 +43,7 @@ class Settings(BaseSettings): POSTCODE_SPLITTER_SQS_URL: str = "changeme" COMBINER_SQS_URL: str = "changeme" LANDLORD_OVERRIDES_SQS_URL: str = "changeme" + FINALISER_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/ordnanceSurvey/helpers.py b/backend/ordnanceSurvey/helpers.py index c0d6583b..52fbe9c5 100644 --- a/backend/ordnanceSurvey/helpers.py +++ b/backend/ordnanceSurvey/helpers.py @@ -1,4 +1,5 @@ import urllib.parse +from typing import Any from pydantic import ValidationError import requests import pandas as pd @@ -7,13 +8,13 @@ from utils.logger import setup_logger logger = setup_logger() -def os_places_results_to_dataframe(data: dict) -> pd.DataFrame: +def os_places_results_to_dataframe(data: dict[str, Any]) -> pd.DataFrame: """ Flatten the OS Places API response results into a DataFrame. Each result contains either a DPA or LPI record. """ - results = data.get("results", []) - rows = [] + results: list[dict[str, Any]] = data.get("results", []) + rows: list[dict[str, Any]] = [] for r in results: if "DPA" in r: rows.append(r["DPA"]) @@ -22,7 +23,7 @@ def os_places_results_to_dataframe(data: dict) -> pd.DataFrame: return pd.DataFrame(rows) -def lookup_os_places(postcode: str, api_key: str) -> dict: +def lookup_os_places(postcode: str, api_key: str) -> dict[str, Any]: """ Lookup a postcode using the OS Places API. Returns the full API response data or an error dict. diff --git a/backend/ordnanceSurvey/local_handler/.env.local.example b/backend/ordnanceSurvey/local_handler/.env.local.example new file mode 100644 index 00000000..b1f0330e --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/.env.local.example @@ -0,0 +1,21 @@ +# Runtime env for the Ordnance Survey handler when run locally via docker compose. +# Variable names must match backend/app/config.py Settings (DB_*, ORDNANCE_SURVEY_API_KEY) +# plus the AWS creds boto3 needs for S3 access. + +ENVIRONMENT=local + +# Database (OS Places postcode cache) +DB_HOST= +DB_PORT=5432 +DB_NAME= +DB_USERNAME= +DB_PASSWORD= + +# Ordnance Survey Places API +ORDNANCE_SURVEY_API_KEY= + +# AWS — read input CSV from / write results to S3 +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION=eu-west-2 +S3_BUCKET_NAME=retrofit-data-dev diff --git a/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py b/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py index e5272732..dd3c9742 100644 --- a/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py +++ b/backend/ordnanceSurvey/local_handler/invoke_local_lambda.py @@ -12,9 +12,11 @@ payload = { { "body": json.dumps( { - "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", - "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", - "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/missinguprn.csv", + "task_id": "ea1a20b2-a21f-464c-9999-25f684405b69", + "sub_task_id": "747c99df-a1ff-41a0-98c2-5265d7797f90", + "s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/hyde/output1(Sheet2).csv", + "lexiscore_column": "address2uprn_lexiscore", + "lexiscore_threshold": 0.2, } ) } diff --git a/backend/ordnanceSurvey/local_handler/run_local.sh b/backend/ordnanceSurvey/local_handler/run_local.sh new file mode 100755 index 00000000..345b60ee --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/run_local.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail +cd "$(dirname "$0")" + +if [ ! -f .env.local ]; then + cp .env.local.example .env.local + echo "Created .env.local from the template — fill it in, then re-run." >&2 + exit 1 +fi + +docker compose build --no-cache +docker compose up --force-recreate diff --git a/backend/ordnanceSurvey/main.py b/backend/ordnanceSurvey/main.py index 6e82b468..03020307 100644 --- a/backend/ordnanceSurvey/main.py +++ b/backend/ordnanceSurvey/main.py @@ -22,6 +22,7 @@ import uuid import os import pandas as pd +from tqdm import tqdm logger: logging.Logger = setup_logger() @@ -152,8 +153,10 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: df["ordnance_survey_lexiscore"] = None # Process each postcode group at a time - for postcode, group in grouped: - print(f"Processing postcode: {postcode} ({len(group)} rows)") + for postcode, group in tqdm( + grouped, total=grouped.ngroups, desc="OS postcodes", unit="postcode" + ): + tqdm.write(f"Processing postcode: {postcode} ({len(group)} rows)") valid_group = AddressMatch.is_valid_postcode(postcode) if not valid_group: logger.warning(f"Postcode {postcode} is invalid, skipping") diff --git a/backend/ordnanceSurvey/tests/test_data.csv b/backend/ordnanceSurvey/tests/test_data.csv new file mode 100644 index 00000000..f5f886d7 --- /dev/null +++ b/backend/ordnanceSurvey/tests/test_data.csv @@ -0,0 +1,2 @@ +User Input,Postcode,Expected UPRN +"115 NORTHWALK, CROYDON, SURREY",CR0 9ES, diff --git a/backend/ordnanceSurvey/tests/test_os_match.py b/backend/ordnanceSurvey/tests/test_os_match.py new file mode 100644 index 00000000..b2d63689 --- /dev/null +++ b/backend/ordnanceSurvey/tests/test_os_match.py @@ -0,0 +1,153 @@ +# backend/ordnanceSurvey/tests/test_os_match.py +""" +Debug harness for Ordnance Survey address matching. + +Mirrors backend/address2UPRN/tests/test_csv.py, but for the OS Places flow: +for each (User Input, Postcode) case it hits the live OS Places API, scores every +candidate address with AddressMatch.score (exactly as backend/ordnanceSurvey/main.py +does), and prints the full ranked breakdown so you can see *why* a match was or +wasn't found. + +Run with -s to see the ranking: + + pytest backend/ordnanceSurvey/tests/test_os_match.py -s + +Requires ORDNANCE_SURVEY_API_KEY to be set (config Settings / env); skipped otherwise. +""" + +import csv +import os +import time +from pathlib import Path +from typing import Any, Optional + +import pytest + +from backend.ordnanceSurvey.helpers import ( + lookup_os_places, + os_places_results_to_dataframe, +) +from backend.utils.addressMatch import AddressMatch + +FIXTURE_PATH = Path(__file__).parent / "test_data.csv" + +# Be polite to the live OS Places API between cases. +OS_THROTTLE_SECONDS = 1.0 + +# Handler treats best_score <= 0 as "no match" (see ordnanceSurvey/main.py). +MATCH_THRESHOLD = 0.0 + + +@pytest.fixture(autouse=True) +def _throttle_os_requests(): + yield + time.sleep(OS_THROTTLE_SECONDS) + + +def _api_key() -> Optional[str]: + # Read straight from the environment so the debug harness doesn't depend on + # the full Settings model loading cleanly. Falls back to Settings if unset. + key = os.getenv("ORDNANCE_SURVEY_API_KEY") + if not key: + try: + from backend.app.config import get_settings + + key = get_settings().ORDNANCE_SURVEY_API_KEY + except Exception: + key = None + if not key or key == "changeme": + return None + return key + + +def load_test_cases(): + with open(FIXTURE_PATH, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + return [ + pytest.param( + row["User Input"], + row["Postcode"], + (row.get("Expected UPRN") or "").strip(), + id=f'{row["User Input"]} [{row["Postcode"]}]', + ) + for row in reader + ] + + +def _scored_candidates( + user_input: str, postcode: str, api_key: str +) -> list[dict[str, Any]]: + """ + Fetch OS Places candidates for a postcode (bypassing the DB cache) and score + every candidate ADDRESS exactly as ordnanceSurvey/main.py does. Returned + ranked best-first. + """ + response: dict[str, Any] = lookup_os_places(postcode, api_key) + assert response.get("status") == 200, f"OS Places API failed: {response}" + assert "data" in response, f"No data in OS Places response: {response}" + + candidates = os_places_results_to_dataframe(response["data"]) + records: list[dict[str, Any]] = candidates.to_dict("records") # type: ignore[assignment] + + scored: list[dict[str, Any]] = [] + for rec in records: + address = str(rec.get("ADDRESS", "")) + scored.append( + { + "uprn": rec.get("UPRN", "?"), + "address": address, + "normalised": AddressMatch.normalise_address(address), + "score": AddressMatch.score(user_input, address), + } + ) + scored.sort(key=lambda r: r["score"], reverse=True) + return scored + + +def _print_debug( + user_input: str, postcode: str, scored: list[dict[str, Any]] +) -> None: + print(f"\n{'=' * 80}") + print(f"User input : {user_input!r}") + print(f"Normalised : {AddressMatch.normalise_address(user_input)!r}") + print(f"Postcode : {postcode!r}") + print(f"Candidates : {len(scored)}") + print(f"{'-' * 80}") + if not scored: + print("(no OS Places candidates returned for this postcode)") + return + for row in scored[:15]: + print(f" score={row['score']:.4f} uprn={row['uprn']}") + print(f" ADDRESS : {row['address']}") + print(f" normalised : {row['normalised']}") + print(f"{'=' * 80}") + + +@pytest.mark.integration +@pytest.mark.parametrize("user_input,postcode,expected_uprn", load_test_cases()) +def test_os_match_finds_candidate( + user_input: str, + postcode: str, + expected_uprn: str, +): + api_key = _api_key() + if api_key is None: + pytest.skip("ORDNANCE_SURVEY_API_KEY not set") + + scored = _scored_candidates(user_input, postcode, api_key) + _print_debug(user_input, postcode, scored) + + best = scored[0] if scored else None + best_score = float(best["score"]) if best is not None else 0.0 + + # The handler records a match only when best_score > 0. This assertion is + # the debug signal: when it fails, the printed ranking above shows why. + assert best is not None and best_score > MATCH_THRESHOLD, ( + f"No OS match for {user_input!r} @ {postcode!r} " + f"(best_score={best_score:.4f}). See ranking above." + ) + + if expected_uprn: + assert str(best["uprn"]) == expected_uprn, ( + f"Best match UPRN {best['uprn']!r} != expected {expected_uprn!r}" + ) diff --git a/backend/pashub_fetcher/tests/test_token_getter.py b/backend/pashub_fetcher/tests/test_token_getter.py new file mode 100644 index 00000000..2c847d16 --- /dev/null +++ b/backend/pashub_fetcher/tests/test_token_getter.py @@ -0,0 +1,50 @@ +from unittest.mock import MagicMock, patch + +from backend.pashub_fetcher.token_getter import get_token_from_local_storage + + +def _configure_playwright_mock(mock_sync_playwright: MagicMock) -> None: + mock_page = MagicMock() + mock_page.url = "https://pashub.net/dashboard" + mock_page.evaluate.return_value = "fake-token" + + mock_context = MagicMock() + mock_context.new_page.return_value = mock_page + + mock_browser = MagicMock() + mock_browser.new_context.return_value = mock_context + + mock_p = MagicMock() + mock_p.chromium.launch.return_value = mock_browser + + mock_sync_playwright.return_value.__enter__.return_value = mock_p + + +@patch("backend.pashub_fetcher.token_getter.shutil.rmtree") +@patch("backend.pashub_fetcher.token_getter.glob.glob") +@patch("backend.pashub_fetcher.token_getter.sync_playwright") +def test_playwright_tmp_dirs_are_cleaned_up_after_browser_close( + mock_sync_playwright: MagicMock, + mock_glob: MagicMock, + mock_rmtree: MagicMock, +) -> None: + # Arrange + fake_artifacts = ["/tmp/playwright-artifacts-abc12"] + fake_profiles = ["/tmp/playwright_chromiumdev_profile-xyz99"] + + def glob_side_effect(pattern: str) -> list[str]: + if "playwright-artifacts-*" in pattern: + return fake_artifacts + if "playwright_chromiumdev_profile-*" in pattern: + return fake_profiles + return [] + + mock_glob.side_effect = glob_side_effect + _configure_playwright_mock(mock_sync_playwright) + + # Act + get_token_from_local_storage("user@example.com", "secret") + + # Assert + mock_rmtree.assert_any_call("/tmp/playwright-artifacts-abc12", ignore_errors=True) + mock_rmtree.assert_any_call("/tmp/playwright_chromiumdev_profile-xyz99", ignore_errors=True) diff --git a/backend/pashub_fetcher/token_getter.py b/backend/pashub_fetcher/token_getter.py index 2e2d1440..57338a6d 100644 --- a/backend/pashub_fetcher/token_getter.py +++ b/backend/pashub_fetcher/token_getter.py @@ -1,4 +1,6 @@ +import glob import os +import shutil from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError @@ -90,5 +92,12 @@ def get_token_from_local_storage( context.close() browser.close() + for pattern in ( + "/tmp/playwright-artifacts-*", + "/tmp/playwright_chromiumdev_profile-*", + ): + for path in glob.glob(pattern): + shutil.rmtree(path, ignore_errors=True) + if record_video and video_dir: logger.info(f"Video(s) saved in: {video_dir}") diff --git a/backend/pashub_fetcher/trigger_pashub_sqs_from_file.py b/backend/pashub_fetcher/trigger_pashub_sqs_from_file.py index d6736eda..b647c6a7 100644 --- a/backend/pashub_fetcher/trigger_pashub_sqs_from_file.py +++ b/backend/pashub_fetcher/trigger_pashub_sqs_from_file.py @@ -61,7 +61,7 @@ SHAREPOINT_SITE: str = "ECO" def _build_requests(excel_path: str) -> list[PashubToAraTriggerRequest]: wb = load_workbook(excel_path, data_only=True) - ws = wb.worksheets[1] + ws = wb.worksheets[0] headers: dict[str, int] = {} for col in range(1, ws.max_column + 1): @@ -107,6 +107,7 @@ def _build_requests(excel_path: str) -> list[PashubToAraTriggerRequest]: deal_stage=deal_stage, sharepoint_link=SHAREPOINT_PROPERTIES_FOLDER or None, sharepoint_site=SHAREPOINT_SITE, + get_other_files=True, ) ) diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/main.tf b/deployment/terraform/lambda/bulkUploadFinaliser/main.tf new file mode 100644 index 00000000..0d6685d0 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/main.tf @@ -0,0 +1,49 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + +module "lambda" { + source = "../../modules/lambda_with_sqs" + + name = "bulk-upload-finaliser" + stage = var.stage + + image_uri = local.image_uri + + # The finaliser reads the combiner CSV and does one bulk INSERT — IO-light, but + # a property list can be ~40,000 rows, so 300s leaves ample headroom under the + # queue visibility timeout. batch_size = 1 keeps one upload per invocation so a + # bad record can't redrive its siblings; maximum_concurrency caps DB write + # fan-out. + timeout = 300 + batch_size = 1 + maximum_concurrency = 2 + + environment = merge( + { + STAGE = var.stage + LOG_LEVEL = "info" + POSTGRES_USERNAME = local.db_credentials.db_assessment_model_username + POSTGRES_PASSWORD = local.db_credentials.db_assessment_model_password + }, + ) +} + +# Attach S3 read policy so the handler can read the combiner output CSV. +resource "aws_iam_role_policy_attachment" "bulk_upload_finaliser_s3_read" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.bulk_upload_finaliser_s3_read_arn +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf b/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf new file mode 100644 index 00000000..505d902a --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf @@ -0,0 +1,9 @@ +output "bulk_upload_finaliser_queue_url" { + value = module.lambda.queue_url + description = "URL of the Bulk Upload Finaliser SQS queue (wire into the FastAPI FINALISER_SQS_URL)" +} + +output "bulk_upload_finaliser_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the Bulk Upload Finaliser SQS queue" +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf b/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf new file mode 100644 index 00000000..98a588f1 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "bulk-upload-finaliser-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf b/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf new file mode 100644 index 00000000..5d45b312 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf @@ -0,0 +1,27 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda (e.g. bulkUploadFinaliser)" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} + +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} diff --git a/deployment/terraform/lambda/fast-api/main.tf b/deployment/terraform/lambda/fast-api/main.tf index 3a2b5a5f..dea9b7d9 100644 --- a/deployment/terraform/lambda/fast-api/main.tf +++ b/deployment/terraform/lambda/fast-api/main.tf @@ -46,6 +46,15 @@ data "terraform_remote_state" "bulk_address2uprn_combiner" { } } +data "terraform_remote_state" "bulk_upload_finaliser" { + backend = "s3" + config = { + bucket = "bulk-upload-finaliser-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + ############################################ # Load Credentials ############################################ @@ -105,6 +114,7 @@ module "fastapi" { CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url + FINALISER_SQS_URL = data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_url } } @@ -126,7 +136,8 @@ module "fastapi_sqs_policy" { data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn, - data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn + data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn, + data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_arn ] conditions = null diff --git a/deployment/terraform/lambda/pashub_to_ara/main.tf b/deployment/terraform/lambda/pashub_to_ara/main.tf index b5714055..85a2a893 100644 --- a/deployment/terraform/lambda/pashub_to_ara/main.tf +++ b/deployment/terraform/lambda/pashub_to_ara/main.tf @@ -22,6 +22,7 @@ module "lambda" { stage = var.stage image_uri = local.image_uri + timeout = var.timeout # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) maximum_concurrency = var.maximum_concurrency diff --git a/deployment/terraform/lambda/pashub_to_ara/variables.tf b/deployment/terraform/lambda/pashub_to_ara/variables.tf index 29b7af70..6673908b 100644 --- a/deployment/terraform/lambda/pashub_to_ara/variables.tf +++ b/deployment/terraform/lambda/pashub_to_ara/variables.tf @@ -17,6 +17,12 @@ variable "image_digest" { description = "Image digest (sha256:...)" } +variable "timeout" { + type = number + default = 300 + description = "Lambda timeout in seconds." +} + variable "maximum_concurrency" { type = number default = null diff --git a/deployment/terraform/shared/main.tf b/deployment/terraform/shared/main.tf index 82a3820a..804082fc 100644 --- a/deployment/terraform/shared/main.tf +++ b/deployment/terraform/shared/main.tf @@ -558,6 +558,36 @@ output "bulk_address2uprn_combiner_s3_arn" { value = module.bulk_address2uprn_combiner_s3.policy_arn } +################################################ +# Bulk Upload Finaliser – Lambda ECR (ADR-0013) +################################################ +module "bulk_upload_finaliser_state_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "bulk-upload-finaliser-terraform-state" +} + +module "bulk_upload_finaliser_registry" { + source = "../modules/container_registry" + name = "bulk_upload_finaliser" + stage = var.stage +} + +# The finaliser only reads the combiner output (bulk_final_outputs) to insert +# property rows; it writes to Postgres, not S3. +module "bulk_upload_finaliser_s3_read" { + source = "../modules/s3_iam_policy" + + policy_name = "BulkUploadFinaliserReadS3" + policy_description = "Allow bulk_upload_finaliser Lambda to read combiner output from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket"] + resource_paths = ["/bulk_final_outputs/*"] +} + +output "bulk_upload_finaliser_s3_read_arn" { + value = module.bulk_upload_finaliser_s3_read.policy_arn +} + ################################################ # Categorisation – Lambda ECR ################################################ diff --git a/docs/adr/0013-bulk-upload-finaliser-writes-properties.md b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md new file mode 100644 index 00000000..f9a61d20 --- /dev/null +++ b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md @@ -0,0 +1,125 @@ +# ADR-0013: The `bulk_upload_finaliser` Lambda writes properties and the terminal status + +**Status:** Accepted +**Date:** 2026-06-04 + +> Companion to the `assessment-model` (frontend) repo's +> [ADR-0005](https://github.com/Hestia-Homes/assessment-model/blob/main/docs/adr/0005-async-bulk-upload-finaliser.md), +> which owns the state-machine change and the Drizzle schema. This ADR owns the +> **backend write path**. It applies the direct-write pattern, transaction-boundary +> rule, and port/adapter/row-mirror file layout established by +> [ADR-0003](0003-python-writes-landlord-overrides-directly.md). + +## Context + +Finalising a BulkUpload inserts one `property` row per source row — up to ~40,000. +Today a synchronous Next.js route does it; frontend ADR-0005 moves it to a dispatched +Lambda for the same reasons ADR-0003 moved the classifier write into Python: the work +is internal (a Lambda computes the rows, the same Lambda persists them), the Lambda +already sits next to a Postgres connection, and routing 40k inserts back through an +HTTP hop buys nothing. + +`PropertyRow` in this repo +([`infrastructure/postgres/property_table.py`](../../infrastructure/postgres/property_table.py)) +is currently a **defensive read-only view** — its docstring states the backend never +inserts properties. Finalise changes that. + +## Decision + +1. **New application `applications/bulk_upload_finaliser/`** wraps a handler in + `@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the + subtask start/complete/fail and the Task cascade), exactly like + `applications/landlord_description_overrides/handler.py`. The trigger body: + + ```python + class BulkUploadFinaliserTriggerBody(SubtaskTriggerBody): # task_id, sub_task_id + s3_uri: str # combiner output (combined_output_s3_uri) + portfolio_id: int + bulk_upload_id: UUID + ``` + + Dispatched via a new `POST /v1/bulk-uploads/trigger-finaliser` FastAPI endpoint + (auth `validate_token`) that enqueues to a new SQS queue — mirroring + `trigger-postcode-splitter` / `trigger-landlord-overrides`. + +2. **`PropertyRow` drops its "backend never inserts" invariant** and gains the + insertable columns finalise needs — the **exact nine** the frontend route writes + today: `portfolio_id`, `creation_status` (`'READY'`), `uprn`, + `landlord_property_id` (← `Internal Reference`), `address` (matched ?? user + input), `postcode`, `user_inputted_address`, `user_inputted_postcode`, + `lexiscore`. Drizzle remains the schema source of truth (ADR-0003); this is a + mirror change only. + +3. **Insert policy lives in SQL, idempotent:** + + ```sql + INSERT INTO property (portfolio_id, creation_status, uprn, landlord_property_id, + address, postcode, user_inputted_address, + user_inputted_postcode, lexiscore) + VALUES … + ON CONFLICT (portfolio_id, uprn) WHERE uprn IS NOT NULL + DO NOTHING; + ``` + + This reproduces the frontend's `onConflictDoNothing` exactly — existing properties + are not churned on a re-run. + +4. **The Lambda writes the terminal BulkUpload status directly — DDD, on its own + session.** On success it sets `status='complete'`; on failure `status='failed'`. + Rather than the combiner's `backend/app/db/functions` (which open the legacy + `backend/` connection on the separate `DB_*` config), the finaliser stays + **PostgresConfig-only like the landlord classifier Lambda**: it writes status + through a DDD `BulkUploadStatusWriter` port (`repositories/bulk_upload/`) backed + by a minimal `BulkAddressUploadRow` mirror (`infrastructure/postgres/`), on the + *same* session. So a single DB config (`POSTGRES_*`) drives the run and the + image needs no `backend/`. The `complete` flip happens **in the same transaction + as the property insert** (atomic finalise — either both land or neither); + `failed` is written on a fresh session in the error path. Next.js no longer + writes `complete`; it owns only the `awaiting_review → finalising` + compare-and-swap at dispatch (frontend ADR-0005). + +5. **DDD layering (ADR-0003 + the landlord precedent).** The + `orchestration/bulk_upload_finaliser_orchestrator.py` owns the whole Finalise + domain flow via a single `finalise(rows, portfolio_id, task_id)` that depends + **only on the two repository ports** — it resolves the combiner rows, inserts + the properties, and marks the upload `complete`, with no engine/session/DB of + its own. So it is unit-testable with two in-memory fakes + (`tests/orchestration/test_bulk_upload_finaliser_orchestrator.py`). The Lambda + `applications/bulk_upload_finaliser/handler.py` is the composition root: parse + trigger, read S3, build the engine/session and the concrete adapters, open the + transaction around `finalise`, and handle the failure path. New seams: + - Property insert: `insert_all` (+ the `PropertyIdentityInsert` DTO) is added to + the existing `PropertyRepository` port and its `PropertyPostgresRepository` + adapter — **one repository per aggregate**, reads and writes together, writing + the amended `infrastructure/postgres/property_table.py` mirror. `epc_repo` is + optional: the write path creates identity rows and never hydrates, so the + finaliser constructs the repo with a session alone. + - Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter + `…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py` + mirror (a separate table → a separate repository). + - Transaction boundary stays in the `infrastructure/postgres/engine` helper + (`commit_scope`); the handler opens the context (once, around insert+complete), + never calls `.commit()`; orchestrator and repositories never commit. + +6. **`property_overrides` is out of scope (v2).** The table exists (frontend + migration 0221) but this Lambda does not populate it in v1. When v2 lands it adds + a `PropertyOverrideRow` mirror + `infrastructure/property/property_overrides_postgres_repository.py` + using `onConflictDoUpdate` per frontend ADR-0005. + +## Consequences + +**Positive.** +- 40k inserts run in a Lambda with a single `sub_task` row to observe, not behind a + synchronous HTTP request. +- Reuses the `@subtask_handler` / SQS / direct-Postgres machinery — reviewers have + the classifier handler and the combiner status-writers as precedents. + +**Negative.** +- **`PropertyRow` is no longer read-only.** The defensive invariant that made + accidental backend property writes impossible is gone; correctness now rests on the + finaliser being the only inserting caller. +- **Terminal-status ownership crosses the repo boundary.** The backend now writes a + BulkUpload status (`complete`/`failed`) that Next.js used to own — coordinated only + through the status column (frontend ADR-0005's "two writers, extended"). +- The Drizzle/SQLModel lockstep risk from ADR-0003 now extends to `property`'s + insertable columns. diff --git a/etl/hubspot/__init__.py b/etl/hubspot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/infrastructure/postgres/bulk_address_upload_table.py b/infrastructure/postgres/bulk_address_upload_table.py new file mode 100644 index 00000000..e43d7e84 --- /dev/null +++ b/infrastructure/postgres/bulk_address_upload_table.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import ClassVar, Optional +from uuid import UUID + +from sqlmodel import Field, SQLModel + + +class BulkAddressUploadRow(SQLModel, table=True): + """Minimal mirror of the FE-owned ``bulk_address_uploads`` table. + + The schema source of truth is the Next.js Drizzle repo + (``src/app/db/schema/bulk_address_uploads.ts``); the backend's fuller mirror + lives at ``backend/app/db/models/bulk_address_uploads.py``. This DDD-side + mirror declares only the columns the finaliser writes — the terminal status — + so the finaliser can flip status on its own ``PostgresConfig`` session without + pulling in the legacy ``backend/`` connection (ADR-0013). Keep the column names + in step with the Drizzle source. + """ + + __tablename__: ClassVar[str] = "bulk_address_uploads" # pyright: ignore[reportIncompatibleVariableOverride] + + id: UUID = Field(primary_key=True) + task_id: Optional[UUID] = Field(default=None, index=True) + status: str + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) diff --git a/infrastructure/postgres/property_table.py b/infrastructure/postgres/property_table.py index 6bd2d644..c333cad4 100644 --- a/infrastructure/postgres/property_table.py +++ b/infrastructure/postgres/property_table.py @@ -2,24 +2,49 @@ from __future__ import annotations from typing import ClassVar, Optional +from sqlalchemy import Column +from sqlalchemy import Enum as SAEnum from sqlmodel import Field, SQLModel +# Mirror of the FE-owned `creation_status` pgEnum (property.ts: +# propertyCreationStatusEnum = {LOADING, READY, ERROR}). A single SAEnum instance +# so test-schema create_all emits one CREATE TYPE; prod owns the type via Drizzle. +property_creation_status_sa_enum = SAEnum( + "LOADING", "READY", "ERROR", name="creation_status" +) + class PropertyRow(SQLModel, table=True): - """Defensive view of the FE-owned ``property`` table. + """Mirror of the FE-owned ``property`` table. - The schema and migrations for ``property`` are owned by the front-end - Next.js repo; this declares only the identity columns the modelling backend - reads/writes, so FE-owned migrations to other columns don't ripple into us. + The schema and migrations for ``property`` are owned by the front-end Next.js + repo (``src/app/db/schema/property.ts``); this declares the identity columns + the modelling backend reads, plus the subset the ``bulk_upload_finaliser`` + Lambda **inserts** at Finalise (ADR-0013). It is no longer read-only — the + finaliser is the one backend caller that inserts. Columns not declared here + are still owned by FE migrations and don't ripple into us. """ __tablename__: ClassVar[str] = "property" # pyright: ignore[reportIncompatibleVariableOverride] - # Non-Optional: this is a read-only defensive view of the FE-owned ``property`` - # table — the backend never inserts rows, so every row read carries an id. - id: int = Field(primary_key=True) + # bigserial in the FE schema — DB-assigned on insert, so Optional/None on the + # way in and always populated on the way out. + id: Optional[int] = Field(default=None, primary_key=True) portfolio_id: int - postcode: str - address: str + # Nullable in the FE schema. The finaliser writes `matched ?? user-inputted`, + # which is absent for fully-unmatched rows. + postcode: Optional[str] = Field(default=None) + address: Optional[str] = Field(default=None) uprn: Optional[int] = Field(default=None) landlord_property_id: Optional[str] = Field(default=None) + + # Insertable columns the finaliser writes (ADR-0013). All nullable in the FE + # schema except `creation_status` (NOT NULL); the finaliser always sets it to + # 'READY', so a nullable mirror is safe — the real column enforces NOT NULL. + creation_status: Optional[str] = Field( + default=None, + sa_column=Column(property_creation_status_sa_enum, nullable=True), + ) + user_inputted_address: Optional[str] = Field(default=None) + user_inputted_postcode: Optional[str] = Field(default=None) + lexiscore: Optional[float] = Field(default=None) diff --git a/orchestration/bulk_upload_finaliser_orchestrator.py b/orchestration/bulk_upload_finaliser_orchestrator.py new file mode 100644 index 00000000..46ad5b12 --- /dev/null +++ b/orchestration/bulk_upload_finaliser_orchestrator.py @@ -0,0 +1,146 @@ +"""Finalises a BulkUpload into ``property`` rows (ADR-0013). + +The domain logic of Finalise: turn the combiner output rows into property identity +rows (the same resolution the old Next.js ``/finalize`` route did) and persist them +through the injected writer. Like every orchestrator it never commits — the caller +owns the transaction boundary (see the Lambda handler). +""" + +from __future__ import annotations + +import re +from typing import Any, Optional + +from uuid import UUID + +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) + +# Combiner-output columns — identical to the old frontend /finalize route and the +# backend combined-results reader (router.py). +ADDRESS_COLS = ("Address 1", "Address 2", "Address 3") +POSTCODE_COL = "postcode" +INTERNAL_REF_COL = "Internal Reference" +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +LEXISCORE_COL = "address2uprn_lexiscore" +MISSING_SENTINEL = "invalid postcode" +UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE) + + +def _normalize(value: Any) -> str: + if value is None: + return "" + return str(value).strip() + + +def _is_missing(value: str) -> bool: + return value == "" or value.lower() == MISSING_SENTINEL + + +def _parse_uprn(raw: Any) -> Optional[int]: + val = _normalize(raw) + if _is_missing(val): + return None + try: + return int(val) + except ValueError: + return None + + +def _parse_lexiscore(raw: Any) -> Optional[float]: + val = _normalize(raw) + if _is_missing(val): + return None + try: + return float(val) + except ValueError: + return None + + +def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]: + if matched: + m = UK_POSTCODE_RE.search(matched) + if m: + return m.group(0).upper() + return fallback or None + + +class BulkUploadFinaliserOrchestrator: + """Owns the domain flow of Finalise, depending only on repository ports. + + Both collaborators are ports (``PropertyRepository``, + ``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the + Lambda handler (the composition root). So a unit test constructs this with two + fakes and exercises ``finalise`` end-to-end — no engine, session, or DB. The + orchestrator never commits: the caller opens the transaction around + ``finalise`` so the insert and the ``complete`` flip land atomically. + """ + + def __init__( + self, + property_repo: PropertyRepository, + status_writer: BulkUploadStatusWriter, + ) -> None: + self._property_repo = property_repo + self._status_writer = status_writer + + def finalise( + self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID + ) -> int: + """Resolve the combiner rows, insert the properties, and mark the upload + ``complete`` — all via the injected repositories, no DB connection of its + own. Returns the number of properties inserted. Does not commit.""" + inserts = self.to_property_rows(rows, portfolio_id) + inserted = self._property_repo.insert_all(inserts) + self._status_writer.set_status(task_id, "complete") + return inserted + + def to_property_rows( + self, rows: list[dict[str, str]], portfolio_id: int + ) -> list[PropertyIdentityInsert]: + """Resolve combiner rows into property identity inserts. + + Pure (no DB / IO) and independently testable. Reproduces the old + ``/finalize`` route's resolution exactly: matched address falls back to the + user-inputted one; postcode is extracted from the matched address or falls + back to the user-inputted postcode. + """ + return [self._row_to_insert(raw, portfolio_id) for raw in rows] + + @staticmethod + def _row_to_insert( + raw: dict[str, str], portfolio_id: int + ) -> PropertyIdentityInsert: + user_inputted_address = ( + ", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p) + or None + ) + user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None + + uprn = _parse_uprn(raw.get(UPRN_COL)) + + matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) + matched_address = ( + None if _is_missing(matched_address_raw) else matched_address_raw + ) + + address = matched_address or user_inputted_address + postcode = _extract_postcode(matched_address, user_inputted_postcode or "") + internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None + lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) + + return PropertyIdentityInsert( + portfolio_id=portfolio_id, + uprn=uprn, + landlord_property_id=internal_ref, + address=address, + postcode=postcode, + user_inputted_address=user_inputted_address, + user_inputted_postcode=user_inputted_postcode, + lexiscore=lexiscore, + creation_status="READY", + ) diff --git a/pytest.ini b/pytest.ini index 47b40c17..1303dcb4 100644 --- a/pytest.ini +++ b/pytest.ini @@ -7,6 +7,7 @@ testpaths = recommendations/tests backend/tests backend/address2UPRN/tests + backend/ordnanceSurvey/tests backend/app/db/functions/tests backend/categorisation/tests backend/condition/tests @@ -25,7 +26,5 @@ testpaths = etl/epc_clean/tests etl/hubspot/tests etl/spatial/tests - domain/sap10_ml/tests - tests/ markers = integration: mark a test as an integration test diff --git a/repositories/bulk_upload/bulk_upload_status_writer.py b/repositories/bulk_upload/bulk_upload_status_writer.py new file mode 100644 index 00000000..70fdb1b2 --- /dev/null +++ b/repositories/bulk_upload/bulk_upload_status_writer.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from uuid import UUID + + +class BulkUploadStatusWriter(ABC): + """Port: writes the terminal BulkUpload status at Finalise (ADR-0013). + + The finaliser owns the terminal write — Next.js no longer writes ``complete``; + it only flips ``awaiting_review → finalising`` at dispatch (ADR-0005). This is + the backend half of that "two writers" split, alongside the combiner's + ``combining``/``awaiting_review`` writes. + """ + + @abstractmethod + def set_status(self, task_id: UUID, status: str) -> None: + """Set the status of the bulk_address_uploads row for ``task_id``. + + Does not commit — the caller owns the transaction boundary, so the status + flip can share the same transaction as the property insert (atomic + finalise) or run in its own session for the failure path. + """ + ... diff --git a/repositories/bulk_upload/bulk_upload_status_writer_postgres.py b/repositories/bulk_upload/bulk_upload_status_writer_postgres.py new file mode 100644 index 00000000..ba83c4d7 --- /dev/null +++ b/repositories/bulk_upload/bulk_upload_status_writer_postgres.py @@ -0,0 +1,33 @@ +"""Postgres adapter for ``BulkUploadStatusWriter`` (ADR-0013). + +Flips the ``bulk_address_uploads`` status on the caller's session — so the +finaliser can mark ``complete`` in the *same* transaction as the property insert +(atomic finalise), and mark ``failed`` in a fresh session on the error path. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import UUID + +from sqlmodel import Session, select + +from infrastructure.postgres.bulk_address_upload_table import BulkAddressUploadRow +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter + + +class BulkUploadStatusWriterPostgresRepository(BulkUploadStatusWriter): + def __init__(self, session: Session) -> None: + self._session = session + + def set_status(self, task_id: UUID, status: str) -> None: + row = self._session.exec( + select(BulkAddressUploadRow).where( + BulkAddressUploadRow.task_id == task_id + ) + ).first() + if row is None: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.status = status + row.updated_at = datetime.now(timezone.utc) + self._session.add(row) diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index 55a32ed3..407fe1e5 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -1,39 +1,65 @@ from __future__ import annotations +from typing import Any, Optional, cast + +from sqlalchemy import Table +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col, select from domain.property.properties import Properties from domain.property.property import Property, PropertyIdentity from infrastructure.postgres.property_table import PropertyRow from repositories.epc.epc_repository import EpcRepository -from repositories.property.property_repository import PropertyRepository +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) class PropertyPostgresRepository(PropertyRepository): - """Hydrates the Property aggregate from the FE-owned ``property`` row plus the - EPC slice (via an injected `EpcRepository`). Reads only from repos — no - external IO — so a hydrated Property is a pure function of repository state - (ADR-0003). + """Postgres adapter for the ``property`` table — reads and writes (ADR-0003). + + Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice + (via an injected `EpcRepository`), so a hydrated Property is a pure function of + repository state. ``epc_repo`` is optional: the Finalise write path + (``insert_all``) creates new identity rows and never hydrates, so callers that + only insert construct this with a session alone. """ - def __init__(self, session: Session, epc_repo: EpcRepository) -> None: + def __init__( + self, session: Session, epc_repo: Optional[EpcRepository] = None + ) -> None: self._session = session self._epc_repo = epc_repo + # ``__table__`` is injected at runtime on table=True classes but the stubs + # don't expose it; pin to ``Table`` so the dialect insert is typed. + self._table: Table = cast(Table, getattr(PropertyRow, "__table__")) + + def _epc(self) -> EpcRepository: + if self._epc_repo is None: + raise ValueError( + "PropertyPostgresRepository needs an EpcRepository to read; it was " + "constructed for the write-only Finalise path." + ) + return self._epc_repo def get(self, property_id: int) -> Property: row = self._session.get(PropertyRow, property_id) if row is None: raise ValueError(f"property {property_id} not found") identity = PropertyIdentity( + # `postcode`/`address` are nullable in the FE schema (the finaliser may + # insert a row with neither); coerce the degenerate null to "" so the + # identity type stays a plain str. portfolio_id=row.portfolio_id, - postcode=row.postcode, - address=row.address, + postcode=row.postcode or "", + address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ) return Property( identity=identity, - epc=self._epc_repo.get_for_property(property_id), + epc=self._epc().get_for_property(property_id), ) def get_many(self, property_ids: list[int]) -> Properties: @@ -43,7 +69,7 @@ class PropertyPostgresRepository(PropertyRepository): select(PropertyRow).where(col(PropertyRow.id).in_(property_ids)) ).all() row_by_id = {row.id: row for row in rows} - epcs = self._epc_repo.get_for_properties(property_ids) + epcs = self._epc().get_for_properties(property_ids) items: list[Property] = [] for property_id in property_ids: row = row_by_id.get(property_id) @@ -53,8 +79,8 @@ class PropertyPostgresRepository(PropertyRepository): Property( identity=PropertyIdentity( portfolio_id=row.portfolio_id, - postcode=row.postcode, - address=row.address, + postcode=row.postcode or "", + address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ), @@ -62,3 +88,36 @@ class PropertyPostgresRepository(PropertyRepository): ) ) return Properties(items) + + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + if not rows: + return 0 + + values = [ + { + "portfolio_id": r.portfolio_id, + "creation_status": r.creation_status, + "uprn": r.uprn, + "landlord_property_id": r.landlord_property_id, + "address": r.address, + "postcode": r.postcode, + "user_inputted_address": r.user_inputted_address, + "user_inputted_postcode": r.user_inputted_postcode, + "lexiscore": r.lexiscore, + } + for r in rows + ] + + stmt = pg_insert(self._table).values(values) + # Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL), + # reproducing today's Next.js onConflictDoNothing — a re-run leaves existing + # properties untouched (contrast property_overrides, which recalculates). + stmt = stmt.on_conflict_do_nothing( + index_elements=["portfolio_id", "uprn"], + index_where=self._table.c.uprn.isnot(None), + ) + + # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked + # deprecated in the stubs but the INSERT path is supported. + result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] + return cast(int, result.rowcount) diff --git a/repositories/property/property_repository.py b/repositories/property/property_repository.py index 1f3df1da..1773b530 100644 --- a/repositories/property/property_repository.py +++ b/repositories/property/property_repository.py @@ -1,17 +1,40 @@ from __future__ import annotations from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional from domain.property.properties import Properties from domain.property.property import Property -class PropertyRepository(ABC): - """Loads and saves the Property aggregate. +@dataclass(frozen=True) +class PropertyIdentityInsert: + """One row inserted into the FE-owned ``property`` table at Finalise (ADR-0013). - Composes the aggregate whole from the FE-owned ``property`` identity row plus - its source-data slices (EPC today; Site Notes / enrichments as later slices - land). Aggregates load whole — never half a Property (ADR-0002). + Mirrors the exact column set today's Next.js ``/finalize`` writes: nine fields + plus ``creation_status='READY'``. ``address``/``postcode`` are the resolved + (matched ?? user-inputted) values and may be ``None``. + """ + + portfolio_id: int + uprn: Optional[int] + landlord_property_id: Optional[str] + address: Optional[str] + postcode: Optional[str] + user_inputted_address: Optional[str] + user_inputted_postcode: Optional[str] + lexiscore: Optional[float] + creation_status: str = "READY" + + +class PropertyRepository(ABC): + """Reads and writes the FE-owned ``property`` table. + + Reads hydrate the Property aggregate whole — never half a Property — from the + identity row plus its source-data slices (EPC today; Site Notes / enrichments + as later slices land) (ADR-0002, ADR-0012). Writes bulk-insert identity rows at + Finalise (ADR-0013). One repository per aggregate. """ @abstractmethod @@ -23,3 +46,11 @@ class PropertyRepository(ABC): rather than one round-trip per property (ADR-0012). Order follows the input ids.""" ... + + @abstractmethod + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + """Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)`` + already exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). + Rows with no UPRN are always inserted. Returns the number actually + inserted; an empty list is a no-op returning 0 (ADR-0013).""" + ... diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py index 3e2feef0..c9dcf891 100644 --- a/tests/orchestration/fakes.py +++ b/tests/orchestration/fakes.py @@ -15,7 +15,10 @@ from domain.property.properties import Properties from domain.property.property import Property from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository from repositories.epc.epc_repository import EpcRepository -from repositories.property.property_repository import PropertyRepository +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) from repositories.solar.solar_repository import SolarRepository from repositories.unit_of_work import UnitOfWork @@ -30,6 +33,10 @@ class FakePropertyRepo(PropertyRepository): def get_many(self, property_ids: list[int]) -> Properties: return Properties([self._by_id[property_id] for property_id in property_ids]) + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + self.inserted: list[PropertyIdentityInsert] = list(rows) + return len(rows) + class FakeEpcRepo(EpcRepository): def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None: diff --git a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py new file mode 100644 index 00000000..da6de277 --- /dev/null +++ b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py @@ -0,0 +1,105 @@ +"""The finaliser orchestrator runs end-to-end against fake writers — no DB. + +This is the payoff of injecting the repository ports (ADR-0013): the whole +Finalise domain flow (resolve combiner rows -> insert properties -> mark complete) +is unit-testable with two in-memory fakes, no engine/session/Postgres. +""" + +from __future__ import annotations + +from uuid import UUID, uuid4 + +from domain.property.properties import Properties +from domain.property.property import Property +from orchestration.bulk_upload_finaliser_orchestrator import ( + BulkUploadFinaliserOrchestrator, +) +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter +from repositories.property.property_repository import ( + PropertyIdentityInsert, + PropertyRepository, +) + + +class FakePropertyRepository(PropertyRepository): + """Records inserts; the read half is unused on the Finalise path.""" + + def __init__(self) -> None: + self.inserted: list[PropertyIdentityInsert] = [] + + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + self.inserted = list(rows) + return len(rows) + + def get(self, property_id: int) -> Property: # pragma: no cover + raise NotImplementedError + + def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover + raise NotImplementedError + + +class FakeStatusWriter(BulkUploadStatusWriter): + def __init__(self) -> None: + self.calls: list[tuple[UUID, str]] = [] + + def set_status(self, task_id: UUID, status: str) -> None: + self.calls.append((task_id, status)) + + +def _orchestrator() -> tuple[ + BulkUploadFinaliserOrchestrator, FakePropertyRepository, FakeStatusWriter +]: + prop = FakePropertyRepository() + status = FakeStatusWriter() + return BulkUploadFinaliserOrchestrator(prop, status), prop, status + + +def test_finalise_inserts_resolved_rows_and_marks_complete() -> None: + orchestrator, prop, status = _orchestrator() + task_id = uuid4() + rows = [ + { + "Address 1": "1 Some Street", + "postcode": "A0 0AA", + "Internal Reference": "REF-1", + "address2uprn_uprn": "100023", + "address2uprn_address": "1 SOME STREET, TOWN, SW1A 1AA", + "address2uprn_lexiscore": "0.95", + }, + ] + + inserted = orchestrator.finalise(rows, portfolio_id=7, task_id=task_id) + + assert inserted == 1 + (row,) = prop.inserted + assert row.portfolio_id == 7 + assert row.uprn == 100023 + assert row.landlord_property_id == "REF-1" + assert row.address == "1 SOME STREET, TOWN, SW1A 1AA" # matched wins + assert row.postcode == "SW1A 1AA" # extracted from the matched address + assert row.user_inputted_address == "1 Some Street" + assert row.lexiscore == 0.95 + assert row.creation_status == "READY" + # The upload is marked complete via the injected status writer. + assert status.calls == [(task_id, "complete")] + + +def test_finalise_falls_back_to_user_input_when_unmatched() -> None: + orchestrator, prop, _status = _orchestrator() + rows = [ + { + "Address 1": "2 Other Road", + "postcode": "B1 1BB", + "address2uprn_uprn": "invalid postcode", + "address2uprn_address": "invalid postcode", + "address2uprn_lexiscore": "", + }, + ] + + orchestrator.finalise(rows, portfolio_id=9, task_id=uuid4()) + + (row,) = prop.inserted + assert row.uprn is None # missing sentinel -> None + assert row.address == "2 Other Road" # falls back to user input + assert row.postcode == "B1 1BB" # falls back to user-inputted postcode + assert row.lexiscore is None diff --git a/utils/sharepoint/domna_sharepoint_client.py b/utils/sharepoint/domna_sharepoint_client.py index 3e9168ba..c36cb418 100644 --- a/utils/sharepoint/domna_sharepoint_client.py +++ b/utils/sharepoint/domna_sharepoint_client.py @@ -37,7 +37,7 @@ class DomnaSharepointClient: site_id=self.sharepoint_drive.value, ) - return sharepoint_client.list_folder_contents(path) + return sharepoint_client.list_folder_contents(path, page_size=500) def does_folder_exists_at(self, file_name: str, file_path: str): folders: Dict[str, Any] = self.get_folders_in_path(file_path)