diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/onboard/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/onboard/route.ts index 15fe2124..5a6d4b01 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/onboard/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/onboard/route.ts @@ -1,6 +1,7 @@ import { db } from "@/app/db/db"; import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads"; import { tasks } from "@/app/db/schema/tasks/tasks"; +import { subTasks } from "@/app/db/schema/tasks/subtask"; import { eq } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; import { getServerSession } from "next-auth"; @@ -8,6 +9,8 @@ import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; import { z } from "zod"; import { createS3Client } from "@/app/utils/s3"; import { sendToQueue } from "@/app/utils/sqs"; +import S3 from "aws-sdk/clients/s3"; +import * as XLSX from "xlsx"; const FIELD_RENAME: Record = { address_1: "Address 1", @@ -22,25 +25,26 @@ const BodySchema = z.object({ subTaskId: z.string().uuid(), }); -function transformCsv( - raw: string, +function transformFile( + buffer: Buffer, columnMapping: Record ): { csv: string; error?: never } | { csv?: never; error: string } { - const lines = raw.split(/\r?\n/); - if (lines.length === 0) return { error: "Empty file" }; + const wb = XLSX.read(buffer, { type: "buffer" }); + const sheet = wb.Sheets[wb.SheetNames[0]]; + const rows = XLSX.utils.sheet_to_json>(sheet, { defval: "" }); - const sourceHeaders = lines[0].split(",").map((h) => h.trim().replace(/^"|"$/g, "")); + if (rows.length === 0) return { error: "Empty file" }; + const sourceHeaders = Object.keys(rows[0]); const outputHeaders: string[] = []; - const keepIndices: number[] = []; + const sourceToOutput: Record = {}; - for (let i = 0; i < sourceHeaders.length; i++) { - const src = sourceHeaders[i]; + for (const src of sourceHeaders) { const mapped = columnMapping[src]; if (!mapped || mapped === "skip") continue; const renamed = FIELD_RENAME[mapped] ?? mapped; outputHeaders.push(renamed); - keepIndices.push(i); + sourceToOutput[src] = renamed; } if (!outputHeaders.includes("Address 1")) @@ -48,16 +52,16 @@ function transformCsv( if (!outputHeaders.includes("postcode")) return { error: 'Mapping must include "postcode"' }; - const outputLines: string[] = [outputHeaders.join(",")]; + const outputRows = rows.map((row) => { + const out: Record = {}; + for (const [src, renamed] of Object.entries(sourceToOutput)) { + out[renamed] = row[src] ?? ""; + } + return out; + }); - for (let r = 1; r < lines.length; r++) { - const line = lines[r].trim(); - if (!line) continue; - const cols = line.split(",").map((c) => c.trim().replace(/^"|"$/g, "")); - outputLines.push(keepIndices.map((i) => cols[i] ?? "").join(",")); - } - - return { csv: outputLines.join("\n") }; + const outSheet = XLSX.utils.json_to_sheet(outputRows, { header: outputHeaders }); + return { csv: XLSX.utils.sheet_to_csv(outSheet) }; } export async function POST( @@ -89,27 +93,33 @@ export async function POST( return NextResponse.json({ error: "Column mapping missing" }, { status: 422 }); const s3 = createS3Client(); + const outputS3 = new S3({ + region: process.env.RETROFIT_DATA_DEV_REGION, + accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY, + secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY, + }); + const outputBucket = process.env.RETROFIT_DATA_DEV_S3_BUCKET_NAME!; const bucket = upload.s3Bucket; - let rawCsv: string; + let fileBuffer: Buffer; try { const obj = await s3 .getObject({ Bucket: bucket, Key: upload.s3Key }) .promise(); - rawCsv = obj.Body?.toString("utf-8") ?? ""; + fileBuffer = Buffer.from(obj.Body as Uint8Array); } catch (err) { - console.error("Failed to read source CSV from S3:", err); + console.error("Failed to read source file from S3:", err); return NextResponse.json({ error: "Failed to read source file" }, { status: 500 }); } - const result = transformCsv(rawCsv, upload.columnMapping); + const result = transformFile(fileBuffer, upload.columnMapping); if (result.error) return NextResponse.json({ error: result.error }, { status: 422 }); const transformedKey = `bulk_onboarding_inputs/${portfolioId}/${uploadId}.csv`; try { - await s3 + await outputS3 .putObject({ - Bucket: bucket, + Bucket: outputBucket, Key: transformedKey, Body: result.csv, ContentType: "text/csv", @@ -120,7 +130,7 @@ export async function POST( return NextResponse.json({ error: "Failed to store transformed file" }, { status: 500 }); } - const s3Uri = `s3://${bucket}/${transformedKey}`; + const s3Uri = `s3://${outputBucket}/${transformedKey}`; const queueName = process.env.POSTCODE_SPLITTER_QUEUE_NAME; if (!queueName) { console.error("POSTCODE_SPLITTER_QUEUE_NAME not set"); @@ -137,15 +147,17 @@ export async function POST( return NextResponse.json({ error: "Failed to queue onboarding job" }, { status: 500 }); } - await db - .update(bulkAddressUploads) - .set({ status: "processing", taskId: body.taskId }) - .where(eq(bulkAddressUploads.id, uploadId)); - - await db - .update(tasks) - .set({ status: "in progress" }) - .where(eq(tasks.id, body.taskId)); + await Promise.all([ + db.update(bulkAddressUploads) + .set({ status: "processing", taskId: body.taskId }) + .where(eq(bulkAddressUploads.id, uploadId)), + db.update(tasks) + .set({ status: "in progress" }) + .where(eq(tasks.id, body.taskId)), + db.update(subTasks) + .set({ inputs: JSON.stringify({ task_id: body.taskId, sub_task_id: body.subTaskId, s3_uri: s3Uri }) }) + .where(eq(subTasks.id, body.subTaskId)), + ]); return NextResponse.json({ taskId: body.taskId }, { status: 200 }); } diff --git a/src/app/api/tasks/[taskId]/route.ts b/src/app/api/tasks/[taskId]/route.ts index 9eaa1ed6..8c43f4f4 100644 --- a/src/app/api/tasks/[taskId]/route.ts +++ b/src/app/api/tasks/[taskId]/route.ts @@ -1,40 +1,26 @@ import { db } from "@/app/db/db"; -import { tasks } from "@/app/db/schema/tasks/tasks"; import { subTasks } from "@/app/db/schema/tasks/subtask"; -import { eq, count, sql } from "drizzle-orm"; +import { eq } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; export async function GET( - _request: NextRequest, + request: NextRequest, { params }: { params: Promise<{ taskId: string }> } ) { - const { taskId } = await params; - try { - const [row] = await db - .select({ - id: tasks.id, - taskSource: tasks.taskSource, - status: tasks.status, - service: tasks.service, - jobStarted: tasks.jobStarted, - jobCompleted: tasks.jobCompleted, - updatedAt: tasks.updatedAt, - totalSubtasks: count(subTasks.id), - completedSubtasks: sql`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`, - failedSubtasks: sql`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`, - }) - .from(tasks) - .leftJoin(subTasks, eq(subTasks.taskId, tasks.id)) - .where(eq(tasks.id, taskId)) - .groupBy(tasks.id) - .limit(1); + const { taskId } = await params; + const taskSubTasks = await db + .select() + .from(subTasks) + .where(eq(subTasks.taskId, taskId)) + .orderBy(subTasks.updatedAt); - if (!row) return NextResponse.json({ error: "Not found" }, { status: 404 }); - - return NextResponse.json(row); + return NextResponse.json(taskSubTasks); } catch (error) { - console.error("Error fetching task:", error); - return NextResponse.json({ error: "Failed to fetch task" }, { status: 500 }); + console.error("Error fetching subtasks:", error); + return NextResponse.json( + { error: "Failed to fetch subtasks" }, + { status: 500 } + ); } } diff --git a/src/app/api/tasks/[taskId]/summary/route.ts b/src/app/api/tasks/[taskId]/summary/route.ts new file mode 100644 index 00000000..6e5cb5c2 --- /dev/null +++ b/src/app/api/tasks/[taskId]/summary/route.ts @@ -0,0 +1,40 @@ +import { db } from "@/app/db/db"; +import { tasks } from "@/app/db/schema/tasks/tasks"; +import { subTasks } from "@/app/db/schema/tasks/subtask"; +import { eq, count, sql } from "drizzle-orm"; +import { NextRequest, NextResponse } from "next/server"; + +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ taskId: string }> } +) { + const { taskId } = await params; + + try { + const [row] = await db + .select({ + id: tasks.id, + taskSource: tasks.taskSource, + status: tasks.status, + service: tasks.service, + jobStarted: tasks.jobStarted, + jobCompleted: tasks.jobCompleted, + updatedAt: tasks.updatedAt, + totalSubtasks: count(subTasks.id), + completedSubtasks: sql`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`, + failedSubtasks: sql`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`, + }) + .from(tasks) + .leftJoin(subTasks, eq(subTasks.taskId, tasks.id)) + .where(eq(tasks.id, taskId)) + .groupBy(tasks.id) + .limit(1); + + if (!row) return NextResponse.json({ error: "Not found" }, { status: 404 }); + + return NextResponse.json(row); + } catch (error) { + console.error("Error fetching task summary:", error); + return NextResponse.json({ error: "Failed to fetch task summary" }, { status: 500 }); + } +} diff --git a/src/app/components/portfolio/AddNew.tsx b/src/app/components/portfolio/AddNew.tsx index 28bcec7b..2dfe23bd 100644 --- a/src/app/components/portfolio/AddNew.tsx +++ b/src/app/components/portfolio/AddNew.tsx @@ -13,7 +13,7 @@ import { import { cn } from "@/lib/utils"; import { useRouter } from "next/navigation"; import { Dispatch, SetStateAction, useState } from "react"; -// import BulkUploadComingSoonModal from "@/app/components/portfolio/BulkUploadComingSoonModal"; +import BulkUploadComingSoonModal from "@/app/components/portfolio/BulkUploadComingSoonModal"; interface AddNewProps { portfolioId: string; @@ -37,11 +37,11 @@ export default function AddNew({ return ( <> - {/* setIsBulkUploadOpen(false)} - portfolioId={portfolioId} - /> */} + setIsBulkUploadOpen(false)} + portfolioId={portfolioId} + /> >(), taskId: uuid("task_id"), + combinedOutputS3Uri: text("combined_output_s3_uri"), createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), updatedAt: timestamp("updated_at", { withTimezone: true }) .notNull() diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx index 24999b56..552d4a40 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx @@ -15,11 +15,12 @@ interface TaskData { interface Props { taskId: string; portfolioSlug: string; + isDomnaUser: boolean; } const TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]); -export default function OnboardingProgress({ taskId, portfolioSlug }: Props) { +export default function OnboardingProgress({ taskId, portfolioSlug, isDomnaUser }: Props) { const [data, setData] = useState(null); const [fetchError, setFetchError] = useState(false); const intervalRef = useRef | null>(null); @@ -27,7 +28,7 @@ export default function OnboardingProgress({ taskId, portfolioSlug }: Props) { useEffect(() => { async function poll() { try { - const res = await fetch(`/api/tasks/${taskId}`); + const res = await fetch(`/api/tasks/${taskId}/summary`); if (!res.ok) { setFetchError(true); return; } const json: TaskData = await res.json(); setData(json); @@ -98,12 +99,14 @@ export default function OnboardingProgress({ taskId, portfolioSlug }: Props) { )} - - View detailed logs - + {isDomnaUser && ( + + View detailed logs + + )} ); } diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx index 0c35f762..a85440f3 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/page.tsx @@ -77,6 +77,7 @@ export default async function BulkUploadDetailPage(props: { const { slug, uploadId } = await props.params; const session = await getServerSession(AuthOptions); if (!session) redirect("/login"); + const isDomnaUser = !!session.user?.email?.endsWith("@domna.homes"); const [upload] = await db .select() @@ -151,7 +152,7 @@ export default async function BulkUploadDetailPage(props: { {(statusKey === "processing" || statusKey === "complete" || statusKey === "failed") && upload.taskId && ( - + )}