From 8170601efb1457ccdfbf9eb3b75b0aa9704ead62 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Wed, 6 May 2026 15:49:34 +0000 Subject: [PATCH] refactored with improve-code-archievture and grill-with-docs --- .devcontainer/Dockerfile | 5 - CLAUDE.md | 6 +- CONTEXT.md | 74 +++++ docs/adr/0001-bulk-upload-state-machine.md | 20 ++ ...ulk-upload-browser-driven-orchestration.md | 17 ++ .../0003-task-creation-inside-bulk-upload.md | 12 + ...0004-bulk-upload-explicit-stage-buttons.md | 19 ++ .../bulk-uploads/[uploadId]/combine/route.ts | 83 ++---- .../bulk-uploads/[uploadId]/finalize/route.ts | 54 ++-- .../bulk-uploads/[uploadId]/progress/route.ts | 17 ++ .../bulk-uploads/[uploadId]/route.ts | 66 +---- .../start-address-matching/route.ts | 119 ++------ .../[portfolioId]/bulk-uploads/route.ts | 13 +- src/app/api/tasks/route.ts | 54 ---- .../upload/bulk-addresses/confirm/route.ts | 6 +- src/app/api/upload/bulk-addresses/route.ts | 4 +- .../portfolio/BulkUploadComingSoonModal.tsx | 95 ++---- .../[uploadId]/OnboardingProgress.tsx | 272 ++++++----------- .../[uploadId]/StartAddressMatchingButton.tsx | 49 +--- .../map-columns/MapColumnsClient.tsx | 37 +-- .../bulk-upload/[uploadId]/page.tsx | 1 - src/app/utils/s3.ts | 14 + src/lib/bulkUpload/client.ts | 173 +++++++++++ src/lib/bulkUpload/keys.ts | 4 + src/lib/bulkUpload/server.ts | 277 ++++++++++++++++++ src/lib/bulkUpload/types.ts | 44 +++ src/lib/session.ts | 8 + 27 files changed, 924 insertions(+), 619 deletions(-) create mode 100644 CONTEXT.md create mode 100644 docs/adr/0001-bulk-upload-state-machine.md create mode 100644 docs/adr/0002-bulk-upload-browser-driven-orchestration.md create mode 100644 docs/adr/0003-task-creation-inside-bulk-upload.md create mode 100644 docs/adr/0004-bulk-upload-explicit-stage-buttons.md create mode 100644 src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/progress/route.ts create mode 100644 src/lib/bulkUpload/client.ts create mode 100644 src/lib/bulkUpload/keys.ts create mode 100644 src/lib/bulkUpload/server.ts create mode 100644 src/lib/bulkUpload/types.ts create mode 100644 src/lib/session.ts diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index f193fda..997ede6 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,13 +7,8 @@ ARG DEBIAN_FRONTEND=noninteractive # Base CLI tooling (sudo, git, ripgrep/fd for editors, etc.). RUN apt update && apt install -y --no-install-recommends \ -<<<<<<< HEAD - sudo jq vim curl bash-completion iputils-ping \ - && apt autoremove -y \ -======= sudo jq vim curl bash-completion \ ripgrep fd-find git make unzip \ ->>>>>>> main && rm -rf /var/lib/apt/lists/* # Passwordless-sudo dev user (UID/GID injected from the host via compose). diff --git a/CLAUDE.md b/CLAUDE.md index 6cbbf3f..e35dd11 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,10 +2,10 @@ ## React -- **Avoid `useEffect` and `useMemo`.** Derive values inline, use Server Components + Route Handlers, event handlers, or `useSyncExternalStore` instead. If a hook is genuinely the only option, flag it and ask before using it. +- **Avoid `useEffect` and `useMemo`.** Derive values inline, use Server Components + Route Handlers, event handlers. If a hook is genuinely the only option, flag it and ask before using it. + +- Instead of raw fetch use reactQuery to allow handling of mutations ## Next.js 15 route handlers - `params` is a `Promise` — type as `{ params: Promise<{ ... }> }` and `await params` before destructuring. - - diff --git a/CONTEXT.md b/CONTEXT.md new file mode 100644 index 0000000..652d44c --- /dev/null +++ b/CONTEXT.md @@ -0,0 +1,74 @@ +# Context + +This document captures the domain language used in this project. Terms here are the **canonical** ones — when more than one word exists for a concept, we pick one and treat the others as aliases to avoid. + +This file grows as terms are resolved during design conversations. Concepts that haven't been examined yet are not listed. + +## Language + +### Bulk upload + +**BulkUpload**: +A user-supplied spreadsheet of addresses for a Portfolio, transformed and matched to UPRNs before being inserted as Properties. Has an explicit lifecycle from upload through finalisation. +_Avoid_: import, batch, file upload, ingest + +**ColumnMapping**: +The user's declaration of which spreadsheet column means what (e.g. column "Property Address" means `address_1`). Stored as JSON on the BulkUpload row. +_Avoid_: schema, header map, field mapping + +**UPRN**: +Unique Property Reference Number — the UK national identifier for an address. Address matching attaches a UPRN to each row where possible. + +**Address matching**: +The pipeline stage that splits the source file by postcode, looks up UPRNs, and produces matched-address output. Triggered via FastAPI. +_Avoid_: postcode lookup, address resolution, address lookup + +**Combiner**: +The pipeline stage that aggregates the per-postcode address-matching outputs into a single combined CSV in S3, ready for review. +_Avoid_: aggregator, merger + +**Finalise**: +The terminal action that reads the combiner output, inserts rows as Properties on the Portfolio, and decides whether the BulkUpload needs further review. +_Avoid_: import, commit, ingest + +## Lifecycle + +A **BulkUpload** moves through these statuses: + +``` +ready_for_processing + → mapping_complete (user submits ColumnMapping; Next.js writes) + → processing (Address matching triggered; Next.js writes) + → combining (Combiner stage running; FastAPI writes directly) + → awaiting_review (Combiner output in S3; FastAPI writes directly) + → complete (Finalise succeeded, no row issues; Next.js writes) + → needs_review (Finalise succeeded, missing or duplicate UPRNs; Next.js writes) + → failed (FastAPI reports in-flight failure — schema only, not yet wired) +``` + +`complete`, `needs_review`, and `failed` are terminal. + +Re-mapping (PATCHing `columnMapping`) is legal only in `ready_for_processing` and `mapping_complete`. Any later state rejects with 409. + +**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the terminal Finalise outcomes. FastAPI owns `combining` and `awaiting_review` — writing them direct to the DB during the combiner run. The BulkUpload aggregate observes both. + +See [ADR-0001](./docs/adr/0001-bulk-upload-state-machine.md) for the deliberate "not yet" decisions baked into this lifecycle. + +## Relationships + +- A **Portfolio** has many **BulkUploads**. +- A **BulkUpload** produces zero or more **Properties** when finalised. +- A **BulkUpload** has at most one **Task** (the orchestration handle for the FastAPI pipeline run); a Task has many **SubTasks** (one per pipeline stage: address matching, combiner). + +## Example dialogue + +> **Dev:** "If the **Combiner** finishes but the user hasn't clicked Finalise, what does the user see?" +> **Domain expert:** "The BulkUpload sits in `awaiting_review`. The frontend polls and shows a 'review and confirm' button. Nothing's been written to **Properties** yet." +> +> **Dev:** "And if **Finalise** runs and 30% of rows have no **UPRN**?" +> **Domain expert:** "Those still get imported as **Properties** — just without a UPRN — and the BulkUpload moves to `needs_review`. The `complete` state is only for clean runs." + +## Flagged ambiguities + +- "Upload" is used in the codebase to mean both the file-on-S3 and the BulkUpload row. We standardise on **BulkUpload** for the row; the file is just "the source file." +- "Onboarding" appears in some route paths (`bulk_onboarding_inputs/...`) but isn't part of this glossary — we use **BulkUpload** end-to-end. diff --git a/docs/adr/0001-bulk-upload-state-machine.md b/docs/adr/0001-bulk-upload-state-machine.md new file mode 100644 index 0000000..741e4a3 --- /dev/null +++ b/docs/adr/0001-bulk-upload-state-machine.md @@ -0,0 +1,20 @@ +# BulkUpload state machine (v1) + +The BulkUpload lifecycle is: + +``` +ready_for_processing → mapping_complete → processing → combining → awaiting_review + → complete | needs_review | failed +``` + +Two writers: Next.js writes the user-driven transitions (`mapping_complete`, `processing`, `complete`, `needs_review`); the FastAPI `bulk_address2uprn_combiner` worker writes `combining` and `awaiting_review` directly to the DB during its run. Any aggregate enforcing the state machine on the Next.js side must treat those two as observed-not-owned. + +Three deliberate "not yet" decisions are baked into this version, each likely to be re-suggested without a record: + +1. **`needs_review` is terminal — no recovery flow.** A finalised upload with missing or duplicate UPRNs ends up here, and that's it. The imported Properties show up in the property table; a proper review/recovery UI is planned but out of scope. Without this note, every future architecture pass will surface "needs_review has no exit" as a bug. + +2. **Re-mapping is rejected after `mapping_complete`.** PATCHing `columnMapping` once Address matching has been triggered returns 409. We considered (B) reset-and-rerun (clear the combiner output, return to `mapping_complete`, force the user to re-trigger) but rejected it for now: it requires cleaning up abandoned Tasks/SubTasks and re-charging a FastAPI run. (A) is the smallest correct thing — the DB column never drifts from what produced the combined output. Revisit when re-run-from-review lands. + +3. **`failed` exists in the schema but is not yet written by any route.** Synchronous trigger failures (route can't reach FastAPI) are surfaced as React Query toasts on 5xx; the BulkUpload stays in its prior status and the user retries. In-flight failures (Combiner crashes silently) currently leave uploads stuck in `processing` — this is known-incomplete, waiting on FastAPI-side callback work to report failure. The status is in the schema now so the seam is ready when that work is scheduled. + +These are the only "no" decisions in v1; everything else (concurrency guards on stage triggers, persisting `failed`, etc.) is intended to be added. diff --git a/docs/adr/0002-bulk-upload-browser-driven-orchestration.md b/docs/adr/0002-bulk-upload-browser-driven-orchestration.md new file mode 100644 index 0000000..f27a5e0 --- /dev/null +++ b/docs/adr/0002-bulk-upload-browser-driven-orchestration.md @@ -0,0 +1,17 @@ +# BulkUpload pipeline stays browser-driven for now + +The BulkUpload pipeline chains three stages — address matching, combiner trigger, finalise — and the chain is currently driven by the **frontend polling** in `OnboardingProgress.tsx`. The browser fires `POST /combine` when the Task looks done, then `POST /finalize` when the upload reaches `awaiting_review`. Close the tab → upload gets stuck. + +A server-driven alternative (FastAPI callback into a Next.js webhook, or a sweeper route) would be more robust, but the entire bulk-upload flow is scheduled for redesign. This code path is being kept as **internal tooling for the Domna tech team** to onboard portfolios while the new flow is built — not for end-user use. + +So the cleanup invests only in things that survive the redesign: +- A real state machine on the BulkUpload aggregate (so the tech team can debug from the row alone). +- Deduplicated FastAPI trigger logic. +- Concurrency guards on stage triggers. + +It deliberately does **not** invest in: +- Server-side stage orchestration. +- Recovery from `failed` / stuck uploads. +- A real `awaiting_review` review UI. + +When the redesign lands, browser-driven chaining and the auto-finalise behaviour go with it. Future readers asking "why didn't they fix this" — that's why. diff --git a/docs/adr/0003-task-creation-inside-bulk-upload.md b/docs/adr/0003-task-creation-inside-bulk-upload.md new file mode 100644 index 0000000..2915346 --- /dev/null +++ b/docs/adr/0003-task-creation-inside-bulk-upload.md @@ -0,0 +1,12 @@ +# Task creation lives in BulkUpload, not behind a generic Task endpoint + +A `Task` is the orchestration handle for a FastAPI pipeline run; per `CONTEXT.md`, **a BulkUpload has at most one Task**. Today BulkUpload is the only feature that creates Tasks. + +We're collapsing the previous two-step client seam — `POST /api/tasks` to create a Task and SubTask, then `POST /bulk-uploads/[uploadId]/start-address-matching` with the resulting IDs — into a single route. Task + SubTask creation moves inside `triggerAddressMatching` in `src/lib/bulkUpload/server.ts`. The generic `POST /api/tasks` endpoint is deleted; the `GET /api/tasks` listing (admin Tasks UI) stays. + +The trade-off was between: + +- **(rejected) Keep `POST /api/tasks` for future generic use.** No other feature creates Tasks today. By the deletion test, the generic endpoint wasn't earning its keep — every line was overhead carried for hypothetical callers, while the only real caller had to perform a leaky two-call dance to satisfy a contract that didn't reflect the domain. +- **(chosen) Collapse Task creation into the BulkUpload module.** The seam matches the domain: BulkUpload owns its Task's lifecycle, including the moment of creation. One route handles the full transition into `processing`. The client surface (`useStartAddressMatching`) is one mutation, not a chain. + +A future reader will see `/api/tasks` exposing GET-only and reasonably wonder where Tasks are created. The answer is: inside whichever feature owns the Task. Today that's BulkUpload only. If a second feature ever needs to create Tasks, re-extract a generic creator at that point — extracting from two real consumers is straightforward, whereas keeping a speculative endpoint live without consumers leaves a stale-by-default seam that drifts from how Tasks are actually created. diff --git a/docs/adr/0004-bulk-upload-explicit-stage-buttons.md b/docs/adr/0004-bulk-upload-explicit-stage-buttons.md new file mode 100644 index 0000000..de7a98c --- /dev/null +++ b/docs/adr/0004-bulk-upload-explicit-stage-buttons.md @@ -0,0 +1,19 @@ +# Bulk-upload pipeline advances via explicit Run Combiner / Finalise buttons + +[ADR-0002](./0002-bulk-upload-browser-driven-orchestration.md) described the bulk-upload pipeline as **browser-driven and auto-firing**: a polling loop in `OnboardingProgress.tsx` watched the Task summary and fired `POST /combine` when the Task looked done, then `POST /finalize` when the upload reached `awaiting_review`, with mutable `combineFired` / `finalizeFired` / `refreshed` flags gating the side effects. + +This decision keeps the pipeline browser-driven (per ADR-0002) but replaces the auto-fire behaviour with **explicit buttons** the Domna tech team clicks to advance each stage. The state machine in [ADR-0001](./0001-bulk-upload-state-machine.md) is unchanged. + +Concretely: + +- The progress screen polls a single `GET /bulk-uploads/[uploadId]/progress` snapshot (Task summary + BulkUpload row in one query). +- "Run Combiner" appears when the Task reaches terminal-non-failed and the Combiner hasn't been triggered yet. +- "Finalise" appears when the BulkUpload reaches `awaiting_review`. +- Polling stops once the BulkUpload reaches a terminal status; on `useFinalize` success the caller runs `router.refresh()`. + +The trade-off was between: + +- **(rejected) Keep auto-fire and translate the orchestration to react-query with `useEffect`s.** Functional, but the auto-fire UX hides where the pipeline gets stuck — which is the failure mode the Domna tech team needs to debug. CLAUDE.md's "avoid `useEffect`" rule also gets stretched: three effects, one per side-effecting transition, each watching derived eligibility flags. +- **(chosen) Explicit buttons.** The "fire once" mutable flags collapse into react-query mutation state (`isIdle` / `isPending` / `isSuccess`) — pure derivations from the cache, no `useEffect`. Each stage is observable, retryable, and reflects the current BulkUpload status directly. Tab close still leaves uploads stuck (browser-driven progression, per ADR-0002), but "stuck" is now legible: the team sees which button is pending. + +When the bulk-upload redesign lands (per ADR-0002, "out of scope"), this entire screen and its buttons go away in favour of server-driven progression. Until then, the buttons are the right level of investment: they survive the redesign as discoverable artifacts of the prior flow, and they make the current flow tractable for the team using it. diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts index 673e2c5..afee7f0 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/combine/route.ts @@ -1,7 +1,5 @@ -import { db } from "@/app/db/db"; -import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; -import { subTasks } from "@/app/db/schema/tasks/subtask"; -import { eq } from "drizzle-orm"; +import { requestCombineRetrigger } from "@/lib/bulkUpload/server"; +import { readSessionToken } from "@/lib/session"; import { NextRequest, NextResponse } from "next/server"; import { getServerSession } from "next-auth"; import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; @@ -14,65 +12,24 @@ export async function POST( if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); const { uploadId } = await params; + const result = await requestCombineRetrigger({ + uploadId, + sessionToken: readSessionToken(request), + }); - const [upload] = await db - .select() - .from(bulkAddressUploads) - .where(eq(bulkAddressUploads.id, uploadId)) - .limit(1); - - if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 }); - if (!upload.taskId) - return NextResponse.json({ error: "Upload has no task" }, { status: 422 }); - if (upload.combinedOutputS3Uri) - return NextResponse.json({ alreadyCombined: true }, { status: 200 }); - - const fastapiUrl = process.env.FASTAPI_API_URL; - const fastapiKey = process.env.FASTAPI_API_KEY; - if (!fastapiUrl || !fastapiKey) { - console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set"); - return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 }); + switch (result.kind) { + case "triggered": + return NextResponse.json( + { taskId: result.taskId, subTaskId: result.subTaskId }, + { status: 200 } + ); + case "already_combined": + return NextResponse.json({ alreadyCombined: true }, { status: 200 }); + case "not_found": + return NextResponse.json({ error: "Not found" }, { status: 404 }); + case "missing_task": + return NextResponse.json({ error: "Upload has no task" }, { status: 422 }); + case "trigger_failed": + return NextResponse.json({ error: result.message }, { status: result.status }); } - - const [subTask] = await db - .insert(subTasks) - .values({ - taskId: upload.taskId, - status: "waiting", - }) - .returning(); - - const messageBody = { task_id: upload.taskId, sub_task_id: subTask.id }; - - const sessionToken = - request.cookies.get("__Secure-next-auth.session-token")?.value ?? - request.cookies.get("next-auth.session-token")?.value; - - try { - const triggerRes = await fetch(`${fastapiUrl}/v1/bulk-uploads/trigger-combiner`, { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-api-key": fastapiKey, - Authorization: `Bearer ${sessionToken}`, - }, - body: JSON.stringify(messageBody), - }); - - if (!triggerRes.ok) { - const errText = await triggerRes.text().catch(() => ""); - console.error("Backend trigger-combiner failed:", triggerRes.status, errText); - return NextResponse.json({ error: "Failed to trigger combiner" }, { status: 502 }); - } - } catch (err) { - console.error("Failed to reach backend trigger-combiner:", err); - return NextResponse.json({ error: "Failed to trigger combiner" }, { status: 502 }); - } - - await db - .update(subTasks) - .set({ inputs: JSON.stringify(messageBody) }) - .where(eq(subTasks.id, subTask.id)); - - return NextResponse.json({ taskId: upload.taskId, subTaskId: subTask.id }, { status: 200 }); } diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts index 98fb5d1..b006e92 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts @@ -1,13 +1,13 @@ import { db } from "@/app/db/db"; -import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; import { property } from "@/app/db/schema/property"; -import { eq, sql } from "drizzle-orm"; +import { sql } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; import { revalidatePath } from "next/cache"; import { getServerSession } from "next-auth"; import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; -import S3 from "aws-sdk/clients/s3"; +import { createRetrofitDataS3Client } from "@/app/utils/s3"; import * as XLSX from "xlsx"; +import { loadForFinalize, markFinalized } from "@/lib/bulkUpload/server"; const ADDRESS_COLS = ["Address 1", "Address 2", "Address 3"] as const; const POSTCODE_COL = "postcode"; @@ -69,33 +69,31 @@ export async function POST( const { uploadId } = await params; - const [upload] = await db - .select() - .from(bulkAddressUploads) - .where(eq(bulkAddressUploads.id, uploadId)) - .limit(1); + const guarded = await loadForFinalize(uploadId); + switch (guarded.kind) { + case "not_found": + return NextResponse.json({ error: "Not found" }, { status: 404 }); + case "already_finalized": + return NextResponse.json( + { alreadyComplete: true, status: guarded.status }, + { status: 200 } + ); + case "wrong_state": + return NextResponse.json( + { error: `Upload not ready to finalize (state: ${guarded.current})` }, + { status: 409 } + ); + case "not_yet_combined": + return NextResponse.json({ error: "Combiner not finished" }, { status: 409 }); + } + const upload = guarded.upload; - if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 }); - if (upload.status === "complete" || upload.status === "needs_review") { - return NextResponse.json({ alreadyComplete: true, status: upload.status }, { status: 200 }); - } - if (upload.status !== "awaiting_review") { - return NextResponse.json({ error: "Upload not ready to finalize" }, { status: 422 }); - } - if (!upload.combinedOutputS3Uri) { - return NextResponse.json({ error: "Combiner not finished" }, { status: 409 }); - } - - const parsed = parseS3Uri(upload.combinedOutputS3Uri); + const parsed = parseS3Uri(upload.combinedOutputS3Uri!); if (!parsed) { return NextResponse.json({ error: "Invalid combined output S3 URI" }, { status: 500 }); } - const s3 = new S3({ - region: process.env.RETROFIT_DATA_DEV_REGION, - accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY, - secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY, - }); + const s3 = createRetrofitDataS3Client(); let rawRows: Record[]; try { @@ -169,13 +167,9 @@ export async function POST( } const needsReview = missingUprnCount > 0 || duplicateUprnCount > 0; + await markFinalized(uploadId, { needsReview }); const nextStatus = needsReview ? "needs_review" : "complete"; - await db - .update(bulkAddressUploads) - .set({ status: nextStatus }) - .where(eq(bulkAddressUploads.id, uploadId)); - revalidatePath("/portfolio/[slug]", "layout"); return NextResponse.json( diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/progress/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/progress/route.ts new file mode 100644 index 0000000..f5ac996 --- /dev/null +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/progress/route.ts @@ -0,0 +1,17 @@ +import { getProgressView } from "@/lib/bulkUpload/server"; +import { NextRequest, NextResponse } from "next/server"; +import { getServerSession } from "next-auth"; +import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; + +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ portfolioId: string; uploadId: string }> } +) { + const session = await getServerSession(AuthOptions); + if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + + const { uploadId } = await params; + const view = await getProgressView(uploadId); + if (!view) return NextResponse.json({ error: "Not found" }, { status: 404 }); + return NextResponse.json(view, { status: 200 }); +} diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/route.ts index f51bd73..2b45299 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/route.ts @@ -1,38 +1,11 @@ -import { db } from "@/app/db/db"; -import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; -import { eq } from "drizzle-orm"; +import { setColumnMapping } from "@/lib/bulkUpload/server"; import { NextRequest, NextResponse } from "next/server"; -import { getServerSession } from "next-auth"; -import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; import { z } from "zod"; const PatchSchema = z.object({ columnMapping: z.record(z.string(), z.string()), }); -export async function GET( - _request: NextRequest, - { params }: { params: Promise<{ portfolioId: string; uploadId: string }> } -) { - const session = await getServerSession(AuthOptions); - if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); - - const { uploadId } = await params; - - const [upload] = await db - .select({ - status: bulkAddressUploads.status, - combinedOutputS3Uri: bulkAddressUploads.combinedOutputS3Uri, - }) - .from(bulkAddressUploads) - .where(eq(bulkAddressUploads.id, uploadId)) - .limit(1); - - if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 }); - - return NextResponse.json(upload, { status: 200 }); -} - export async function PATCH( request: NextRequest, { params }: { params: Promise<{ portfolioId: string; uploadId: string }> } @@ -43,33 +16,26 @@ export async function PATCH( try { body = PatchSchema.parse(await request.json()); } catch { - return NextResponse.json({ msg: "Invalid input" }, { status: 400 }); - } - - const values = Object.values(body.columnMapping); - const hasAddress = values.includes("address_1"); - const hasPostcode = values.includes("postcode"); - if (!hasAddress || !hasPostcode) { - return NextResponse.json( - { msg: "Mapping must include address_1 and postcode." }, - { status: 422 } - ); + return NextResponse.json({ error: "Invalid input" }, { status: 400 }); } try { - const [updated] = await db - .update(bulkAddressUploads) - .set({ columnMapping: body.columnMapping, status: "mapping_complete" }) - .where(eq(bulkAddressUploads.id, uploadId)) - .returning(); - - if (!updated) { - return NextResponse.json({ msg: "Not found" }, { status: 404 }); + const result = await setColumnMapping(uploadId, body.columnMapping); + switch (result.kind) { + case "ok": + return NextResponse.json(result.upload, { status: 200 }); + case "not_found": + return NextResponse.json({ error: "Not found" }, { status: 404 }); + case "invalid_status": + return NextResponse.json( + { error: `Cannot remap upload in state '${result.current}'` }, + { status: 409 } + ); + case "invalid_mapping": + return NextResponse.json({ error: result.reason }, { status: 422 }); } - - return NextResponse.json(updated, { status: 200 }); } catch (error) { console.error("Failed to save column mapping:", error); - return NextResponse.json({ msg: "Internal server error" }, { status: 500 }); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/start-address-matching/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/start-address-matching/route.ts index e5b77b2..5fd282f 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/start-address-matching/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/start-address-matching/route.ts @@ -1,15 +1,10 @@ -import { db } from "@/app/db/db"; -import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; -import { tasks } from "@/app/db/schema/tasks/tasks"; -import { subTasks } from "@/app/db/schema/tasks/subtask"; -import { eq } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; import { getServerSession } from "next-auth"; import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; -import { z } from "zod"; -import { createS3Client } from "@/app/utils/s3"; -import S3 from "aws-sdk/clients/s3"; +import { createS3Client, createRetrofitDataS3Client, retrofitDataS3Bucket } from "@/app/utils/s3"; import * as XLSX from "xlsx"; +import { loadForAddressMatching, triggerAddressMatching } from "@/lib/bulkUpload/server"; +import { readSessionToken } from "@/lib/session"; const FIELD_RENAME: Record = { address_1: "Address 1", @@ -19,11 +14,6 @@ const FIELD_RENAME: Record = { internal_reference: "Internal Reference", }; -const BodySchema = z.object({ - taskId: z.string().uuid(), - subTaskId: z.string().uuid(), -}); - function transformFile( buffer: Buffer, columnMapping: Record @@ -72,38 +62,28 @@ export async function POST( const { portfolioId, uploadId } = await params; - let body; - try { - body = BodySchema.parse(await request.json()); - } catch { - return NextResponse.json({ error: "Invalid input" }, { status: 400 }); + const guarded = await loadForAddressMatching(uploadId); + switch (guarded.kind) { + case "not_found": + return NextResponse.json({ error: "Not found" }, { status: 404 }); + case "wrong_state": + return NextResponse.json( + { error: `Upload not ready for onboarding (state: ${guarded.current})` }, + { status: 409 } + ); + case "missing_mapping": + return NextResponse.json({ error: "Column mapping missing" }, { status: 422 }); } - - const [upload] = await db - .select() - .from(bulkAddressUploads) - .where(eq(bulkAddressUploads.id, uploadId)) - .limit(1); - - if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 }); - if (upload.status !== "mapping_complete") - return NextResponse.json({ error: "Upload not ready for onboarding" }, { status: 422 }); - if (!upload.columnMapping) - return NextResponse.json({ error: "Column mapping missing" }, { status: 422 }); + const upload = guarded.upload; const s3 = createS3Client(); - const outputS3 = new S3({ - region: process.env.RETROFIT_DATA_DEV_REGION, - accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY, - secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY, - }); - const outputBucket = process.env.RETROFIT_DATA_DEV_S3_BUCKET_NAME!; - const bucket = upload.s3Bucket; + const outputS3 = createRetrofitDataS3Client(); + const outputBucket = retrofitDataS3Bucket(); let fileBuffer: Buffer; try { const obj = await s3 - .getObject({ Bucket: bucket, Key: upload.s3Key }) + .getObject({ Bucket: upload.s3Bucket, Key: upload.s3Key }) .promise(); fileBuffer = Buffer.from(obj.Body as Uint8Array); } catch (err) { @@ -111,8 +91,9 @@ export async function POST( return NextResponse.json({ error: "Failed to read source file" }, { status: 500 }); } - const result = transformFile(fileBuffer, upload.columnMapping); - if (result.error) return NextResponse.json({ error: result.error }, { status: 422 }); + const transformed = transformFile(fileBuffer, upload.columnMapping!); + if (transformed.error) + return NextResponse.json({ error: transformed.error }, { status: 422 }); const transformedKey = `bulk_onboarding_inputs/${portfolioId}/${uploadId}.csv`; try { @@ -120,7 +101,7 @@ export async function POST( .putObject({ Bucket: outputBucket, Key: transformedKey, - Body: result.csv, + Body: transformed.csv, ContentType: "text/csv", }) .promise(); @@ -131,53 +112,13 @@ export async function POST( const s3Uri = `s3://${outputBucket}/${transformedKey}`; - const fastapiUrl = process.env.FASTAPI_API_URL; - const fastapiKey = process.env.FASTAPI_API_KEY; - if (!fastapiUrl || !fastapiKey) { - console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set"); - return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 }); - } + const trigger = await triggerAddressMatching({ + uploadId, + s3Uri, + sessionToken: readSessionToken(request), + }); + if (trigger.kind === "trigger_failed") + return NextResponse.json({ error: trigger.message }, { status: trigger.status }); - const sessionToken = - request.cookies.get("__Secure-next-auth.session-token")?.value ?? - request.cookies.get("next-auth.session-token")?.value; - - try { - const triggerRes = await fetch(`${fastapiUrl}/v1/bulk-uploads/trigger-postcode-splitter`, { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-api-key": fastapiKey, - Authorization: `Bearer ${sessionToken}`, - }, - body: JSON.stringify({ - task_id: body.taskId, - sub_task_id: body.subTaskId, - s3_uri: s3Uri, - }), - }); - - if (!triggerRes.ok) { - const errText = await triggerRes.text().catch(() => ""); - console.error("Backend trigger-postcode-splitter failed:", triggerRes.status, errText); - return NextResponse.json({ error: "Failed to trigger address matching" }, { status: 502 }); - } - } catch (err) { - console.error("Failed to reach backend trigger-postcode-splitter:", err); - return NextResponse.json({ error: "Failed to trigger address matching" }, { status: 502 }); - } - - await Promise.all([ - db.update(bulkAddressUploads) - .set({ status: "processing", taskId: body.taskId }) - .where(eq(bulkAddressUploads.id, uploadId)), - db.update(tasks) - .set({ status: "in progress" }) - .where(eq(tasks.id, body.taskId)), - db.update(subTasks) - .set({ inputs: JSON.stringify({ task_id: body.taskId, sub_task_id: body.subTaskId, s3_uri: s3Uri }) }) - .where(eq(subTasks.id, body.subTaskId)), - ]); - - return NextResponse.json({ taskId: body.taskId }, { status: 200 }); + return NextResponse.json({ taskId: trigger.taskId }, { status: 200 }); } diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/route.ts index 86bd00f..a6ebbc1 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/route.ts @@ -1,6 +1,4 @@ -import { db } from "@/app/db/db"; -import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; -import { eq, desc } from "drizzle-orm"; +import { listForPortfolio } from "@/lib/bulkUpload/server"; import { NextRequest, NextResponse } from "next/server"; export async function GET( @@ -10,15 +8,10 @@ export async function GET( const { portfolioId } = await params; try { - const uploads = await db - .select() - .from(bulkAddressUploads) - .where(eq(bulkAddressUploads.portfolioId, portfolioId)) - .orderBy(desc(bulkAddressUploads.createdAt)); - + const uploads = await listForPortfolio(portfolioId); return NextResponse.json(uploads, { status: 200 }); } catch (error) { console.error("Failed to fetch bulk uploads:", error); - return NextResponse.json({ msg: "Internal server error" }, { status: 500 }); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } diff --git a/src/app/api/tasks/route.ts b/src/app/api/tasks/route.ts index dc6e830..d11a239 100644 --- a/src/app/api/tasks/route.ts +++ b/src/app/api/tasks/route.ts @@ -1,61 +1,7 @@ import { db } from "@/app/db/db"; import { tasks } from "@/app/db/schema/tasks/tasks"; -import { subTasks } from "@/app/db/schema/tasks/subtask"; import { desc, count } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; -import { getServerSession } from "next-auth"; -import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; -import { z } from "zod"; - -const CreateTaskSchema = z.object({ - taskSource: z.string().min(1), - service: z.string().optional(), - source: z.literal("portfolio_id").optional(), - sourceId: z.string().optional(), - inputs: z.record(z.unknown()).optional(), -}); - -export async function POST(request: NextRequest) { - const session = await getServerSession(AuthOptions); - if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); - - let body; - try { - body = CreateTaskSchema.parse(await request.json()); - } catch { - return NextResponse.json({ error: "Invalid input" }, { status: 400 }); - } - - try { - const now = new Date(); - - const [task] = await db - .insert(tasks) - .values({ - taskSource: body.taskSource, - service: body.service, - source: body.source, - sourceId: body.sourceId, - status: "waiting", - jobStarted: now, - }) - .returning(); - - const [subTask] = await db - .insert(subTasks) - .values({ - taskId: task.id, - status: "waiting", - inputs: body.inputs ? JSON.stringify(body.inputs) : null, - }) - .returning(); - - return NextResponse.json({ taskId: task.id, subTaskId: subTask.id }, { status: 201 }); - } catch (error) { - console.error("Failed to create task:", error); - return NextResponse.json({ error: "Internal server error" }, { status: 500 }); - } -} export async function GET(request: NextRequest) { try { diff --git a/src/app/api/upload/bulk-addresses/confirm/route.ts b/src/app/api/upload/bulk-addresses/confirm/route.ts index 8edaab4..80e94e4 100644 --- a/src/app/api/upload/bulk-addresses/confirm/route.ts +++ b/src/app/api/upload/bulk-addresses/confirm/route.ts @@ -22,13 +22,13 @@ export async function POST(request: NextRequest) { body = BodySchema.parse(await request.json()); } catch (error) { console.error("Invalid input:", error); - return NextResponse.json({ msg: "Invalid input" }, { status: 400 }); + return NextResponse.json({ error: "Invalid input" }, { status: 400 }); } const bucket = process.env.RETROFIT_PLAN_INPUT_BUCKET_NAME; if (!bucket) { console.error("RETROFIT_PLAN_INPUT_BUCKET_NAME not set"); - return NextResponse.json({ msg: "Server misconfiguration" }, { status: 500 }); + return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 }); } try { @@ -50,6 +50,6 @@ export async function POST(request: NextRequest) { ); } catch (error) { console.error("Failed to record upload:", error); - return NextResponse.json({ msg: "Internal server error" }, { status: 500 }); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } diff --git a/src/app/api/upload/bulk-addresses/route.ts b/src/app/api/upload/bulk-addresses/route.ts index 9eb1c41..a54966f 100644 --- a/src/app/api/upload/bulk-addresses/route.ts +++ b/src/app/api/upload/bulk-addresses/route.ts @@ -15,7 +15,7 @@ export async function POST(request: NextRequest) { body = BodySchema.parse(await request.json()); } catch (error) { console.error("Invalid input:", error); - return NextResponse.json({ msg: "Invalid input" }, { status: 400 }); + return NextResponse.json({ error: "Invalid input" }, { status: 400 }); } try { @@ -31,6 +31,6 @@ export async function POST(request: NextRequest) { return NextResponse.json({ url: preSignedUrl }, { status: 200 }); } catch (error) { console.error(error); - return NextResponse.json({ msg: "Internal server error" }, { status: 500 }); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } diff --git a/src/app/components/portfolio/BulkUploadComingSoonModal.tsx b/src/app/components/portfolio/BulkUploadComingSoonModal.tsx index a416f00..ab19c19 100644 --- a/src/app/components/portfolio/BulkUploadComingSoonModal.tsx +++ b/src/app/components/portfolio/BulkUploadComingSoonModal.tsx @@ -22,6 +22,7 @@ import { } from "@heroicons/react/24/outline"; import { useSession } from "next-auth/react"; import { useRouter } from "next/navigation"; +import { useCreateBulkUpload } from "@/lib/bulkUpload/client"; const MAX_FILE_SIZE_MB = 50; const ALLOWED_EXTENSIONS = [".csv", ".xlsx", ".xls"]; @@ -99,22 +100,6 @@ async function validateHeaders(file: File): Promise<{ error: string | null; head return { error: null, headers }; } -async function getPresignedUrl( - userId: string, - portfolioId: string, - fileKey: string, - contentType: string -): Promise { - const res = await fetch("/api/upload/bulk-addresses", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ userId, portfolioId, fileKey, contentType }), - }); - if (!res.ok) throw new Error("Failed to generate upload URL."); - const data = await res.json(); - return data.url; -} - export default function BulkUploadComingSoonModal({ isOpen, onClose, @@ -123,18 +108,22 @@ export default function BulkUploadComingSoonModal({ const session = useSession(); const router = useRouter(); const fileInputRef = useRef(null); + const createUpload = useCreateBulkUpload(); const [isDragging, setIsDragging] = useState(false); const [selectedFile, setSelectedFile] = useState(null); const [sourceHeaders, setSourceHeaders] = useState([]); const [validationError, setValidationError] = useState(null); const [validating, setValidating] = useState(false); - const [uploading, setUploading] = useState(false); const [uploadProgress, setUploadProgress] = useState(null); - const [uploadError, setUploadError] = useState(null); + + const uploading = createUpload.isPending; + const uploadError = createUpload.error + ? "Upload failed. Please try again, or contact a Domna representative if the issue persists." + : null; async function handleFile(file: File) { - setUploadError(null); + createUpload.reset(); setSelectedFile(null); setValidationError(null); @@ -183,60 +172,38 @@ export default function BulkUploadComingSoonModal({ setSourceHeaders([]); setValidationError(null); setValidating(false); - setUploadError(null); - setUploading(false); setUploadProgress(null); + createUpload.reset(); onClose(); } - async function handleUpload() { + function handleUpload() { const userId = String(session.data?.user?.dbId ?? ""); if (!selectedFile || !userId) return; - setUploading(true); + const ext = getFileExtension(selectedFile.name); + const contentType = CONTENT_TYPES[ext] ?? "application/octet-stream"; + const fileKey = generateS3Key(userId, portfolioId, ext); setUploadProgress(0); - setUploadError(null); - try { - const ext = getFileExtension(selectedFile.name); - const contentType = CONTENT_TYPES[ext] ?? "application/octet-stream"; - const fileKey = generateS3Key(userId, portfolioId, ext); - - const presignedUrl = await getPresignedUrl(userId, portfolioId, fileKey, contentType); - - await new Promise((resolve, reject) => { - const xhr = new XMLHttpRequest(); - xhr.open("PUT", presignedUrl); - xhr.setRequestHeader("Content-Type", contentType); - xhr.upload.addEventListener("progress", (e) => { - if (e.lengthComputable) { - setUploadProgress(Math.round((e.loaded / e.total) * 100)); - } - }); - xhr.onload = () => { - if (xhr.status >= 200 && xhr.status < 300) resolve(); - else reject(new Error(`S3 upload failed: ${xhr.status}`)); - }; - xhr.onerror = () => reject(new Error("Network error during upload")); - xhr.send(selectedFile); - }); - - const confirmRes = await fetch("/api/upload/bulk-addresses/confirm", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ fileKey, filename: selectedFile.name, portfolioId, userId, sourceHeaders }), - }); - if (!confirmRes.ok) throw new Error("Failed to record upload."); - - const { id: uploadId } = await confirmRes.json(); - router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}/map-columns`); - onClose(); - } catch (err) { - setUploadError("Upload failed. Please try again, or contact a Domna representative if the issue persists."); - } finally { - setUploading(false); - setUploadProgress(null); - } + createUpload.mutate( + { + file: selectedFile, + portfolioId, + userId, + sourceHeaders, + contentType, + fileKey, + onProgress: setUploadProgress, + }, + { + onSuccess: ({ id: uploadId }) => { + router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}/map-columns`); + onClose(); + }, + onSettled: () => setUploadProgress(null), + } + ); } const canUpload = !!selectedFile && !uploading && !validating; diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx index 68160eb..3824b1b 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx @@ -1,172 +1,37 @@ "use client"; -import { useState, useSyncExternalStore } from "react"; import { useRouter } from "next/navigation"; import Link from "next/link"; - -interface TaskData { - id: string; - taskSource: string; - status: string; - totalSubtasks: number; - completedSubtasks: number; - failedSubtasks: number; -} - -interface UploadStatus { - status: string; - combinedOutputS3Uri: string | null; -} +import { ArrowRightIcon } from "@heroicons/react/24/outline"; +import { + useBulkUploadProgress, + useFinalize, + useRequestCombine, +} from "@/lib/bulkUpload/client"; interface Props { - taskId: string; portfolioSlug: string; portfolioId: string; uploadId: string; isDomnaUser: boolean; } -interface Snapshot { - data: TaskData | null; - uploadStatus: UploadStatus | null; - fetchError: boolean; - finalizeError: string | null; -} - -const TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]); -const FAILED_STATUSES = new Set(["failed", "failure", "error"]); -const FINAL_UPLOAD_STATUSES = new Set(["complete", "needs_review"]); - -function createProgressStore(args: { - taskId: string; - portfolioId: string; - uploadId: string; - onComplete: () => void; -}) { - let snapshot: Snapshot = { - data: null, - uploadStatus: null, - fetchError: false, - finalizeError: null, - }; - const listeners = new Set<() => void>(); - let intervalId: ReturnType | null = null; - let combineFired = false; - let finalizeFired = false; - let refreshed = false; - - function emit(patch: Partial) { - snapshot = { ...snapshot, ...patch }; - listeners.forEach((l) => l()); - } - - async function fireFinalize() { - try { - const res = await fetch( - `/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}/finalize`, - { method: "POST" } - ); - if (!res.ok) { - const body = await res.json().catch(() => ({})); - const msg = - body?.detail || body?.error || `Finalize failed (${res.status})`; - emit({ finalizeError: msg }); - finalizeFired = false; - } - } catch (err) { - console.error("Failed to trigger finalize:", err); - emit({ finalizeError: err instanceof Error ? err.message : "Network error" }); - finalizeFired = false; - } - } - - async function poll() { - try { - const res = await fetch(`/api/tasks/${args.taskId}/summary`); - if (!res.ok) { emit({ fetchError: true }); return; } - const json: TaskData = await res.json(); - emit({ data: json }); - const status = json.status.toLowerCase(); - - if (TERMINAL_STATUSES.has(status)) { - if (!FAILED_STATUSES.has(status) && !combineFired) { - combineFired = true; - fetch(`/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}/combine`, { - method: "POST", - }).catch((err) => console.error("Failed to trigger combiner:", err)); - } - - const uploadRes = await fetch( - `/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}` - ); - if (uploadRes.ok) { - const upload: UploadStatus = await uploadRes.json(); - emit({ uploadStatus: upload }); - - if (upload.status === "awaiting_review" && !finalizeFired) { - finalizeFired = true; - fireFinalize(); - } - - if (FINAL_UPLOAD_STATUSES.has(upload.status) && !refreshed) { - refreshed = true; - if (intervalId) clearInterval(intervalId); - intervalId = null; - args.onComplete(); - return; - } - } - } - } catch { - emit({ fetchError: true }); - } - } - - return { - subscribe(listener: () => void) { - listeners.add(listener); - if (listeners.size === 1 && intervalId === null && !refreshed) { - poll(); - intervalId = setInterval(poll, 3000); - } - return () => { - listeners.delete(listener); - if (listeners.size === 0 && intervalId !== null) { - clearInterval(intervalId); - intervalId = null; - } - }; - }, - getSnapshot: () => snapshot, - retryFinalize() { - emit({ finalizeError: null }); - finalizeFired = true; - fireFinalize(); - }, - }; -} +const TASK_TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]); +const TASK_FAILED_STATUSES = new Set(["failed", "failure", "error"]); export default function OnboardingProgress({ - taskId, portfolioSlug, portfolioId, uploadId, isDomnaUser, }: Props) { const router = useRouter(); - const [store] = useState(() => - createProgressStore({ - taskId, - portfolioId, - uploadId, - onComplete: () => router.refresh(), - }) - ); - const snap = useSyncExternalStore(store.subscribe, store.getSnapshot, store.getSnapshot); - const { data, uploadStatus, fetchError, finalizeError } = snap; + const progress = useBulkUploadProgress(portfolioId, uploadId); + const combine = useRequestCombine(portfolioId, uploadId); + const finalize = useFinalize(portfolioId, uploadId); - if (fetchError) return null; - if (!data) { + if (progress.isError) return null; + if (!progress.data) { return (
@@ -175,22 +40,26 @@ export default function OnboardingProgress({ ); } - const total = data.totalSubtasks; - const complete = data.completedSubtasks; - const failed = data.failedSubtasks; - const percent = total > 0 ? Math.round((complete / total) * 100) : 0; - const taskDone = TERMINAL_STATUSES.has(data.status.toLowerCase()); - const isFailed = FAILED_STATUSES.has(data.status.toLowerCase()); - const isCombining = - taskDone && !isFailed && uploadStatus?.status === "combining"; - const isImporting = - taskDone && !isFailed && uploadStatus?.status === "awaiting_review"; + const { task, upload } = progress.data; + const total = task?.totalSubtasks ?? 0; + const completedSubtasks = task?.completedSubtasks ?? 0; + const failedSubtasks = task?.failedSubtasks ?? 0; + const percent = total > 0 ? Math.round((completedSubtasks / total) * 100) : 0; + + const taskStatus = task?.status.toLowerCase() ?? ""; + const taskDone = TASK_TERMINAL_STATUSES.has(taskStatus); + const taskFailed = TASK_FAILED_STATUSES.has(taskStatus); + const isCombining = upload.status === "combining"; + const isImporting = upload.status === "awaiting_review"; + + const canRunCombiner = taskDone && !taskFailed && upload.status === "processing"; + const canFinalize = upload.status === "awaiting_review"; return (
0 ? `${percent}%` : "4%" }} />
@@ -198,13 +67,13 @@ export default function OnboardingProgress({
{total > 0 && ( - {complete} / {total} batches complete + {completedSubtasks} / {total} batches complete )} - {failed > 0 && ( + {failedSubtasks > 0 && ( - {failed} failed + {failedSubtasks} failed )} {!taskDone && ( @@ -222,24 +91,41 @@ export default function OnboardingProgress({ {isImporting && ( - Importing to portfolio… + Awaiting import )}
- {finalizeError && ( -
-
-

Import failed

-

{finalizeError}

-
- + {(canRunCombiner || canFinalize) && ( +
+ {canRunCombiner && ( + combine.mutate()} + /> + )} + {canFinalize && ( + + finalize.mutate(undefined, { onSuccess: () => router.refresh() }) + } + /> + )} +
+ )} + + {combine.error && ( +

{combine.error.message}

+ )} + {finalize.error && ( +
+

Import failed

+

{finalize.error.message}

)} @@ -254,3 +140,37 @@ export default function OnboardingProgress({
); } + +function StageButton({ + label, + activeLabel, + isPending, + onClick, +}: { + label: string; + activeLabel: string; + isPending: boolean; + onClick: () => void; +}) { + return ( + + ); +} diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/StartAddressMatchingButton.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/StartAddressMatchingButton.tsx index 8f12612..afd6117 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/StartAddressMatchingButton.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/StartAddressMatchingButton.tsx @@ -1,8 +1,8 @@ "use client"; import { useRouter } from "next/navigation"; -import { useMutation } from "@tanstack/react-query"; import { ArrowRightIcon } from "@heroicons/react/24/outline"; +import { useStartAddressMatching } from "@/lib/bulkUpload/client"; interface Props { portfolioId: string; @@ -10,53 +10,14 @@ interface Props { filename: string; } -export default function StartAddressMatchingButton({ portfolioId, uploadId, filename }: Props) { +export default function StartAddressMatchingButton({ portfolioId, uploadId }: Props) { const router = useRouter(); - - const { mutate, isPending, error } = useMutation({ - mutationFn: async () => { - const taskRes = await fetch("/api/tasks", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - taskSource: `Address Onboarding – ${filename}`, - service: "address2uprn", - source: "portfolio_id", - sourceId: portfolioId, - inputs: { bulk_upload_id: uploadId }, - }), - }); - - if (!taskRes.ok) { - const data = await taskRes.json().catch(() => ({})); - throw new Error(data.error ?? "Failed to create task"); - } - - const { taskId, subTaskId } = await taskRes.json(); - - const matchRes = await fetch( - `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/start-address-matching`, - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ taskId, subTaskId }), - } - ); - - if (!matchRes.ok) { - const data = await matchRes.json().catch(() => ({})); - throw new Error(data.error ?? "Failed to start address matching"); - } - }, - onSuccess: () => { - router.refresh(); - }, - }); + const { mutate, isPending, error } = useStartAddressMatching(portfolioId, uploadId); return (
{error && (

- {error instanceof Error ? error.message : "Something went wrong"} + {error.message}

)}
diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/map-columns/MapColumnsClient.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/map-columns/MapColumnsClient.tsx index 1915282..adda7d3 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/map-columns/MapColumnsClient.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/map-columns/MapColumnsClient.tsx @@ -9,6 +9,7 @@ import { TableCellsIcon, ArrowsRightLeftIcon, } from "@heroicons/react/24/outline"; +import { useSetColumnMapping } from "@/lib/bulkUpload/client"; const INTERNAL_FIELDS = [ { value: "address_1", label: "Address 1", required: true }, @@ -61,42 +62,28 @@ export default function MapColumnsClient({ const [mapping, setMapping] = useState>( buildInitialMapping(sourceHeaders, existingMapping) ); - const [submitting, setSubmitting] = useState(false); - const [error, setError] = useState(null); + const setMappingMutation = useSetColumnMapping(portfolioId, uploadId); const mappedValues = Object.values(mapping).filter((v) => v !== "skip"); const missingRequired = REQUIRED_VALUES.filter((r) => !mappedValues.includes(r)); + const submitting = setMappingMutation.isPending; + const error = setMappingMutation.error?.message ?? null; const canSubmit = missingRequired.length === 0 && !submitting; function setField(header: string, value: string) { setMapping((prev) => ({ ...prev, [header]: value })); } - async function handleSubmit() { + function handleSubmit() { if (!canSubmit) return; - setSubmitting(true); - setError(null); - - try { - const res = await fetch( - `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}`, - { - method: "PATCH", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ columnMapping: mapping }), - } - ); - - if (!res.ok) { - const data = await res.json().catch(() => ({})); - throw new Error(data.msg ?? "Failed to save mapping."); + setMappingMutation.mutate( + { columnMapping: mapping }, + { + onSuccess: () => { + router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}`); + }, } - - router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}`); - } catch (err) { - setError(err instanceof Error ? err.message : "Something went wrong."); - setSubmitting(false); - } + ); } return ( diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx index d743097..f8f1a35 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx @@ -181,7 +181,6 @@ export default async function BulkUploadDetailPage(props: { statusKey === "failed") && upload.taskId && ( { + const body = await res.json().catch(() => ({})); + return new Error(body?.error ?? fallback); +} + +export type CreateBulkUploadInput = { + file: File; + portfolioId: string; + userId: string; + sourceHeaders: string[]; + contentType: string; + fileKey: string; + onProgress?: (percent: number) => void; +}; + +export type CreateBulkUploadResult = { + id: string; + s3Key: string; + s3Bucket: string; + status: string; +}; + +export function useCreateBulkUpload() { + return useMutation({ + mutationFn: async (input) => { + const presignRes = await fetch("/api/upload/bulk-addresses", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + userId: input.userId, + portfolioId: input.portfolioId, + fileKey: input.fileKey, + contentType: input.contentType, + }), + }); + if (!presignRes.ok) throw await parseError(presignRes, "Failed to generate upload URL."); + const { url: presignedUrl } = await presignRes.json(); + + await new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.open("PUT", presignedUrl); + xhr.setRequestHeader("Content-Type", input.contentType); + if (input.onProgress) { + xhr.upload.addEventListener("progress", (e) => { + if (e.lengthComputable) { + input.onProgress!(Math.round((e.loaded / e.total) * 100)); + } + }); + } + xhr.onload = () => { + if (xhr.status >= 200 && xhr.status < 300) resolve(); + else reject(new Error(`S3 upload failed: ${xhr.status}`)); + }; + xhr.onerror = () => reject(new Error("Network error during upload")); + xhr.send(input.file); + }); + + const confirmRes = await fetch("/api/upload/bulk-addresses/confirm", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + fileKey: input.fileKey, + filename: input.file.name, + portfolioId: input.portfolioId, + userId: input.userId, + sourceHeaders: input.sourceHeaders, + }), + }); + if (!confirmRes.ok) throw await parseError(confirmRes, "Failed to record upload."); + return confirmRes.json(); + }, + }); +} + +export function useSetColumnMapping(portfolioId: string, uploadId: string) { + const queryClient = useQueryClient(); + return useMutation }>({ + mutationFn: async (input) => { + const res = await fetch(`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}`, { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(input), + }); + if (!res.ok) throw await parseError(res, "Failed to save mapping."); + return res.json(); + }, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) }); + }, + }); +} + +export function useStartAddressMatching(portfolioId: string, uploadId: string) { + const queryClient = useQueryClient(); + return useMutation<{ taskId: string }, Error, void>({ + mutationFn: async () => { + const res = await fetch( + `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/start-address-matching`, + { method: "POST" }, + ); + if (!res.ok) throw await parseError(res, "Failed to start address matching."); + return res.json(); + }, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) }); + }, + }); +} + +export function useBulkUploadProgress(portfolioId: string, uploadId: string) { + return useQuery({ + queryKey: bulkUploadKeys.progress(uploadId), + queryFn: async () => { + const res = await fetch( + `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/progress`, + ); + if (!res.ok) throw await parseError(res, "Failed to load progress."); + return res.json(); + }, + refetchInterval: (data) => { + const status = data?.upload.status; + return status && isTerminalStatus(status) ? false : 3000; + }, + }); +} + +export function useRequestCombine(portfolioId: string, uploadId: string) { + const queryClient = useQueryClient(); + return useMutation({ + mutationFn: async () => { + const res = await fetch( + `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/combine`, + { method: "POST" }, + ); + if (!res.ok) throw await parseError(res, "Failed to start combiner."); + return res.json(); + }, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) }); + }, + }); +} + +export type FinalizeResult = { + inserted?: number; + missingUprnCount?: number; + duplicateUprnCount?: number; + status?: string; + alreadyComplete?: boolean; +}; + +export function useFinalize(portfolioId: string, uploadId: string) { + const queryClient = useQueryClient(); + return useMutation({ + mutationFn: async () => { + const res = await fetch( + `/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/finalize`, + { method: "POST" }, + ); + if (!res.ok) throw await parseError(res, "Finalize failed."); + return res.json(); + }, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) }); + }, + }); +} diff --git a/src/lib/bulkUpload/keys.ts b/src/lib/bulkUpload/keys.ts new file mode 100644 index 0000000..1f77cb9 --- /dev/null +++ b/src/lib/bulkUpload/keys.ts @@ -0,0 +1,4 @@ +export const bulkUploadKeys = { + all: ["bulkUpload"] as const, + progress: (uploadId: string) => ["bulkUpload", uploadId, "progress"] as const, +}; diff --git a/src/lib/bulkUpload/server.ts b/src/lib/bulkUpload/server.ts new file mode 100644 index 0000000..a4ce2d5 --- /dev/null +++ b/src/lib/bulkUpload/server.ts @@ -0,0 +1,277 @@ +import { db } from "@/app/db/db"; +import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; +import { tasks } from "@/app/db/schema/tasks/tasks"; +import { subTasks } from "@/app/db/schema/tasks/subtask"; +import { count, desc, eq, sql } from "drizzle-orm"; +import type { BulkUpload, BulkUploadStatus, ProgressView, TaskSummary } from "./types"; + +const REMAP_ALLOWED: ReadonlySet = new Set([ + "ready_for_processing", + "mapping_complete", +]); + +type FastApiTriggerArgs = { + endpoint: string; + payload: Record; + sessionToken: string | undefined; +}; + +type FastApiTriggerResult = { ok: true } | { ok: false; status: number; message: string }; + +async function triggerFastApiPipeline(args: FastApiTriggerArgs): Promise { + const url = process.env.FASTAPI_API_URL; + const key = process.env.FASTAPI_API_KEY; + if (!url || !key) { + console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set"); + return { ok: false, status: 500, message: "Server misconfiguration" }; + } + + try { + const res = await fetch(`${url}${args.endpoint}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": key, + Authorization: `Bearer ${args.sessionToken}`, + }, + body: JSON.stringify(args.payload), + }); + if (!res.ok) { + const errText = await res.text().catch(() => ""); + console.error(`FastAPI ${args.endpoint} failed:`, res.status, errText); + return { ok: false, status: 502, message: "Pipeline trigger failed" }; + } + return { ok: true }; + } catch (err) { + console.error(`Failed to reach FastAPI ${args.endpoint}:`, err); + return { ok: false, status: 502, message: "Pipeline trigger failed" }; + } +} + +async function loadById(uploadId: string): Promise { + const [row] = await db + .select() + .from(bulkAddressUploads) + .where(eq(bulkAddressUploads.id, uploadId)) + .limit(1); + return row ?? null; +} + +export async function listForPortfolio(portfolioId: string): Promise { + return db + .select() + .from(bulkAddressUploads) + .where(eq(bulkAddressUploads.portfolioId, portfolioId)) + .orderBy(desc(bulkAddressUploads.createdAt)); +} + +async function loadTaskSummary(taskId: string): Promise { + const [row] = await db + .select({ + id: tasks.id, + taskSource: tasks.taskSource, + status: tasks.status, + service: tasks.service, + jobStarted: tasks.jobStarted, + jobCompleted: tasks.jobCompleted, + updatedAt: tasks.updatedAt, + totalSubtasks: count(subTasks.id), + completedSubtasks: sql`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`, + failedSubtasks: sql`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`, + }) + .from(tasks) + .leftJoin(subTasks, eq(subTasks.taskId, tasks.id)) + .where(eq(tasks.id, taskId)) + .groupBy(tasks.id) + .limit(1); + return row ?? null; +} + +export async function getProgressView(uploadId: string): Promise { + const upload = await loadById(uploadId); + if (!upload) return null; + const task = upload.taskId ? await loadTaskSummary(upload.taskId) : null; + return { upload, task }; +} + +function validateMapping(mapping: Record): string | null { + const values = Object.values(mapping); + if (!values.includes("address_1")) return "Mapping must include address_1."; + if (!values.includes("postcode")) return "Mapping must include postcode."; + return null; +} + +export type SetMappingOutcome = + | { kind: "ok"; upload: BulkUpload } + | { kind: "not_found" } + | { kind: "invalid_status"; current: string } + | { kind: "invalid_mapping"; reason: string }; + +export async function setColumnMapping( + uploadId: string, + mapping: Record, +): Promise { + const upload = await loadById(uploadId); + if (!upload) return { kind: "not_found" }; + if (!REMAP_ALLOWED.has(upload.status as BulkUploadStatus)) + return { kind: "invalid_status", current: upload.status }; + + const reason = validateMapping(mapping); + if (reason) return { kind: "invalid_mapping", reason }; + + const [updated] = await db + .update(bulkAddressUploads) + .set({ columnMapping: mapping, status: "mapping_complete" }) + .where(eq(bulkAddressUploads.id, uploadId)) + .returning(); + if (!updated) return { kind: "not_found" }; + return { kind: "ok", upload: updated }; +} + +export type LoadForAddressMatchingOutcome = + | { kind: "ok"; upload: BulkUpload } + | { kind: "not_found" } + | { kind: "wrong_state"; current: string } + | { kind: "missing_mapping" }; + +export async function loadForAddressMatching( + uploadId: string, +): Promise { + const upload = await loadById(uploadId); + if (!upload) return { kind: "not_found" }; + if (upload.status !== "mapping_complete") + return { kind: "wrong_state", current: upload.status }; + if (!upload.columnMapping) return { kind: "missing_mapping" }; + return { kind: "ok", upload }; +} + +export type TriggerAddressMatchingOutcome = + | { kind: "ok"; taskId: string } + | { kind: "trigger_failed"; status: number; message: string }; + +export async function triggerAddressMatching(args: { + uploadId: string; + s3Uri: string; + sessionToken: string | undefined; +}): Promise { + const upload = await loadById(args.uploadId); + if (!upload) return { kind: "trigger_failed", status: 404, message: "Upload not found" }; + + const now = new Date(); + const [task] = await db + .insert(tasks) + .values({ + taskSource: `Address Onboarding – ${upload.filename}`, + service: "address2uprn", + source: "portfolio_id", + sourceId: upload.portfolioId, + status: "waiting", + jobStarted: now, + }) + .returning(); + const [subTask] = await db + .insert(subTasks) + .values({ + taskId: task.id, + status: "waiting", + inputs: JSON.stringify({ bulk_upload_id: args.uploadId }), + }) + .returning(); + + const payload = { + task_id: task.id, + sub_task_id: subTask.id, + s3_uri: args.s3Uri, + }; + + const trigger = await triggerFastApiPipeline({ + endpoint: "/v1/bulk-uploads/trigger-postcode-splitter", + payload, + sessionToken: args.sessionToken, + }); + if (!trigger.ok) + return { kind: "trigger_failed", status: trigger.status, message: trigger.message }; + + await Promise.all([ + db + .update(bulkAddressUploads) + .set({ status: "processing", taskId: task.id }) + .where(eq(bulkAddressUploads.id, args.uploadId)), + db + .update(tasks) + .set({ status: "in progress" }) + .where(eq(tasks.id, task.id)), + db + .update(subTasks) + .set({ inputs: JSON.stringify(payload) }) + .where(eq(subTasks.id, subTask.id)), + ]); + return { kind: "ok", taskId: task.id }; +} + +export type CombineRetriggerOutcome = + | { kind: "triggered"; taskId: string; subTaskId: string } + | { kind: "already_combined" } + | { kind: "not_found" } + | { kind: "missing_task" } + | { kind: "trigger_failed"; status: number; message: string }; + +export async function requestCombineRetrigger(args: { + uploadId: string; + sessionToken: string | undefined; +}): Promise { + const upload = await loadById(args.uploadId); + if (!upload) return { kind: "not_found" }; + if (!upload.taskId) return { kind: "missing_task" }; + if (upload.combinedOutputS3Uri) return { kind: "already_combined" }; + + const [subTask] = await db + .insert(subTasks) + .values({ taskId: upload.taskId, status: "waiting" }) + .returning(); + + const payload = { task_id: upload.taskId, sub_task_id: subTask.id }; + + const trigger = await triggerFastApiPipeline({ + endpoint: "/v1/bulk-uploads/trigger-combiner", + payload, + sessionToken: args.sessionToken, + }); + if (!trigger.ok) + return { kind: "trigger_failed", status: trigger.status, message: trigger.message }; + + await db + .update(subTasks) + .set({ inputs: JSON.stringify(payload) }) + .where(eq(subTasks.id, subTask.id)); + + return { kind: "triggered", taskId: upload.taskId, subTaskId: subTask.id }; +} + +export type LoadForFinalizeOutcome = + | { kind: "ready"; upload: BulkUpload } + | { kind: "already_finalized"; status: "complete" | "needs_review" } + | { kind: "not_found" } + | { kind: "not_yet_combined" } + | { kind: "wrong_state"; current: string }; + +export async function loadForFinalize(uploadId: string): Promise { + const upload = await loadById(uploadId); + if (!upload) return { kind: "not_found" }; + if (upload.status === "complete" || upload.status === "needs_review") + return { kind: "already_finalized", status: upload.status }; + if (upload.status !== "awaiting_review") + return { kind: "wrong_state", current: upload.status }; + if (!upload.combinedOutputS3Uri) return { kind: "not_yet_combined" }; + return { kind: "ready", upload }; +} + +export async function markFinalized( + uploadId: string, + args: { needsReview: boolean }, +): Promise { + await db + .update(bulkAddressUploads) + .set({ status: args.needsReview ? "needs_review" : "complete" }) + .where(eq(bulkAddressUploads.id, uploadId)); +} diff --git a/src/lib/bulkUpload/types.ts b/src/lib/bulkUpload/types.ts new file mode 100644 index 0000000..1559f52 --- /dev/null +++ b/src/lib/bulkUpload/types.ts @@ -0,0 +1,44 @@ +import type { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; + +export const BULK_UPLOAD_STATUSES = [ + "ready_for_processing", + "mapping_complete", + "processing", + "combining", + "awaiting_review", + "complete", + "needs_review", + "failed", +] as const; + +export type BulkUploadStatus = (typeof BULK_UPLOAD_STATUSES)[number]; + +export type BulkUpload = typeof bulkAddressUploads.$inferSelect; + +export type TaskSummary = { + id: string; + taskSource: string; + status: string; + service: string | null; + jobStarted: Date | null; + jobCompleted: Date | null; + updatedAt: Date; + totalSubtasks: number; + completedSubtasks: number; + failedSubtasks: number; +}; + +export type ProgressView = { + upload: BulkUpload; + task: TaskSummary | null; +}; + +const TERMINAL_UPLOAD_STATUSES: ReadonlySet = new Set([ + "complete", + "needs_review", + "failed", +]); + +export function isTerminalStatus(status: string): boolean { + return TERMINAL_UPLOAD_STATUSES.has(status as BulkUploadStatus); +} diff --git a/src/lib/session.ts b/src/lib/session.ts new file mode 100644 index 0000000..92f04dd --- /dev/null +++ b/src/lib/session.ts @@ -0,0 +1,8 @@ +import { NextRequest } from "next/server"; + +export function readSessionToken(request: NextRequest): string | undefined { + return ( + request.cookies.get("__Secure-next-auth.session-token")?.value ?? + request.cookies.get("next-auth.session-token")?.value + ); +}