refactored with improve-code-archievture and grill-with-docs

This commit is contained in:
Jun-te Kim 2026-05-06 15:49:34 +00:00
parent f98a740ee7
commit 8170601efb
27 changed files with 924 additions and 619 deletions

View file

@ -7,13 +7,8 @@ ARG DEBIAN_FRONTEND=noninteractive
# Base CLI tooling (sudo, git, ripgrep/fd for editors, etc.).
RUN apt update && apt install -y --no-install-recommends \
<<<<<<< HEAD
sudo jq vim curl bash-completion iputils-ping \
&& apt autoremove -y \
=======
sudo jq vim curl bash-completion \
ripgrep fd-find git make unzip \
>>>>>>> main
&& rm -rf /var/lib/apt/lists/*
# Passwordless-sudo dev user (UID/GID injected from the host via compose).

View file

@ -2,10 +2,10 @@
## React
- **Avoid `useEffect` and `useMemo`.** Derive values inline, use Server Components + Route Handlers, event handlers, or `useSyncExternalStore` instead. If a hook is genuinely the only option, flag it and ask before using it.
- **Avoid `useEffect` and `useMemo`.** Derive values inline, use Server Components + Route Handlers, event handlers. If a hook is genuinely the only option, flag it and ask before using it.
- Instead of raw fetch use reactQuery to allow handling of mutations
## Next.js 15 route handlers
- `params` is a `Promise` — type as `{ params: Promise<{ ... }> }` and `await params` before destructuring.

74
CONTEXT.md Normal file
View file

@ -0,0 +1,74 @@
# Context
This document captures the domain language used in this project. Terms here are the **canonical** ones — when more than one word exists for a concept, we pick one and treat the others as aliases to avoid.
This file grows as terms are resolved during design conversations. Concepts that haven't been examined yet are not listed.
## Language
### Bulk upload
**BulkUpload**:
A user-supplied spreadsheet of addresses for a Portfolio, transformed and matched to UPRNs before being inserted as Properties. Has an explicit lifecycle from upload through finalisation.
_Avoid_: import, batch, file upload, ingest
**ColumnMapping**:
The user's declaration of which spreadsheet column means what (e.g. column "Property Address" means `address_1`). Stored as JSON on the BulkUpload row.
_Avoid_: schema, header map, field mapping
**UPRN**:
Unique Property Reference Number — the UK national identifier for an address. Address matching attaches a UPRN to each row where possible.
**Address matching**:
The pipeline stage that splits the source file by postcode, looks up UPRNs, and produces matched-address output. Triggered via FastAPI.
_Avoid_: postcode lookup, address resolution, address lookup
**Combiner**:
The pipeline stage that aggregates the per-postcode address-matching outputs into a single combined CSV in S3, ready for review.
_Avoid_: aggregator, merger
**Finalise**:
The terminal action that reads the combiner output, inserts rows as Properties on the Portfolio, and decides whether the BulkUpload needs further review.
_Avoid_: import, commit, ingest
## Lifecycle
A **BulkUpload** moves through these statuses:
```
ready_for_processing
→ mapping_complete (user submits ColumnMapping; Next.js writes)
→ processing (Address matching triggered; Next.js writes)
→ combining (Combiner stage running; FastAPI writes directly)
→ awaiting_review (Combiner output in S3; FastAPI writes directly)
→ complete (Finalise succeeded, no row issues; Next.js writes)
→ needs_review (Finalise succeeded, missing or duplicate UPRNs; Next.js writes)
→ failed (FastAPI reports in-flight failure — schema only, not yet wired)
```
`complete`, `needs_review`, and `failed` are terminal.
Re-mapping (PATCHing `columnMapping`) is legal only in `ready_for_processing` and `mapping_complete`. Any later state rejects with 409.
**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the terminal Finalise outcomes. FastAPI owns `combining` and `awaiting_review` — writing them direct to the DB during the combiner run. The BulkUpload aggregate observes both.
See [ADR-0001](./docs/adr/0001-bulk-upload-state-machine.md) for the deliberate "not yet" decisions baked into this lifecycle.
## Relationships
- A **Portfolio** has many **BulkUploads**.
- A **BulkUpload** produces zero or more **Properties** when finalised.
- A **BulkUpload** has at most one **Task** (the orchestration handle for the FastAPI pipeline run); a Task has many **SubTasks** (one per pipeline stage: address matching, combiner).
## Example dialogue
> **Dev:** "If the **Combiner** finishes but the user hasn't clicked Finalise, what does the user see?"
> **Domain expert:** "The BulkUpload sits in `awaiting_review`. The frontend polls and shows a 'review and confirm' button. Nothing's been written to **Properties** yet."
>
> **Dev:** "And if **Finalise** runs and 30% of rows have no **UPRN**?"
> **Domain expert:** "Those still get imported as **Properties** — just without a UPRN — and the BulkUpload moves to `needs_review`. The `complete` state is only for clean runs."
## Flagged ambiguities
- "Upload" is used in the codebase to mean both the file-on-S3 and the BulkUpload row. We standardise on **BulkUpload** for the row; the file is just "the source file."
- "Onboarding" appears in some route paths (`bulk_onboarding_inputs/...`) but isn't part of this glossary — we use **BulkUpload** end-to-end.

View file

@ -0,0 +1,20 @@
# BulkUpload state machine (v1)
The BulkUpload lifecycle is:
```
ready_for_processing → mapping_complete → processing → combining → awaiting_review
→ complete | needs_review | failed
```
Two writers: Next.js writes the user-driven transitions (`mapping_complete`, `processing`, `complete`, `needs_review`); the FastAPI `bulk_address2uprn_combiner` worker writes `combining` and `awaiting_review` directly to the DB during its run. Any aggregate enforcing the state machine on the Next.js side must treat those two as observed-not-owned.
Three deliberate "not yet" decisions are baked into this version, each likely to be re-suggested without a record:
1. **`needs_review` is terminal — no recovery flow.** A finalised upload with missing or duplicate UPRNs ends up here, and that's it. The imported Properties show up in the property table; a proper review/recovery UI is planned but out of scope. Without this note, every future architecture pass will surface "needs_review has no exit" as a bug.
2. **Re-mapping is rejected after `mapping_complete`.** PATCHing `columnMapping` once Address matching has been triggered returns 409. We considered (B) reset-and-rerun (clear the combiner output, return to `mapping_complete`, force the user to re-trigger) but rejected it for now: it requires cleaning up abandoned Tasks/SubTasks and re-charging a FastAPI run. (A) is the smallest correct thing — the DB column never drifts from what produced the combined output. Revisit when re-run-from-review lands.
3. **`failed` exists in the schema but is not yet written by any route.** Synchronous trigger failures (route can't reach FastAPI) are surfaced as React Query toasts on 5xx; the BulkUpload stays in its prior status and the user retries. In-flight failures (Combiner crashes silently) currently leave uploads stuck in `processing` — this is known-incomplete, waiting on FastAPI-side callback work to report failure. The status is in the schema now so the seam is ready when that work is scheduled.
These are the only "no" decisions in v1; everything else (concurrency guards on stage triggers, persisting `failed`, etc.) is intended to be added.

View file

@ -0,0 +1,17 @@
# BulkUpload pipeline stays browser-driven for now
The BulkUpload pipeline chains three stages — address matching, combiner trigger, finalise — and the chain is currently driven by the **frontend polling** in `OnboardingProgress.tsx`. The browser fires `POST /combine` when the Task looks done, then `POST /finalize` when the upload reaches `awaiting_review`. Close the tab → upload gets stuck.
A server-driven alternative (FastAPI callback into a Next.js webhook, or a sweeper route) would be more robust, but the entire bulk-upload flow is scheduled for redesign. This code path is being kept as **internal tooling for the Domna tech team** to onboard portfolios while the new flow is built — not for end-user use.
So the cleanup invests only in things that survive the redesign:
- A real state machine on the BulkUpload aggregate (so the tech team can debug from the row alone).
- Deduplicated FastAPI trigger logic.
- Concurrency guards on stage triggers.
It deliberately does **not** invest in:
- Server-side stage orchestration.
- Recovery from `failed` / stuck uploads.
- A real `awaiting_review` review UI.
When the redesign lands, browser-driven chaining and the auto-finalise behaviour go with it. Future readers asking "why didn't they fix this" — that's why.

View file

@ -0,0 +1,12 @@
# Task creation lives in BulkUpload, not behind a generic Task endpoint
A `Task` is the orchestration handle for a FastAPI pipeline run; per `CONTEXT.md`, **a BulkUpload has at most one Task**. Today BulkUpload is the only feature that creates Tasks.
We're collapsing the previous two-step client seam — `POST /api/tasks` to create a Task and SubTask, then `POST /bulk-uploads/[uploadId]/start-address-matching` with the resulting IDs — into a single route. Task + SubTask creation moves inside `triggerAddressMatching` in `src/lib/bulkUpload/server.ts`. The generic `POST /api/tasks` endpoint is deleted; the `GET /api/tasks` listing (admin Tasks UI) stays.
The trade-off was between:
- **(rejected) Keep `POST /api/tasks` for future generic use.** No other feature creates Tasks today. By the deletion test, the generic endpoint wasn't earning its keep — every line was overhead carried for hypothetical callers, while the only real caller had to perform a leaky two-call dance to satisfy a contract that didn't reflect the domain.
- **(chosen) Collapse Task creation into the BulkUpload module.** The seam matches the domain: BulkUpload owns its Task's lifecycle, including the moment of creation. One route handles the full transition into `processing`. The client surface (`useStartAddressMatching`) is one mutation, not a chain.
A future reader will see `/api/tasks` exposing GET-only and reasonably wonder where Tasks are created. The answer is: inside whichever feature owns the Task. Today that's BulkUpload only. If a second feature ever needs to create Tasks, re-extract a generic creator at that point — extracting from two real consumers is straightforward, whereas keeping a speculative endpoint live without consumers leaves a stale-by-default seam that drifts from how Tasks are actually created.

View file

@ -0,0 +1,19 @@
# Bulk-upload pipeline advances via explicit Run Combiner / Finalise buttons
[ADR-0002](./0002-bulk-upload-browser-driven-orchestration.md) described the bulk-upload pipeline as **browser-driven and auto-firing**: a polling loop in `OnboardingProgress.tsx` watched the Task summary and fired `POST /combine` when the Task looked done, then `POST /finalize` when the upload reached `awaiting_review`, with mutable `combineFired` / `finalizeFired` / `refreshed` flags gating the side effects.
This decision keeps the pipeline browser-driven (per ADR-0002) but replaces the auto-fire behaviour with **explicit buttons** the Domna tech team clicks to advance each stage. The state machine in [ADR-0001](./0001-bulk-upload-state-machine.md) is unchanged.
Concretely:
- The progress screen polls a single `GET /bulk-uploads/[uploadId]/progress` snapshot (Task summary + BulkUpload row in one query).
- "Run Combiner" appears when the Task reaches terminal-non-failed and the Combiner hasn't been triggered yet.
- "Finalise" appears when the BulkUpload reaches `awaiting_review`.
- Polling stops once the BulkUpload reaches a terminal status; on `useFinalize` success the caller runs `router.refresh()`.
The trade-off was between:
- **(rejected) Keep auto-fire and translate the orchestration to react-query with `useEffect`s.** Functional, but the auto-fire UX hides where the pipeline gets stuck — which is the failure mode the Domna tech team needs to debug. CLAUDE.md's "avoid `useEffect`" rule also gets stretched: three effects, one per side-effecting transition, each watching derived eligibility flags.
- **(chosen) Explicit buttons.** The "fire once" mutable flags collapse into react-query mutation state (`isIdle` / `isPending` / `isSuccess`) — pure derivations from the cache, no `useEffect`. Each stage is observable, retryable, and reflects the current BulkUpload status directly. Tab close still leaves uploads stuck (browser-driven progression, per ADR-0002), but "stuck" is now legible: the team sees which button is pending.
When the bulk-upload redesign lands (per ADR-0002, "out of scope"), this entire screen and its buttons go away in favour of server-driven progression. Until then, the buttons are the right level of investment: they survive the redesign as discoverable artifacts of the prior flow, and they make the current flow tractable for the team using it.

View file

@ -1,7 +1,5 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { eq } from "drizzle-orm";
import { requestCombineRetrigger } from "@/lib/bulkUpload/server";
import { readSessionToken } from "@/lib/session";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
@ -14,65 +12,24 @@ export async function POST(
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
const { uploadId } = await params;
const result = await requestCombineRetrigger({
uploadId,
sessionToken: readSessionToken(request),
});
const [upload] = await db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.id, uploadId))
.limit(1);
if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 });
if (!upload.taskId)
return NextResponse.json({ error: "Upload has no task" }, { status: 422 });
if (upload.combinedOutputS3Uri)
return NextResponse.json({ alreadyCombined: true }, { status: 200 });
const fastapiUrl = process.env.FASTAPI_API_URL;
const fastapiKey = process.env.FASTAPI_API_KEY;
if (!fastapiUrl || !fastapiKey) {
console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set");
return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 });
switch (result.kind) {
case "triggered":
return NextResponse.json(
{ taskId: result.taskId, subTaskId: result.subTaskId },
{ status: 200 }
);
case "already_combined":
return NextResponse.json({ alreadyCombined: true }, { status: 200 });
case "not_found":
return NextResponse.json({ error: "Not found" }, { status: 404 });
case "missing_task":
return NextResponse.json({ error: "Upload has no task" }, { status: 422 });
case "trigger_failed":
return NextResponse.json({ error: result.message }, { status: result.status });
}
const [subTask] = await db
.insert(subTasks)
.values({
taskId: upload.taskId,
status: "waiting",
})
.returning();
const messageBody = { task_id: upload.taskId, sub_task_id: subTask.id };
const sessionToken =
request.cookies.get("__Secure-next-auth.session-token")?.value ??
request.cookies.get("next-auth.session-token")?.value;
try {
const triggerRes = await fetch(`${fastapiUrl}/v1/bulk-uploads/trigger-combiner`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": fastapiKey,
Authorization: `Bearer ${sessionToken}`,
},
body: JSON.stringify(messageBody),
});
if (!triggerRes.ok) {
const errText = await triggerRes.text().catch(() => "");
console.error("Backend trigger-combiner failed:", triggerRes.status, errText);
return NextResponse.json({ error: "Failed to trigger combiner" }, { status: 502 });
}
} catch (err) {
console.error("Failed to reach backend trigger-combiner:", err);
return NextResponse.json({ error: "Failed to trigger combiner" }, { status: 502 });
}
await db
.update(subTasks)
.set({ inputs: JSON.stringify(messageBody) })
.where(eq(subTasks.id, subTask.id));
return NextResponse.json({ taskId: upload.taskId, subTaskId: subTask.id }, { status: 200 });
}

View file

@ -1,13 +1,13 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { property } from "@/app/db/schema/property";
import { eq, sql } from "drizzle-orm";
import { sql } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
import { revalidatePath } from "next/cache";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import S3 from "aws-sdk/clients/s3";
import { createRetrofitDataS3Client } from "@/app/utils/s3";
import * as XLSX from "xlsx";
import { loadForFinalize, markFinalized } from "@/lib/bulkUpload/server";
const ADDRESS_COLS = ["Address 1", "Address 2", "Address 3"] as const;
const POSTCODE_COL = "postcode";
@ -69,33 +69,31 @@ export async function POST(
const { uploadId } = await params;
const [upload] = await db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.id, uploadId))
.limit(1);
const guarded = await loadForFinalize(uploadId);
switch (guarded.kind) {
case "not_found":
return NextResponse.json({ error: "Not found" }, { status: 404 });
case "already_finalized":
return NextResponse.json(
{ alreadyComplete: true, status: guarded.status },
{ status: 200 }
);
case "wrong_state":
return NextResponse.json(
{ error: `Upload not ready to finalize (state: ${guarded.current})` },
{ status: 409 }
);
case "not_yet_combined":
return NextResponse.json({ error: "Combiner not finished" }, { status: 409 });
}
const upload = guarded.upload;
if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 });
if (upload.status === "complete" || upload.status === "needs_review") {
return NextResponse.json({ alreadyComplete: true, status: upload.status }, { status: 200 });
}
if (upload.status !== "awaiting_review") {
return NextResponse.json({ error: "Upload not ready to finalize" }, { status: 422 });
}
if (!upload.combinedOutputS3Uri) {
return NextResponse.json({ error: "Combiner not finished" }, { status: 409 });
}
const parsed = parseS3Uri(upload.combinedOutputS3Uri);
const parsed = parseS3Uri(upload.combinedOutputS3Uri!);
if (!parsed) {
return NextResponse.json({ error: "Invalid combined output S3 URI" }, { status: 500 });
}
const s3 = new S3({
region: process.env.RETROFIT_DATA_DEV_REGION,
accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY,
secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY,
});
const s3 = createRetrofitDataS3Client();
let rawRows: Record<string, unknown>[];
try {
@ -169,13 +167,9 @@ export async function POST(
}
const needsReview = missingUprnCount > 0 || duplicateUprnCount > 0;
await markFinalized(uploadId, { needsReview });
const nextStatus = needsReview ? "needs_review" : "complete";
await db
.update(bulkAddressUploads)
.set({ status: nextStatus })
.where(eq(bulkAddressUploads.id, uploadId));
revalidatePath("/portfolio/[slug]", "layout");
return NextResponse.json(

View file

@ -0,0 +1,17 @@
import { getProgressView } from "@/lib/bulkUpload/server";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ portfolioId: string; uploadId: string }> }
) {
const session = await getServerSession(AuthOptions);
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
const { uploadId } = await params;
const view = await getProgressView(uploadId);
if (!view) return NextResponse.json({ error: "Not found" }, { status: 404 });
return NextResponse.json(view, { status: 200 });
}

View file

@ -1,38 +1,11 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { eq } from "drizzle-orm";
import { setColumnMapping } from "@/lib/bulkUpload/server";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import { z } from "zod";
const PatchSchema = z.object({
columnMapping: z.record(z.string(), z.string()),
});
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ portfolioId: string; uploadId: string }> }
) {
const session = await getServerSession(AuthOptions);
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
const { uploadId } = await params;
const [upload] = await db
.select({
status: bulkAddressUploads.status,
combinedOutputS3Uri: bulkAddressUploads.combinedOutputS3Uri,
})
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.id, uploadId))
.limit(1);
if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 });
return NextResponse.json(upload, { status: 200 });
}
export async function PATCH(
request: NextRequest,
{ params }: { params: Promise<{ portfolioId: string; uploadId: string }> }
@ -43,33 +16,26 @@ export async function PATCH(
try {
body = PatchSchema.parse(await request.json());
} catch {
return NextResponse.json({ msg: "Invalid input" }, { status: 400 });
}
const values = Object.values(body.columnMapping);
const hasAddress = values.includes("address_1");
const hasPostcode = values.includes("postcode");
if (!hasAddress || !hasPostcode) {
return NextResponse.json(
{ msg: "Mapping must include address_1 and postcode." },
{ status: 422 }
);
return NextResponse.json({ error: "Invalid input" }, { status: 400 });
}
try {
const [updated] = await db
.update(bulkAddressUploads)
.set({ columnMapping: body.columnMapping, status: "mapping_complete" })
.where(eq(bulkAddressUploads.id, uploadId))
.returning();
if (!updated) {
return NextResponse.json({ msg: "Not found" }, { status: 404 });
const result = await setColumnMapping(uploadId, body.columnMapping);
switch (result.kind) {
case "ok":
return NextResponse.json(result.upload, { status: 200 });
case "not_found":
return NextResponse.json({ error: "Not found" }, { status: 404 });
case "invalid_status":
return NextResponse.json(
{ error: `Cannot remap upload in state '${result.current}'` },
{ status: 409 }
);
case "invalid_mapping":
return NextResponse.json({ error: result.reason }, { status: 422 });
}
return NextResponse.json(updated, { status: 200 });
} catch (error) {
console.error("Failed to save column mapping:", error);
return NextResponse.json({ msg: "Internal server error" }, { status: 500 });
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}

View file

@ -1,15 +1,10 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { eq } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import { z } from "zod";
import { createS3Client } from "@/app/utils/s3";
import S3 from "aws-sdk/clients/s3";
import { createS3Client, createRetrofitDataS3Client, retrofitDataS3Bucket } from "@/app/utils/s3";
import * as XLSX from "xlsx";
import { loadForAddressMatching, triggerAddressMatching } from "@/lib/bulkUpload/server";
import { readSessionToken } from "@/lib/session";
const FIELD_RENAME: Record<string, string> = {
address_1: "Address 1",
@ -19,11 +14,6 @@ const FIELD_RENAME: Record<string, string> = {
internal_reference: "Internal Reference",
};
const BodySchema = z.object({
taskId: z.string().uuid(),
subTaskId: z.string().uuid(),
});
function transformFile(
buffer: Buffer,
columnMapping: Record<string, string>
@ -72,38 +62,28 @@ export async function POST(
const { portfolioId, uploadId } = await params;
let body;
try {
body = BodySchema.parse(await request.json());
} catch {
return NextResponse.json({ error: "Invalid input" }, { status: 400 });
const guarded = await loadForAddressMatching(uploadId);
switch (guarded.kind) {
case "not_found":
return NextResponse.json({ error: "Not found" }, { status: 404 });
case "wrong_state":
return NextResponse.json(
{ error: `Upload not ready for onboarding (state: ${guarded.current})` },
{ status: 409 }
);
case "missing_mapping":
return NextResponse.json({ error: "Column mapping missing" }, { status: 422 });
}
const [upload] = await db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.id, uploadId))
.limit(1);
if (!upload) return NextResponse.json({ error: "Not found" }, { status: 404 });
if (upload.status !== "mapping_complete")
return NextResponse.json({ error: "Upload not ready for onboarding" }, { status: 422 });
if (!upload.columnMapping)
return NextResponse.json({ error: "Column mapping missing" }, { status: 422 });
const upload = guarded.upload;
const s3 = createS3Client();
const outputS3 = new S3({
region: process.env.RETROFIT_DATA_DEV_REGION,
accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY,
secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY,
});
const outputBucket = process.env.RETROFIT_DATA_DEV_S3_BUCKET_NAME!;
const bucket = upload.s3Bucket;
const outputS3 = createRetrofitDataS3Client();
const outputBucket = retrofitDataS3Bucket();
let fileBuffer: Buffer;
try {
const obj = await s3
.getObject({ Bucket: bucket, Key: upload.s3Key })
.getObject({ Bucket: upload.s3Bucket, Key: upload.s3Key })
.promise();
fileBuffer = Buffer.from(obj.Body as Uint8Array);
} catch (err) {
@ -111,8 +91,9 @@ export async function POST(
return NextResponse.json({ error: "Failed to read source file" }, { status: 500 });
}
const result = transformFile(fileBuffer, upload.columnMapping);
if (result.error) return NextResponse.json({ error: result.error }, { status: 422 });
const transformed = transformFile(fileBuffer, upload.columnMapping!);
if (transformed.error)
return NextResponse.json({ error: transformed.error }, { status: 422 });
const transformedKey = `bulk_onboarding_inputs/${portfolioId}/${uploadId}.csv`;
try {
@ -120,7 +101,7 @@ export async function POST(
.putObject({
Bucket: outputBucket,
Key: transformedKey,
Body: result.csv,
Body: transformed.csv,
ContentType: "text/csv",
})
.promise();
@ -131,53 +112,13 @@ export async function POST(
const s3Uri = `s3://${outputBucket}/${transformedKey}`;
const fastapiUrl = process.env.FASTAPI_API_URL;
const fastapiKey = process.env.FASTAPI_API_KEY;
if (!fastapiUrl || !fastapiKey) {
console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set");
return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 });
}
const trigger = await triggerAddressMatching({
uploadId,
s3Uri,
sessionToken: readSessionToken(request),
});
if (trigger.kind === "trigger_failed")
return NextResponse.json({ error: trigger.message }, { status: trigger.status });
const sessionToken =
request.cookies.get("__Secure-next-auth.session-token")?.value ??
request.cookies.get("next-auth.session-token")?.value;
try {
const triggerRes = await fetch(`${fastapiUrl}/v1/bulk-uploads/trigger-postcode-splitter`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": fastapiKey,
Authorization: `Bearer ${sessionToken}`,
},
body: JSON.stringify({
task_id: body.taskId,
sub_task_id: body.subTaskId,
s3_uri: s3Uri,
}),
});
if (!triggerRes.ok) {
const errText = await triggerRes.text().catch(() => "");
console.error("Backend trigger-postcode-splitter failed:", triggerRes.status, errText);
return NextResponse.json({ error: "Failed to trigger address matching" }, { status: 502 });
}
} catch (err) {
console.error("Failed to reach backend trigger-postcode-splitter:", err);
return NextResponse.json({ error: "Failed to trigger address matching" }, { status: 502 });
}
await Promise.all([
db.update(bulkAddressUploads)
.set({ status: "processing", taskId: body.taskId })
.where(eq(bulkAddressUploads.id, uploadId)),
db.update(tasks)
.set({ status: "in progress" })
.where(eq(tasks.id, body.taskId)),
db.update(subTasks)
.set({ inputs: JSON.stringify({ task_id: body.taskId, sub_task_id: body.subTaskId, s3_uri: s3Uri }) })
.where(eq(subTasks.id, body.subTaskId)),
]);
return NextResponse.json({ taskId: body.taskId }, { status: 200 });
return NextResponse.json({ taskId: trigger.taskId }, { status: 200 });
}

View file

@ -1,6 +1,4 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { eq, desc } from "drizzle-orm";
import { listForPortfolio } from "@/lib/bulkUpload/server";
import { NextRequest, NextResponse } from "next/server";
export async function GET(
@ -10,15 +8,10 @@ export async function GET(
const { portfolioId } = await params;
try {
const uploads = await db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.portfolioId, portfolioId))
.orderBy(desc(bulkAddressUploads.createdAt));
const uploads = await listForPortfolio(portfolioId);
return NextResponse.json(uploads, { status: 200 });
} catch (error) {
console.error("Failed to fetch bulk uploads:", error);
return NextResponse.json({ msg: "Internal server error" }, { status: 500 });
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}

View file

@ -1,61 +1,7 @@
import { db } from "@/app/db/db";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { desc, count } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import { z } from "zod";
const CreateTaskSchema = z.object({
taskSource: z.string().min(1),
service: z.string().optional(),
source: z.literal("portfolio_id").optional(),
sourceId: z.string().optional(),
inputs: z.record(z.unknown()).optional(),
});
export async function POST(request: NextRequest) {
const session = await getServerSession(AuthOptions);
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
let body;
try {
body = CreateTaskSchema.parse(await request.json());
} catch {
return NextResponse.json({ error: "Invalid input" }, { status: 400 });
}
try {
const now = new Date();
const [task] = await db
.insert(tasks)
.values({
taskSource: body.taskSource,
service: body.service,
source: body.source,
sourceId: body.sourceId,
status: "waiting",
jobStarted: now,
})
.returning();
const [subTask] = await db
.insert(subTasks)
.values({
taskId: task.id,
status: "waiting",
inputs: body.inputs ? JSON.stringify(body.inputs) : null,
})
.returning();
return NextResponse.json({ taskId: task.id, subTaskId: subTask.id }, { status: 201 });
} catch (error) {
console.error("Failed to create task:", error);
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}
export async function GET(request: NextRequest) {
try {

View file

@ -22,13 +22,13 @@ export async function POST(request: NextRequest) {
body = BodySchema.parse(await request.json());
} catch (error) {
console.error("Invalid input:", error);
return NextResponse.json({ msg: "Invalid input" }, { status: 400 });
return NextResponse.json({ error: "Invalid input" }, { status: 400 });
}
const bucket = process.env.RETROFIT_PLAN_INPUT_BUCKET_NAME;
if (!bucket) {
console.error("RETROFIT_PLAN_INPUT_BUCKET_NAME not set");
return NextResponse.json({ msg: "Server misconfiguration" }, { status: 500 });
return NextResponse.json({ error: "Server misconfiguration" }, { status: 500 });
}
try {
@ -50,6 +50,6 @@ export async function POST(request: NextRequest) {
);
} catch (error) {
console.error("Failed to record upload:", error);
return NextResponse.json({ msg: "Internal server error" }, { status: 500 });
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}

View file

@ -15,7 +15,7 @@ export async function POST(request: NextRequest) {
body = BodySchema.parse(await request.json());
} catch (error) {
console.error("Invalid input:", error);
return NextResponse.json({ msg: "Invalid input" }, { status: 400 });
return NextResponse.json({ error: "Invalid input" }, { status: 400 });
}
try {
@ -31,6 +31,6 @@ export async function POST(request: NextRequest) {
return NextResponse.json({ url: preSignedUrl }, { status: 200 });
} catch (error) {
console.error(error);
return NextResponse.json({ msg: "Internal server error" }, { status: 500 });
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}

View file

@ -22,6 +22,7 @@ import {
} from "@heroicons/react/24/outline";
import { useSession } from "next-auth/react";
import { useRouter } from "next/navigation";
import { useCreateBulkUpload } from "@/lib/bulkUpload/client";
const MAX_FILE_SIZE_MB = 50;
const ALLOWED_EXTENSIONS = [".csv", ".xlsx", ".xls"];
@ -99,22 +100,6 @@ async function validateHeaders(file: File): Promise<{ error: string | null; head
return { error: null, headers };
}
async function getPresignedUrl(
userId: string,
portfolioId: string,
fileKey: string,
contentType: string
): Promise<string> {
const res = await fetch("/api/upload/bulk-addresses", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ userId, portfolioId, fileKey, contentType }),
});
if (!res.ok) throw new Error("Failed to generate upload URL.");
const data = await res.json();
return data.url;
}
export default function BulkUploadComingSoonModal({
isOpen,
onClose,
@ -123,18 +108,22 @@ export default function BulkUploadComingSoonModal({
const session = useSession();
const router = useRouter();
const fileInputRef = useRef<HTMLInputElement>(null);
const createUpload = useCreateBulkUpload();
const [isDragging, setIsDragging] = useState(false);
const [selectedFile, setSelectedFile] = useState<File | null>(null);
const [sourceHeaders, setSourceHeaders] = useState<string[]>([]);
const [validationError, setValidationError] = useState<string | null>(null);
const [validating, setValidating] = useState(false);
const [uploading, setUploading] = useState(false);
const [uploadProgress, setUploadProgress] = useState<number | null>(null);
const [uploadError, setUploadError] = useState<string | null>(null);
const uploading = createUpload.isPending;
const uploadError = createUpload.error
? "Upload failed. Please try again, or contact a Domna representative if the issue persists."
: null;
async function handleFile(file: File) {
setUploadError(null);
createUpload.reset();
setSelectedFile(null);
setValidationError(null);
@ -183,60 +172,38 @@ export default function BulkUploadComingSoonModal({
setSourceHeaders([]);
setValidationError(null);
setValidating(false);
setUploadError(null);
setUploading(false);
setUploadProgress(null);
createUpload.reset();
onClose();
}
async function handleUpload() {
function handleUpload() {
const userId = String(session.data?.user?.dbId ?? "");
if (!selectedFile || !userId) return;
setUploading(true);
const ext = getFileExtension(selectedFile.name);
const contentType = CONTENT_TYPES[ext] ?? "application/octet-stream";
const fileKey = generateS3Key(userId, portfolioId, ext);
setUploadProgress(0);
setUploadError(null);
try {
const ext = getFileExtension(selectedFile.name);
const contentType = CONTENT_TYPES[ext] ?? "application/octet-stream";
const fileKey = generateS3Key(userId, portfolioId, ext);
const presignedUrl = await getPresignedUrl(userId, portfolioId, fileKey, contentType);
await new Promise<void>((resolve, reject) => {
const xhr = new XMLHttpRequest();
xhr.open("PUT", presignedUrl);
xhr.setRequestHeader("Content-Type", contentType);
xhr.upload.addEventListener("progress", (e) => {
if (e.lengthComputable) {
setUploadProgress(Math.round((e.loaded / e.total) * 100));
}
});
xhr.onload = () => {
if (xhr.status >= 200 && xhr.status < 300) resolve();
else reject(new Error(`S3 upload failed: ${xhr.status}`));
};
xhr.onerror = () => reject(new Error("Network error during upload"));
xhr.send(selectedFile);
});
const confirmRes = await fetch("/api/upload/bulk-addresses/confirm", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ fileKey, filename: selectedFile.name, portfolioId, userId, sourceHeaders }),
});
if (!confirmRes.ok) throw new Error("Failed to record upload.");
const { id: uploadId } = await confirmRes.json();
router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}/map-columns`);
onClose();
} catch (err) {
setUploadError("Upload failed. Please try again, or contact a Domna representative if the issue persists.");
} finally {
setUploading(false);
setUploadProgress(null);
}
createUpload.mutate(
{
file: selectedFile,
portfolioId,
userId,
sourceHeaders,
contentType,
fileKey,
onProgress: setUploadProgress,
},
{
onSuccess: ({ id: uploadId }) => {
router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}/map-columns`);
onClose();
},
onSettled: () => setUploadProgress(null),
}
);
}
const canUpload = !!selectedFile && !uploading && !validating;

View file

@ -1,172 +1,37 @@
"use client";
import { useState, useSyncExternalStore } from "react";
import { useRouter } from "next/navigation";
import Link from "next/link";
interface TaskData {
id: string;
taskSource: string;
status: string;
totalSubtasks: number;
completedSubtasks: number;
failedSubtasks: number;
}
interface UploadStatus {
status: string;
combinedOutputS3Uri: string | null;
}
import { ArrowRightIcon } from "@heroicons/react/24/outline";
import {
useBulkUploadProgress,
useFinalize,
useRequestCombine,
} from "@/lib/bulkUpload/client";
interface Props {
taskId: string;
portfolioSlug: string;
portfolioId: string;
uploadId: string;
isDomnaUser: boolean;
}
interface Snapshot {
data: TaskData | null;
uploadStatus: UploadStatus | null;
fetchError: boolean;
finalizeError: string | null;
}
const TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]);
const FAILED_STATUSES = new Set(["failed", "failure", "error"]);
const FINAL_UPLOAD_STATUSES = new Set(["complete", "needs_review"]);
function createProgressStore(args: {
taskId: string;
portfolioId: string;
uploadId: string;
onComplete: () => void;
}) {
let snapshot: Snapshot = {
data: null,
uploadStatus: null,
fetchError: false,
finalizeError: null,
};
const listeners = new Set<() => void>();
let intervalId: ReturnType<typeof setInterval> | null = null;
let combineFired = false;
let finalizeFired = false;
let refreshed = false;
function emit(patch: Partial<Snapshot>) {
snapshot = { ...snapshot, ...patch };
listeners.forEach((l) => l());
}
async function fireFinalize() {
try {
const res = await fetch(
`/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}/finalize`,
{ method: "POST" }
);
if (!res.ok) {
const body = await res.json().catch(() => ({}));
const msg =
body?.detail || body?.error || `Finalize failed (${res.status})`;
emit({ finalizeError: msg });
finalizeFired = false;
}
} catch (err) {
console.error("Failed to trigger finalize:", err);
emit({ finalizeError: err instanceof Error ? err.message : "Network error" });
finalizeFired = false;
}
}
async function poll() {
try {
const res = await fetch(`/api/tasks/${args.taskId}/summary`);
if (!res.ok) { emit({ fetchError: true }); return; }
const json: TaskData = await res.json();
emit({ data: json });
const status = json.status.toLowerCase();
if (TERMINAL_STATUSES.has(status)) {
if (!FAILED_STATUSES.has(status) && !combineFired) {
combineFired = true;
fetch(`/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}/combine`, {
method: "POST",
}).catch((err) => console.error("Failed to trigger combiner:", err));
}
const uploadRes = await fetch(
`/api/portfolio/${args.portfolioId}/bulk-uploads/${args.uploadId}`
);
if (uploadRes.ok) {
const upload: UploadStatus = await uploadRes.json();
emit({ uploadStatus: upload });
if (upload.status === "awaiting_review" && !finalizeFired) {
finalizeFired = true;
fireFinalize();
}
if (FINAL_UPLOAD_STATUSES.has(upload.status) && !refreshed) {
refreshed = true;
if (intervalId) clearInterval(intervalId);
intervalId = null;
args.onComplete();
return;
}
}
}
} catch {
emit({ fetchError: true });
}
}
return {
subscribe(listener: () => void) {
listeners.add(listener);
if (listeners.size === 1 && intervalId === null && !refreshed) {
poll();
intervalId = setInterval(poll, 3000);
}
return () => {
listeners.delete(listener);
if (listeners.size === 0 && intervalId !== null) {
clearInterval(intervalId);
intervalId = null;
}
};
},
getSnapshot: () => snapshot,
retryFinalize() {
emit({ finalizeError: null });
finalizeFired = true;
fireFinalize();
},
};
}
const TASK_TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]);
const TASK_FAILED_STATUSES = new Set(["failed", "failure", "error"]);
export default function OnboardingProgress({
taskId,
portfolioSlug,
portfolioId,
uploadId,
isDomnaUser,
}: Props) {
const router = useRouter();
const [store] = useState(() =>
createProgressStore({
taskId,
portfolioId,
uploadId,
onComplete: () => router.refresh(),
})
);
const snap = useSyncExternalStore(store.subscribe, store.getSnapshot, store.getSnapshot);
const { data, uploadStatus, fetchError, finalizeError } = snap;
const progress = useBulkUploadProgress(portfolioId, uploadId);
const combine = useRequestCombine(portfolioId, uploadId);
const finalize = useFinalize(portfolioId, uploadId);
if (fetchError) return null;
if (!data) {
if (progress.isError) return null;
if (!progress.data) {
return (
<div className="mt-4 flex items-center gap-2 text-sm text-gray-400">
<span className="w-4 h-4 rounded-full border-2 border-gray-300 border-t-transparent animate-spin" />
@ -175,22 +40,26 @@ export default function OnboardingProgress({
);
}
const total = data.totalSubtasks;
const complete = data.completedSubtasks;
const failed = data.failedSubtasks;
const percent = total > 0 ? Math.round((complete / total) * 100) : 0;
const taskDone = TERMINAL_STATUSES.has(data.status.toLowerCase());
const isFailed = FAILED_STATUSES.has(data.status.toLowerCase());
const isCombining =
taskDone && !isFailed && uploadStatus?.status === "combining";
const isImporting =
taskDone && !isFailed && uploadStatus?.status === "awaiting_review";
const { task, upload } = progress.data;
const total = task?.totalSubtasks ?? 0;
const completedSubtasks = task?.completedSubtasks ?? 0;
const failedSubtasks = task?.failedSubtasks ?? 0;
const percent = total > 0 ? Math.round((completedSubtasks / total) * 100) : 0;
const taskStatus = task?.status.toLowerCase() ?? "";
const taskDone = TASK_TERMINAL_STATUSES.has(taskStatus);
const taskFailed = TASK_FAILED_STATUSES.has(taskStatus);
const isCombining = upload.status === "combining";
const isImporting = upload.status === "awaiting_review";
const canRunCombiner = taskDone && !taskFailed && upload.status === "processing";
const canFinalize = upload.status === "awaiting_review";
return (
<div className="mt-6 space-y-3">
<div className="w-full bg-gray-100 rounded-full h-2 overflow-hidden">
<div
className={`h-2 rounded-full transition-all duration-500 ${isFailed ? "bg-red-400" : "bg-[#14163d]"}`}
className={`h-2 rounded-full transition-all duration-500 ${taskFailed ? "bg-red-400" : "bg-[#14163d]"}`}
style={{ width: total > 0 ? `${percent}%` : "4%" }}
/>
</div>
@ -198,13 +67,13 @@ export default function OnboardingProgress({
<div className="flex items-center gap-4 text-xs text-gray-500">
{total > 0 && (
<span>
<span className="font-semibold text-gray-700">{complete}</span> / {total} batches complete
<span className="font-semibold text-gray-700">{completedSubtasks}</span> / {total} batches complete
</span>
)}
{failed > 0 && (
{failedSubtasks > 0 && (
<span className="flex items-center gap-1 text-red-500 font-semibold">
<span className="w-1.5 h-1.5 rounded-full bg-red-400" />
{failed} failed
{failedSubtasks} failed
</span>
)}
{!taskDone && (
@ -222,24 +91,41 @@ export default function OnboardingProgress({
{isImporting && (
<span className="flex items-center gap-1 text-blue-500">
<span className="w-1.5 h-1.5 rounded-full bg-blue-400 animate-pulse" />
Importing to portfolio
Awaiting import
</span>
)}
</div>
{finalizeError && (
<div className="rounded-lg border border-red-200 bg-red-50 px-3 py-2 text-xs text-red-700 flex items-start gap-3">
<div className="flex-1">
<p className="font-semibold">Import failed</p>
<p className="text-red-600 mt-0.5 break-words">{finalizeError}</p>
</div>
<button
type="button"
onClick={() => store.retryFinalize()}
className="shrink-0 px-2 py-1 rounded-md bg-red-600 text-white font-semibold hover:bg-red-700 transition-colors"
>
Retry
</button>
{(canRunCombiner || canFinalize) && (
<div className="flex flex-col gap-2 pt-2">
{canRunCombiner && (
<StageButton
label="Run Combiner"
activeLabel="Starting combiner…"
isPending={combine.isPending}
onClick={() => combine.mutate()}
/>
)}
{canFinalize && (
<StageButton
label="Finalise"
activeLabel="Finalising…"
isPending={finalize.isPending}
onClick={() =>
finalize.mutate(undefined, { onSuccess: () => router.refresh() })
}
/>
)}
</div>
)}
{combine.error && (
<p className="text-xs text-red-500">{combine.error.message}</p>
)}
{finalize.error && (
<div className="rounded-lg border border-red-200 bg-red-50 px-3 py-2 text-xs text-red-700">
<p className="font-semibold">Import failed</p>
<p className="text-red-600 mt-0.5 break-words">{finalize.error.message}</p>
</div>
)}
@ -254,3 +140,37 @@ export default function OnboardingProgress({
</div>
);
}
function StageButton({
label,
activeLabel,
isPending,
onClick,
}: {
label: string;
activeLabel: string;
isPending: boolean;
onClick: () => void;
}) {
return (
<button
onClick={onClick}
disabled={isPending}
className={`inline-flex items-center gap-2 self-start px-5 py-2 rounded-xl bg-gradient-to-br from-[#14163d] to-[#15173e] text-white text-sm font-bold transition-opacity ${
isPending ? "opacity-50 cursor-not-allowed" : "hover:opacity-90"
}`}
>
{isPending ? (
<>
<span className="w-4 h-4 rounded-full border-2 border-white border-t-transparent animate-spin" />
{activeLabel}
</>
) : (
<>
{label}
<ArrowRightIcon className="h-4 w-4" />
</>
)}
</button>
);
}

View file

@ -1,8 +1,8 @@
"use client";
import { useRouter } from "next/navigation";
import { useMutation } from "@tanstack/react-query";
import { ArrowRightIcon } from "@heroicons/react/24/outline";
import { useStartAddressMatching } from "@/lib/bulkUpload/client";
interface Props {
portfolioId: string;
@ -10,53 +10,14 @@ interface Props {
filename: string;
}
export default function StartAddressMatchingButton({ portfolioId, uploadId, filename }: Props) {
export default function StartAddressMatchingButton({ portfolioId, uploadId }: Props) {
const router = useRouter();
const { mutate, isPending, error } = useMutation({
mutationFn: async () => {
const taskRes = await fetch("/api/tasks", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
taskSource: `Address Onboarding ${filename}`,
service: "address2uprn",
source: "portfolio_id",
sourceId: portfolioId,
inputs: { bulk_upload_id: uploadId },
}),
});
if (!taskRes.ok) {
const data = await taskRes.json().catch(() => ({}));
throw new Error(data.error ?? "Failed to create task");
}
const { taskId, subTaskId } = await taskRes.json();
const matchRes = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/start-address-matching`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ taskId, subTaskId }),
}
);
if (!matchRes.ok) {
const data = await matchRes.json().catch(() => ({}));
throw new Error(data.error ?? "Failed to start address matching");
}
},
onSuccess: () => {
router.refresh();
},
});
const { mutate, isPending, error } = useStartAddressMatching(portfolioId, uploadId);
return (
<div className="mt-4">
<button
onClick={() => mutate()}
onClick={() => mutate(undefined, { onSuccess: () => router.refresh() })}
disabled={isPending}
className={`inline-flex items-center gap-2 px-5 py-2 rounded-xl bg-gradient-to-br from-[#14163d] to-[#15173e] text-white text-sm font-bold transition-opacity ${
isPending ? "opacity-50 cursor-not-allowed" : "hover:opacity-90"
@ -76,7 +37,7 @@ export default function StartAddressMatchingButton({ portfolioId, uploadId, file
</button>
{error && (
<p className="mt-2 text-xs text-red-500">
{error instanceof Error ? error.message : "Something went wrong"}
{error.message}
</p>
)}
</div>

View file

@ -9,6 +9,7 @@ import {
TableCellsIcon,
ArrowsRightLeftIcon,
} from "@heroicons/react/24/outline";
import { useSetColumnMapping } from "@/lib/bulkUpload/client";
const INTERNAL_FIELDS = [
{ value: "address_1", label: "Address 1", required: true },
@ -61,42 +62,28 @@ export default function MapColumnsClient({
const [mapping, setMapping] = useState<Record<string, string>>(
buildInitialMapping(sourceHeaders, existingMapping)
);
const [submitting, setSubmitting] = useState(false);
const [error, setError] = useState<string | null>(null);
const setMappingMutation = useSetColumnMapping(portfolioId, uploadId);
const mappedValues = Object.values(mapping).filter((v) => v !== "skip");
const missingRequired = REQUIRED_VALUES.filter((r) => !mappedValues.includes(r));
const submitting = setMappingMutation.isPending;
const error = setMappingMutation.error?.message ?? null;
const canSubmit = missingRequired.length === 0 && !submitting;
function setField(header: string, value: string) {
setMapping((prev) => ({ ...prev, [header]: value }));
}
async function handleSubmit() {
function handleSubmit() {
if (!canSubmit) return;
setSubmitting(true);
setError(null);
try {
const res = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}`,
{
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ columnMapping: mapping }),
}
);
if (!res.ok) {
const data = await res.json().catch(() => ({}));
throw new Error(data.msg ?? "Failed to save mapping.");
setMappingMutation.mutate(
{ columnMapping: mapping },
{
onSuccess: () => {
router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}`);
},
}
router.push(`/portfolio/${portfolioId}/bulk-upload/${uploadId}`);
} catch (err) {
setError(err instanceof Error ? err.message : "Something went wrong.");
setSubmitting(false);
}
);
}
return (

View file

@ -181,7 +181,6 @@ export default async function BulkUploadDetailPage(props: {
statusKey === "failed") &&
upload.taskId && (
<OnboardingProgress
taskId={upload.taskId}
portfolioSlug={slug}
portfolioId={upload.portfolioId}
uploadId={uploadId}

View file

@ -20,6 +20,20 @@ export function createS3Client(config?: S3Config) {
});
}
export function createRetrofitDataS3Client() {
return new S3({
region: process.env.RETROFIT_DATA_DEV_REGION,
accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY,
secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY,
});
}
export function retrofitDataS3Bucket(): string {
const bucket = process.env.RETROFIT_DATA_DEV_S3_BUCKET_NAME;
if (!bucket) throw new Error("RETROFIT_DATA_DEV_S3_BUCKET_NAME not set");
return bucket;
}
// Get presigned url from s3
export type PresignGetOptions = {

View file

@ -0,0 +1,173 @@
"use client";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { bulkUploadKeys } from "./keys";
import { isTerminalStatus, type BulkUpload, type ProgressView } from "./types";
async function parseError(res: Response, fallback: string): Promise<Error> {
const body = await res.json().catch(() => ({}));
return new Error(body?.error ?? fallback);
}
export type CreateBulkUploadInput = {
file: File;
portfolioId: string;
userId: string;
sourceHeaders: string[];
contentType: string;
fileKey: string;
onProgress?: (percent: number) => void;
};
export type CreateBulkUploadResult = {
id: string;
s3Key: string;
s3Bucket: string;
status: string;
};
export function useCreateBulkUpload() {
return useMutation<CreateBulkUploadResult, Error, CreateBulkUploadInput>({
mutationFn: async (input) => {
const presignRes = await fetch("/api/upload/bulk-addresses", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
userId: input.userId,
portfolioId: input.portfolioId,
fileKey: input.fileKey,
contentType: input.contentType,
}),
});
if (!presignRes.ok) throw await parseError(presignRes, "Failed to generate upload URL.");
const { url: presignedUrl } = await presignRes.json();
await new Promise<void>((resolve, reject) => {
const xhr = new XMLHttpRequest();
xhr.open("PUT", presignedUrl);
xhr.setRequestHeader("Content-Type", input.contentType);
if (input.onProgress) {
xhr.upload.addEventListener("progress", (e) => {
if (e.lengthComputable) {
input.onProgress!(Math.round((e.loaded / e.total) * 100));
}
});
}
xhr.onload = () => {
if (xhr.status >= 200 && xhr.status < 300) resolve();
else reject(new Error(`S3 upload failed: ${xhr.status}`));
};
xhr.onerror = () => reject(new Error("Network error during upload"));
xhr.send(input.file);
});
const confirmRes = await fetch("/api/upload/bulk-addresses/confirm", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
fileKey: input.fileKey,
filename: input.file.name,
portfolioId: input.portfolioId,
userId: input.userId,
sourceHeaders: input.sourceHeaders,
}),
});
if (!confirmRes.ok) throw await parseError(confirmRes, "Failed to record upload.");
return confirmRes.json();
},
});
}
export function useSetColumnMapping(portfolioId: string, uploadId: string) {
const queryClient = useQueryClient();
return useMutation<BulkUpload, Error, { columnMapping: Record<string, string> }>({
mutationFn: async (input) => {
const res = await fetch(`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}`, {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(input),
});
if (!res.ok) throw await parseError(res, "Failed to save mapping.");
return res.json();
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) });
},
});
}
export function useStartAddressMatching(portfolioId: string, uploadId: string) {
const queryClient = useQueryClient();
return useMutation<{ taskId: string }, Error, void>({
mutationFn: async () => {
const res = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/start-address-matching`,
{ method: "POST" },
);
if (!res.ok) throw await parseError(res, "Failed to start address matching.");
return res.json();
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) });
},
});
}
export function useBulkUploadProgress(portfolioId: string, uploadId: string) {
return useQuery<ProgressView, Error>({
queryKey: bulkUploadKeys.progress(uploadId),
queryFn: async () => {
const res = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/progress`,
);
if (!res.ok) throw await parseError(res, "Failed to load progress.");
return res.json();
},
refetchInterval: (data) => {
const status = data?.upload.status;
return status && isTerminalStatus(status) ? false : 3000;
},
});
}
export function useRequestCombine(portfolioId: string, uploadId: string) {
const queryClient = useQueryClient();
return useMutation<unknown, Error, void>({
mutationFn: async () => {
const res = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/combine`,
{ method: "POST" },
);
if (!res.ok) throw await parseError(res, "Failed to start combiner.");
return res.json();
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) });
},
});
}
export type FinalizeResult = {
inserted?: number;
missingUprnCount?: number;
duplicateUprnCount?: number;
status?: string;
alreadyComplete?: boolean;
};
export function useFinalize(portfolioId: string, uploadId: string) {
const queryClient = useQueryClient();
return useMutation<FinalizeResult, Error, void>({
mutationFn: async () => {
const res = await fetch(
`/api/portfolio/${portfolioId}/bulk-uploads/${uploadId}/finalize`,
{ method: "POST" },
);
if (!res.ok) throw await parseError(res, "Finalize failed.");
return res.json();
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: bulkUploadKeys.progress(uploadId) });
},
});
}

View file

@ -0,0 +1,4 @@
export const bulkUploadKeys = {
all: ["bulkUpload"] as const,
progress: (uploadId: string) => ["bulkUpload", uploadId, "progress"] as const,
};

View file

@ -0,0 +1,277 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { count, desc, eq, sql } from "drizzle-orm";
import type { BulkUpload, BulkUploadStatus, ProgressView, TaskSummary } from "./types";
const REMAP_ALLOWED: ReadonlySet<BulkUploadStatus> = new Set([
"ready_for_processing",
"mapping_complete",
]);
type FastApiTriggerArgs = {
endpoint: string;
payload: Record<string, unknown>;
sessionToken: string | undefined;
};
type FastApiTriggerResult = { ok: true } | { ok: false; status: number; message: string };
async function triggerFastApiPipeline(args: FastApiTriggerArgs): Promise<FastApiTriggerResult> {
const url = process.env.FASTAPI_API_URL;
const key = process.env.FASTAPI_API_KEY;
if (!url || !key) {
console.error("FASTAPI_API_URL or FASTAPI_API_KEY not set");
return { ok: false, status: 500, message: "Server misconfiguration" };
}
try {
const res = await fetch(`${url}${args.endpoint}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": key,
Authorization: `Bearer ${args.sessionToken}`,
},
body: JSON.stringify(args.payload),
});
if (!res.ok) {
const errText = await res.text().catch(() => "");
console.error(`FastAPI ${args.endpoint} failed:`, res.status, errText);
return { ok: false, status: 502, message: "Pipeline trigger failed" };
}
return { ok: true };
} catch (err) {
console.error(`Failed to reach FastAPI ${args.endpoint}:`, err);
return { ok: false, status: 502, message: "Pipeline trigger failed" };
}
}
async function loadById(uploadId: string): Promise<BulkUpload | null> {
const [row] = await db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.id, uploadId))
.limit(1);
return row ?? null;
}
export async function listForPortfolio(portfolioId: string): Promise<BulkUpload[]> {
return db
.select()
.from(bulkAddressUploads)
.where(eq(bulkAddressUploads.portfolioId, portfolioId))
.orderBy(desc(bulkAddressUploads.createdAt));
}
async function loadTaskSummary(taskId: string): Promise<TaskSummary | null> {
const [row] = await db
.select({
id: tasks.id,
taskSource: tasks.taskSource,
status: tasks.status,
service: tasks.service,
jobStarted: tasks.jobStarted,
jobCompleted: tasks.jobCompleted,
updatedAt: tasks.updatedAt,
totalSubtasks: count(subTasks.id),
completedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`,
failedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`,
})
.from(tasks)
.leftJoin(subTasks, eq(subTasks.taskId, tasks.id))
.where(eq(tasks.id, taskId))
.groupBy(tasks.id)
.limit(1);
return row ?? null;
}
export async function getProgressView(uploadId: string): Promise<ProgressView | null> {
const upload = await loadById(uploadId);
if (!upload) return null;
const task = upload.taskId ? await loadTaskSummary(upload.taskId) : null;
return { upload, task };
}
function validateMapping(mapping: Record<string, string>): string | null {
const values = Object.values(mapping);
if (!values.includes("address_1")) return "Mapping must include address_1.";
if (!values.includes("postcode")) return "Mapping must include postcode.";
return null;
}
export type SetMappingOutcome =
| { kind: "ok"; upload: BulkUpload }
| { kind: "not_found" }
| { kind: "invalid_status"; current: string }
| { kind: "invalid_mapping"; reason: string };
export async function setColumnMapping(
uploadId: string,
mapping: Record<string, string>,
): Promise<SetMappingOutcome> {
const upload = await loadById(uploadId);
if (!upload) return { kind: "not_found" };
if (!REMAP_ALLOWED.has(upload.status as BulkUploadStatus))
return { kind: "invalid_status", current: upload.status };
const reason = validateMapping(mapping);
if (reason) return { kind: "invalid_mapping", reason };
const [updated] = await db
.update(bulkAddressUploads)
.set({ columnMapping: mapping, status: "mapping_complete" })
.where(eq(bulkAddressUploads.id, uploadId))
.returning();
if (!updated) return { kind: "not_found" };
return { kind: "ok", upload: updated };
}
export type LoadForAddressMatchingOutcome =
| { kind: "ok"; upload: BulkUpload }
| { kind: "not_found" }
| { kind: "wrong_state"; current: string }
| { kind: "missing_mapping" };
export async function loadForAddressMatching(
uploadId: string,
): Promise<LoadForAddressMatchingOutcome> {
const upload = await loadById(uploadId);
if (!upload) return { kind: "not_found" };
if (upload.status !== "mapping_complete")
return { kind: "wrong_state", current: upload.status };
if (!upload.columnMapping) return { kind: "missing_mapping" };
return { kind: "ok", upload };
}
export type TriggerAddressMatchingOutcome =
| { kind: "ok"; taskId: string }
| { kind: "trigger_failed"; status: number; message: string };
export async function triggerAddressMatching(args: {
uploadId: string;
s3Uri: string;
sessionToken: string | undefined;
}): Promise<TriggerAddressMatchingOutcome> {
const upload = await loadById(args.uploadId);
if (!upload) return { kind: "trigger_failed", status: 404, message: "Upload not found" };
const now = new Date();
const [task] = await db
.insert(tasks)
.values({
taskSource: `Address Onboarding ${upload.filename}`,
service: "address2uprn",
source: "portfolio_id",
sourceId: upload.portfolioId,
status: "waiting",
jobStarted: now,
})
.returning();
const [subTask] = await db
.insert(subTasks)
.values({
taskId: task.id,
status: "waiting",
inputs: JSON.stringify({ bulk_upload_id: args.uploadId }),
})
.returning();
const payload = {
task_id: task.id,
sub_task_id: subTask.id,
s3_uri: args.s3Uri,
};
const trigger = await triggerFastApiPipeline({
endpoint: "/v1/bulk-uploads/trigger-postcode-splitter",
payload,
sessionToken: args.sessionToken,
});
if (!trigger.ok)
return { kind: "trigger_failed", status: trigger.status, message: trigger.message };
await Promise.all([
db
.update(bulkAddressUploads)
.set({ status: "processing", taskId: task.id })
.where(eq(bulkAddressUploads.id, args.uploadId)),
db
.update(tasks)
.set({ status: "in progress" })
.where(eq(tasks.id, task.id)),
db
.update(subTasks)
.set({ inputs: JSON.stringify(payload) })
.where(eq(subTasks.id, subTask.id)),
]);
return { kind: "ok", taskId: task.id };
}
export type CombineRetriggerOutcome =
| { kind: "triggered"; taskId: string; subTaskId: string }
| { kind: "already_combined" }
| { kind: "not_found" }
| { kind: "missing_task" }
| { kind: "trigger_failed"; status: number; message: string };
export async function requestCombineRetrigger(args: {
uploadId: string;
sessionToken: string | undefined;
}): Promise<CombineRetriggerOutcome> {
const upload = await loadById(args.uploadId);
if (!upload) return { kind: "not_found" };
if (!upload.taskId) return { kind: "missing_task" };
if (upload.combinedOutputS3Uri) return { kind: "already_combined" };
const [subTask] = await db
.insert(subTasks)
.values({ taskId: upload.taskId, status: "waiting" })
.returning();
const payload = { task_id: upload.taskId, sub_task_id: subTask.id };
const trigger = await triggerFastApiPipeline({
endpoint: "/v1/bulk-uploads/trigger-combiner",
payload,
sessionToken: args.sessionToken,
});
if (!trigger.ok)
return { kind: "trigger_failed", status: trigger.status, message: trigger.message };
await db
.update(subTasks)
.set({ inputs: JSON.stringify(payload) })
.where(eq(subTasks.id, subTask.id));
return { kind: "triggered", taskId: upload.taskId, subTaskId: subTask.id };
}
export type LoadForFinalizeOutcome =
| { kind: "ready"; upload: BulkUpload }
| { kind: "already_finalized"; status: "complete" | "needs_review" }
| { kind: "not_found" }
| { kind: "not_yet_combined" }
| { kind: "wrong_state"; current: string };
export async function loadForFinalize(uploadId: string): Promise<LoadForFinalizeOutcome> {
const upload = await loadById(uploadId);
if (!upload) return { kind: "not_found" };
if (upload.status === "complete" || upload.status === "needs_review")
return { kind: "already_finalized", status: upload.status };
if (upload.status !== "awaiting_review")
return { kind: "wrong_state", current: upload.status };
if (!upload.combinedOutputS3Uri) return { kind: "not_yet_combined" };
return { kind: "ready", upload };
}
export async function markFinalized(
uploadId: string,
args: { needsReview: boolean },
): Promise<void> {
await db
.update(bulkAddressUploads)
.set({ status: args.needsReview ? "needs_review" : "complete" })
.where(eq(bulkAddressUploads.id, uploadId));
}

View file

@ -0,0 +1,44 @@
import type { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
export const BULK_UPLOAD_STATUSES = [
"ready_for_processing",
"mapping_complete",
"processing",
"combining",
"awaiting_review",
"complete",
"needs_review",
"failed",
] as const;
export type BulkUploadStatus = (typeof BULK_UPLOAD_STATUSES)[number];
export type BulkUpload = typeof bulkAddressUploads.$inferSelect;
export type TaskSummary = {
id: string;
taskSource: string;
status: string;
service: string | null;
jobStarted: Date | null;
jobCompleted: Date | null;
updatedAt: Date;
totalSubtasks: number;
completedSubtasks: number;
failedSubtasks: number;
};
export type ProgressView = {
upload: BulkUpload;
task: TaskSummary | null;
};
const TERMINAL_UPLOAD_STATUSES: ReadonlySet<BulkUploadStatus> = new Set([
"complete",
"needs_review",
"failed",
]);
export function isTerminalStatus(status: string): boolean {
return TERMINAL_UPLOAD_STATUSES.has(status as BulkUploadStatus);
}

8
src/lib/session.ts Normal file
View file

@ -0,0 +1,8 @@
import { NextRequest } from "next/server";
export function readSessionToken(request: NextRequest): string | undefined {
return (
request.cookies.get("__Secure-next-auth.session-token")?.value ??
request.cookies.get("next-auth.session-token")?.value
);
}