From c4f6d77845de6c4730ba4aed5ec0c052a8306642 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 21 Apr 2026 20:23:33 +0000 Subject: [PATCH 1/6] implemented onboarding --- .devcontainer/backend/Dockerfile | 25 ++- .devcontainer/backend/devcontainer.json | 8 +- .devcontainer/backend/docker-compose.yml | 20 ++- .gitignore | 3 +- CLAUDE.md | 1 + backend/app/bulk_uploads/router.py | 142 +++++++++++++++++- backend/app/bulk_uploads/schema.py | 33 ++++ backend/app/bulk_uploads/scoring.py | 14 ++ backend/app/config.py | 1 + backend/app/db/functions/tasks/Tasks.py | 2 +- backend/app/db/models/bulk_address_uploads.py | 18 +++ backend/bulk_address2uprn_combiner/main.py | 11 +- backend/run_local.sh | 3 +- backend/tests/test_bulk_combined_results.py | 21 +++ backend/tests/test_bulk_combiner_status.py | 89 +++++++++++ ...-uploads-trigger-splitter-FastAPI-route.md | 38 ----- ...with-real-schema-prevent-rogue-migrations.md | 50 ------ ...k-uploads-task_id-combine-FastAPI-route.md | 36 ----- ...-uploads-task_id-combined-results-route.md | 35 ----- ...k-uploads-task_id-confirm-matches-route.md | 37 ----- ...ombiner-when-splitter-subtasks-complete.md | 34 ----- ..._address_uploads-combined_output_s3_uri.md | 30 ---- ...ODE_SPLITTER_QUEUE_NAME-to-backend-envs.md | 35 ----- ...tter-combiner-queues-to-backend-runtime.md | 34 ----- ...bda-queue-via-terraform-to-staging-prod.md | 28 ---- 25 files changed, 375 insertions(+), 373 deletions(-) create mode 100644 backend/app/bulk_uploads/scoring.py create mode 100644 backend/tests/test_bulk_combined_results.py create mode 100644 backend/tests/test_bulk_combiner_status.py delete mode 100644 backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md delete mode 100644 backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md delete mode 100644 backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md delete mode 100644 backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md delete mode 100644 backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md delete mode 100644 backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md delete mode 100644 backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md delete mode 100644 backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md delete mode 100644 backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md delete mode 100644 backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 3ddb8d37..a92d37f6 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -1,15 +1,23 @@ -FROM python:3.11.10-bullseye +FROM python:3.11.10-bookworm ARG USER=vscode +ARG USER_UID=1000 +ARG USER_GID=1000 ARG DEBIAN_FRONTEND=noninteractive -# 1) Toolchain + utilities for building libpostal +# 1) Toolchain + utilities for building libpostal, plus LazyVim deps RUN apt-get update && apt-get install -y --no-install-recommends \ - sudo jq vim curl git ca-certificates \ + sudo jq vim curl git ca-certificates wget \ build-essential pkg-config automake autoconf libtool \ + ripgrep fd-find make unzip \ && rm -rf /var/lib/apt/lists/* +# Neovim latest (LazyVim needs >=0.9) +RUN curl -fsSL https://github.com/neovim/neovim/releases/latest/download/nvim-linux-x86_64.tar.gz \ + | tar -xz -C /opt \ + && ln -s /opt/nvim-linux-x86_64/bin/nvim /usr/local/bin/nvim + # # 2) Build and install libpostal from source # RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ # && cd /tmp/libpostal \ @@ -21,7 +29,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # && rm -rf /tmp/libpostal # 3) Create the user and grant sudo privileges -RUN useradd -m -s /usr/bin/bash ${USER} \ +RUN groupadd -g ${USER_GID} ${USER} \ + && useradd -m -u ${USER_UID} -g ${USER_GID} -s /usr/bin/bash ${USER} \ && echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \ && chmod 0440 /etc/sudoers.d/${USER} @@ -72,11 +81,15 @@ RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ && npm install -g backlog.md \ && rm -rf /var/lib/apt/lists/* -# Install Claude USER ${USER} + +# Bootstrap LazyVim starter config +RUN git clone https://github.com/LazyVim/starter /home/${USER}/.config/nvim \ + && rm -rf /home/${USER}/.config/nvim/.git +# Install Claude RUN curl -fsSL https://claude.ai/install.sh | bash \ && export PATH="/home/${USER}/.local/bin:${PATH}" \ && claude plugin marketplace add JuliusBrussee/caveman \ && claude plugin install caveman@caveman ENV PATH="/home/vscode/.local/bin:${PATH}" -USER root \ No newline at end of file +USER root diff --git a/.devcontainer/backend/devcontainer.json b/.devcontainer/backend/devcontainer.json index b104e6e1..a9b7352a 100644 --- a/.devcontainer/backend/devcontainer.json +++ b/.devcontainer/backend/devcontainer.json @@ -6,7 +6,7 @@ "workspaceFolder": "/workspaces/model", "postStartCommand": "bash .devcontainer/backend/post-install.sh", "mounts": [ - "source=${localEnv:HOME},target=/home/vscode,type=bind", + "source=${localEnv:HOME},target=/workspaces/home,type=bind", "source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached" ], "customizations": { @@ -44,11 +44,15 @@ "containerEnv": { "PYTHONFLAGS": "-Xfrozen_modules=off" }, - "forwardPorts": [6421], + "forwardPorts": [6421, 8000], "portsAttributes": { "6421": { "label": "Backlog.md", "onAutoForward": "notify" + }, + "8000": { + "label": "FastAPI", + "onAutoForward": "notify" } } } diff --git a/.devcontainer/backend/docker-compose.yml b/.devcontainer/backend/docker-compose.yml index 683b4489..757cfbe0 100644 --- a/.devcontainer/backend/docker-compose.yml +++ b/.devcontainer/backend/docker-compose.yml @@ -2,13 +2,23 @@ version: '3.8' services: model-backend: - user: "${UID}:${GID}" build: context: ../.. dockerfile: .devcontainer/backend/Dockerfile + args: + USER_UID: ${UID:-1000} + USER_GID: ${GID:-1000} command: sleep infinity + ports: + - "8000:8000" volumes: - ../../:/workspaces/model + - ~/.gitconfig:/home/vscode/.gitconfig:ro + environment: + - SSH_AUTH_SOCK=${SSH_AUTH_SOCK:-} + networks: + - backend-net + - shared-dev db: @@ -22,7 +32,15 @@ services: - POSTGRES_PASSWORD=makingwarmerhomes volumes: - postgres-data-two:/var/lib/postgresql/data + networks: + - backend-net +networks: + backend-net: + driver: bridge + shared-dev: + external: true + volumes: postgres-data-two: \ No newline at end of file diff --git a/.gitignore b/.gitignore index 51a32a0d..ee5aa8ab 100644 --- a/.gitignore +++ b/.gitignore @@ -290,4 +290,5 @@ local_data* pyrightconfig.json # playwright output -*/pashub_fetcher/videos/* \ No newline at end of file +*/pashub_fetcher/videos/* +backlog/* diff --git a/CLAUDE.md b/CLAUDE.md index aa0426a0..de2917f2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -27,3 +27,4 @@ You MUST read the overview resource to understand the complete workflow. The inf + diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index ca1e7b79..1e341790 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -1,4 +1,5 @@ import boto3 +from collections import Counter from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import select @@ -7,9 +8,43 @@ from backend.app.dependencies import validate_token from backend.app.config import get_settings from backend.app.db.connection import get_db_session from backend.app.db.models.bulk_address_uploads import BulkAddressUpload -from backend.app.bulk_uploads.schema import PostcodeSplitterTriggerRequest +from backend.app.bulk_uploads.schema import ( + CombinedResultRow, + CombinedResultsResponse, + CombinerTriggerRequest, + FlagsSummary, + PostcodeSplitterTriggerRequest, +) +from backend.app.bulk_uploads.scoring import score_bucket from utils.s3 import parse_s3_uri, read_csv_from_s3 +ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode") +INTERNAL_REF_COL = "Internal Reference" +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +LEXISCORE_COL = "address2uprn_lexiscore" +MISSING_SENTINEL = "invalid postcode" + + +def _normalize(value) -> str: + if value is None: + return "" + return str(value).strip() + + +def _is_missing_uprn(uprn: str) -> bool: + return uprn == "" or uprn.lower() == MISSING_SENTINEL + + +def _parse_lexiscore(raw) -> float | None: + val = _normalize(raw) + if not val or val.lower() == MISSING_SENTINEL: + return None + try: + return float(val) + except ValueError: + return None + router = APIRouter( prefix="/bulk-uploads", @@ -37,3 +72,108 @@ async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest): "sqs_message_id": response.get("MessageId"), } + +@router.post("/trigger-combiner", status_code=202) +async def trigger_combiner(req: CombinerTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.COMBINER_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + +@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) +async def get_combined_results( + task_id: UUID, + offset: int = Query(0, ge=0), + limit: int = Query(500, ge=1, le=5000), +): + with get_db_session() as session: + upload = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + + if upload is None: + raise HTTPException(status_code=404, detail="Upload not found for task_id") + if not upload.combined_output_s3_uri: + raise HTTPException(status_code=409, detail="Combiner not finished") + + bucket, key = parse_s3_uri(upload.combined_output_s3_uri) + try: + raw_rows = read_csv_from_s3(bucket, key) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}") + + uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows] + uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u)) + duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2} + + missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u)) + duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns) + matched_count = len(raw_rows) - missing_count + + page = raw_rows[offset : offset + limit] + rows: list[CombinedResultRow] = [] + for i, raw in enumerate(page): + absolute_index = offset + i + address_parts = [ + _normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col)) + ] + input_address = ", ".join(address_parts) + internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None + + uprn_raw = _normalize(raw.get(UPRN_COL)) + uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw + + matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) + matched_address = ( + None + if not matched_address_raw or matched_address_raw.lower() == MISSING_SENTINEL + else matched_address_raw + ) + + lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) + + flags: list[str] = [] + if uprn is None: + flags.append("missing") + elif uprn in duplicate_uprns: + flags.append("duplicate") + + rows.append( + CombinedResultRow( + row_index=absolute_index, + input_address=input_address, + internal_reference=internal_ref, + uprn=uprn, + matched_address=matched_address, + lexiscore=lexiscore, + score_bucket=score_bucket(lexiscore), + flags=flags, + ) + ) + + return CombinedResultsResponse( + task_id=str(task_id), + total=len(raw_rows), + offset=offset, + limit=limit, + flags_summary=FlagsSummary( + duplicates=duplicate_count, + missing=missing_count, + matched=matched_count, + ), + rows=rows, + ) + diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index 98a80a2b..ca3b39ea 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -1,3 +1,5 @@ +from typing import List, Literal, Optional + from pydantic import BaseModel @@ -5,3 +7,34 @@ class PostcodeSplitterTriggerRequest(BaseModel): task_id: str sub_task_id: str s3_uri: str + + +class CombinerTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + + +class FlagsSummary(BaseModel): + duplicates: int + missing: int + matched: int + + +class CombinedResultRow(BaseModel): + row_index: int + input_address: str + internal_reference: Optional[str] = None + uprn: Optional[str] = None + matched_address: Optional[str] = None + lexiscore: Optional[float] = None + score_bucket: Optional[Literal["high", "med", "low"]] = None + flags: List[Literal["duplicate", "missing"]] = [] + + +class CombinedResultsResponse(BaseModel): + task_id: str + total: int + offset: int + limit: int + flags_summary: FlagsSummary + rows: List[CombinedResultRow] diff --git a/backend/app/bulk_uploads/scoring.py b/backend/app/bulk_uploads/scoring.py new file mode 100644 index 00000000..5ed946fb --- /dev/null +++ b/backend/app/bulk_uploads/scoring.py @@ -0,0 +1,14 @@ +from typing import Optional + +HIGH_THRESHOLD = 0.85 +MED_THRESHOLD = 0.65 + + +def score_bucket(score: Optional[float]) -> Optional[str]: + if score is None: + return None + if score >= HIGH_THRESHOLD: + return "high" + if score >= MED_THRESHOLD: + return "med" + return "low" diff --git a/backend/app/config.py b/backend/app/config.py index 2603ac72..70a6b50c 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): CATEGORISATION_SQS_URL: str = "changeme" PASHUB_TO_ARA_SQS_URL: str = "changeme" POSTCODE_SPLITTER_SQS_URL: str = "changeme" + COMBINER_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 96980e78..6c0fe89e 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -195,7 +195,7 @@ class SubTaskInterface: task.status = "failed" task.job_completed = now - elif all(s == "complete" for s in statuses): + elif all(s in ("complete", "completed") for s in statuses): task.status = "complete" task.job_completed = now diff --git a/backend/app/db/models/bulk_address_uploads.py b/backend/app/db/models/bulk_address_uploads.py index e7fae633..a136e77b 100644 --- a/backend/app/db/models/bulk_address_uploads.py +++ b/backend/app/db/models/bulk_address_uploads.py @@ -2,6 +2,8 @@ from typing import Optional from uuid import UUID, uuid4 from datetime import datetime, timezone +from sqlalchemy import Column +from sqlalchemy.dialects.postgresql import JSONB from sqlmodel import SQLModel, Field, select from backend.app.db.connection import get_db_session @@ -17,12 +19,27 @@ class BulkAddressUpload(SQLModel, table=True): s3_key: str = Field(nullable=False) filename: str = Field(nullable=False) status: str = Field(default="ready_for_processing", nullable=False) + column_mapping: Optional[dict] = Field(default=None, sa_column=Column(JSONB)) task_id: Optional[UUID] = Field(default=None) combined_output_s3_uri: Optional[str] = Field(default=None) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) +def set_combining_status(task_id: UUID) -> None: + now = datetime.now(timezone.utc) + with get_db_session() as session: + row = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not row: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.status = "combining" + row.updated_at = now + session.add(row) + session.commit() + + def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: now = datetime.now(timezone.utc) with get_db_session() as session: @@ -32,6 +49,7 @@ def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: if not row: raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") row.combined_output_s3_uri = s3_uri + row.status = "awaiting_review" row.updated_at = now session.add(row) session.commit() diff --git a/backend/bulk_address2uprn_combiner/main.py b/backend/bulk_address2uprn_combiner/main.py index 9b4dc6cb..85e0c5cb 100644 --- a/backend/bulk_address2uprn_combiner/main.py +++ b/backend/bulk_address2uprn_combiner/main.py @@ -8,7 +8,10 @@ from datetime import datetime, timezone from utils.logger import setup_logger from backend.utils.subtasks import subtask_handler -from backend.app.db.models.bulk_address_uploads import set_combined_csv_s3_uri +from backend.app.db.models.bulk_address_uploads import ( + set_combined_output_s3_uri, + set_combining_status, +) logger = setup_logger() @@ -38,6 +41,8 @@ def handler(body: dict[str, Any], context: Any) -> str: if not task_id_str: raise RuntimeError("Missing task_id in message body") + set_combining_status(UUID(task_id_str)) + bucket = S3_BUCKET_NAME if not bucket: raise RuntimeError("S3_BUCKET_NAME env var not set") @@ -68,7 +73,7 @@ def handler(body: dict[str, Any], context: Any) -> str: logger.info(f"Saved combined CSV to {s3_uri}") print(f"OUTPUT_S3_URI: {s3_uri}") - set_combined_csv_s3_uri(UUID(task_id_str), s3_uri) - logger.info(f"Persisted combined_csv_s3_uri for task {task_id_str}") + set_combined_output_s3_uri(UUID(task_id_str), s3_uri) + logger.info(f"Persisted combined_output_s3_uri + awaiting_review status for task {task_id_str}") return s3_uri diff --git a/backend/run_local.sh b/backend/run_local.sh index be45a54a..0201e508 100644 --- a/backend/run_local.sh +++ b/backend/run_local.sh @@ -2,5 +2,6 @@ set -a source ./.env set +a -uvicorn app.main:app --reload +cd .. +uvicorn backend.app.main:app --reload --host 0.0.0.0 --port 8000 diff --git a/backend/tests/test_bulk_combined_results.py b/backend/tests/test_bulk_combined_results.py new file mode 100644 index 00000000..33fae09b --- /dev/null +++ b/backend/tests/test_bulk_combined_results.py @@ -0,0 +1,21 @@ +import pytest + +from backend.app.bulk_uploads.scoring import score_bucket + + +@pytest.mark.parametrize( + "score,expected", + [ + (None, None), + (0.0, "low"), + (0.5, "low"), + (0.64, "low"), + (0.65, "med"), + (0.84, "med"), + (0.85, "high"), + (0.9, "high"), + (1.0, "high"), + ], +) +def test_score_bucket_thresholds(score, expected): + assert score_bucket(score) == expected diff --git a/backend/tests/test_bulk_combiner_status.py b/backend/tests/test_bulk_combiner_status.py new file mode 100644 index 00000000..6dd449a1 --- /dev/null +++ b/backend/tests/test_bulk_combiner_status.py @@ -0,0 +1,89 @@ +from datetime import datetime, timezone +from uuid import uuid4 + +import pytest +from sqlalchemy import create_engine +from sqlmodel import Session, SQLModel + +from backend.app.db.models import bulk_address_uploads as module +from backend.app.db.models.bulk_address_uploads import ( + BulkAddressUpload, + set_combined_output_s3_uri, + set_combining_status, +) + + +@pytest.fixture +def sqlite_session(monkeypatch): + engine = create_engine("sqlite:///:memory:") + SQLModel.metadata.create_all(engine) + + sessions = [] + + def factory(): + s = Session(engine) + sessions.append(s) + return s + + monkeypatch.setattr(module, "get_db_session", factory) + yield engine + for s in sessions: + s.close() + + +def _insert_row(engine, task_id, status="processing"): + with Session(engine) as session: + row = BulkAddressUpload( + id=uuid4(), + portfolio_id="p1", + user_id="u1", + s3_bucket="b", + s3_key="k", + filename="f.csv", + status=status, + task_id=task_id, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + session.add(row) + session.commit() + + +def _fetch(engine, task_id): + from sqlmodel import select + with Session(engine) as session: + return session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + + +def test_set_combining_status_updates_row(sqlite_session): + task_id = uuid4() + _insert_row(sqlite_session, task_id, status="processing") + + set_combining_status(task_id) + + row = _fetch(sqlite_session, task_id) + assert row.status == "combining" + assert row.combined_output_s3_uri is None + + +def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(sqlite_session): + task_id = uuid4() + _insert_row(sqlite_session, task_id, status="combining") + + set_combined_output_s3_uri(task_id, "s3://bucket/bulk_final_outputs/abc/combined.csv") + + row = _fetch(sqlite_session, task_id) + assert row.status == "awaiting_review" + assert row.combined_output_s3_uri == "s3://bucket/bulk_final_outputs/abc/combined.csv" + + +def test_set_combining_status_missing_row_raises(sqlite_session): + with pytest.raises(ValueError, match="No bulk_address_uploads row"): + set_combining_status(uuid4()) + + +def test_set_combined_output_s3_uri_missing_row_raises(sqlite_session): + with pytest.raises(ValueError, match="No bulk_address_uploads row"): + set_combined_output_s3_uri(uuid4(), "s3://x/y.csv") diff --git a/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md b/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md deleted file mode 100644 index b8a94ab0..00000000 --- a/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md +++ /dev/null @@ -1,38 +0,0 @@ ---- -id: TASK-1 -title: Add POST /bulk-uploads/trigger-splitter FastAPI route -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:31' -labels: - - backend - - bulk-upload - - api -dependencies: [] -priority: high -ordinal: 2000 ---- - -## Description - - -Expose an HTTP route that the frontend can call instead of sending SQS directly. Route: - -`POST /bulk-uploads/trigger-splitter` - -Body: `{task_id, sub_task_id, s3_uri}` — task+subtask already created by frontend `/api/tasks` call before this is invoked. - -Behaviour: validate inputs, then publish an SQS message to the postcode_splitter queue (see `backend/postcode_splitter/main.py` for expected message shape: `{task_id, sub_task_id, s3_uri}`). Use existing SubTaskInterface / TasksInterface patterns from `backend/app/tasks/router.py`. - -Place under `backend/app/` next to `tasks/router.py` — likely `backend/app/bulk_uploads/router.py`. - - -## Acceptance Criteria - -- [ ] #1 Route returns 202 with {task_id, sub_task_id} -- [ ] #2 SQS message enqueued with correct shape for postcode_splitter Lambda -- [ ] #3 Auth via existing `validate_token` dependency -- [ ] #4 Queue URL from config, not hardcoded -- [ ] #5 Unit test with mocked boto3 sqs client - diff --git a/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md b/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md deleted file mode 100644 index 02b0fa2c..00000000 --- a/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md +++ /dev/null @@ -1,50 +0,0 @@ ---- -id: TASK-10 -title: >- - Fix bulk_address_uploads SQLModel — align columns with real schema, prevent - rogue migrations -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:34' -labels: - - backend - - bulk-upload - - db -dependencies: [] -priority: high -ordinal: 7000 ---- - -## Description - - -`backend/app/db/models/bulk_address_uploads.py` has several bugs that cause a rogue `ALTER TABLE` and silent write failures: - -**1. Wrong column name** -Model declares `combined_csv_s3_uri` — real column (drizzle-managed) is `combined_output_s3_uri`. `set_combined_csv_s3_uri()` currently writes to a non-existent column. - -**2. Partial model declared as `table=True`** -Model only includes `id, task_id, combined_csv_s3_uri, status, updated_at`. Missing: `portfolio_id, user_id, s3_bucket, s3_key, filename, source_headers, column_mapping, created_at`. SQLModel `table=True` with incomplete columns causes Alembic autogenerate / `create_all` to try to ALTER or recreate the table. - -**3. `status` default mismatch** -Backend: `default="pending"`. Real table default: `'ready_for_processing'`. Triggers ALTER TABLE on migration runs. - -**4. `task_id` nullability mismatch** -Backend: `task_id: UUID` (NOT NULL). Frontend drizzle schema: nullable (set later, after onboarding starts). - -**Fix approach:** -- Declare all real columns matching drizzle schema (see `src/app/db/schema/bulk_address_uploads.ts` in assessment-model repo as source of truth). -- Rename `combined_csv_s3_uri` → `combined_output_s3_uri` throughout. -- `task_id: Optional[UUID]`, `status` default `'ready_for_processing'`. -- Ensure Alembic env excludes this table from autogenerate — drizzle owns migrations, not backend. - - -## Acceptance Criteria - -- [ ] #1 `combined_output_s3_uri` column name correct throughout model + helper -- [ ] #2 All columns present and match real table schema -- [ ] #3 No rogue ALTER TABLE when backend starts or migrations run -- [ ] #4 `task_id` nullable, `status` default `'ready_for_processing'` -- [ ] #5 Integration test: combiner runs → `combined_output_s3_uri` populated → frontend reads it correctly - diff --git a/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md b/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md deleted file mode 100644 index dc860e15..00000000 --- a/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md +++ /dev/null @@ -1,36 +0,0 @@ ---- -id: TASK-2 -title: 'Add POST /bulk-uploads/{task_id}/combine FastAPI route' -status: To Do -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 11:53' -labels: - - backend - - bulk-upload - - api -dependencies: [] -priority: high -ordinal: 10000 ---- - -## Description - - -Expose HTTP route to trigger `bulk_address2uprn_combiner`: - -`POST /bulk-uploads/{task_id}/combine` - -Creates a new sub_task under task_id, then pushes `{task_id, sub_task_id}` to the combiner SQS queue (see `backend/bulk_address2uprn_combiner/main.py` for consumer shape). - -Idempotency: if `bulk_address_uploads.combined_output_s3_uri` already set for this task, return 200 with `{already_combined: true}` (mirror current frontend behaviour). - - -## Acceptance Criteria - -- [ ] #1 Route returns 202 with {task_id, sub_task_id} on new trigger -- [ ] #2 Returns 200 {already_combined: true} if combined_output_s3_uri already set -- [ ] #3 SQS message enqueued with correct shape -- [ ] #4 Queue URL from config -- [ ] #5 Auth via validate_token - diff --git a/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md b/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md deleted file mode 100644 index 477a2ec3..00000000 --- a/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md +++ /dev/null @@ -1,35 +0,0 @@ ---- -id: TASK-3 -title: 'Add GET /bulk-uploads/{task_id}/combined-results route' -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:08' -labels: - - backend - - bulk-upload - - api -dependencies: [] -priority: high -ordinal: 1000 ---- - -## Description - - -`GET /bulk-uploads/{task_id}/combined-results` - -Behaviour: lookup `bulk_address_uploads` row by `task_id` → read `combined_output_s3_uri` → read combined CSV from S3 → return parsed JSON rows for the frontend review UI. Each row should include: input address fields, matched UPRN, matched OS address, match confidence/score. - -Pagination: optional query params `?offset&limit` (default limit 500). - -If `combined_output_s3_uri` not yet populated → 409 "Combiner not finished". - - -## Acceptance Criteria - -- [ ] #1 Returns JSON {rows: [...], total, offset, limit} -- [ ] #2 409 when combined_output_s3_uri null -- [ ] #3 Reads CSV from S3 (retrofit_sap_data bucket) with IAM already granted to backend -- [ ] #4 Row shape matches what confirm-matches frontend expects - diff --git a/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md b/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md deleted file mode 100644 index 68e164ee..00000000 --- a/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md +++ /dev/null @@ -1,37 +0,0 @@ ---- -id: TASK-4 -title: 'Add POST /bulk-uploads/{task_id}/confirm-matches route' -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:31' -labels: - - backend - - bulk-upload - - api -dependencies: - - TASK-3 -priority: high -ordinal: 3000 ---- - -## Description - - -`POST /bulk-uploads/{task_id}/confirm-matches` - -Body: `{accepted_rows: [{uprn, address_line_1, address_line_2, postcode, internal_reference}]}` — the rows the user accepted from the review table. - -Behaviour: for each accepted row, upsert into the portfolio's `addresses` / `property` table (confirm exact model during impl — see `backend/addresses/`, `backend/backend/Property.py`). Update `bulk_address_uploads.status` to terminal (e.g. `confirmed`). - -Idempotency: safe to re-call; dedupe by `task_id` + `uprn`. - - -## Acceptance Criteria - -- [ ] #1 Accepted rows persisted as portfolio addresses -- [ ] #2 Duplicate submits do not create duplicate address rows -- [ ] #3 bulk_address_uploads.status updated to terminal -- [ ] #4 Returns summary {inserted, skipped} -- [ ] #5 Transactional — partial failure rolls back - diff --git a/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md b/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md deleted file mode 100644 index cebefde3..00000000 --- a/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md +++ /dev/null @@ -1,34 +0,0 @@ ---- -id: TASK-5 -title: Auto-chain combiner when address2uprn subtasks complete -status: To Do -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20' -labels: - - backend - - bulk-upload - - orchestration -dependencies: - - TASK-2 -priority: medium -ordinal: 5000 ---- - -## Description - - -Today the frontend client polls task status and client-side fires `/combine`. Move that logic to backend: when the last `address2uprn` subtask for a task transitions to complete, backend auto-enqueues the combiner SQS message. - -Likely hook point: `SubTaskInterface.finalize_subtask` — after setting status, check if parent task's subtasks are all terminal and if so, enqueue combiner. Or a separate reconciler run in the subtask-complete code path. - -Removes frontend responsibility for orchestration and avoids "browser closed → combiner never fires" bug. - - -## Acceptance Criteria - -- [ ] #1 Combiner fires automatically when all splitter-spawned subtasks done -- [ ] #2 Only fires once per task (dedupe via task row / existing combined_output_s3_uri check) -- [ ] #3 Failed subtasks do NOT trigger combiner — requires manual retry -- [ ] #4 Frontend combine route (task-7) can be deleted or reduced to manual re-run - diff --git a/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md b/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md deleted file mode 100644 index c669233c..00000000 --- a/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md +++ /dev/null @@ -1,30 +0,0 @@ ---- -id: TASK-6 -title: Verify combiner writes to bulk_address_uploads.combined_output_s3_uri -status: To Do -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20' -labels: - - backend - - bulk-upload - - db -dependencies: [] -priority: high -ordinal: 6000 ---- - -## Description - - -Frontend drizzle schema column: `bulk_address_uploads.combined_output_s3_uri`. Backend combiner (`backend/bulk_address2uprn_combiner/main.py`) calls `set_combined_csv_s3_uri(UUID(task_id), s3_uri)` from `backend.app.db.models.bulk_address_uploads`. - -Confirm that helper actually writes to the **`combined_output_s3_uri`** column (not a legacy `combined_csv_s3_uri`). Name drift suggests risk. Fix if mismatched. - - -## Acceptance Criteria - -- [ ] #1 Confirmed column name matches frontend schema -- [ ] #2 Fix applied if mismatched -- [ ] #3 Integration test covers: run combiner → row updated → frontend schema reads correctly - diff --git a/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md b/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md deleted file mode 100644 index 053c5250..00000000 --- a/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md +++ /dev/null @@ -1,35 +0,0 @@ ---- -id: TASK-7 -title: >- - Add BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME + POSTCODE_SPLITTER_QUEUE_NAME to - backend envs -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:31' -labels: - - infra - - env -dependencies: - - TASK-1 - - TASK-2 -priority: high -ordinal: 4000 ---- - -## Description - - -Since backend now enqueues to both queues (via new trigger-splitter + combine routes), its service config must have the queue names on staging + prod. Values: -- `bulk-address2uprn-combiner-queue-` -- `postcode-splitter-queue-` (if not already present) - -Remove these from frontend `.env` once task-6/task-7 frontend refactor ships. - - -## Acceptance Criteria - -- [ ] #1 Queue names set in backend staging config -- [ ] #2 Queue names set in backend prod config -- [ ] #3 Frontend env vars removed after frontend refactor complete - diff --git a/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md b/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md deleted file mode 100644 index 54477ad0..00000000 --- a/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md +++ /dev/null @@ -1,34 +0,0 @@ ---- -id: TASK-8 -title: 'Grant sqs:SendMessage IAM on splitter + combiner queues to backend runtime' -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:31' -labels: - - infra - - iam - - terraform -dependencies: - - TASK-1 - - TASK-2 -priority: high -ordinal: 5000 ---- - -## Description - - -Backend runtime role needs `sqs:SendMessage` + `sqs:GetQueueUrl` on: -- postcode_splitter queue ARN -- bulk_address2uprn_combiner queue ARN - -Update terraform IAM policy under `infrastructure/terraform/` for backend service. Can revoke equivalent IAM from frontend runtime once refactor ships. - - -## Acceptance Criteria - -- [ ] #1 Terraform updated for staging + prod backend role -- [ ] #2 Verified via `aws sqs get-queue-url` using backend creds -- [ ] #3 Frontend IAM revoked after frontend refactor complete - diff --git a/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md b/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md deleted file mode 100644 index 5d805da6..00000000 --- a/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md +++ /dev/null @@ -1,28 +0,0 @@ ---- -id: TASK-9 -title: Deploy bulk_address2uprn_combiner Lambda + queue via terraform to staging/prod -status: Done -assignee: [] -created_date: '2026-04-20' -updated_date: '2026-04-20 12:31' -labels: - - infra - - terraform -dependencies: [] -priority: high -ordinal: 6000 ---- - -## Description - - -Lambda source at `backend/bulk_address2uprn_combiner/`. Use existing `lambda_with_sqs` terraform module. Lambda envs: `S3_BUCKET_NAME=retrofit_sap_data_bucket_name` + DB creds. Queue name convention: `bulk-address2uprn-combiner-queue-`. Lambda needs read on `ara_raw_outputs/` and write on `bulk_final_outputs/` in retrofit_sap_data bucket. - - -## Acceptance Criteria - -- [ ] #1 Lambda + queue exist in staging -- [ ] #2 Lambda + queue exist in prod -- [ ] #3 Lambda has correct S3 read/write permissions on retrofit_sap_data bucket -- [ ] #4 Lambda has DB write on bulk_address_uploads - From d7b8ca34bf7a85203b778e8fad7e386ec41155e3 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 21 Apr 2026 20:37:34 +0000 Subject: [PATCH 2/6] made everything complete not compelted --- backend/address2UPRN/main.py | 6 +++--- backend/app/db/functions/tasks/Tasks.py | 2 +- backend/postcode_splitter/main.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 79c0de69..28ad344f 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -563,14 +563,14 @@ def handler(event, context, local=False): except Exception as s3_error: logger.error(f"Failed to save results to S3: {s3_error}") - # Mark subtask as completed + # Mark subtask as complete try: subtask_interface.update_subtask_status( subtask_id, - "completed", + "complete", outputs={"rows_processed": "todo -> show sensible output"}, ) - logger.info(f"Marked subtask {subtask_id} as completed") + logger.info(f"Marked subtask {subtask_id} as complete") except Exception as db_error: logger.error(f"Failed to mark subtask as completed: {db_error}") diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 6c0fe89e..96980e78 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -195,7 +195,7 @@ class SubTaskInterface: task.status = "failed" task.job_completed = now - elif all(s in ("complete", "completed") for s in statuses): + elif all(s == "complete" for s in statuses): task.status = "complete" task.job_completed = now diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 4f63ed4b..3d4b9aa0 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -263,10 +263,10 @@ def handler(event, context, local=False): bucket_name=bucket_name, ) - # Mark subtask as completed + # Mark subtask as complete subtask_interface.update_subtask_status( subtask_id, - "completed", + "complete", outputs={"rows_processed": "completed"}, ) From 296bbcc6267aa02eb7c3e54f405d0264b6b31581 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 22 Apr 2026 12:39:44 +0000 Subject: [PATCH 3/6] added logic to add to serverless --- asset_list/app.py | 6 +++--- infrastructure/terraform/lambda/fast-api/main.tf | 13 ++++++++++++- serverless.yml | 6 +++++- sfr/principal_pitch/2_export_data.py | 12 ++++++------ 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/asset_list/app.py b/asset_list/app.py index 25c72bda..49ec48a0 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -74,7 +74,7 @@ def app(): """ data_folder = "/workspaces/model/asset_list" - data_filename = "foom (2).xlsx" + data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx" sheet_name = "in" postcode_column = "postcode_clean" address1_column = "address2uprn_address" @@ -84,8 +84,8 @@ def app(): missing_postcodes_method = None landlord_year_built = None landlord_os_uprn = "address2uprn_uprn" - landlord_property_type = None # Good to include if landlord gave - landlord_built_form = None # Good to include if landlord gave + landlord_property_type = "Property Type" # Good to include if landlord gave + landlord_built_form = "Built Form" # Good to include if landlord gave landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None diff --git a/infrastructure/terraform/lambda/fast-api/main.tf b/infrastructure/terraform/lambda/fast-api/main.tf index 8dcbb8a3..3a2b5a5f 100644 --- a/infrastructure/terraform/lambda/fast-api/main.tf +++ b/infrastructure/terraform/lambda/fast-api/main.tf @@ -37,6 +37,15 @@ data "terraform_remote_state" "postcode_splitter" { } } +data "terraform_remote_state" "bulk_address2uprn_combiner" { + backend = "s3" + config = { + bucket = "bulk-address2uprn-combiner-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + ############################################ # Load Credentials ############################################ @@ -95,6 +104,7 @@ module "fastapi" { ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url + COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url } } @@ -115,7 +125,8 @@ module "fastapi_sqs_policy" { resources = [ data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, - data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn + data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn, + data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn ] conditions = null diff --git a/serverless.yml b/serverless.yml index cf369a36..43b8b4e5 100644 --- a/serverless.yml +++ b/serverless.yml @@ -33,8 +33,10 @@ provider: HEAT_BASELINE_PREDICTIONS_BUCKET: ${env:HEAT_BASELINE_PREDICTIONS_BUCKET} ENGINE_SQS_URL: Ref: EngineQueue - # hardcode the categorisation queue for now as it's created in terraform + # hardcoded queues (created in terraform) CATEGORISATION_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/categorisation-queue-dev" + POSTCODE_SPLITTER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/postcode-splitter-queue-dev" + COMBINER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/bulk-address2uprn-combiner-queue-dev" plugins: - serverless-python-requirements @@ -112,6 +114,8 @@ resources: Resource: - Fn::GetAtt: [ EngineQueue, Arn ] - "arn:aws:sqs:eu-west-2:337213553626:categorisation-queue-dev" + - "arn:aws:sqs:eu-west-2:337213553626:postcode-splitter-queue-dev" + - "arn:aws:sqs:eu-west-2:337213553626:bulk-address2uprn-combiner-queue-dev" - Effect: Allow Action: - s3:GetObject diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 06727f86..b275086d 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -PORTFOLIO_ID = 632 -SCENARIOS = [1144] +PORTFOLIO_ID = 711 +SCENARIOS = [1233] scenario_names = { - 1144: "EPC C", + 1233: "Reach EPC C", } -project_name = "Calico Project" +project_name = "Novus" def get_data(portfolio_id, scenario_ids): @@ -230,7 +230,7 @@ for scenario_id in SCENARIOS: # Get recs for this scenario recommended_measures_df = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "measure_type", "estimated_cost", "default"]] + ][["property_id", "measure_type", "estimated_cost", "default"]] recommended_measures_df = recommended_measures_df[ recommended_measures_df["default"] ] @@ -238,7 +238,7 @@ for scenario_id in SCENARIOS: post_install_sap = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "default", "sap_points"]] + ][["property_id", "default", "sap_points"]] post_install_sap = post_install_sap[post_install_sap["default"]] # Sum up the sap points by property id post_install_sap = ( From 8c92eee4486a9ed94bd11dd2e8a4a3464ba699a8 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 22 Apr 2026 12:45:44 +0000 Subject: [PATCH 4/6] fix deployment order --- .github/workflows/deploy_terraform.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 2077f686..398232c6 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -413,7 +413,7 @@ jobs: # Deploy FastAPI Lambda # ============================================================ fast_api_lambda: - needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda] + needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: ara_fast_api From f220c792e415eb0c27521aff51b9c7feedf398e2 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 22 Apr 2026 14:17:06 +0000 Subject: [PATCH 5/6] added test modification to us postgres --- backend/tests/test_bulk_combiner_status.py | 40 +++++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/backend/tests/test_bulk_combiner_status.py b/backend/tests/test_bulk_combiner_status.py index 6dd449a1..c760f734 100644 --- a/backend/tests/test_bulk_combiner_status.py +++ b/backend/tests/test_bulk_combiner_status.py @@ -13,20 +13,34 @@ from backend.app.db.models.bulk_address_uploads import ( ) -@pytest.fixture -def sqlite_session(monkeypatch): - engine = create_engine("sqlite:///:memory:") +@pytest.fixture(scope="function") +def pg_engine(postgresql): + connection_string = ( + f"postgresql+psycopg://" + f"{postgresql.info.user}:" + f"{postgresql.info.password}@" + f"{postgresql.info.host}:" + f"{postgresql.info.port}/" + f"{postgresql.info.dbname}" + ) + engine = create_engine(connection_string) SQLModel.metadata.create_all(engine) + yield engine + SQLModel.metadata.drop_all(engine) + engine.dispose() + +@pytest.fixture +def patched_session(pg_engine, monkeypatch): sessions = [] def factory(): - s = Session(engine) + s = Session(pg_engine) sessions.append(s) return s monkeypatch.setattr(module, "get_db_session", factory) - yield engine + yield pg_engine for s in sessions: s.close() @@ -57,33 +71,33 @@ def _fetch(engine, task_id): ).first() -def test_set_combining_status_updates_row(sqlite_session): +def test_set_combining_status_updates_row(patched_session): task_id = uuid4() - _insert_row(sqlite_session, task_id, status="processing") + _insert_row(patched_session, task_id, status="processing") set_combining_status(task_id) - row = _fetch(sqlite_session, task_id) + row = _fetch(patched_session, task_id) assert row.status == "combining" assert row.combined_output_s3_uri is None -def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(sqlite_session): +def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(patched_session): task_id = uuid4() - _insert_row(sqlite_session, task_id, status="combining") + _insert_row(patched_session, task_id, status="combining") set_combined_output_s3_uri(task_id, "s3://bucket/bulk_final_outputs/abc/combined.csv") - row = _fetch(sqlite_session, task_id) + row = _fetch(patched_session, task_id) assert row.status == "awaiting_review" assert row.combined_output_s3_uri == "s3://bucket/bulk_final_outputs/abc/combined.csv" -def test_set_combining_status_missing_row_raises(sqlite_session): +def test_set_combining_status_missing_row_raises(patched_session): with pytest.raises(ValueError, match="No bulk_address_uploads row"): set_combining_status(uuid4()) -def test_set_combined_output_s3_uri_missing_row_raises(sqlite_session): +def test_set_combined_output_s3_uri_missing_row_raises(patched_session): with pytest.raises(ValueError, match="No bulk_address_uploads row"): set_combined_output_s3_uri(uuid4(), "s3://x/y.csv") From a6849b28b31724013706438bfee66b45006ccc87 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 22 Apr 2026 14:21:39 +0000 Subject: [PATCH 6/6] added backend funciton to functions --- .../bulk_address_uploads_functions.py | 36 +++++++++++++++++++ backend/app/db/models/bulk_address_uploads.py | 35 ++---------------- backend/bulk_address2uprn_combiner/main.py | 2 +- backend/tests/test_bulk_combiner_status.py | 6 ++-- 4 files changed, 42 insertions(+), 37 deletions(-) create mode 100644 backend/app/db/functions/bulk_address_uploads_functions.py diff --git a/backend/app/db/functions/bulk_address_uploads_functions.py b/backend/app/db/functions/bulk_address_uploads_functions.py new file mode 100644 index 00000000..6252fe20 --- /dev/null +++ b/backend/app/db/functions/bulk_address_uploads_functions.py @@ -0,0 +1,36 @@ +from uuid import UUID +from datetime import datetime, timezone + +from sqlmodel import select + +from backend.app.db.connection import get_db_session +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload + + +def set_combining_status(task_id: UUID) -> None: + now = datetime.now(timezone.utc) + with get_db_session() as session: + row = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not row: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.status = "combining" + row.updated_at = now + session.add(row) + session.commit() + + +def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: + now = datetime.now(timezone.utc) + with get_db_session() as session: + row = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not row: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.combined_output_s3_uri = s3_uri + row.status = "awaiting_review" + row.updated_at = now + session.add(row) + session.commit() diff --git a/backend/app/db/models/bulk_address_uploads.py b/backend/app/db/models/bulk_address_uploads.py index a136e77b..0d87ff47 100644 --- a/backend/app/db/models/bulk_address_uploads.py +++ b/backend/app/db/models/bulk_address_uploads.py @@ -1,12 +1,10 @@ from typing import Optional from uuid import UUID, uuid4 -from datetime import datetime, timezone +from datetime import datetime from sqlalchemy import Column from sqlalchemy.dialects.postgresql import JSONB -from sqlmodel import SQLModel, Field, select - -from backend.app.db.connection import get_db_session +from sqlmodel import SQLModel, Field class BulkAddressUpload(SQLModel, table=True): @@ -24,32 +22,3 @@ class BulkAddressUpload(SQLModel, table=True): combined_output_s3_uri: Optional[str] = Field(default=None) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) - - -def set_combining_status(task_id: UUID) -> None: - now = datetime.now(timezone.utc) - with get_db_session() as session: - row = session.exec( - select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) - ).first() - if not row: - raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") - row.status = "combining" - row.updated_at = now - session.add(row) - session.commit() - - -def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: - now = datetime.now(timezone.utc) - with get_db_session() as session: - row = session.exec( - select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) - ).first() - if not row: - raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") - row.combined_output_s3_uri = s3_uri - row.status = "awaiting_review" - row.updated_at = now - session.add(row) - session.commit() diff --git a/backend/bulk_address2uprn_combiner/main.py b/backend/bulk_address2uprn_combiner/main.py index 85e0c5cb..44f0b3f9 100644 --- a/backend/bulk_address2uprn_combiner/main.py +++ b/backend/bulk_address2uprn_combiner/main.py @@ -8,7 +8,7 @@ from datetime import datetime, timezone from utils.logger import setup_logger from backend.utils.subtasks import subtask_handler -from backend.app.db.models.bulk_address_uploads import ( +from backend.app.db.functions.bulk_address_uploads_functions import ( set_combined_output_s3_uri, set_combining_status, ) diff --git a/backend/tests/test_bulk_combiner_status.py b/backend/tests/test_bulk_combiner_status.py index c760f734..5ac75037 100644 --- a/backend/tests/test_bulk_combiner_status.py +++ b/backend/tests/test_bulk_combiner_status.py @@ -5,12 +5,12 @@ import pytest from sqlalchemy import create_engine from sqlmodel import Session, SQLModel -from backend.app.db.models import bulk_address_uploads as module -from backend.app.db.models.bulk_address_uploads import ( - BulkAddressUpload, +from backend.app.db.functions import bulk_address_uploads_functions as module +from backend.app.db.functions.bulk_address_uploads_functions import ( set_combined_output_s3_uri, set_combining_status, ) +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload @pytest.fixture(scope="function")