tests files

This commit is contained in:
Jun-te Kim 2026-06-04 11:47:42 +00:00
parent c614ff6388
commit dfd05ba28b
28 changed files with 1012 additions and 22 deletions

View file

@ -0,0 +1,35 @@
FROM public.ecr.aws/lambda/python:3.11
# Postgres host/port/database are baked into the image at build time from the
# deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets),
# mirroring the landlord_description_overrides Dockerfile. They map onto the
# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT baked
# in -- Terraform injects those as Lambda env vars from Secrets Manager.
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV POSTGRES_HOST=${DEV_DB_HOST}
ENV POSTGRES_PORT=${DEV_DB_PORT}
ENV POSTGRES_DATABASE=${DEV_DB_NAME}
WORKDIR /var/task
COPY applications/bulk_upload_finaliser/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# DDD-shaped packages only -- no pandas, no legacy backend/. The finaliser writes
# both `property` and the terminal `bulk_address_uploads` status through DDD repos
# on its own PostgresConfig session (ADR-0013).
COPY domain/ domain/
COPY infrastructure/ infrastructure/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY utilities/ utilities/
COPY applications/ applications/
# Place the handler at the Lambda task root so the runtime resolves
# ``main.handler`` without an extra package prefix.
COPY applications/bulk_upload_finaliser/handler.py /var/task/main.py
CMD ["main.handler"]

View file

@ -0,0 +1,22 @@
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class BulkUploadFinaliserTriggerBody(BaseModel):
"""Trigger body for the bulk_upload_finaliser Lambda (ADR-0013).
Dispatched by the Next.js Finalise action via
``POST /v1/bulk-uploads/trigger-finaliser``. ``s3_uri`` is the combiner output
(``combined_output_s3_uri``) the same address/UPRN CSV the old synchronous
``/finalize`` route read.
"""
model_config = ConfigDict(extra="allow")
task_id: UUID
sub_task_id: UUID
s3_uri: str
# bigint in the FE schema; Python int is unbounded so Pydantic stays simple.
portfolio_id: int
bulk_upload_id: UUID

View file

@ -0,0 +1,103 @@
"""bulk_upload_finaliser Lambda (ADR-0013).
Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse
the trigger, read the combiner output CSV from S3, hand the rows to the
``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then
write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to
the backend). ``complete`` is written in the *same* transaction as the property
insert (atomic finalise); ``failed`` is written on a fresh session on error.
PostgresConfig-only, like the landlord classifier Lambda no legacy ``backend/``
connection so a single DB config (POSTGRES_*) drives the whole run.
"""
import logging
import os
from typing import Any
from uuid import UUID
import boto3
from sqlalchemy.engine import Engine
from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import (
BulkUploadFinaliserTriggerBody,
)
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import commit_scope, make_engine, make_session
from infrastructure.s3.csv_s3_client import CsvS3Client
from infrastructure.s3.s3_uri import parse_s3_uri
from orchestration.bulk_upload_finaliser_orchestrator import (
BulkUploadFinaliserOrchestrator,
)
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.bulk_upload.bulk_upload_status_writer_postgres import (
BulkUploadStatusWriterPostgresRepository,
)
from repositories.property.property_identity_writer_postgres import (
PropertyIdentityWriterPostgresRepository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
logger = logging.getLogger(__name__)
def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int:
bucket, _key = parse_s3_uri(trigger.s3_uri)
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri)
session = make_session(engine)
try:
orchestrator = BulkUploadFinaliserOrchestrator(
property_writer=PropertyIdentityWriterPostgresRepository(session)
)
status_writer = BulkUploadStatusWriterPostgresRepository(session)
# Resolution is pure, so run it before opening the transaction.
inserts = orchestrator.to_property_rows(rows, trigger.portfolio_id)
# Atomic finalise: insert properties and mark `complete` together — a
# failure in either rolls back both, leaving the row for the failure path.
with commit_scope(session):
inserted = orchestrator.persist(inserts)
status_writer.set_status(trigger.task_id, "complete")
finally:
session.close()
logger.info(
"Finalised bulk upload %s: %d rows read, %d properties inserted.",
trigger.bulk_upload_id,
len(rows),
inserted,
)
return inserted
def _mark_failed(engine: Engine, task_id: UUID) -> None:
session = make_session(engine)
try:
with commit_scope(session):
BulkUploadStatusWriterPostgresRepository(session).set_status(
task_id, "failed"
)
finally:
session.close()
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> dict[str, int]:
trigger = BulkUploadFinaliserTriggerBody.model_validate(body)
engine = make_engine(PostgresConfig.from_env(os.environ))
try:
inserted = _run(engine, trigger)
except Exception:
# Hand the BulkUpload to the terminal `failed` state so the UI leaves
# `finalising`; the @subtask_handler also marks the SubTask FAILED on the
# re-raise below.
_mark_failed(engine, trigger.task_id)
raise
return {"inserted": inserted}

View file

@ -0,0 +1,4 @@
boto3
pydantic
sqlmodel
psycopg2-binary

View file

@ -12,6 +12,7 @@ from backend.app.bulk_uploads.schema import (
CombinedResultRow,
CombinedResultsResponse,
CombinerTriggerRequest,
FinaliserTriggerRequest,
FlagsSummary,
LandlordOverridesTriggerRequest,
PostcodeSplitterTriggerRequest,
@ -113,6 +114,26 @@ async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest):
}
@router.post("/trigger-finaliser", status_code=202)
async def trigger_finaliser(req: FinaliserTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.FINALISER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
async def get_combined_results(
task_id: UUID,

View file

@ -23,6 +23,14 @@ class LandlordOverridesTriggerRequest(BaseModel):
column_mapping: dict[str, str]
class FinaliserTriggerRequest(BaseModel):
task_id: str
sub_task_id: str
s3_uri: str # combiner output (combined_output_s3_uri)
portfolio_id: int
bulk_upload_id: str
class FlagsSummary(BaseModel):
duplicates: int
missing: int

View file

@ -43,6 +43,7 @@ class Settings(BaseSettings):
POSTCODE_SPLITTER_SQS_URL: str = "changeme"
COMBINER_SQS_URL: str = "changeme"
LANDLORD_OVERRIDES_SQS_URL: str = "changeme"
FINALISER_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str = "changeme"

View file

@ -1,4 +1,5 @@
import urllib.parse
from typing import Any
from pydantic import ValidationError
import requests
import pandas as pd
@ -7,13 +8,13 @@ from utils.logger import setup_logger
logger = setup_logger()
def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
def os_places_results_to_dataframe(data: dict[str, Any]) -> pd.DataFrame:
"""
Flatten the OS Places API response results into a DataFrame.
Each result contains either a DPA or LPI record.
"""
results = data.get("results", [])
rows = []
results: list[dict[str, Any]] = data.get("results", [])
rows: list[dict[str, Any]] = []
for r in results:
if "DPA" in r:
rows.append(r["DPA"])
@ -22,7 +23,7 @@ def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
return pd.DataFrame(rows)
def lookup_os_places(postcode: str, api_key: str) -> dict:
def lookup_os_places(postcode: str, api_key: str) -> dict[str, Any]:
"""
Lookup a postcode using the OS Places API.
Returns the full API response data or an error dict.

View file

@ -0,0 +1,21 @@
# Runtime env for the Ordnance Survey handler when run locally via docker compose.
# Variable names must match backend/app/config.py Settings (DB_*, ORDNANCE_SURVEY_API_KEY)
# plus the AWS creds boto3 needs for S3 access.
ENVIRONMENT=local
# Database (OS Places postcode cache)
DB_HOST=
DB_PORT=5432
DB_NAME=
DB_USERNAME=
DB_PASSWORD=
# Ordnance Survey Places API
ORDNANCE_SURVEY_API_KEY=
# AWS — read input CSV from / write results to S3
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=eu-west-2
S3_BUCKET_NAME=retrofit-data-dev

View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")"
if [ ! -f .env.local ]; then
cp .env.local.example .env.local
echo "Created .env.local from the template — fill it in, then re-run." >&2
exit 1
fi
docker compose build --no-cache
docker compose up --force-recreate

View file

@ -22,6 +22,7 @@ import uuid
import os
import pandas as pd
from tqdm import tqdm
logger: logging.Logger = setup_logger()
@ -152,8 +153,10 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
df["ordnance_survey_lexiscore"] = None
# Process each postcode group at a time
for postcode, group in grouped:
print(f"Processing postcode: {postcode} ({len(group)} rows)")
for postcode, group in tqdm(
grouped, total=grouped.ngroups, desc="OS postcodes", unit="postcode"
):
tqdm.write(f"Processing postcode: {postcode} ({len(group)} rows)")
valid_group = AddressMatch.is_valid_postcode(postcode)
if not valid_group:
logger.warning(f"Postcode {postcode} is invalid, skipping")

View file

@ -0,0 +1,153 @@
# backend/ordnanceSurvey/tests/test_os_match.py
"""
Debug harness for Ordnance Survey address matching.
Mirrors backend/address2UPRN/tests/test_csv.py, but for the OS Places flow:
for each (User Input, Postcode) case it hits the live OS Places API, scores every
candidate address with AddressMatch.score (exactly as backend/ordnanceSurvey/main.py
does), and prints the full ranked breakdown so you can see *why* a match was or
wasn't found.
Run with -s to see the ranking:
pytest backend/ordnanceSurvey/tests/test_os_match.py -s
Requires ORDNANCE_SURVEY_API_KEY to be set (config Settings / env); skipped otherwise.
"""
import csv
import os
import time
from pathlib import Path
from typing import Any, Optional
import pytest
from backend.ordnanceSurvey.helpers import (
lookup_os_places,
os_places_results_to_dataframe,
)
from backend.utils.addressMatch import AddressMatch
FIXTURE_PATH = Path(__file__).parent / "test_data.csv"
# Be polite to the live OS Places API between cases.
OS_THROTTLE_SECONDS = 1.0
# Handler treats best_score <= 0 as "no match" (see ordnanceSurvey/main.py).
MATCH_THRESHOLD = 0.0
@pytest.fixture(autouse=True)
def _throttle_os_requests():
yield
time.sleep(OS_THROTTLE_SECONDS)
def _api_key() -> Optional[str]:
# Read straight from the environment so the debug harness doesn't depend on
# the full Settings model loading cleanly. Falls back to Settings if unset.
key = os.getenv("ORDNANCE_SURVEY_API_KEY")
if not key:
try:
from backend.app.config import get_settings
key = get_settings().ORDNANCE_SURVEY_API_KEY
except Exception:
key = None
if not key or key == "changeme":
return None
return key
def load_test_cases():
with open(FIXTURE_PATH, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
return [
pytest.param(
row["User Input"],
row["Postcode"],
(row.get("Expected UPRN") or "").strip(),
id=f'{row["User Input"]} [{row["Postcode"]}]',
)
for row in reader
]
def _scored_candidates(
user_input: str, postcode: str, api_key: str
) -> list[dict[str, Any]]:
"""
Fetch OS Places candidates for a postcode (bypassing the DB cache) and score
every candidate ADDRESS exactly as ordnanceSurvey/main.py does. Returned
ranked best-first.
"""
response: dict[str, Any] = lookup_os_places(postcode, api_key)
assert response.get("status") == 200, f"OS Places API failed: {response}"
assert "data" in response, f"No data in OS Places response: {response}"
candidates = os_places_results_to_dataframe(response["data"])
records: list[dict[str, Any]] = candidates.to_dict("records") # type: ignore[assignment]
scored: list[dict[str, Any]] = []
for rec in records:
address = str(rec.get("ADDRESS", ""))
scored.append(
{
"uprn": rec.get("UPRN", "?"),
"address": address,
"normalised": AddressMatch.normalise_address(address),
"score": AddressMatch.score(user_input, address),
}
)
scored.sort(key=lambda r: r["score"], reverse=True)
return scored
def _print_debug(
user_input: str, postcode: str, scored: list[dict[str, Any]]
) -> None:
print(f"\n{'=' * 80}")
print(f"User input : {user_input!r}")
print(f"Normalised : {AddressMatch.normalise_address(user_input)!r}")
print(f"Postcode : {postcode!r}")
print(f"Candidates : {len(scored)}")
print(f"{'-' * 80}")
if not scored:
print("(no OS Places candidates returned for this postcode)")
return
for row in scored[:15]:
print(f" score={row['score']:.4f} uprn={row['uprn']}")
print(f" ADDRESS : {row['address']}")
print(f" normalised : {row['normalised']}")
print(f"{'=' * 80}")
@pytest.mark.integration
@pytest.mark.parametrize("user_input,postcode,expected_uprn", load_test_cases())
def test_os_match_finds_candidate(
user_input: str,
postcode: str,
expected_uprn: str,
):
api_key = _api_key()
if api_key is None:
pytest.skip("ORDNANCE_SURVEY_API_KEY not set")
scored = _scored_candidates(user_input, postcode, api_key)
_print_debug(user_input, postcode, scored)
best = scored[0] if scored else None
best_score = float(best["score"]) if best is not None else 0.0
# The handler records a match only when best_score > 0. This assertion is
# the debug signal: when it fails, the printed ranking above shows why.
assert best is not None and best_score > MATCH_THRESHOLD, (
f"No OS match for {user_input!r} @ {postcode!r} "
f"(best_score={best_score:.4f}). See ranking above."
)
if expected_uprn:
assert str(best["uprn"]) == expected_uprn, (
f"Best match UPRN {best['uprn']!r} != expected {expected_uprn!r}"
)

View file

@ -0,0 +1,49 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
module "lambda" {
source = "../../modules/lambda_with_sqs"
name = "bulk-upload-finaliser"
stage = var.stage
image_uri = local.image_uri
# The finaliser reads the combiner CSV and does one bulk INSERT IO-light, but
# a property list can be ~40,000 rows, so 300s leaves ample headroom under the
# queue visibility timeout. batch_size = 1 keeps one upload per invocation so a
# bad record can't redrive its siblings; maximum_concurrency caps DB write
# fan-out.
timeout = 300
batch_size = 1
maximum_concurrency = 2
environment = merge(
{
STAGE = var.stage
LOG_LEVEL = "info"
POSTGRES_USERNAME = local.db_credentials.db_assessment_model_username
POSTGRES_PASSWORD = local.db_credentials.db_assessment_model_password
},
)
}
# Attach S3 read policy so the handler can read the combiner output CSV.
resource "aws_iam_role_policy_attachment" "bulk_upload_finaliser_s3_read" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.bulk_upload_finaliser_s3_read_arn
}

View file

@ -0,0 +1,9 @@
output "bulk_upload_finaliser_queue_url" {
value = module.lambda.queue_url
description = "URL of the Bulk Upload Finaliser SQS queue (wire into the FastAPI FINALISER_SQS_URL)"
}
output "bulk_upload_finaliser_queue_arn" {
value = module.lambda.queue_arn
description = "ARN of the Bulk Upload Finaliser SQS queue"
}

View file

@ -0,0 +1,16 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
backend "s3" {
bucket = "bulk-upload-finaliser-terraform-state"
key = "terraform.tfstate"
region = "eu-west-2"
}
required_version = ">= 1.2.0"
}

View file

@ -0,0 +1,27 @@
variable "lambda_name" {
type = string
description = "Logical name of the lambda (e.g. bulkUploadFinaliser)"
}
variable "stage" {
description = "Deployment stage (e.g. dev, prod)"
type = string
}
variable "ecr_repo_url" {
type = string
description = "ECR repository URL (no tag, no digest)"
}
variable "image_digest" {
type = string
description = "Image digest (sha256:...)"
}
locals {
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
}
output "resolved_image_uri" {
value = local.image_uri
}

View file

@ -46,6 +46,15 @@ data "terraform_remote_state" "bulk_address2uprn_combiner" {
}
}
data "terraform_remote_state" "bulk_upload_finaliser" {
backend = "s3"
config = {
bucket = "bulk-upload-finaliser-terraform-state",
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
############################################
# Load Credentials
############################################
@ -105,6 +114,7 @@ module "fastapi" {
CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url
POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url
COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url
FINALISER_SQS_URL = data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_url
}
}
@ -126,7 +136,8 @@ module "fastapi_sqs_policy" {
data.terraform_remote_state.engine.outputs.ara_engine_queue_arn,
data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn,
data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn,
data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn
data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn,
data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_arn
]
conditions = null

View file

@ -558,6 +558,36 @@ output "bulk_address2uprn_combiner_s3_arn" {
value = module.bulk_address2uprn_combiner_s3.policy_arn
}
################################################
# Bulk Upload Finaliser Lambda ECR (ADR-0013)
################################################
module "bulk_upload_finaliser_state_bucket" {
source = "../modules/tf_state_bucket"
bucket_name = "bulk-upload-finaliser-terraform-state"
}
module "bulk_upload_finaliser_registry" {
source = "../modules/container_registry"
name = "bulk_upload_finaliser"
stage = var.stage
}
# The finaliser only reads the combiner output (bulk_final_outputs) to insert
# property rows; it writes to Postgres, not S3.
module "bulk_upload_finaliser_s3_read" {
source = "../modules/s3_iam_policy"
policy_name = "BulkUploadFinaliserReadS3"
policy_description = "Allow bulk_upload_finaliser Lambda to read combiner output from retrofit-data bucket"
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
actions = ["s3:GetObject", "s3:ListBucket"]
resource_paths = ["/bulk_final_outputs/*"]
}
output "bulk_upload_finaliser_s3_read_arn" {
value = module.bulk_upload_finaliser_s3_read.policy_arn
}
################################################
# Categorisation Lambda ECR
################################################

View file

@ -0,0 +1,120 @@
# ADR-0013: The `bulk_upload_finaliser` Lambda writes properties and the terminal status
**Status:** Accepted
**Date:** 2026-06-04
> Companion to the `assessment-model` (frontend) repo's
> [ADR-0005](https://github.com/Hestia-Homes/assessment-model/blob/main/docs/adr/0005-async-bulk-upload-finaliser.md),
> which owns the state-machine change and the Drizzle schema. This ADR owns the
> **backend write path**. It applies the direct-write pattern, transaction-boundary
> rule, and port/adapter/row-mirror file layout established by
> [ADR-0003](0003-python-writes-landlord-overrides-directly.md).
## Context
Finalising a BulkUpload inserts one `property` row per source row — up to ~40,000.
Today a synchronous Next.js route does it; frontend ADR-0005 moves it to a dispatched
Lambda for the same reasons ADR-0003 moved the classifier write into Python: the work
is internal (a Lambda computes the rows, the same Lambda persists them), the Lambda
already sits next to a Postgres connection, and routing 40k inserts back through an
HTTP hop buys nothing.
`PropertyRow` in this repo
([`infrastructure/postgres/property_table.py`](../../infrastructure/postgres/property_table.py))
is currently a **defensive read-only view** — its docstring states the backend never
inserts properties. Finalise changes that.
## Decision
1. **New application `applications/bulk_upload_finaliser/`** wraps a handler in
`@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the
subtask start/complete/fail and the Task cascade), exactly like
`applications/landlord_description_overrides/handler.py`. The trigger body:
```python
class BulkUploadFinaliserTriggerBody(SubtaskTriggerBody): # task_id, sub_task_id
s3_uri: str # combiner output (combined_output_s3_uri)
portfolio_id: int
bulk_upload_id: UUID
```
Dispatched via a new `POST /v1/bulk-uploads/trigger-finaliser` FastAPI endpoint
(auth `validate_token`) that enqueues to a new SQS queue — mirroring
`trigger-postcode-splitter` / `trigger-landlord-overrides`.
2. **`PropertyRow` drops its "backend never inserts" invariant** and gains the
insertable columns finalise needs — the **exact nine** the frontend route writes
today: `portfolio_id`, `creation_status` (`'READY'`), `uprn`,
`landlord_property_id` (← `Internal Reference`), `address` (matched ?? user
input), `postcode`, `user_inputted_address`, `user_inputted_postcode`,
`lexiscore`. Drizzle remains the schema source of truth (ADR-0003); this is a
mirror change only.
3. **Insert policy lives in SQL, idempotent:**
```sql
INSERT INTO property (portfolio_id, creation_status, uprn, landlord_property_id,
address, postcode, user_inputted_address,
user_inputted_postcode, lexiscore)
VALUES …
ON CONFLICT (portfolio_id, uprn) WHERE uprn IS NOT NULL
DO NOTHING;
```
This reproduces the frontend's `onConflictDoNothing` exactly — existing properties
are not churned on a re-run.
4. **The Lambda writes the terminal BulkUpload status directly — DDD, on its own
session.** On success it sets `status='complete'`; on failure `status='failed'`.
Rather than the combiner's `backend/app/db/functions` (which open the legacy
`backend/` connection on the separate `DB_*` config), the finaliser stays
**PostgresConfig-only like the landlord classifier Lambda**: it writes status
through a DDD `BulkUploadStatusWriter` port (`repositories/bulk_upload/`) backed
by a minimal `BulkAddressUploadRow` mirror (`infrastructure/postgres/`), on the
*same* session. So a single DB config (`POSTGRES_*`) drives the run and the
image needs no `backend/`. The `complete` flip happens **in the same transaction
as the property insert** (atomic finalise — either both land or neither);
`failed` is written on a fresh session in the error path. Next.js no longer
writes `complete`; it owns only the `awaiting_review → finalising`
compare-and-swap at dispatch (frontend ADR-0005).
5. **DDD layering (ADR-0003 + the landlord precedent).** Domain logic — resolving
combiner rows into property identity rows — lives in
`orchestration/bulk_upload_finaliser_orchestrator.py`; the Lambda
`applications/bulk_upload_finaliser/handler.py` stays thin (parse trigger, wire
S3 + engine + repos, delegate, report status). New seams:
- Property insert: port `repositories/property/property_identity_writer.py`
(+ `PropertyIdentityInsert` DTO), adapter
`repositories/property/property_identity_writer_postgres.py`, writing the
amended `infrastructure/postgres/property_table.py` mirror. A dedicated *writer*
distinct from the aggregate-hydrating `PropertyRepository`, which needs an EPC
slice the finaliser doesn't.
- Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter
`…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py`
mirror.
- Transaction boundary stays in the `infrastructure/postgres/engine` helper
(`commit_scope`); the handler opens the context (once, around insert+complete),
never calls `.commit()`; orchestrator and repositories never commit.
6. **`property_overrides` is out of scope (v2).** The table exists (frontend
migration 0221) but this Lambda does not populate it in v1. When v2 lands it adds
a `PropertyOverrideRow` mirror + `infrastructure/property/property_overrides_postgres_repository.py`
using `onConflictDoUpdate` per frontend ADR-0005.
## Consequences
**Positive.**
- 40k inserts run in a Lambda with a single `sub_task` row to observe, not behind a
synchronous HTTP request.
- Reuses the `@subtask_handler` / SQS / direct-Postgres machinery — reviewers have
the classifier handler and the combiner status-writers as precedents.
**Negative.**
- **`PropertyRow` is no longer read-only.** The defensive invariant that made
accidental backend property writes impossible is gone; correctness now rests on the
finaliser being the only inserting caller.
- **Terminal-status ownership crosses the repo boundary.** The backend now writes a
BulkUpload status (`complete`/`failed`) that Next.js used to own — coordinated only
through the status column (frontend ADR-0005's "two writers, extended").
- The Drizzle/SQLModel lockstep risk from ADR-0003 now extends to `property`'s
insertable columns.

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import ClassVar, Optional
from uuid import UUID
from sqlmodel import Field, SQLModel
class BulkAddressUploadRow(SQLModel, table=True):
"""Minimal mirror of the FE-owned ``bulk_address_uploads`` table.
The schema source of truth is the Next.js Drizzle repo
(``src/app/db/schema/bulk_address_uploads.ts``); the backend's fuller mirror
lives at ``backend/app/db/models/bulk_address_uploads.py``. This DDD-side
mirror declares only the columns the finaliser writes the terminal status
so the finaliser can flip status on its own ``PostgresConfig`` session without
pulling in the legacy ``backend/`` connection (ADR-0013). Keep the column names
in step with the Drizzle source.
"""
__tablename__: ClassVar[str] = "bulk_address_uploads" # pyright: ignore[reportIncompatibleVariableOverride]
id: UUID = Field(primary_key=True)
task_id: Optional[UUID] = Field(default=None, index=True)
status: str
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)

View file

@ -2,24 +2,49 @@ from __future__ import annotations
from typing import ClassVar, Optional
from sqlalchemy import Column
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
# Mirror of the FE-owned `creation_status` pgEnum (property.ts:
# propertyCreationStatusEnum = {LOADING, READY, ERROR}). A single SAEnum instance
# so test-schema create_all emits one CREATE TYPE; prod owns the type via Drizzle.
property_creation_status_sa_enum = SAEnum(
"LOADING", "READY", "ERROR", name="creation_status"
)
class PropertyRow(SQLModel, table=True):
"""Defensive view of the FE-owned ``property`` table.
"""Mirror of the FE-owned ``property`` table.
The schema and migrations for ``property`` are owned by the front-end
Next.js repo; this declares only the identity columns the modelling backend
reads/writes, so FE-owned migrations to other columns don't ripple into us.
The schema and migrations for ``property`` are owned by the front-end Next.js
repo (``src/app/db/schema/property.ts``); this declares the identity columns
the modelling backend reads, plus the subset the ``bulk_upload_finaliser``
Lambda **inserts** at Finalise (ADR-0013). It is no longer read-only the
finaliser is the one backend caller that inserts. Columns not declared here
are still owned by FE migrations and don't ripple into us.
"""
__tablename__: ClassVar[str] = "property" # pyright: ignore[reportIncompatibleVariableOverride]
# Non-Optional: this is a read-only defensive view of the FE-owned ``property``
# table — the backend never inserts rows, so every row read carries an id.
id: int = Field(primary_key=True)
# bigserial in the FE schema — DB-assigned on insert, so Optional/None on the
# way in and always populated on the way out.
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int
postcode: str
address: str
# Nullable in the FE schema. The finaliser writes `matched ?? user-inputted`,
# which is absent for fully-unmatched rows.
postcode: Optional[str] = Field(default=None)
address: Optional[str] = Field(default=None)
uprn: Optional[int] = Field(default=None)
landlord_property_id: Optional[str] = Field(default=None)
# Insertable columns the finaliser writes (ADR-0013). All nullable in the FE
# schema except `creation_status` (NOT NULL); the finaliser always sets it to
# 'READY', so a nullable mirror is safe — the real column enforces NOT NULL.
creation_status: Optional[str] = Field(
default=None,
sa_column=Column(property_creation_status_sa_enum, nullable=True),
)
user_inputted_address: Optional[str] = Field(default=None)
user_inputted_postcode: Optional[str] = Field(default=None)
lexiscore: Optional[float] = Field(default=None)

View file

@ -0,0 +1,125 @@
"""Finalises a BulkUpload into ``property`` rows (ADR-0013).
The domain logic of Finalise: turn the combiner output rows into property identity
rows (the same resolution the old Next.js ``/finalize`` route did) and persist them
through the injected writer. Like every orchestrator it never commits the caller
owns the transaction boundary (see the Lambda handler).
"""
from __future__ import annotations
import re
from typing import Any, Optional
from repositories.property.property_identity_writer import (
PropertyIdentityInsert,
PropertyIdentityWriter,
)
# Combiner-output columns — identical to the old frontend /finalize route and the
# backend combined-results reader (router.py).
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3")
POSTCODE_COL = "postcode"
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE)
def _normalize(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing(value: str) -> bool:
return value == "" or value.lower() == MISSING_SENTINEL
def _parse_uprn(raw: Any) -> Optional[int]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return int(val)
except ValueError:
return None
def _parse_lexiscore(raw: Any) -> Optional[float]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return float(val)
except ValueError:
return None
def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
if matched:
m = UK_POSTCODE_RE.search(matched)
if m:
return m.group(0).upper()
return fallback or None
class BulkUploadFinaliserOrchestrator:
def __init__(self, property_writer: PropertyIdentityWriter) -> None:
self._property_writer = property_writer
def to_property_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> list[PropertyIdentityInsert]:
"""Resolve combiner rows into property identity inserts.
Pure (no DB / IO), so the caller can run it before opening a transaction.
Reproduces the old ``/finalize`` route's resolution exactly: matched
address falls back to the user-inputted one; postcode is extracted from
the matched address or falls back to the user-inputted postcode.
"""
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
def persist(self, inserts: list[PropertyIdentityInsert]) -> int:
"""Insert the resolved rows via the writer (idempotent — see the adapter).
Does not commit; the caller opens the transaction around this call.
Returns the number of properties actually inserted.
"""
return self._property_writer.insert_all(inserts)
@staticmethod
def _row_to_insert(
raw: dict[str, str], portfolio_id: int
) -> PropertyIdentityInsert:
user_inputted_address = (
", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p)
or None
)
user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None
uprn = _parse_uprn(raw.get(UPRN_COL))
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None if _is_missing(matched_address_raw) else matched_address_raw
)
address = matched_address or user_inputted_address
postcode = _extract_postcode(matched_address, user_inputted_postcode or "")
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
return PropertyIdentityInsert(
portfolio_id=portfolio_id,
uprn=uprn,
landlord_property_id=internal_ref,
address=address,
postcode=postcode,
user_inputted_address=user_inputted_address,
user_inputted_postcode=user_inputted_postcode,
lexiscore=lexiscore,
creation_status="READY",
)

View file

@ -7,6 +7,7 @@ testpaths =
recommendations/tests
backend/tests
backend/address2UPRN/tests
backend/ordnanceSurvey/tests
backend/app/db/functions/tests
backend/categorisation/tests
backend/condition/tests
@ -25,7 +26,5 @@ testpaths =
etl/epc_clean/tests
etl/hubspot/tests
etl/spatial/tests
domain/sap10_ml/tests
tests/
markers =
integration: mark a test as an integration test

View file

@ -0,0 +1,24 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from uuid import UUID
class BulkUploadStatusWriter(ABC):
"""Port: writes the terminal BulkUpload status at Finalise (ADR-0013).
The finaliser owns the terminal write Next.js no longer writes ``complete``;
it only flips ``awaiting_review finalising`` at dispatch (ADR-0005). This is
the backend half of that "two writers" split, alongside the combiner's
``combining``/``awaiting_review`` writes.
"""
@abstractmethod
def set_status(self, task_id: UUID, status: str) -> None:
"""Set the status of the bulk_address_uploads row for ``task_id``.
Does not commit the caller owns the transaction boundary, so the status
flip can share the same transaction as the property insert (atomic
finalise) or run in its own session for the failure path.
"""
...

View file

@ -0,0 +1,33 @@
"""Postgres adapter for ``BulkUploadStatusWriter`` (ADR-0013).
Flips the ``bulk_address_uploads`` status on the caller's session — so the
finaliser can mark ``complete`` in the *same* transaction as the property insert
(atomic finalise), and mark ``failed`` in a fresh session on the error path.
"""
from __future__ import annotations
from datetime import datetime, timezone
from uuid import UUID
from sqlmodel import Session, select
from infrastructure.postgres.bulk_address_upload_table import BulkAddressUploadRow
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
class BulkUploadStatusWriterPostgresRepository(BulkUploadStatusWriter):
def __init__(self, session: Session) -> None:
self._session = session
def set_status(self, task_id: UUID, status: str) -> None:
row = self._session.exec(
select(BulkAddressUploadRow).where(
BulkAddressUploadRow.task_id == task_id
)
).first()
if row is None:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.status = status
row.updated_at = datetime.now(timezone.utc)
self._session.add(row)

View file

@ -0,0 +1,43 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class PropertyIdentityInsert:
"""One row the finaliser inserts into the FE-owned ``property`` table.
Mirrors the exact column set today's Next.js ``/finalize`` writes (ADR-0013):
nine fields plus ``creation_status='READY'``. ``address``/``postcode`` are the
resolved (matched ?? user-inputted) values and may be ``None``.
"""
portfolio_id: int
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
postcode: Optional[str]
user_inputted_address: Optional[str]
user_inputted_postcode: Optional[str]
lexiscore: Optional[float]
creation_status: str = "READY"
class PropertyIdentityWriter(ABC):
"""Port: bulk-inserts ``property`` identity rows at Finalise (ADR-0013).
Distinct from ``PropertyRepository`` (which hydrates the Property *aggregate*
for reads and needs an EPC slice). This writer only persists identity rows and
has no read/EPC dependency. Idempotent on re-run: existing properties are not
churned see the adapter's conflict policy.
"""
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Insert all rows, skipping any whose ``(portfolio_id, uprn)`` already
exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). Rows with
no UPRN are always inserted. Returns the number of rows actually inserted.
An empty list is a no-op returning 0."""
...

View file

@ -0,0 +1,63 @@
"""Postgres adapter for ``PropertyIdentityWriter`` (ADR-0013).
Bulk-inserts ``property`` identity rows through the ``PropertyRow`` SQLModel
mirror. The conflict policy lives in SQL and reproduces today's Next.js
``onConflictDoNothing`` exactly: skip a row whose ``(portfolio_id, uprn)`` already
exists under the FE partial unique index ``uq_property_portfolio_uprn``
(``WHERE uprn IS NOT NULL``); rows with no UPRN are always inserted. This makes a
re-run leave existing properties untouched contrast ``property_overrides``,
which recalculates (ADR-0005).
"""
from __future__ import annotations
from typing import Any, cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from infrastructure.postgres.property_table import PropertyRow
from repositories.property.property_identity_writer import (
PropertyIdentityInsert,
PropertyIdentityWriter,
)
class PropertyIdentityWriterPostgresRepository(PropertyIdentityWriter):
def __init__(self, session: Session) -> None:
self._session = session
# SQLModel injects ``__table__`` at runtime on table=True classes but the
# stubs don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the INSERT path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)

View file

@ -25,9 +25,12 @@ class PropertyPostgresRepository(PropertyRepository):
if row is None:
raise ValueError(f"property {property_id} not found")
identity = PropertyIdentity(
# `postcode`/`address` are nullable in the FE schema (the finaliser may
# insert a row with neither); coerce the degenerate null to "" so the
# identity type stays a plain str.
portfolio_id=row.portfolio_id,
postcode=row.postcode,
address=row.address,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
)
@ -53,8 +56,8 @@ class PropertyPostgresRepository(PropertyRepository):
Property(
identity=PropertyIdentity(
portfolio_id=row.portfolio_id,
postcode=row.postcode,
address=row.address,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
),