Merge branch 'main' into feature/read-site-notes-pdf

This commit is contained in:
Daniel Roth 2026-04-23 10:11:14 +00:00
commit e406b11be8
35 changed files with 463 additions and 409 deletions

View file

@ -1,15 +1,23 @@
FROM python:3.11.10-bullseye FROM python:3.11.10-bookworm
ARG USER=vscode ARG USER=vscode
ARG USER_UID=1000
ARG USER_GID=1000
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
# 1) Toolchain + utilities for building libpostal # 1) Toolchain + utilities for building libpostal, plus LazyVim deps
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
sudo jq vim curl git ca-certificates \ sudo jq vim curl git ca-certificates wget \
build-essential pkg-config automake autoconf libtool \ build-essential pkg-config automake autoconf libtool \
ripgrep fd-find make unzip \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Neovim latest (LazyVim needs >=0.9)
RUN curl -fsSL https://github.com/neovim/neovim/releases/latest/download/nvim-linux-x86_64.tar.gz \
| tar -xz -C /opt \
&& ln -s /opt/nvim-linux-x86_64/bin/nvim /usr/local/bin/nvim
# # 2) Build and install libpostal from source # # 2) Build and install libpostal from source
# RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ # RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \
# && cd /tmp/libpostal \ # && cd /tmp/libpostal \
@ -21,7 +29,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# && rm -rf /tmp/libpostal # && rm -rf /tmp/libpostal
# 3) Create the user and grant sudo privileges # 3) Create the user and grant sudo privileges
RUN useradd -m -s /usr/bin/bash ${USER} \ RUN groupadd -g ${USER_GID} ${USER} \
&& useradd -m -u ${USER_UID} -g ${USER_GID} -s /usr/bin/bash ${USER} \
&& echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \ && echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \
&& chmod 0440 /etc/sudoers.d/${USER} && chmod 0440 /etc/sudoers.d/${USER}
@ -72,8 +81,12 @@ RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
&& npm install -g backlog.md \ && npm install -g backlog.md \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install Claude
USER ${USER} USER ${USER}
# Bootstrap LazyVim starter config
RUN git clone https://github.com/LazyVim/starter /home/${USER}/.config/nvim \
&& rm -rf /home/${USER}/.config/nvim/.git
# Install Claude
RUN curl -fsSL https://claude.ai/install.sh | bash \ RUN curl -fsSL https://claude.ai/install.sh | bash \
&& export PATH="/home/${USER}/.local/bin:${PATH}" \ && export PATH="/home/${USER}/.local/bin:${PATH}" \
&& claude plugin marketplace add JuliusBrussee/caveman \ && claude plugin marketplace add JuliusBrussee/caveman \

View file

@ -6,7 +6,7 @@
"workspaceFolder": "/workspaces/model", "workspaceFolder": "/workspaces/model",
"postStartCommand": "bash .devcontainer/backend/post-install.sh", "postStartCommand": "bash .devcontainer/backend/post-install.sh",
"mounts": [ "mounts": [
"source=${localEnv:HOME},target=/home/vscode,type=bind", "source=${localEnv:HOME},target=/workspaces/home,type=bind",
"source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached" "source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached"
], ],
"customizations": { "customizations": {
@ -44,11 +44,15 @@
"containerEnv": { "containerEnv": {
"PYTHONFLAGS": "-Xfrozen_modules=off" "PYTHONFLAGS": "-Xfrozen_modules=off"
}, },
"forwardPorts": [6421], "forwardPorts": [6421, 8000],
"portsAttributes": { "portsAttributes": {
"6421": { "6421": {
"label": "Backlog.md", "label": "Backlog.md",
"onAutoForward": "notify" "onAutoForward": "notify"
},
"8000": {
"label": "FastAPI",
"onAutoForward": "notify"
} }
} }
} }

View file

@ -2,13 +2,23 @@ version: '3.8'
services: services:
model-backend: model-backend:
user: "${UID}:${GID}"
build: build:
context: ../.. context: ../..
dockerfile: .devcontainer/backend/Dockerfile dockerfile: .devcontainer/backend/Dockerfile
args:
USER_UID: ${UID:-1000}
USER_GID: ${GID:-1000}
command: sleep infinity command: sleep infinity
ports:
- "8000:8000"
volumes: volumes:
- ../../:/workspaces/model - ../../:/workspaces/model
- ~/.gitconfig:/home/vscode/.gitconfig:ro
environment:
- SSH_AUTH_SOCK=${SSH_AUTH_SOCK:-}
networks:
- backend-net
- shared-dev
db: db:
@ -22,7 +32,15 @@ services:
- POSTGRES_PASSWORD=makingwarmerhomes - POSTGRES_PASSWORD=makingwarmerhomes
volumes: volumes:
- postgres-data-two:/var/lib/postgresql/data - postgres-data-two:/var/lib/postgresql/data
networks:
- backend-net
networks:
backend-net:
driver: bridge
shared-dev:
external: true
volumes: volumes:
postgres-data-two: postgres-data-two:

View file

@ -413,7 +413,7 @@ jobs:
# Deploy FastAPI Lambda # Deploy FastAPI Lambda
# ============================================================ # ============================================================
fast_api_lambda: fast_api_lambda:
needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda] needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda]
uses: ./.github/workflows/_deploy_lambda.yml uses: ./.github/workflows/_deploy_lambda.yml
with: with:
lambda_name: ara_fast_api lambda_name: ara_fast_api

1
.gitignore vendored
View file

@ -291,3 +291,4 @@ pyrightconfig.json
# playwright output # playwright output
*/pashub_fetcher/videos/* */pashub_fetcher/videos/*
backlog/*

View file

@ -27,3 +27,4 @@ You MUST read the overview resource to understand the complete workflow. The inf
</CRITICAL_INSTRUCTION> </CRITICAL_INSTRUCTION>
<!-- BACKLOG.MD MCP GUIDELINES END --> <!-- BACKLOG.MD MCP GUIDELINES END -->

View file

@ -74,7 +74,7 @@ def app():
""" """
data_folder = "/workspaces/model/asset_list" data_folder = "/workspaces/model/asset_list"
data_filename = "foom (2).xlsx" data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx"
sheet_name = "in" sheet_name = "in"
postcode_column = "postcode_clean" postcode_column = "postcode_clean"
address1_column = "address2uprn_address" address1_column = "address2uprn_address"
@ -84,8 +84,8 @@ def app():
missing_postcodes_method = None missing_postcodes_method = None
landlord_year_built = None landlord_year_built = None
landlord_os_uprn = "address2uprn_uprn" landlord_os_uprn = "address2uprn_uprn"
landlord_property_type = None # Good to include if landlord gave landlord_property_type = "Property Type" # Good to include if landlord gave
landlord_built_form = None # Good to include if landlord gave landlord_built_form = "Built Form" # Good to include if landlord gave
landlord_wall_construction = None landlord_wall_construction = None
landlord_roof_construction = None landlord_roof_construction = None
landlord_heating_system = None landlord_heating_system = None

View file

@ -563,14 +563,14 @@ def handler(event, context, local=False):
except Exception as s3_error: except Exception as s3_error:
logger.error(f"Failed to save results to S3: {s3_error}") logger.error(f"Failed to save results to S3: {s3_error}")
# Mark subtask as completed # Mark subtask as complete
try: try:
subtask_interface.update_subtask_status( subtask_interface.update_subtask_status(
subtask_id, subtask_id,
"completed", "complete",
outputs={"rows_processed": "todo -> show sensible output"}, outputs={"rows_processed": "todo -> show sensible output"},
) )
logger.info(f"Marked subtask {subtask_id} as completed") logger.info(f"Marked subtask {subtask_id} as complete")
except Exception as db_error: except Exception as db_error:
logger.error(f"Failed to mark subtask as completed: {db_error}") logger.error(f"Failed to mark subtask as completed: {db_error}")

View file

@ -1,4 +1,5 @@
import boto3 import boto3
from collections import Counter
from uuid import UUID from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import select from sqlmodel import select
@ -7,7 +8,41 @@ from backend.app.dependencies import validate_token
from backend.app.config import get_settings from backend.app.config import get_settings
from backend.app.db.connection import get_db_session from backend.app.db.connection import get_db_session
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
from backend.app.bulk_uploads.schema import PostcodeSplitterTriggerRequest from backend.app.bulk_uploads.schema import (
CombinedResultRow,
CombinedResultsResponse,
CombinerTriggerRequest,
FlagsSummary,
PostcodeSplitterTriggerRequest,
)
from backend.app.bulk_uploads.scoring import score_bucket
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode")
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
def _normalize(value) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing_uprn(uprn: str) -> bool:
return uprn == "" or uprn.lower() == MISSING_SENTINEL
def _parse_lexiscore(raw) -> float | None:
val = _normalize(raw)
if not val or val.lower() == MISSING_SENTINEL:
return None
try:
return float(val)
except ValueError:
return None
router = APIRouter( router = APIRouter(
@ -36,3 +71,108 @@ async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest):
"sqs_message_id": response.get("MessageId"), "sqs_message_id": response.get("MessageId"),
} }
@router.post("/trigger-combiner", status_code=202)
async def trigger_combiner(req: CombinerTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.COMBINER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
async def get_combined_results(
task_id: UUID,
offset: int = Query(0, ge=0),
limit: int = Query(500, ge=1, le=5000),
):
with get_db_session() as session:
upload = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if upload is None:
raise HTTPException(status_code=404, detail="Upload not found for task_id")
if not upload.combined_output_s3_uri:
raise HTTPException(status_code=409, detail="Combiner not finished")
bucket, key = parse_s3_uri(upload.combined_output_s3_uri)
try:
raw_rows = read_csv_from_s3(bucket, key)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}")
uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows]
uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u))
duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2}
missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u))
duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns)
matched_count = len(raw_rows) - missing_count
page = raw_rows[offset : offset + limit]
rows: list[CombinedResultRow] = []
for i, raw in enumerate(page):
absolute_index = offset + i
address_parts = [
_normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col))
]
input_address = ", ".join(address_parts)
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
uprn_raw = _normalize(raw.get(UPRN_COL))
uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None
if not matched_address_raw
or matched_address_raw.lower() == MISSING_SENTINEL
else matched_address_raw
)
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
flags: list[str] = []
if uprn is None:
flags.append("missing")
elif uprn in duplicate_uprns:
flags.append("duplicate")
rows.append(
CombinedResultRow(
row_index=absolute_index,
input_address=input_address,
internal_reference=internal_ref,
uprn=uprn,
matched_address=matched_address,
lexiscore=lexiscore,
score_bucket=score_bucket(lexiscore),
flags=flags,
)
)
return CombinedResultsResponse(
task_id=str(task_id),
total=len(raw_rows),
offset=offset,
limit=limit,
flags_summary=FlagsSummary(
duplicates=duplicate_count,
missing=missing_count,
matched=matched_count,
),
rows=rows,
)

View file

@ -1,3 +1,5 @@
from typing import List, Literal, Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -5,3 +7,34 @@ class PostcodeSplitterTriggerRequest(BaseModel):
task_id: str task_id: str
sub_task_id: str sub_task_id: str
s3_uri: str s3_uri: str
class CombinerTriggerRequest(BaseModel):
task_id: str
sub_task_id: str
class FlagsSummary(BaseModel):
duplicates: int
missing: int
matched: int
class CombinedResultRow(BaseModel):
row_index: int
input_address: str
internal_reference: Optional[str] = None
uprn: Optional[str] = None
matched_address: Optional[str] = None
lexiscore: Optional[float] = None
score_bucket: Optional[Literal["high", "med", "low"]] = None
flags: List[Literal["duplicate", "missing"]] = []
class CombinedResultsResponse(BaseModel):
task_id: str
total: int
offset: int
limit: int
flags_summary: FlagsSummary
rows: List[CombinedResultRow]

View file

@ -0,0 +1,14 @@
from typing import Optional
HIGH_THRESHOLD = 0.85
MED_THRESHOLD = 0.65
def score_bucket(score: Optional[float]) -> Optional[str]:
if score is None:
return None
if score >= HIGH_THRESHOLD:
return "high"
if score >= MED_THRESHOLD:
return "med"
return "low"

View file

@ -40,6 +40,7 @@ class Settings(BaseSettings):
CATEGORISATION_SQS_URL: str = "changeme" CATEGORISATION_SQS_URL: str = "changeme"
PASHUB_TO_ARA_SQS_URL: str = "changeme" PASHUB_TO_ARA_SQS_URL: str = "changeme"
POSTCODE_SPLITTER_SQS_URL: str = "changeme" POSTCODE_SPLITTER_SQS_URL: str = "changeme"
COMBINER_SQS_URL: str = "changeme"
# Third parties # Third parties
EPC_AUTH_TOKEN: str = "changeme" EPC_AUTH_TOKEN: str = "changeme"

View file

@ -0,0 +1,36 @@
from uuid import UUID
from datetime import datetime, timezone
from sqlmodel import select
from backend.app.db.connection import get_db_session
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
def set_combining_status(task_id: UUID) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.status = "combining"
row.updated_at = now
session.add(row)
session.commit()
def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.combined_output_s3_uri = s3_uri
row.status = "awaiting_review"
row.updated_at = now
session.add(row)
session.commit()

View file

@ -1,10 +1,10 @@
from typing import Optional from typing import Optional
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from datetime import datetime, timezone from datetime import datetime
from sqlmodel import SQLModel, Field, select from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from backend.app.db.connection import get_db_session from sqlmodel import SQLModel, Field
class BulkAddressUpload(SQLModel, table=True): class BulkAddressUpload(SQLModel, table=True):
@ -17,21 +17,8 @@ class BulkAddressUpload(SQLModel, table=True):
s3_key: str = Field(nullable=False) s3_key: str = Field(nullable=False)
filename: str = Field(nullable=False) filename: str = Field(nullable=False)
status: str = Field(default="ready_for_processing", nullable=False) status: str = Field(default="ready_for_processing", nullable=False)
column_mapping: Optional[dict] = Field(default=None, sa_column=Column(JSONB))
task_id: Optional[UUID] = Field(default=None) task_id: Optional[UUID] = Field(default=None)
combined_output_s3_uri: Optional[str] = Field(default=None) combined_output_s3_uri: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow) created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.combined_output_s3_uri = s3_uri
row.updated_at = now
session.add(row)
session.commit()

View file

@ -8,7 +8,10 @@ from datetime import datetime, timezone
from utils.logger import setup_logger from utils.logger import setup_logger
from backend.utils.subtasks import subtask_handler from backend.utils.subtasks import subtask_handler
from backend.app.db.models.bulk_address_uploads import set_combined_csv_s3_uri from backend.app.db.functions.bulk_address_uploads_functions import (
set_combined_output_s3_uri,
set_combining_status,
)
logger = setup_logger() logger = setup_logger()
@ -38,6 +41,8 @@ def handler(body: dict[str, Any], context: Any) -> str:
if not task_id_str: if not task_id_str:
raise RuntimeError("Missing task_id in message body") raise RuntimeError("Missing task_id in message body")
set_combining_status(UUID(task_id_str))
bucket = S3_BUCKET_NAME bucket = S3_BUCKET_NAME
if not bucket: if not bucket:
raise RuntimeError("S3_BUCKET_NAME env var not set") raise RuntimeError("S3_BUCKET_NAME env var not set")
@ -68,7 +73,7 @@ def handler(body: dict[str, Any], context: Any) -> str:
logger.info(f"Saved combined CSV to {s3_uri}") logger.info(f"Saved combined CSV to {s3_uri}")
print(f"OUTPUT_S3_URI: {s3_uri}") print(f"OUTPUT_S3_URI: {s3_uri}")
set_combined_csv_s3_uri(UUID(task_id_str), s3_uri) set_combined_output_s3_uri(UUID(task_id_str), s3_uri)
logger.info(f"Persisted combined_csv_s3_uri for task {task_id_str}") logger.info(f"Persisted combined_output_s3_uri + awaiting_review status for task {task_id_str}")
return s3_uri return s3_uri

View file

@ -7,14 +7,25 @@ from utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
def get_token_from_local_storage(email: str, password: str, record_video: bool = False) -> str: def get_token_from_local_storage(
email: str, password: str, record_video: bool = False
) -> str:
logger.info("Starting Playwright flow") logger.info("Starting Playwright flow")
with sync_playwright() as p: with sync_playwright() as p:
logger.info("Playwright server started")
browser = p.chromium.launch( browser = p.chromium.launch(
headless=True, headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"], args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--single-process",
"--no-zygote",
],
) )
logger.info("Chromium launched successfully")
video_dir = None video_dir = None
if record_video: if record_video:
@ -29,6 +40,7 @@ def get_token_from_local_storage(email: str, password: str, record_video: bool =
else: else:
context = browser.new_context() context = browser.new_context()
page = context.new_page() page = context.new_page()
logger.info("Page created")
try: try:
logger.info("Navigating to site...") logger.info("Navigating to site...")

View file

@ -263,10 +263,10 @@ def handler(event, context, local=False):
bucket_name=bucket_name, bucket_name=bucket_name,
) )
# Mark subtask as completed # Mark subtask as complete
subtask_interface.update_subtask_status( subtask_interface.update_subtask_status(
subtask_id, subtask_id,
"completed", "complete",
outputs={"rows_processed": "completed"}, outputs={"rows_processed": "completed"},
) )

View file

@ -2,5 +2,6 @@ set -a
source ./.env source ./.env
set +a set +a
uvicorn app.main:app --reload cd ..
uvicorn backend.app.main:app --reload --host 0.0.0.0 --port 8000

View file

@ -0,0 +1,21 @@
import pytest
from backend.app.bulk_uploads.scoring import score_bucket
@pytest.mark.parametrize(
"score,expected",
[
(None, None),
(0.0, "low"),
(0.5, "low"),
(0.64, "low"),
(0.65, "med"),
(0.84, "med"),
(0.85, "high"),
(0.9, "high"),
(1.0, "high"),
],
)
def test_score_bucket_thresholds(score, expected):
assert score_bucket(score) == expected

View file

@ -0,0 +1,103 @@
from datetime import datetime, timezone
from uuid import uuid4
import pytest
from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel
from backend.app.db.functions import bulk_address_uploads_functions as module
from backend.app.db.functions.bulk_address_uploads_functions import (
set_combined_output_s3_uri,
set_combining_status,
)
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
@pytest.fixture(scope="function")
def pg_engine(postgresql):
connection_string = (
f"postgresql+psycopg://"
f"{postgresql.info.user}:"
f"{postgresql.info.password}@"
f"{postgresql.info.host}:"
f"{postgresql.info.port}/"
f"{postgresql.info.dbname}"
)
engine = create_engine(connection_string)
SQLModel.metadata.create_all(engine)
yield engine
SQLModel.metadata.drop_all(engine)
engine.dispose()
@pytest.fixture
def patched_session(pg_engine, monkeypatch):
sessions = []
def factory():
s = Session(pg_engine)
sessions.append(s)
return s
monkeypatch.setattr(module, "get_db_session", factory)
yield pg_engine
for s in sessions:
s.close()
def _insert_row(engine, task_id, status="processing"):
with Session(engine) as session:
row = BulkAddressUpload(
id=uuid4(),
portfolio_id="p1",
user_id="u1",
s3_bucket="b",
s3_key="k",
filename="f.csv",
status=status,
task_id=task_id,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(row)
session.commit()
def _fetch(engine, task_id):
from sqlmodel import select
with Session(engine) as session:
return session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
def test_set_combining_status_updates_row(patched_session):
task_id = uuid4()
_insert_row(patched_session, task_id, status="processing")
set_combining_status(task_id)
row = _fetch(patched_session, task_id)
assert row.status == "combining"
assert row.combined_output_s3_uri is None
def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(patched_session):
task_id = uuid4()
_insert_row(patched_session, task_id, status="combining")
set_combined_output_s3_uri(task_id, "s3://bucket/bulk_final_outputs/abc/combined.csv")
row = _fetch(patched_session, task_id)
assert row.status == "awaiting_review"
assert row.combined_output_s3_uri == "s3://bucket/bulk_final_outputs/abc/combined.csv"
def test_set_combining_status_missing_row_raises(patched_session):
with pytest.raises(ValueError, match="No bulk_address_uploads row"):
set_combining_status(uuid4())
def test_set_combined_output_s3_uri_missing_row_raises(patched_session):
with pytest.raises(ValueError, match="No bulk_address_uploads row"):
set_combined_output_s3_uri(uuid4(), "s3://x/y.csv")

View file

@ -1,38 +0,0 @@
---
id: TASK-1
title: Add POST /bulk-uploads/trigger-splitter FastAPI route
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:31'
labels:
- backend
- bulk-upload
- api
dependencies: []
priority: high
ordinal: 2000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Expose an HTTP route that the frontend can call instead of sending SQS directly. Route:
`POST /bulk-uploads/trigger-splitter`
Body: `{task_id, sub_task_id, s3_uri}` — task+subtask already created by frontend `/api/tasks` call before this is invoked.
Behaviour: validate inputs, then publish an SQS message to the postcode_splitter queue (see `backend/postcode_splitter/main.py` for expected message shape: `{task_id, sub_task_id, s3_uri}`). Use existing SubTaskInterface / TasksInterface patterns from `backend/app/tasks/router.py`.
Place under `backend/app/` next to `tasks/router.py` — likely `backend/app/bulk_uploads/router.py`.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Route returns 202 with {task_id, sub_task_id}
- [ ] #2 SQS message enqueued with correct shape for postcode_splitter Lambda
- [ ] #3 Auth via existing `validate_token` dependency
- [ ] #4 Queue URL from config, not hardcoded
- [ ] #5 Unit test with mocked boto3 sqs client
<!-- AC:END -->

View file

@ -1,50 +0,0 @@
---
id: TASK-10
title: >-
Fix bulk_address_uploads SQLModel — align columns with real schema, prevent
rogue migrations
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:34'
labels:
- backend
- bulk-upload
- db
dependencies: []
priority: high
ordinal: 7000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
`backend/app/db/models/bulk_address_uploads.py` has several bugs that cause a rogue `ALTER TABLE` and silent write failures:
**1. Wrong column name**
Model declares `combined_csv_s3_uri` — real column (drizzle-managed) is `combined_output_s3_uri`. `set_combined_csv_s3_uri()` currently writes to a non-existent column.
**2. Partial model declared as `table=True`**
Model only includes `id, task_id, combined_csv_s3_uri, status, updated_at`. Missing: `portfolio_id, user_id, s3_bucket, s3_key, filename, source_headers, column_mapping, created_at`. SQLModel `table=True` with incomplete columns causes Alembic autogenerate / `create_all` to try to ALTER or recreate the table.
**3. `status` default mismatch**
Backend: `default="pending"`. Real table default: `'ready_for_processing'`. Triggers ALTER TABLE on migration runs.
**4. `task_id` nullability mismatch**
Backend: `task_id: UUID` (NOT NULL). Frontend drizzle schema: nullable (set later, after onboarding starts).
**Fix approach:**
- Declare all real columns matching drizzle schema (see `src/app/db/schema/bulk_address_uploads.ts` in assessment-model repo as source of truth).
- Rename `combined_csv_s3_uri``combined_output_s3_uri` throughout.
- `task_id: Optional[UUID]`, `status` default `'ready_for_processing'`.
- Ensure Alembic env excludes this table from autogenerate — drizzle owns migrations, not backend.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 `combined_output_s3_uri` column name correct throughout model + helper
- [ ] #2 All columns present and match real table schema
- [ ] #3 No rogue ALTER TABLE when backend starts or migrations run
- [ ] #4 `task_id` nullable, `status` default `'ready_for_processing'`
- [ ] #5 Integration test: combiner runs → `combined_output_s3_uri` populated → frontend reads it correctly
<!-- AC:END -->

View file

@ -1,36 +0,0 @@
---
id: TASK-2
title: 'Add POST /bulk-uploads/{task_id}/combine FastAPI route'
status: To Do
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 11:53'
labels:
- backend
- bulk-upload
- api
dependencies: []
priority: high
ordinal: 10000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Expose HTTP route to trigger `bulk_address2uprn_combiner`:
`POST /bulk-uploads/{task_id}/combine`
Creates a new sub_task under task_id, then pushes `{task_id, sub_task_id}` to the combiner SQS queue (see `backend/bulk_address2uprn_combiner/main.py` for consumer shape).
Idempotency: if `bulk_address_uploads.combined_output_s3_uri` already set for this task, return 200 with `{already_combined: true}` (mirror current frontend behaviour).
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Route returns 202 with {task_id, sub_task_id} on new trigger
- [ ] #2 Returns 200 {already_combined: true} if combined_output_s3_uri already set
- [ ] #3 SQS message enqueued with correct shape
- [ ] #4 Queue URL from config
- [ ] #5 Auth via validate_token
<!-- AC:END -->

View file

@ -1,35 +0,0 @@
---
id: TASK-3
title: 'Add GET /bulk-uploads/{task_id}/combined-results route'
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:08'
labels:
- backend
- bulk-upload
- api
dependencies: []
priority: high
ordinal: 1000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
`GET /bulk-uploads/{task_id}/combined-results`
Behaviour: lookup `bulk_address_uploads` row by `task_id` → read `combined_output_s3_uri` → read combined CSV from S3 → return parsed JSON rows for the frontend review UI. Each row should include: input address fields, matched UPRN, matched OS address, match confidence/score.
Pagination: optional query params `?offset&limit` (default limit 500).
If `combined_output_s3_uri` not yet populated → 409 "Combiner not finished".
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Returns JSON {rows: [...], total, offset, limit}
- [ ] #2 409 when combined_output_s3_uri null
- [ ] #3 Reads CSV from S3 (retrofit_sap_data bucket) with IAM already granted to backend
- [ ] #4 Row shape matches what confirm-matches frontend expects
<!-- AC:END -->

View file

@ -1,37 +0,0 @@
---
id: TASK-4
title: 'Add POST /bulk-uploads/{task_id}/confirm-matches route'
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:31'
labels:
- backend
- bulk-upload
- api
dependencies:
- TASK-3
priority: high
ordinal: 3000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
`POST /bulk-uploads/{task_id}/confirm-matches`
Body: `{accepted_rows: [{uprn, address_line_1, address_line_2, postcode, internal_reference}]}` — the rows the user accepted from the review table.
Behaviour: for each accepted row, upsert into the portfolio's `addresses` / `property` table (confirm exact model during impl — see `backend/addresses/`, `backend/backend/Property.py`). Update `bulk_address_uploads.status` to terminal (e.g. `confirmed`).
Idempotency: safe to re-call; dedupe by `task_id` + `uprn`.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Accepted rows persisted as portfolio addresses
- [ ] #2 Duplicate submits do not create duplicate address rows
- [ ] #3 bulk_address_uploads.status updated to terminal
- [ ] #4 Returns summary {inserted, skipped}
- [ ] #5 Transactional — partial failure rolls back
<!-- AC:END -->

View file

@ -1,34 +0,0 @@
---
id: TASK-5
title: Auto-chain combiner when address2uprn subtasks complete
status: To Do
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20'
labels:
- backend
- bulk-upload
- orchestration
dependencies:
- TASK-2
priority: medium
ordinal: 5000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Today the frontend client polls task status and client-side fires `/combine`. Move that logic to backend: when the last `address2uprn` subtask for a task transitions to complete, backend auto-enqueues the combiner SQS message.
Likely hook point: `SubTaskInterface.finalize_subtask` — after setting status, check if parent task's subtasks are all terminal and if so, enqueue combiner. Or a separate reconciler run in the subtask-complete code path.
Removes frontend responsibility for orchestration and avoids "browser closed → combiner never fires" bug.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Combiner fires automatically when all splitter-spawned subtasks done
- [ ] #2 Only fires once per task (dedupe via task row / existing combined_output_s3_uri check)
- [ ] #3 Failed subtasks do NOT trigger combiner — requires manual retry
- [ ] #4 Frontend combine route (task-7) can be deleted or reduced to manual re-run
<!-- AC:END -->

View file

@ -1,30 +0,0 @@
---
id: TASK-6
title: Verify combiner writes to bulk_address_uploads.combined_output_s3_uri
status: To Do
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20'
labels:
- backend
- bulk-upload
- db
dependencies: []
priority: high
ordinal: 6000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Frontend drizzle schema column: `bulk_address_uploads.combined_output_s3_uri`. Backend combiner (`backend/bulk_address2uprn_combiner/main.py`) calls `set_combined_csv_s3_uri(UUID(task_id), s3_uri)` from `backend.app.db.models.bulk_address_uploads`.
Confirm that helper actually writes to the **`combined_output_s3_uri`** column (not a legacy `combined_csv_s3_uri`). Name drift suggests risk. Fix if mismatched.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Confirmed column name matches frontend schema
- [ ] #2 Fix applied if mismatched
- [ ] #3 Integration test covers: run combiner → row updated → frontend schema reads correctly
<!-- AC:END -->

View file

@ -1,35 +0,0 @@
---
id: TASK-7
title: >-
Add BULK_ADDRESS2UPRN_COMBINER_QUEUE_NAME + POSTCODE_SPLITTER_QUEUE_NAME to
backend envs
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:31'
labels:
- infra
- env
dependencies:
- TASK-1
- TASK-2
priority: high
ordinal: 4000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Since backend now enqueues to both queues (via new trigger-splitter + combine routes), its service config must have the queue names on staging + prod. Values:
- `bulk-address2uprn-combiner-queue-<stage>`
- `postcode-splitter-queue-<stage>` (if not already present)
Remove these from frontend `.env` once task-6/task-7 frontend refactor ships.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Queue names set in backend staging config
- [ ] #2 Queue names set in backend prod config
- [ ] #3 Frontend env vars removed after frontend refactor complete
<!-- AC:END -->

View file

@ -1,34 +0,0 @@
---
id: TASK-8
title: 'Grant sqs:SendMessage IAM on splitter + combiner queues to backend runtime'
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:31'
labels:
- infra
- iam
- terraform
dependencies:
- TASK-1
- TASK-2
priority: high
ordinal: 5000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Backend runtime role needs `sqs:SendMessage` + `sqs:GetQueueUrl` on:
- postcode_splitter queue ARN
- bulk_address2uprn_combiner queue ARN
Update terraform IAM policy under `infrastructure/terraform/` for backend service. Can revoke equivalent IAM from frontend runtime once refactor ships.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Terraform updated for staging + prod backend role
- [ ] #2 Verified via `aws sqs get-queue-url` using backend creds
- [ ] #3 Frontend IAM revoked after frontend refactor complete
<!-- AC:END -->

View file

@ -1,28 +0,0 @@
---
id: TASK-9
title: Deploy bulk_address2uprn_combiner Lambda + queue via terraform to staging/prod
status: Done
assignee: []
created_date: '2026-04-20'
updated_date: '2026-04-20 12:31'
labels:
- infra
- terraform
dependencies: []
priority: high
ordinal: 6000
---
## Description
<!-- SECTION:DESCRIPTION:BEGIN -->
Lambda source at `backend/bulk_address2uprn_combiner/`. Use existing `lambda_with_sqs` terraform module. Lambda envs: `S3_BUCKET_NAME=retrofit_sap_data_bucket_name` + DB creds. Queue name convention: `bulk-address2uprn-combiner-queue-<stage>`. Lambda needs read on `ara_raw_outputs/` and write on `bulk_final_outputs/` in retrofit_sap_data bucket.
<!-- SECTION:DESCRIPTION:END -->
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 Lambda + queue exist in staging
- [ ] #2 Lambda + queue exist in prod
- [ ] #3 Lambda has correct S3 read/write permissions on retrofit_sap_data bucket
- [ ] #4 Lambda has DB write on bulk_address_uploads
<!-- AC:END -->

View file

@ -37,6 +37,15 @@ data "terraform_remote_state" "postcode_splitter" {
} }
} }
data "terraform_remote_state" "bulk_address2uprn_combiner" {
backend = "s3"
config = {
bucket = "bulk-address2uprn-combiner-terraform-state",
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
############################################ ############################################
# Load Credentials # Load Credentials
############################################ ############################################
@ -95,6 +104,7 @@ module "fastapi" {
ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url
CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url
POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url
COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url
} }
} }
@ -115,7 +125,8 @@ module "fastapi_sqs_policy" {
resources = [ resources = [
data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, data.terraform_remote_state.engine.outputs.ara_engine_queue_arn,
data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn,
data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn,
data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn
] ]
conditions = null conditions = null

View file

@ -42,6 +42,7 @@ module "lambda" {
SHAREPOINT_CLIENT_ID = var.sharepoint_client_id SHAREPOINT_CLIENT_ID = var.sharepoint_client_id
SHAREPOINT_CLIENT_SECRET = var.sharepoint_client_secret SHAREPOINT_CLIENT_SECRET = var.sharepoint_client_secret
SHAREPOINT_TENANT_ID = var.sharepoint_tenant_id
DOMNA_SHAREPOINT_ID = var.domna_sharepoint_id DOMNA_SHAREPOINT_ID = var.domna_sharepoint_id
OSMOSIS_ACD_SHAREPOINT_ID = var.osmosis_acd_sharepoint_id OSMOSIS_ACD_SHAREPOINT_ID = var.osmosis_acd_sharepoint_id
PRIVATE_PAY_SHAREPOINT_ID = var.private_pay_sharepoint_id PRIVATE_PAY_SHAREPOINT_ID = var.private_pay_sharepoint_id

View file

@ -67,6 +67,11 @@ variable "sharepoint_client_secret" {
sensitive = true sensitive = true
} }
variable "sharepoint_tenant_id" {
type = string
sensitive = true
}
variable "domna_sharepoint_id" { variable "domna_sharepoint_id" {
type = string type = string
sensitive = true sensitive = true

View file

@ -33,8 +33,10 @@ provider:
HEAT_BASELINE_PREDICTIONS_BUCKET: ${env:HEAT_BASELINE_PREDICTIONS_BUCKET} HEAT_BASELINE_PREDICTIONS_BUCKET: ${env:HEAT_BASELINE_PREDICTIONS_BUCKET}
ENGINE_SQS_URL: ENGINE_SQS_URL:
Ref: EngineQueue Ref: EngineQueue
# hardcode the categorisation queue for now as it's created in terraform # hardcoded queues (created in terraform)
CATEGORISATION_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/categorisation-queue-dev" CATEGORISATION_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/categorisation-queue-dev"
POSTCODE_SPLITTER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/postcode-splitter-queue-dev"
COMBINER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/bulk-address2uprn-combiner-queue-dev"
plugins: plugins:
- serverless-python-requirements - serverless-python-requirements
@ -112,6 +114,8 @@ resources:
Resource: Resource:
- Fn::GetAtt: [ EngineQueue, Arn ] - Fn::GetAtt: [ EngineQueue, Arn ]
- "arn:aws:sqs:eu-west-2:337213553626:categorisation-queue-dev" - "arn:aws:sqs:eu-west-2:337213553626:categorisation-queue-dev"
- "arn:aws:sqs:eu-west-2:337213553626:postcode-splitter-queue-dev"
- "arn:aws:sqs:eu-west-2:337213553626:bulk-address2uprn-combiner-queue-dev"
- Effect: Allow - Effect: Allow
Action: Action:
- s3:GetObject - s3:GetObject

View file

@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict from collections import defaultdict
from sqlalchemy import func from sqlalchemy import func
PORTFOLIO_ID = 632 PORTFOLIO_ID = 711
SCENARIOS = [1144] SCENARIOS = [1233]
scenario_names = { scenario_names = {
1144: "EPC C", 1233: "Reach EPC C",
} }
project_name = "Calico Project" project_name = "Novus"
def get_data(portfolio_id, scenario_ids): def get_data(portfolio_id, scenario_ids):