From 7caa7c476a7341fb768241865c2480262eec3158 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 13:06:31 +0000 Subject: [PATCH 01/10] added bulk address uprn route --- .devcontainer/backend/Dockerfile | 6 + .devcontainer/backend/devcontainer.json | 9 +- AGENTS.md | 29 ++++ CLAUDE.md | 29 ++++ backend/app/bulk_uploads/__init__.py | 0 backend/app/bulk_uploads/router.py | 127 ++++++++++++++++++ backend/app/bulk_uploads/schema.py | 20 +++ backend/app/config.py | 1 + backend/app/db/models/bulk_address_uploads.py | 15 ++- backend/app/main.py | 4 + backend/tests/test_bulk_uploads.py | 77 +++++++++++ backlog/config.yml | 17 +++ ...-uploads-trigger-splitter-FastAPI-route.md | 38 ++++++ ...with-real-schema-prevent-rogue-migrations.md | 50 +++++++ ...k-uploads-task_id-combine-FastAPI-route.md | 36 +++++ ...-uploads-task_id-combined-results-route.md | 35 +++++ ...k-uploads-task_id-confirm-matches-route.md | 37 +++++ ...ombiner-when-splitter-subtasks-complete.md | 34 +++++ ..._address_uploads-combined_output_s3_uri.md | 30 +++++ ...ODE_SPLITTER_QUEUE_NAME-to-backend-envs.md | 35 +++++ ...tter-combiner-queues-to-backend-runtime.md | 34 +++++ ...bda-queue-via-terraform-to-staging-prod.md | 28 ++++ .../terraform/lambda/fast-api/main.tf | 17 ++- .../lambda/postcodeSplitter/outputs.tf | 9 ++ run_backlog.sh | 2 + 25 files changed, 711 insertions(+), 8 deletions(-) create mode 100644 AGENTS.md create mode 100644 CLAUDE.md create mode 100644 backend/app/bulk_uploads/__init__.py create mode 100644 backend/app/bulk_uploads/router.py create mode 100644 backend/app/bulk_uploads/schema.py create mode 100644 backend/tests/test_bulk_uploads.py create mode 100644 backlog/config.yml create mode 100644 backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md create mode 100644 backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md create mode 100644 backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md create mode 100644 backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md create mode 100644 backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md create mode 100644 backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md create mode 100644 backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md create mode 100644 backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md create mode 100644 backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md create mode 100644 backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md create mode 100644 infrastructure/terraform/lambda/postcodeSplitter/outputs.tf create mode 100644 run_backlog.sh diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 59aa0cb6..3ddb8d37 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -66,6 +66,12 @@ RUN wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key RUN apt update RUN apt install -y postgresql-14 +# Install Node.js + backlog.md +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ + && apt-get install -y nodejs \ + && npm install -g backlog.md \ + && rm -rf /var/lib/apt/lists/* + # Install Claude USER ${USER} RUN curl -fsSL https://claude.ai/install.sh | bash \ diff --git a/.devcontainer/backend/devcontainer.json b/.devcontainer/backend/devcontainer.json index 48a58bd6..b104e6e1 100644 --- a/.devcontainer/backend/devcontainer.json +++ b/.devcontainer/backend/devcontainer.json @@ -6,7 +6,7 @@ "workspaceFolder": "/workspaces/model", "postStartCommand": "bash .devcontainer/backend/post-install.sh", "mounts": [ - // "source=${localEnv:HOME},target=/home/vscode,type=bind", + "source=${localEnv:HOME},target=/home/vscode,type=bind", "source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached" ], "customizations": { @@ -43,6 +43,13 @@ }, "containerEnv": { "PYTHONFLAGS": "-Xfrozen_modules=off" + }, + "forwardPorts": [6421], + "portsAttributes": { + "6421": { + "label": "Backlog.md", + "onAutoForward": "notify" + } } } \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..aa0426a0 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,29 @@ + + + + + +## BACKLOG WORKFLOW INSTRUCTIONS + +This project uses Backlog.md MCP for all task and project management activities. + +**CRITICAL GUIDANCE** + +- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. +- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. + +- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow +- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") +- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work + +These guides cover: +- Decision framework for when to create tasks +- Search-first workflow to avoid duplicates +- Links to detailed guides for task creation, execution, and finalization +- MCP tools reference + +You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. + + + + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..aa0426a0 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,29 @@ + + + + + +## BACKLOG WORKFLOW INSTRUCTIONS + +This project uses Backlog.md MCP for all task and project management activities. + +**CRITICAL GUIDANCE** + +- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. +- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. + +- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow +- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") +- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work + +These guides cover: +- Decision framework for when to create tasks +- Search-first workflow to avoid duplicates +- Links to detailed guides for task creation, execution, and finalization +- MCP tools reference + +You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. + + + + diff --git a/backend/app/bulk_uploads/__init__.py b/backend/app/bulk_uploads/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py new file mode 100644 index 00000000..10fbc4ce --- /dev/null +++ b/backend/app/bulk_uploads/router.py @@ -0,0 +1,127 @@ +import boto3 +import json +from uuid import UUID +from datetime import datetime, timezone +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlmodel import select +from sqlalchemy.dialects.postgresql import insert + +from backend.app.dependencies import validate_token +from backend.app.config import get_settings +from backend.app.db.connection import get_db_session +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload +from backend.app.db.models.portfolio import PropertyModel, PropertyCreationStatus, PortfolioStatus +from backend.app.bulk_uploads.schema import TriggerSplitterRequest, ConfirmMatchesRequest +from utils.s3 import parse_s3_uri, read_csv_from_s3 + + +router = APIRouter( + prefix="/bulk-uploads", + tags=["bulk-uploads"], + dependencies=[Depends(validate_token)], +) + + +@router.post("/trigger-splitter", status_code=202) +async def trigger_splitter(req: TriggerSplitterRequest): + settings = get_settings() + + sqs_payload = { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "s3_uri": req.s3_uri, + } + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL, + MessageBody=json.dumps(sqs_payload), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + +@router.get("/{task_id}/combined-results") +async def get_combined_results( + task_id: UUID, + offset: int = Query(default=0, ge=0), + limit: int = Query(default=500, ge=1, le=5000), +): + with get_db_session() as session: + upload = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + + if not upload: + raise HTTPException(status_code=404, detail="Upload not found") + + if not upload.combined_output_s3_uri: + raise HTTPException(status_code=409, detail="Combiner not finished") + + bucket, key = parse_s3_uri(upload.combined_output_s3_uri) + rows = read_csv_from_s3(bucket, key) + + total = len(rows) + return { + "rows": rows[offset : offset + limit], + "total": total, + "offset": offset, + "limit": limit, + } + + +@router.post("/{task_id}/confirm-matches") +async def confirm_matches(task_id: UUID, req: ConfirmMatchesRequest): + with get_db_session() as session: + upload = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not upload: + raise HTTPException(status_code=404, detail="Upload not found") + + rows = [] + for row in req.accepted_rows: + address = row.address_line_1 + if row.address_line_2: + address = f"{row.address_line_1}, {row.address_line_2}" + rows.append( + { + "uprn": row.uprn, + "address": address, + "postcode": row.postcode, + "portfolio_id": int(upload.portfolio_id), + "landlord_property_id": row.internal_reference, + "creation_status": PropertyCreationStatus.LOADING, + "status": PortfolioStatus.ASSESSMENT.value, + "has_pre_condition_report": False, + "has_recommendations": False, + } + ) + + stmt = ( + insert(PropertyModel) + .values(rows) + .on_conflict_do_nothing( + index_elements=["portfolio_id", "uprn"], + index_where=PropertyModel.uprn.isnot(None), + ) + .returning(PropertyModel.id) + ) + result = session.execute(stmt) + session.flush() + inserted = len(result.fetchall()) + skipped = len(rows) - inserted + + upload.status = "confirmed" + upload.updated_at = datetime.now(timezone.utc) + session.add(upload) + session.commit() + + return {"inserted": inserted, "skipped": skipped} diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py new file mode 100644 index 00000000..070bdd2d --- /dev/null +++ b/backend/app/bulk_uploads/schema.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel +from typing import Optional + + +class TriggerSplitterRequest(BaseModel): + task_id: str + sub_task_id: str + s3_uri: str + + +class AcceptedRow(BaseModel): + uprn: int + address_line_1: str + address_line_2: Optional[str] = None + postcode: str + internal_reference: Optional[str] = None + + +class ConfirmMatchesRequest(BaseModel): + accepted_rows: list[AcceptedRow] diff --git a/backend/app/config.py b/backend/app/config.py index 9532ddd6..2603ac72 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -39,6 +39,7 @@ class Settings(BaseSettings): ENGINE_SQS_URL: str = "changeme" CATEGORISATION_SQS_URL: str = "changeme" PASHUB_TO_ARA_SQS_URL: str = "changeme" + POSTCODE_SPLITTER_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/app/db/models/bulk_address_uploads.py b/backend/app/db/models/bulk_address_uploads.py index 335a4c45..e7fae633 100644 --- a/backend/app/db/models/bulk_address_uploads.py +++ b/backend/app/db/models/bulk_address_uploads.py @@ -11,12 +11,19 @@ class BulkAddressUpload(SQLModel, table=True): __tablename__ = "bulk_address_uploads" id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) - task_id: UUID = Field(foreign_key="tasks.id", index=True) - combined_csv_s3_uri: Optional[str] = Field(default=None) + portfolio_id: str = Field(nullable=False) + user_id: str = Field(nullable=False) + s3_bucket: str = Field(nullable=False) + s3_key: str = Field(nullable=False) + filename: str = Field(nullable=False) + status: str = Field(default="ready_for_processing", nullable=False) + task_id: Optional[UUID] = Field(default=None) + combined_output_s3_uri: Optional[str] = Field(default=None) + created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) -def set_combined_csv_s3_uri(task_id: UUID, s3_uri: str) -> None: +def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: now = datetime.now(timezone.utc) with get_db_session() as session: row = session.exec( @@ -24,7 +31,7 @@ def set_combined_csv_s3_uri(task_id: UUID, s3_uri: str) -> None: ).first() if not row: raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") - row.combined_csv_s3_uri = s3_uri + row.combined_output_s3_uri = s3_uri row.updated_at = now session.add(row) session.commit() diff --git a/backend/app/main.py b/backend/app/main.py index f0ab4d86..c9733c18 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,6 +9,7 @@ from backend.app.portfolio import router as portfolio_router from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router from backend.app.tasks import router as tasks_router +from backend.app.bulk_uploads import router as bulk_uploads_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -59,6 +60,7 @@ app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") app.include_router(whlg_router.router, prefix="/v1") app.include_router(tasks_router.router, prefix="/v1") +app.include_router(bulk_uploads_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router @@ -75,6 +77,7 @@ from mangum import Mangum from backend.app.portfolio import router as portfolio_router from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router +from backend.app.bulk_uploads import router as bulk_uploads_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -124,6 +127,7 @@ async def log_requests(request: Request, call_next): app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") app.include_router(whlg_router.router, prefix="/v1") +app.include_router(bulk_uploads_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router diff --git a/backend/tests/test_bulk_uploads.py b/backend/tests/test_bulk_uploads.py new file mode 100644 index 00000000..18bd8ed5 --- /dev/null +++ b/backend/tests/test_bulk_uploads.py @@ -0,0 +1,77 @@ +from unittest.mock import MagicMock, patch +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def client(): + with patch("backend.app.config.get_settings") as mock_settings: + mock_settings.return_value = MagicMock( + ENVIRONMENT="local", + AWS_DEFAULT_REGION="eu-west-1", + POSTCODE_SPLITTER_SQS_URL="https://sqs.eu-west-1.amazonaws.com/123456789/postcode-splitter", + ) + from backend.app.main import app + yield TestClient(app) + + +@patch("backend.app.bulk_uploads.router.boto3") +@patch("backend.app.bulk_uploads.router.TasksInterface") +def test_trigger_splitter_creates_task_and_enqueues(mock_tasks_cls, mock_boto3, client): + mock_tasks = MagicMock() + mock_tasks.create_task.return_value = ("task-123", "subtask-456") + mock_tasks_cls.return_value = mock_tasks + + mock_sqs = MagicMock() + mock_sqs.send_message.return_value = {"MessageId": "msg-789"} + mock_boto3.client.return_value = mock_sqs + + response = client.post( + "/v1/bulk-uploads/trigger-splitter", + json={ + "upload_id": "upload-abc", + "s3_uri": "s3://bucket/file.csv", + "portfolio_id": "portfolio-xyz", + }, + headers={"Authorization": "Bearer test-token"}, + ) + + assert response.status_code == 202 + body = response.json() + assert body["task_id"] == "task-123" + assert body["sub_task_id"] == "subtask-456" + assert body["sqs_message_id"] == "msg-789" + + mock_sqs.send_message.assert_called_once() + call_kwargs = mock_sqs.send_message.call_args[1] + import json + payload = json.loads(call_kwargs["MessageBody"]) + assert payload["task_id"] == "task-123" + assert payload["sub_task_id"] == "subtask-456" + assert payload["s3_uri"] == "s3://bucket/file.csv" + + +@patch("backend.app.bulk_uploads.router.boto3") +@patch("backend.app.bulk_uploads.router.TasksInterface") +def test_trigger_splitter_uses_provided_task_ids(mock_tasks_cls, mock_boto3, client): + mock_sqs = MagicMock() + mock_sqs.send_message.return_value = {"MessageId": "msg-999"} + mock_boto3.client.return_value = mock_sqs + + response = client.post( + "/v1/bulk-uploads/trigger-splitter", + json={ + "upload_id": "upload-abc", + "s3_uri": "s3://bucket/file.csv", + "portfolio_id": "portfolio-xyz", + "task_id": "existing-task", + "sub_task_id": "existing-subtask", + }, + headers={"Authorization": "Bearer test-token"}, + ) + + assert response.status_code == 202 + mock_tasks_cls.assert_not_called() + body = response.json() + assert body["task_id"] == "existing-task" + assert body["sub_task_id"] == "existing-subtask" diff --git a/backlog/config.yml b/backlog/config.yml new file mode 100644 index 00000000..edf9b80b --- /dev/null +++ b/backlog/config.yml @@ -0,0 +1,17 @@ +project_name: "model-backend" +default_status: "To Do" +statuses: ["To Do", "In Progress", "Done"] +labels: [] +definition_of_done: [] +date_format: yyyy-mm-dd +max_column_width: 20 +default_editor: "vim" +auto_open_browser: true +default_port: 6420 +remote_operations: false +auto_commit: false +zero_padded_ids: 3 +bypass_git_hooks: false +check_active_branches: false +active_branch_days: 30 +task_prefix: "task" diff --git a/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md b/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md new file mode 100644 index 00000000..b8a94ab0 --- /dev/null +++ b/backlog/tasks/task-1 - Add-POST-bulk-uploads-trigger-splitter-FastAPI-route.md @@ -0,0 +1,38 @@ +--- +id: TASK-1 +title: Add POST /bulk-uploads/trigger-splitter FastAPI route +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:31' +labels: + - backend + - bulk-upload + - api +dependencies: [] +priority: high +ordinal: 2000 +--- + +## Description + + +Expose an HTTP route that the frontend can call instead of sending SQS directly. Route: + +`POST /bulk-uploads/trigger-splitter` + +Body: `{task_id, sub_task_id, s3_uri}` — task+subtask already created by frontend `/api/tasks` call before this is invoked. + +Behaviour: validate inputs, then publish an SQS message to the postcode_splitter queue (see `backend/postcode_splitter/main.py` for expected message shape: `{task_id, sub_task_id, s3_uri}`). Use existing SubTaskInterface / TasksInterface patterns from `backend/app/tasks/router.py`. + +Place under `backend/app/` next to `tasks/router.py` — likely `backend/app/bulk_uploads/router.py`. + + +## Acceptance Criteria + +- [ ] #1 Route returns 202 with {task_id, sub_task_id} +- [ ] #2 SQS message enqueued with correct shape for postcode_splitter Lambda +- [ ] #3 Auth via existing `validate_token` dependency +- [ ] #4 Queue URL from config, not hardcoded +- [ ] #5 Unit test with mocked boto3 sqs client + diff --git a/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md b/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md new file mode 100644 index 00000000..02b0fa2c --- /dev/null +++ b/backlog/tasks/task-10 - Fix-bulk_address_uploads-SQLModel-—-align-columns-with-real-schema-prevent-rogue-migrations.md @@ -0,0 +1,50 @@ +--- +id: TASK-10 +title: >- + Fix bulk_address_uploads SQLModel — align columns with real schema, prevent + rogue migrations +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:34' +labels: + - backend + - bulk-upload + - db +dependencies: [] +priority: high +ordinal: 7000 +--- + +## Description + + +`backend/app/db/models/bulk_address_uploads.py` has several bugs that cause a rogue `ALTER TABLE` and silent write failures: + +**1. Wrong column name** +Model declares `combined_csv_s3_uri` — real column (drizzle-managed) is `combined_output_s3_uri`. `set_combined_csv_s3_uri()` currently writes to a non-existent column. + +**2. Partial model declared as `table=True`** +Model only includes `id, task_id, combined_csv_s3_uri, status, updated_at`. Missing: `portfolio_id, user_id, s3_bucket, s3_key, filename, source_headers, column_mapping, created_at`. SQLModel `table=True` with incomplete columns causes Alembic autogenerate / `create_all` to try to ALTER or recreate the table. + +**3. `status` default mismatch** +Backend: `default="pending"`. Real table default: `'ready_for_processing'`. Triggers ALTER TABLE on migration runs. + +**4. `task_id` nullability mismatch** +Backend: `task_id: UUID` (NOT NULL). Frontend drizzle schema: nullable (set later, after onboarding starts). + +**Fix approach:** +- Declare all real columns matching drizzle schema (see `src/app/db/schema/bulk_address_uploads.ts` in assessment-model repo as source of truth). +- Rename `combined_csv_s3_uri` → `combined_output_s3_uri` throughout. +- `task_id: Optional[UUID]`, `status` default `'ready_for_processing'`. +- Ensure Alembic env excludes this table from autogenerate — drizzle owns migrations, not backend. + + +## Acceptance Criteria + +- [ ] #1 `combined_output_s3_uri` column name correct throughout model + helper +- [ ] #2 All columns present and match real table schema +- [ ] #3 No rogue ALTER TABLE when backend starts or migrations run +- [ ] #4 `task_id` nullable, `status` default `'ready_for_processing'` +- [ ] #5 Integration test: combiner runs → `combined_output_s3_uri` populated → frontend reads it correctly + diff --git a/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md b/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md new file mode 100644 index 00000000..dc860e15 --- /dev/null +++ b/backlog/tasks/task-2 - Add-POST-bulk-uploads-task_id-combine-FastAPI-route.md @@ -0,0 +1,36 @@ +--- +id: TASK-2 +title: 'Add POST /bulk-uploads/{task_id}/combine FastAPI route' +status: To Do +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 11:53' +labels: + - backend + - bulk-upload + - api +dependencies: [] +priority: high +ordinal: 10000 +--- + +## Description + + +Expose HTTP route to trigger `bulk_address2uprn_combiner`: + +`POST /bulk-uploads/{task_id}/combine` + +Creates a new sub_task under task_id, then pushes `{task_id, sub_task_id}` to the combiner SQS queue (see `backend/bulk_address2uprn_combiner/main.py` for consumer shape). + +Idempotency: if `bulk_address_uploads.combined_output_s3_uri` already set for this task, return 200 with `{already_combined: true}` (mirror current frontend behaviour). + + +## Acceptance Criteria + +- [ ] #1 Route returns 202 with {task_id, sub_task_id} on new trigger +- [ ] #2 Returns 200 {already_combined: true} if combined_output_s3_uri already set +- [ ] #3 SQS message enqueued with correct shape +- [ ] #4 Queue URL from config +- [ ] #5 Auth via validate_token + diff --git a/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md b/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md new file mode 100644 index 00000000..477a2ec3 --- /dev/null +++ b/backlog/tasks/task-3 - Add-GET-bulk-uploads-task_id-combined-results-route.md @@ -0,0 +1,35 @@ +--- +id: TASK-3 +title: 'Add GET /bulk-uploads/{task_id}/combined-results route' +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:08' +labels: + - backend + - bulk-upload + - api +dependencies: [] +priority: high +ordinal: 1000 +--- + +## Description + + +`GET /bulk-uploads/{task_id}/combined-results` + +Behaviour: lookup `bulk_address_uploads` row by `task_id` → read `combined_output_s3_uri` → read combined CSV from S3 → return parsed JSON rows for the frontend review UI. Each row should include: input address fields, matched UPRN, matched OS address, match confidence/score. + +Pagination: optional query params `?offset&limit` (default limit 500). + +If `combined_output_s3_uri` not yet populated → 409 "Combiner not finished". + + +## Acceptance Criteria + +- [ ] #1 Returns JSON {rows: [...], total, offset, limit} +- [ ] #2 409 when combined_output_s3_uri null +- [ ] #3 Reads CSV from S3 (retrofit_sap_data bucket) with IAM already granted to backend +- [ ] #4 Row shape matches what confirm-matches frontend expects + diff --git a/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md b/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md new file mode 100644 index 00000000..68e164ee --- /dev/null +++ b/backlog/tasks/task-4 - Add-POST-bulk-uploads-task_id-confirm-matches-route.md @@ -0,0 +1,37 @@ +--- +id: TASK-4 +title: 'Add POST /bulk-uploads/{task_id}/confirm-matches route' +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:31' +labels: + - backend + - bulk-upload + - api +dependencies: + - TASK-3 +priority: high +ordinal: 3000 +--- + +## Description + + +`POST /bulk-uploads/{task_id}/confirm-matches` + +Body: `{accepted_rows: [{uprn, address_line_1, address_line_2, postcode, internal_reference}]}` — the rows the user accepted from the review table. + +Behaviour: for each accepted row, upsert into the portfolio's `addresses` / `property` table (confirm exact model during impl — see `backend/addresses/`, `backend/backend/Property.py`). Update `bulk_address_uploads.status` to terminal (e.g. `confirmed`). + +Idempotency: safe to re-call; dedupe by `task_id` + `uprn`. + + +## Acceptance Criteria + +- [ ] #1 Accepted rows persisted as portfolio addresses +- [ ] #2 Duplicate submits do not create duplicate address rows +- [ ] #3 bulk_address_uploads.status updated to terminal +- [ ] #4 Returns summary {inserted, skipped} +- [ ] #5 Transactional — partial failure rolls back + diff --git a/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md b/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md new file mode 100644 index 00000000..cebefde3 --- /dev/null +++ b/backlog/tasks/task-5 - Auto-chain-combiner-when-splitter-subtasks-complete.md @@ -0,0 +1,34 @@ +--- +id: TASK-5 +title: Auto-chain combiner when address2uprn subtasks complete +status: To Do +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20' +labels: + - backend + - bulk-upload + - orchestration +dependencies: + - TASK-2 +priority: medium +ordinal: 5000 +--- + +## Description + + +Today the frontend client polls task status and client-side fires `/combine`. Move that logic to backend: when the last `address2uprn` subtask for a task transitions to complete, backend auto-enqueues the combiner SQS message. + +Likely hook point: `SubTaskInterface.finalize_subtask` — after setting status, check if parent task's subtasks are all terminal and if so, enqueue combiner. Or a separate reconciler run in the subtask-complete code path. + +Removes frontend responsibility for orchestration and avoids "browser closed → combiner never fires" bug. + + +## Acceptance Criteria + +- [ ] #1 Combiner fires automatically when all splitter-spawned subtasks done +- [ ] #2 Only fires once per task (dedupe via task row / existing combined_output_s3_uri check) +- [ ] #3 Failed subtasks do NOT trigger combiner — requires manual retry +- [ ] #4 Frontend combine route (task-7) can be deleted or reduced to manual re-run + diff --git a/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md b/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md new file mode 100644 index 00000000..c669233c --- /dev/null +++ b/backlog/tasks/task-6 - Verify-combiner-writes-to-bulk_address_uploads-combined_output_s3_uri.md @@ -0,0 +1,30 @@ +--- +id: TASK-6 +title: Verify combiner writes to bulk_address_uploads.combined_output_s3_uri +status: To Do +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20' +labels: + - backend + - bulk-upload + - db +dependencies: [] +priority: high +ordinal: 6000 +--- + +## Description + + +Frontend drizzle schema column: `bulk_address_uploads.combined_output_s3_uri`. Backend combiner (`backend/bulk_address2uprn_combiner/main.py`) calls `set_combined_csv_s3_uri(UUID(task_id), s3_uri)` from `backend.app.db.models.bulk_address_uploads`. + +Confirm that helper actually writes to the **`combined_output_s3_uri`** column (not a legacy `combined_csv_s3_uri`). Name drift suggests risk. Fix if mismatched. + + +## Acceptance Criteria + +- [ ] #1 Confirmed column name matches frontend schema +- [ ] #2 Fix applied if mismatched +- [ ] #3 Integration test covers: run combiner → row updated → frontend schema reads correctly + diff --git a/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md b/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md new file mode 100644 index 00000000..053c5250 --- /dev/null +++ b/backlog/tasks/task-7 - Add-BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME-POSTCODE_SPLITTER_QUEUE_NAME-to-backend-envs.md @@ -0,0 +1,35 @@ +--- +id: TASK-7 +title: >- + Add BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME + POSTCODE_SPLITTER_QUEUE_NAME to + backend envs +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:31' +labels: + - infra + - env +dependencies: + - TASK-1 + - TASK-2 +priority: high +ordinal: 4000 +--- + +## Description + + +Since backend now enqueues to both queues (via new trigger-splitter + combine routes), its service config must have the queue names on staging + prod. Values: +- `bulk-address2uprn-combiner-queue-` +- `postcode-splitter-queue-` (if not already present) + +Remove these from frontend `.env` once task-6/task-7 frontend refactor ships. + + +## Acceptance Criteria + +- [ ] #1 Queue names set in backend staging config +- [ ] #2 Queue names set in backend prod config +- [ ] #3 Frontend env vars removed after frontend refactor complete + diff --git a/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md b/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md new file mode 100644 index 00000000..54477ad0 --- /dev/null +++ b/backlog/tasks/task-8 - Grant-sqs-SendMessage-IAM-on-splitter-combiner-queues-to-backend-runtime.md @@ -0,0 +1,34 @@ +--- +id: TASK-8 +title: 'Grant sqs:SendMessage IAM on splitter + combiner queues to backend runtime' +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:31' +labels: + - infra + - iam + - terraform +dependencies: + - TASK-1 + - TASK-2 +priority: high +ordinal: 5000 +--- + +## Description + + +Backend runtime role needs `sqs:SendMessage` + `sqs:GetQueueUrl` on: +- postcode_splitter queue ARN +- bulk_address2uprn_combiner queue ARN + +Update terraform IAM policy under `infrastructure/terraform/` for backend service. Can revoke equivalent IAM from frontend runtime once refactor ships. + + +## Acceptance Criteria + +- [ ] #1 Terraform updated for staging + prod backend role +- [ ] #2 Verified via `aws sqs get-queue-url` using backend creds +- [ ] #3 Frontend IAM revoked after frontend refactor complete + diff --git a/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md b/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md new file mode 100644 index 00000000..5d805da6 --- /dev/null +++ b/backlog/tasks/task-9 - Deploy-bulk_address2uprn_combiner-Lambda-queue-via-terraform-to-staging-prod.md @@ -0,0 +1,28 @@ +--- +id: TASK-9 +title: Deploy bulk_address2uprn_combiner Lambda + queue via terraform to staging/prod +status: Done +assignee: [] +created_date: '2026-04-20' +updated_date: '2026-04-20 12:31' +labels: + - infra + - terraform +dependencies: [] +priority: high +ordinal: 6000 +--- + +## Description + + +Lambda source at `backend/bulk_address2uprn_combiner/`. Use existing `lambda_with_sqs` terraform module. Lambda envs: `S3_BUCKET_NAME=retrofit_sap_data_bucket_name` + DB creds. Queue name convention: `bulk-address2uprn-combiner-queue-`. Lambda needs read on `ara_raw_outputs/` and write on `bulk_final_outputs/` in retrofit_sap_data bucket. + + +## Acceptance Criteria + +- [ ] #1 Lambda + queue exist in staging +- [ ] #2 Lambda + queue exist in prod +- [ ] #3 Lambda has correct S3 read/write permissions on retrofit_sap_data bucket +- [ ] #4 Lambda has DB write on bulk_address_uploads + diff --git a/infrastructure/terraform/lambda/fast-api/main.tf b/infrastructure/terraform/lambda/fast-api/main.tf index 05447657..f1a9494b 100644 --- a/infrastructure/terraform/lambda/fast-api/main.tf +++ b/infrastructure/terraform/lambda/fast-api/main.tf @@ -28,6 +28,15 @@ data "terraform_remote_state" "categorisation" { } } +data "terraform_remote_state" "postcode_splitter" { + backend = "s3" + config = { + bucket = "postcode-splitter-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } +} + ############################################ # Load Credentials ############################################ @@ -83,8 +92,9 @@ module "fastapi" { CARBON_BASELINE_PREDICTIONS_BUCKET = data.terraform_remote_state.shared.outputs.retrofit_carbon_baseline_predictions_bucket_name HEAT_BASELINE_PREDICTIONS_BUCKET = data.terraform_remote_state.shared.outputs.retrofit_heat_baseline_predictions_bucket_name - ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url - CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url + ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url + CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url + POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url } } @@ -104,7 +114,8 @@ module "fastapi_sqs_policy" { resources = [ data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, - data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn + data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, + data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn ] conditions = null diff --git a/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf b/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf new file mode 100644 index 00000000..847cefa4 --- /dev/null +++ b/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf @@ -0,0 +1,9 @@ +output "postcode_splitter_queue_url" { + value = module.lambda.queue_url + description = "URL of the Postcode Splitter SQS queue" +} + +output "postcode_splitter_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the Postcode Splitter SQS queue" +} diff --git a/run_backlog.sh b/run_backlog.sh new file mode 100644 index 00000000..398e921c --- /dev/null +++ b/run_backlog.sh @@ -0,0 +1,2 @@ +#!/bin/bash +backlog browser --port 6421 From 85cfa153356460622fb5ebe01fdaad50f3ea737b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 13:11:35 +0000 Subject: [PATCH 02/10] got rid of confirm matches --- backend/app/bulk_uploads/router.py | 50 ------------------------------ 1 file changed, 50 deletions(-) diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index 10fbc4ce..b79fe6d3 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -75,53 +75,3 @@ async def get_combined_results( "offset": offset, "limit": limit, } - - -@router.post("/{task_id}/confirm-matches") -async def confirm_matches(task_id: UUID, req: ConfirmMatchesRequest): - with get_db_session() as session: - upload = session.exec( - select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) - ).first() - if not upload: - raise HTTPException(status_code=404, detail="Upload not found") - - rows = [] - for row in req.accepted_rows: - address = row.address_line_1 - if row.address_line_2: - address = f"{row.address_line_1}, {row.address_line_2}" - rows.append( - { - "uprn": row.uprn, - "address": address, - "postcode": row.postcode, - "portfolio_id": int(upload.portfolio_id), - "landlord_property_id": row.internal_reference, - "creation_status": PropertyCreationStatus.LOADING, - "status": PortfolioStatus.ASSESSMENT.value, - "has_pre_condition_report": False, - "has_recommendations": False, - } - ) - - stmt = ( - insert(PropertyModel) - .values(rows) - .on_conflict_do_nothing( - index_elements=["portfolio_id", "uprn"], - index_where=PropertyModel.uprn.isnot(None), - ) - .returning(PropertyModel.id) - ) - result = session.execute(stmt) - session.flush() - inserted = len(result.fetchall()) - skipped = len(rows) - inserted - - upload.status = "confirmed" - upload.updated_at = datetime.now(timezone.utc) - session.add(upload) - session.commit() - - return {"inserted": inserted, "skipped": skipped} From 414bdc6431bbe9681d2c498a77edd0106153ee60 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 13:17:35 +0000 Subject: [PATCH 03/10] keep it minimal first --- backend/app/bulk_uploads/router.py | 5 +- backend/app/bulk_uploads/schema.py | 13 ----- backend/tests/test_bulk_uploads.py | 77 ------------------------------ 3 files changed, 1 insertion(+), 94 deletions(-) delete mode 100644 backend/tests/test_bulk_uploads.py diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index b79fe6d3..470ef36e 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -1,17 +1,14 @@ import boto3 import json from uuid import UUID -from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import select -from sqlalchemy.dialects.postgresql import insert from backend.app.dependencies import validate_token from backend.app.config import get_settings from backend.app.db.connection import get_db_session from backend.app.db.models.bulk_address_uploads import BulkAddressUpload -from backend.app.db.models.portfolio import PropertyModel, PropertyCreationStatus, PortfolioStatus -from backend.app.bulk_uploads.schema import TriggerSplitterRequest, ConfirmMatchesRequest +from backend.app.bulk_uploads.schema import TriggerSplitterRequest from utils.s3 import parse_s3_uri, read_csv_from_s3 diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index 070bdd2d..1efd02a3 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -1,20 +1,7 @@ from pydantic import BaseModel -from typing import Optional class TriggerSplitterRequest(BaseModel): task_id: str sub_task_id: str s3_uri: str - - -class AcceptedRow(BaseModel): - uprn: int - address_line_1: str - address_line_2: Optional[str] = None - postcode: str - internal_reference: Optional[str] = None - - -class ConfirmMatchesRequest(BaseModel): - accepted_rows: list[AcceptedRow] diff --git a/backend/tests/test_bulk_uploads.py b/backend/tests/test_bulk_uploads.py deleted file mode 100644 index 18bd8ed5..00000000 --- a/backend/tests/test_bulk_uploads.py +++ /dev/null @@ -1,77 +0,0 @@ -from unittest.mock import MagicMock, patch -import pytest -from fastapi.testclient import TestClient - - -@pytest.fixture -def client(): - with patch("backend.app.config.get_settings") as mock_settings: - mock_settings.return_value = MagicMock( - ENVIRONMENT="local", - AWS_DEFAULT_REGION="eu-west-1", - POSTCODE_SPLITTER_SQS_URL="https://sqs.eu-west-1.amazonaws.com/123456789/postcode-splitter", - ) - from backend.app.main import app - yield TestClient(app) - - -@patch("backend.app.bulk_uploads.router.boto3") -@patch("backend.app.bulk_uploads.router.TasksInterface") -def test_trigger_splitter_creates_task_and_enqueues(mock_tasks_cls, mock_boto3, client): - mock_tasks = MagicMock() - mock_tasks.create_task.return_value = ("task-123", "subtask-456") - mock_tasks_cls.return_value = mock_tasks - - mock_sqs = MagicMock() - mock_sqs.send_message.return_value = {"MessageId": "msg-789"} - mock_boto3.client.return_value = mock_sqs - - response = client.post( - "/v1/bulk-uploads/trigger-splitter", - json={ - "upload_id": "upload-abc", - "s3_uri": "s3://bucket/file.csv", - "portfolio_id": "portfolio-xyz", - }, - headers={"Authorization": "Bearer test-token"}, - ) - - assert response.status_code == 202 - body = response.json() - assert body["task_id"] == "task-123" - assert body["sub_task_id"] == "subtask-456" - assert body["sqs_message_id"] == "msg-789" - - mock_sqs.send_message.assert_called_once() - call_kwargs = mock_sqs.send_message.call_args[1] - import json - payload = json.loads(call_kwargs["MessageBody"]) - assert payload["task_id"] == "task-123" - assert payload["sub_task_id"] == "subtask-456" - assert payload["s3_uri"] == "s3://bucket/file.csv" - - -@patch("backend.app.bulk_uploads.router.boto3") -@patch("backend.app.bulk_uploads.router.TasksInterface") -def test_trigger_splitter_uses_provided_task_ids(mock_tasks_cls, mock_boto3, client): - mock_sqs = MagicMock() - mock_sqs.send_message.return_value = {"MessageId": "msg-999"} - mock_boto3.client.return_value = mock_sqs - - response = client.post( - "/v1/bulk-uploads/trigger-splitter", - json={ - "upload_id": "upload-abc", - "s3_uri": "s3://bucket/file.csv", - "portfolio_id": "portfolio-xyz", - "task_id": "existing-task", - "sub_task_id": "existing-subtask", - }, - headers={"Authorization": "Bearer test-token"}, - ) - - assert response.status_code == 202 - mock_tasks_cls.assert_not_called() - body = response.json() - assert body["task_id"] == "existing-task" - assert body["sub_task_id"] == "existing-subtask" From 42ea70e60fcf57053d492f8d63fbfae2f761328d Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 15:23:38 +0000 Subject: [PATCH 04/10] added additional deal properties --- backend/app/db/models/hubspot_deal_data.py | 4 ++++ etl/hubspot/hubspotClient.py | 8 ++++++-- etl/hubspot/hubspotDataTodB.py | 20 ++++++++++++++++---- etl/hubspot/hubspot_deal_differ.py | 8 ++++++-- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/backend/app/db/models/hubspot_deal_data.py b/backend/app/db/models/hubspot_deal_data.py index 758f688d..27649042 100644 --- a/backend/app/db/models/hubspot_deal_data.py +++ b/backend/app/db/models/hubspot_deal_data.py @@ -38,6 +38,10 @@ class HubspotDealData(SQLModel, table=True): dampmould_growth: Optional[str] = Field(default=None) damp_mould_and_repairs_comments: Optional[str] = Field(default=None) pre_sap: Optional[str] = Field(default=None) + batch: Optional[str] = Field(default=None) + block_reference: Optional[str] = Field(default=None) + epc_prn: Optional[str] = Field(default=None) + potential_post_sap_score_dropdown: Optional[str] = Field(default=None) coordinator: Optional[str] = Field(default=None) mtp_completion_date: Optional[datetime] = Field(default=None) mtp_re_model_completion_date: Optional[datetime] = Field(default=None) diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index 6bdf71ed..3a562700 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -254,12 +254,12 @@ class HubspotClient: "sharepoint_link", "dampmould_growth", "damp_mould_and_repairs_comments", - "pre_sap", + "pre_sap_score_dropdown", "coordinator", "mtp_completion_date", "mtp_re_model_completion_date", "ioe_v3_completion_date", - "proposed_measures", + "proposed_measures_dropdown", "approved_package", "designer", "design_completion_date", @@ -275,6 +275,10 @@ class HubspotClient: "confirmed_survey_time", "surveyed_date", "design_type", + "batch", + "block_reference", + "epc_prn", + "potential_post_sap_score_dropdown", ], ) ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index 9756833b..eb8e2d14 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -159,7 +159,13 @@ class HubspotDataToDb: "damp_mould_and_repairs_comments": deal_data.get( "damp_mould_and_repairs_comments" ), - "pre_sap": deal_data.get("pre_sap"), + "pre_sap": deal_data.get("pre_sap_score_dropdown"), + "batch": deal_data.get("batch"), + "block_reference": deal_data.get("block_reference"), + "epc_prn": deal_data.get("epc_prn"), + "potential_post_sap_score_dropdown": deal_data.get( + "potential_post_sap_score_dropdown" + ), "coordinator": deal_data.get("coordinator"), "mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")), "mtp_re_model_completion_date": parse_hs_date( @@ -168,7 +174,7 @@ class HubspotDataToDb: "ioe_v3_completion_date": parse_hs_date( deal_data.get("ioe_v3_completion_date") ), - "proposed_measures": deal_data.get("proposed_measures"), + "proposed_measures": deal_data.get("proposed_measures_dropdown"), "approved_package": deal_data.get("approved_package"), "designer": deal_data.get("designer"), "design_completion_date": parse_hs_date( @@ -228,7 +234,13 @@ class HubspotDataToDb: damp_mould_and_repairs_comments=deal_data.get( "damp_mould_and_repairs_comments" ), - pre_sap=deal_data.get("pre_sap"), + pre_sap=deal_data.get("pre_sap_score_dropdown"), + batch=deal_data.get("batch"), + block_reference=deal_data.get("block_reference"), + epc_prn=deal_data.get("epc_prn"), + potential_post_sap_score_dropdown=deal_data.get( + "potential_post_sap_score_dropdown" + ), coordinator=deal_data.get("coordinator"), mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")), mtp_re_model_completion_date=parse_hs_date( @@ -237,7 +249,7 @@ class HubspotDataToDb: ioe_v3_completion_date=parse_hs_date( deal_data.get("ioe_v3_completion_date") ), - proposed_measures=deal_data.get("proposed_measures"), + proposed_measures=deal_data.get("proposed_measures_dropdown"), approved_package=deal_data.get("approved_package"), designer=deal_data.get("designer"), design_completion_date=parse_hs_date( diff --git a/etl/hubspot/hubspot_deal_differ.py b/etl/hubspot/hubspot_deal_differ.py index 74c8264d..cf9ad1ee 100644 --- a/etl/hubspot/hubspot_deal_differ.py +++ b/etl/hubspot/hubspot_deal_differ.py @@ -62,9 +62,13 @@ class HubspotDealDiffer: "sharepoint_link": "sharepoint_link", "dampmould_growth": "dampmould_growth", "damp_mould_and_repairs_comments": "damp_mould_and_repairs_comments", - "pre_sap": "pre_sap", + "pre_sap_score_dropdown": "pre_sap", + "batch": "batch", + "block_reference": "block_reference", + "epc_prn": "epc_prn", + "potential_post_sap_score_dropdown": "potential_post_sap_score_dropdown", "coordinator": "coordinator", - "proposed_measures": "proposed_measures", + "proposed_measures_dropdown": "proposed_measures", "approved_package": "approved_package", "designer": "designer", "actual_measures_installed": "actual_measures_installed", From 902e5cd8dfb0696b80190eb66d7a96f44f8f46fc Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 15:44:54 +0000 Subject: [PATCH 05/10] added more deal properties --- backend/app/db/models/hubspot_deal_data.py | 4 ++++ etl/hubspot/hubspotClient.py | 4 ++++ etl/hubspot/hubspotDataTodB.py | 8 ++++++++ etl/hubspot/hubspot_deal_differ.py | 4 ++++ etl/hubspot/scripts/scraper/bulk_load.py | 4 ++-- 5 files changed, 22 insertions(+), 2 deletions(-) diff --git a/backend/app/db/models/hubspot_deal_data.py b/backend/app/db/models/hubspot_deal_data.py index 27649042..fa508fbe 100644 --- a/backend/app/db/models/hubspot_deal_data.py +++ b/backend/app/db/models/hubspot_deal_data.py @@ -42,6 +42,10 @@ class HubspotDealData(SQLModel, table=True): block_reference: Optional[str] = Field(default=None) epc_prn: Optional[str] = Field(default=None) potential_post_sap_score_dropdown: Optional[str] = Field(default=None) + ei_score: Optional[str] = Field(default=None) + ei_score__potential_: Optional[str] = Field(default=None) + epc_sap_score: Optional[str] = Field(default=None) + epc_sap_score__potential_: Optional[str] = Field(default=None) coordinator: Optional[str] = Field(default=None) mtp_completion_date: Optional[datetime] = Field(default=None) mtp_re_model_completion_date: Optional[datetime] = Field(default=None) diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index 3a562700..df28e4d6 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -279,6 +279,10 @@ class HubspotClient: "block_reference", "epc_prn", "potential_post_sap_score_dropdown", + "ei_score", + "ei_score__potential_", + "epc_sap_score", + "epc_sap_score__potential_", ], ) ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index eb8e2d14..a2eb24c2 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -166,6 +166,10 @@ class HubspotDataToDb: "potential_post_sap_score_dropdown": deal_data.get( "potential_post_sap_score_dropdown" ), + "ei_score": deal_data.get("ei_score"), + "ei_score__potential_": deal_data.get("ei_score__potential_"), + "epc_sap_score": deal_data.get("epc_sap_score"), + "epc_sap_score__potential_": deal_data.get("epc_sap_score__potential_"), "coordinator": deal_data.get("coordinator"), "mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")), "mtp_re_model_completion_date": parse_hs_date( @@ -241,6 +245,10 @@ class HubspotDataToDb: potential_post_sap_score_dropdown=deal_data.get( "potential_post_sap_score_dropdown" ), + ei_score=deal_data.get("ei_score"), + ei_score__potential_=deal_data.get("ei_score__potential_"), + epc_sap_score=deal_data.get("epc_sap_score"), + epc_sap_score__potential_=deal_data.get("epc_sap_score__potential_"), coordinator=deal_data.get("coordinator"), mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")), mtp_re_model_completion_date=parse_hs_date( diff --git a/etl/hubspot/hubspot_deal_differ.py b/etl/hubspot/hubspot_deal_differ.py index cf9ad1ee..80c1fe04 100644 --- a/etl/hubspot/hubspot_deal_differ.py +++ b/etl/hubspot/hubspot_deal_differ.py @@ -67,6 +67,10 @@ class HubspotDealDiffer: "block_reference": "block_reference", "epc_prn": "epc_prn", "potential_post_sap_score_dropdown": "potential_post_sap_score_dropdown", + "ei_score": "ei_score", + "ei_score__potential_": "ei_score__potential_", + "epc_sap_score": "epc_sap_score", + "epc_sap_score__potential_": "epc_sap_score__potential_", "coordinator": "coordinator", "proposed_measures_dropdown": "proposed_measures", "approved_package": "approved_package", diff --git a/etl/hubspot/scripts/scraper/bulk_load.py b/etl/hubspot/scripts/scraper/bulk_load.py index f0529905..91aa89e2 100644 --- a/etl/hubspot/scripts/scraper/bulk_load.py +++ b/etl/hubspot/scripts/scraper/bulk_load.py @@ -9,8 +9,8 @@ PIPELINE_ID = Pipeline.OPERATIONS_SOCIAL_HOUSING.value companies = list( [ # Companies.THE_GUINESS_PARTNERSHIP, - Companies.SOUTHERN_HOUSING_GROUP, - # Companies.CALICO_HOMES, + # Companies.SOUTHERN_HOUSING_GROUP, + Companies.CALICO_HOMES, ] ) From 33d355e2c3731b8aff3100358efe0a36947579b5 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 16:34:03 +0000 Subject: [PATCH 06/10] added dan's changes --- .github/workflows/deploy_terraform.yml | 2 +- backend/app/bulk_uploads/router.py | 15 ++++----------- backend/app/bulk_uploads/schema.py | 2 +- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 7e1281b7..5ac4cee4 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -417,7 +417,7 @@ jobs: # Deploy FastAPI Lambda # ============================================================ fast_api_lambda: - needs: [determine_stage, ara_engine_lambda, categorisation_lambda] + needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: ara_fast_api diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index 470ef36e..d9c38250 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -1,5 +1,4 @@ import boto3 -import json from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import select @@ -8,7 +7,7 @@ from backend.app.dependencies import validate_token from backend.app.config import get_settings from backend.app.db.connection import get_db_session from backend.app.db.models.bulk_address_uploads import BulkAddressUpload -from backend.app.bulk_uploads.schema import TriggerSplitterRequest +from backend.app.bulk_uploads.schema import PostcodeSplitterTriggerRequest from utils.s3 import parse_s3_uri, read_csv_from_s3 @@ -19,21 +18,15 @@ router = APIRouter( ) -@router.post("/trigger-splitter", status_code=202) -async def trigger_splitter(req: TriggerSplitterRequest): +@router.post("/trigger-postcode-splitter", status_code=202) +async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest): settings = get_settings() - sqs_payload = { - "task_id": req.task_id, - "sub_task_id": req.sub_task_id, - "s3_uri": req.s3_uri, - } - try: sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) response = sqs.send_message( QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL, - MessageBody=json.dumps(sqs_payload), + MessageBody=req.model_dump_json(), ) except Exception as e: raise HTTPException(status_code=500, detail=f"SQS error: {e}") diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index 1efd02a3..98a80a2b 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -1,7 +1,7 @@ from pydantic import BaseModel -class TriggerSplitterRequest(BaseModel): +class PostcodeSplitterTriggerRequest(BaseModel): task_id: str sub_task_id: str s3_uri: str From 574156b403919f2a81309d6bf1966003f9a1a174 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 16:38:20 +0000 Subject: [PATCH 07/10] remove combined as i'm doing that later --- backend/app/bulk_uploads/router.py | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index d9c38250..ca1e7b79 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -37,31 +37,3 @@ async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest): "sqs_message_id": response.get("MessageId"), } - -@router.get("/{task_id}/combined-results") -async def get_combined_results( - task_id: UUID, - offset: int = Query(default=0, ge=0), - limit: int = Query(default=500, ge=1, le=5000), -): - with get_db_session() as session: - upload = session.exec( - select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) - ).first() - - if not upload: - raise HTTPException(status_code=404, detail="Upload not found") - - if not upload.combined_output_s3_uri: - raise HTTPException(status_code=409, detail="Combiner not finished") - - bucket, key = parse_s3_uri(upload.combined_output_s3_uri) - rows = read_csv_from_s3(bucket, key) - - total = len(rows) - return { - "rows": rows[offset : offset + limit], - "total": total, - "offset": offset, - "limit": limit, - } From 735368c3121f2c0198af51f641de90c304cee387 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Mon, 20 Apr 2026 17:00:27 +0000 Subject: [PATCH 08/10] forgot to add stage --- infrastructure/terraform/lambda/fast-api/main.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infrastructure/terraform/lambda/fast-api/main.tf b/infrastructure/terraform/lambda/fast-api/main.tf index f1a9494b..8dcbb8a3 100644 --- a/infrastructure/terraform/lambda/fast-api/main.tf +++ b/infrastructure/terraform/lambda/fast-api/main.tf @@ -31,8 +31,8 @@ data "terraform_remote_state" "categorisation" { data "terraform_remote_state" "postcode_splitter" { backend = "s3" config = { - bucket = "postcode-splitter-terraform-state" - key = "terraform.tfstate" + bucket = "postcode-splitter-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" region = "eu-west-2" } } From 30ac251705ddb1dc5337a388743d2dd2f9c3a49c Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Tue, 21 Apr 2026 08:35:52 +0000 Subject: [PATCH 09/10] limit pashub lambda concurrency to 1 --- infrastructure/terraform/lambda/pashub_to_ara/main.tf | 2 ++ infrastructure/terraform/lambda/pashub_to_ara/variables.tf | 6 ++++++ infrastructure/terraform/modules/lambda_service/main.tf | 2 ++ .../terraform/modules/lambda_service/variables.tf | 6 ++++++ infrastructure/terraform/modules/lambda_with_sqs/main.tf | 3 ++- .../terraform/modules/lambda_with_sqs/variables.tf | 6 ++++++ 6 files changed, 24 insertions(+), 1 deletion(-) diff --git a/infrastructure/terraform/lambda/pashub_to_ara/main.tf b/infrastructure/terraform/lambda/pashub_to_ara/main.tf index 0c652dc4..ae719a99 100644 --- a/infrastructure/terraform/lambda/pashub_to_ara/main.tf +++ b/infrastructure/terraform/lambda/pashub_to_ara/main.tf @@ -26,6 +26,8 @@ module "lambda" { # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) maximum_concurrency = var.maximum_concurrency + reserved_concurrent_executions = var.reserved_concurrent_executions + batch_size = var.batch_size environment = { diff --git a/infrastructure/terraform/lambda/pashub_to_ara/variables.tf b/infrastructure/terraform/lambda/pashub_to_ara/variables.tf index f16b41ac..e68a26b6 100644 --- a/infrastructure/terraform/lambda/pashub_to_ara/variables.tf +++ b/infrastructure/terraform/lambda/pashub_to_ara/variables.tf @@ -23,6 +23,12 @@ variable "maximum_concurrency" { description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." } +variable "reserved_concurrent_executions" { + type = number + default = 1 + description = "Reserved concurrency. Defaults to 1 to prevent concurrent Playwright browser collisions." +} + variable "batch_size" { type = number default = 1 diff --git a/infrastructure/terraform/modules/lambda_service/main.tf b/infrastructure/terraform/modules/lambda_service/main.tf index 8a159db1..3250110b 100644 --- a/infrastructure/terraform/modules/lambda_service/main.tf +++ b/infrastructure/terraform/modules/lambda_service/main.tf @@ -9,6 +9,8 @@ resource "aws_lambda_function" "this" { memory_size = var.memory_size publish = true + reserved_concurrent_executions = var.reserved_concurrent_executions + environment { variables = var.environment } diff --git a/infrastructure/terraform/modules/lambda_service/variables.tf b/infrastructure/terraform/modules/lambda_service/variables.tf index 43def6ad..46241f30 100644 --- a/infrastructure/terraform/modules/lambda_service/variables.tf +++ b/infrastructure/terraform/modules/lambda_service/variables.tf @@ -16,3 +16,9 @@ variable "environment" { type = map(string) default = {} } + +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved (default). 0 = throttle all. 1+ = hard limit." +} diff --git a/infrastructure/terraform/modules/lambda_with_sqs/main.tf b/infrastructure/terraform/modules/lambda_with_sqs/main.tf index 35626487..97f86793 100644 --- a/infrastructure/terraform/modules/lambda_with_sqs/main.tf +++ b/infrastructure/terraform/modules/lambda_with_sqs/main.tf @@ -31,7 +31,8 @@ module "lambda" { timeout = var.timeout memory_size = var.memory_size - environment = var.environment + environment = var.environment + reserved_concurrent_executions = var.reserved_concurrent_executions } ############################################ diff --git a/infrastructure/terraform/modules/lambda_with_sqs/variables.tf b/infrastructure/terraform/modules/lambda_with_sqs/variables.tf index 7c2832d2..90585e92 100644 --- a/infrastructure/terraform/modules/lambda_with_sqs/variables.tf +++ b/infrastructure/terraform/modules/lambda_with_sqs/variables.tf @@ -40,3 +40,9 @@ variable "maximum_concurrency" { default = null description = "Maximum number of concurrent Lambda invocations from SQS. null = no limit." } + +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved. 1 = single-threaded." +} From 5966757051699b7f6f5e07a9b6652a34bb060aa9 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Tue, 21 Apr 2026 08:58:51 +0000 Subject: [PATCH 10/10] add reserved_concurrent_executions to template lambda terraform --- infrastructure/terraform/lambda/_template/main.tf | 2 ++ infrastructure/terraform/lambda/_template/variables.tf | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/infrastructure/terraform/lambda/_template/main.tf b/infrastructure/terraform/lambda/_template/main.tf index 81b1c7f1..1cd63ffe 100644 --- a/infrastructure/terraform/lambda/_template/main.tf +++ b/infrastructure/terraform/lambda/_template/main.tf @@ -36,6 +36,8 @@ module "lambda" { # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) maximum_concurrency = var.maximum_concurrency + reserved_concurrent_executions = var.reserved_concurrent_executions + batch_size = var.batch_size environment = { diff --git a/infrastructure/terraform/lambda/_template/variables.tf b/infrastructure/terraform/lambda/_template/variables.tf index 0a3092ee..daaa0b7c 100644 --- a/infrastructure/terraform/lambda/_template/variables.tf +++ b/infrastructure/terraform/lambda/_template/variables.tf @@ -23,6 +23,12 @@ variable "maximum_concurrency" { description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." } +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved. 1+ = hard limit across all triggers." +} + variable "batch_size" { type = number default = 1