From 3f277f13c25f75b82bbcbe4b9b6056de497a354b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 5 Jun 2026 19:02:35 +0000 Subject: [PATCH] route of the address2uprn problem of float value --- .claude/settings.json | 3 +- .../bulk-uploads/[uploadId]/combine/route.ts | 4 +++ src/lib/bulkUpload/server.ts | 36 ++++++++++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/.claude/settings.json b/.claude/settings.json index c425aeb..e1550d2 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -56,7 +56,8 @@ "Bash(python -m py_compile applications/bulk_upload_finaliser/handler.py orchestration/bulk_upload_finaliser_orchestrator.py)", "Bash(python -m py_compile repositories/property/property_repository.py repositories/property/property_postgres_repository.py orchestration/bulk_upload_finaliser_orchestrator.py applications/bulk_upload_finaliser/handler.py tests/orchestration/test_bulk_upload_finaliser_orchestrator.py)", "Bash(python -m py_compile tests/orchestration/fakes.py)", - "Bash(curl -s -o /dev/null -w \"%{http_code}\" --max-time 30 http://localhost:3000/home)" + "Bash(curl -s -o /dev/null -w \"%{http_code}\" --max-time 30 http://localhost:3000/home)", + "Bash(python -m py_compile orchestration/bulk_upload_finaliser_orchestrator.py tests/orchestration/test_bulk_upload_finaliser_orchestrator.py)" ], "deny": [ "Bash(npx drizzle-kit generate)", diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts index afee7f0..bc2f843 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts @@ -25,6 +25,10 @@ export async function POST( ); case "already_combined": return NextResponse.json({ alreadyCombined: true }, { status: 200 }); + case "already_dispatched": + // Lost the double-dispatch CAS (or the combiner is already running) — a + // benign no-op; the client just keeps polling and sees `combining`. + return NextResponse.json({ alreadyDispatched: true }, { status: 200 }); case "not_found": return NextResponse.json({ error: "Not found" }, { status: 404 }); case "missing_task": diff --git a/src/lib/bulkUpload/server.ts b/src/lib/bulkUpload/server.ts index 95df4df..ca303f9 100644 --- a/src/lib/bulkUpload/server.ts +++ b/src/lib/bulkUpload/server.ts @@ -583,6 +583,7 @@ export async function triggerClassifier(args: { export type CombineRetriggerOutcome = | { kind: "triggered"; taskId: string; subTaskId: string } | { kind: "already_combined" } + | { kind: "already_dispatched" } | { kind: "not_found" } | { kind: "missing_task" } | { kind: "trigger_failed"; status: number; message: string }; @@ -596,6 +597,24 @@ export async function requestCombineRetrigger(args: { if (!upload.taskId) return { kind: "missing_task" }; if (upload.combinedOutputS3Uri) return { kind: "already_combined" }; + // CAS: atomically claim `processing → combining` — the double-dispatch guard + // (mirrors dispatchFinaliser's awaiting_review → finalising, ADR-0005). Of two + // rapid "Run Combiner" clicks exactly one flips the row; the loser updates 0 + // rows and bails, so only one combiner subtask is ever dispatched. It also + // closes the window where status is still `processing` because the backend + // hasn't written `combining` yet. + const claimed = await db + .update(bulkAddressUploads) + .set({ status: "combining" }) + .where( + and( + eq(bulkAddressUploads.id, args.uploadId), + eq(bulkAddressUploads.status, "processing"), + ), + ) + .returning(); + if (claimed.length === 0) return { kind: "already_dispatched" }; + const [subTask] = await db .insert(subTasks) .values({ taskId: upload.taskId, status: "waiting" }) @@ -608,8 +627,23 @@ export async function requestCombineRetrigger(args: { payload, sessionToken: args.sessionToken, }); - if (!trigger.ok) + if (!trigger.ok) { + // Roll the claim back so the user can retry, and fail the subtask. + await Promise.all([ + db + .update(bulkAddressUploads) + .set({ status: "processing" }) + .where(eq(bulkAddressUploads.id, args.uploadId)), + db + .update(subTasks) + .set({ + status: "failed", + outputs: JSON.stringify({ error: trigger.message }), + }) + .where(eq(subTasks.id, subTask.id)), + ]); return { kind: "trigger_failed", status: trigger.status, message: trigger.message }; + } await db .update(subTasks)