push migration for khalim

This commit is contained in:
Jun-te Kim 2026-04-17 19:30:35 +00:00
parent 16136a3d3a
commit ab4fdf3000
11 changed files with 6902 additions and 77 deletions

View file

@ -1,6 +1,7 @@
import { db } from "@/app/db/db";
import { bulkAddressUploads } from "@/app/db/schema/bulk_address_uploads";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { eq } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
import { getServerSession } from "next-auth";
@ -8,6 +9,8 @@ import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import { z } from "zod";
import { createS3Client } from "@/app/utils/s3";
import { sendToQueue } from "@/app/utils/sqs";
import S3 from "aws-sdk/clients/s3";
import * as XLSX from "xlsx";
const FIELD_RENAME: Record<string, string> = {
address_1: "Address 1",
@ -22,25 +25,26 @@ const BodySchema = z.object({
subTaskId: z.string().uuid(),
});
function transformCsv(
raw: string,
function transformFile(
buffer: Buffer,
columnMapping: Record<string, string>
): { csv: string; error?: never } | { csv?: never; error: string } {
const lines = raw.split(/\r?\n/);
if (lines.length === 0) return { error: "Empty file" };
const wb = XLSX.read(buffer, { type: "buffer" });
const sheet = wb.Sheets[wb.SheetNames[0]];
const rows = XLSX.utils.sheet_to_json<Record<string, unknown>>(sheet, { defval: "" });
const sourceHeaders = lines[0].split(",").map((h) => h.trim().replace(/^"|"$/g, ""));
if (rows.length === 0) return { error: "Empty file" };
const sourceHeaders = Object.keys(rows[0]);
const outputHeaders: string[] = [];
const keepIndices: number[] = [];
const sourceToOutput: Record<string, string> = {};
for (let i = 0; i < sourceHeaders.length; i++) {
const src = sourceHeaders[i];
for (const src of sourceHeaders) {
const mapped = columnMapping[src];
if (!mapped || mapped === "skip") continue;
const renamed = FIELD_RENAME[mapped] ?? mapped;
outputHeaders.push(renamed);
keepIndices.push(i);
sourceToOutput[src] = renamed;
}
if (!outputHeaders.includes("Address 1"))
@ -48,16 +52,16 @@ function transformCsv(
if (!outputHeaders.includes("postcode"))
return { error: 'Mapping must include "postcode"' };
const outputLines: string[] = [outputHeaders.join(",")];
const outputRows = rows.map((row) => {
const out: Record<string, unknown> = {};
for (const [src, renamed] of Object.entries(sourceToOutput)) {
out[renamed] = row[src] ?? "";
}
return out;
});
for (let r = 1; r < lines.length; r++) {
const line = lines[r].trim();
if (!line) continue;
const cols = line.split(",").map((c) => c.trim().replace(/^"|"$/g, ""));
outputLines.push(keepIndices.map((i) => cols[i] ?? "").join(","));
}
return { csv: outputLines.join("\n") };
const outSheet = XLSX.utils.json_to_sheet(outputRows, { header: outputHeaders });
return { csv: XLSX.utils.sheet_to_csv(outSheet) };
}
export async function POST(
@ -89,27 +93,33 @@ export async function POST(
return NextResponse.json({ error: "Column mapping missing" }, { status: 422 });
const s3 = createS3Client();
const outputS3 = new S3({
region: process.env.RETROFIT_DATA_DEV_REGION,
accessKeyId: process.env.RETROFIT_DATA_DEV_ACCESS_KEY,
secretAccessKey: process.env.RETROFIT_DATA_DEV_SECRET_KEY,
});
const outputBucket = process.env.RETROFIT_DATA_DEV_S3_BUCKET_NAME!;
const bucket = upload.s3Bucket;
let rawCsv: string;
let fileBuffer: Buffer;
try {
const obj = await s3
.getObject({ Bucket: bucket, Key: upload.s3Key })
.promise();
rawCsv = obj.Body?.toString("utf-8") ?? "";
fileBuffer = Buffer.from(obj.Body as Uint8Array);
} catch (err) {
console.error("Failed to read source CSV from S3:", err);
console.error("Failed to read source file from S3:", err);
return NextResponse.json({ error: "Failed to read source file" }, { status: 500 });
}
const result = transformCsv(rawCsv, upload.columnMapping);
const result = transformFile(fileBuffer, upload.columnMapping);
if (result.error) return NextResponse.json({ error: result.error }, { status: 422 });
const transformedKey = `bulk_onboarding_inputs/${portfolioId}/${uploadId}.csv`;
try {
await s3
await outputS3
.putObject({
Bucket: bucket,
Bucket: outputBucket,
Key: transformedKey,
Body: result.csv,
ContentType: "text/csv",
@ -120,7 +130,7 @@ export async function POST(
return NextResponse.json({ error: "Failed to store transformed file" }, { status: 500 });
}
const s3Uri = `s3://${bucket}/${transformedKey}`;
const s3Uri = `s3://${outputBucket}/${transformedKey}`;
const queueName = process.env.POSTCODE_SPLITTER_QUEUE_NAME;
if (!queueName) {
console.error("POSTCODE_SPLITTER_QUEUE_NAME not set");
@ -137,15 +147,17 @@ export async function POST(
return NextResponse.json({ error: "Failed to queue onboarding job" }, { status: 500 });
}
await db
.update(bulkAddressUploads)
.set({ status: "processing", taskId: body.taskId })
.where(eq(bulkAddressUploads.id, uploadId));
await db
.update(tasks)
.set({ status: "in progress" })
.where(eq(tasks.id, body.taskId));
await Promise.all([
db.update(bulkAddressUploads)
.set({ status: "processing", taskId: body.taskId })
.where(eq(bulkAddressUploads.id, uploadId)),
db.update(tasks)
.set({ status: "in progress" })
.where(eq(tasks.id, body.taskId)),
db.update(subTasks)
.set({ inputs: JSON.stringify({ task_id: body.taskId, sub_task_id: body.subTaskId, s3_uri: s3Uri }) })
.where(eq(subTasks.id, body.subTaskId)),
]);
return NextResponse.json({ taskId: body.taskId }, { status: 200 });
}

View file

@ -1,40 +1,26 @@
import { db } from "@/app/db/db";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { eq, count, sql } from "drizzle-orm";
import { eq } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
export async function GET(
_request: NextRequest,
request: NextRequest,
{ params }: { params: Promise<{ taskId: string }> }
) {
const { taskId } = await params;
try {
const [row] = await db
.select({
id: tasks.id,
taskSource: tasks.taskSource,
status: tasks.status,
service: tasks.service,
jobStarted: tasks.jobStarted,
jobCompleted: tasks.jobCompleted,
updatedAt: tasks.updatedAt,
totalSubtasks: count(subTasks.id),
completedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`,
failedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`,
})
.from(tasks)
.leftJoin(subTasks, eq(subTasks.taskId, tasks.id))
.where(eq(tasks.id, taskId))
.groupBy(tasks.id)
.limit(1);
const { taskId } = await params;
const taskSubTasks = await db
.select()
.from(subTasks)
.where(eq(subTasks.taskId, taskId))
.orderBy(subTasks.updatedAt);
if (!row) return NextResponse.json({ error: "Not found" }, { status: 404 });
return NextResponse.json(row);
return NextResponse.json(taskSubTasks);
} catch (error) {
console.error("Error fetching task:", error);
return NextResponse.json({ error: "Failed to fetch task" }, { status: 500 });
console.error("Error fetching subtasks:", error);
return NextResponse.json(
{ error: "Failed to fetch subtasks" },
{ status: 500 }
);
}
}

View file

@ -0,0 +1,40 @@
import { db } from "@/app/db/db";
import { tasks } from "@/app/db/schema/tasks/tasks";
import { subTasks } from "@/app/db/schema/tasks/subtask";
import { eq, count, sql } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ taskId: string }> }
) {
const { taskId } = await params;
try {
const [row] = await db
.select({
id: tasks.id,
taskSource: tasks.taskSource,
status: tasks.status,
service: tasks.service,
jobStarted: tasks.jobStarted,
jobCompleted: tasks.jobCompleted,
updatedAt: tasks.updatedAt,
totalSubtasks: count(subTasks.id),
completedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('completed', 'complete') then 1 end)::int`,
failedSubtasks: sql<number>`count(case when lower(${subTasks.status}) in ('failed', 'failure', 'error') then 1 end)::int`,
})
.from(tasks)
.leftJoin(subTasks, eq(subTasks.taskId, tasks.id))
.where(eq(tasks.id, taskId))
.groupBy(tasks.id)
.limit(1);
if (!row) return NextResponse.json({ error: "Not found" }, { status: 404 });
return NextResponse.json(row);
} catch (error) {
console.error("Error fetching task summary:", error);
return NextResponse.json({ error: "Failed to fetch task summary" }, { status: 500 });
}
}

View file

@ -13,7 +13,7 @@ import {
import { cn } from "@/lib/utils";
import { useRouter } from "next/navigation";
import { Dispatch, SetStateAction, useState } from "react";
// import BulkUploadComingSoonModal from "@/app/components/portfolio/BulkUploadComingSoonModal";
import BulkUploadComingSoonModal from "@/app/components/portfolio/BulkUploadComingSoonModal";
interface AddNewProps {
portfolioId: string;
@ -37,11 +37,11 @@ export default function AddNew({
return (
<>
{/* <BulkUploadComingSoonModal
isOpen={isBulkUploadOpen}
onClose={() => setIsBulkUploadOpen(false)}
portfolioId={portfolioId}
/> */}
<BulkUploadComingSoonModal
isOpen={isBulkUploadOpen}
onClose={() => setIsBulkUploadOpen(false)}
portfolioId={portfolioId}
/>
<Menu as="div" className="relative inline-block text-left">
<MenuButton
className="

View file

@ -0,0 +1 @@
ALTER TABLE "bulk_address_uploads" ADD COLUMN "combined_output_s3_uri" text;

View file

@ -0,0 +1 @@
ALTER TABLE "bulk_address_uploads" ADD COLUMN "combined_output_s3_uri" text;

File diff suppressed because it is too large Load diff

View file

@ -1233,6 +1233,20 @@
"when": 1776434096854,
"tag": "0175_sweet_otto_octavius",
"breakpoints": true
},
{
"idx": 176,
"version": "7",
"when": 1776900120000,
"tag": "0176_bulk_upload_combined_output",
"breakpoints": true
},
{
"idx": 177,
"version": "7",
"when": 1776451871348,
"tag": "0177_wooden_dexter_bennett",
"breakpoints": true
}
]
}

View file

@ -12,6 +12,7 @@ export const bulkAddressUploads = pgTable("bulk_address_uploads", {
sourceHeaders: text("source_headers").array().notNull().default(sql`'{}'`),
columnMapping: jsonb("column_mapping").$type<Record<string, string>>(),
taskId: uuid("task_id"),
combinedOutputS3Uri: text("combined_output_s3_uri"),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true })
.notNull()

View file

@ -15,11 +15,12 @@ interface TaskData {
interface Props {
taskId: string;
portfolioSlug: string;
isDomnaUser: boolean;
}
const TERMINAL_STATUSES = new Set(["complete", "completed", "failed", "failure", "error"]);
export default function OnboardingProgress({ taskId, portfolioSlug }: Props) {
export default function OnboardingProgress({ taskId, portfolioSlug, isDomnaUser }: Props) {
const [data, setData] = useState<TaskData | null>(null);
const [fetchError, setFetchError] = useState(false);
const intervalRef = useRef<ReturnType<typeof setInterval> | null>(null);
@ -27,7 +28,7 @@ export default function OnboardingProgress({ taskId, portfolioSlug }: Props) {
useEffect(() => {
async function poll() {
try {
const res = await fetch(`/api/tasks/${taskId}`);
const res = await fetch(`/api/tasks/${taskId}/summary`);
if (!res.ok) { setFetchError(true); return; }
const json: TaskData = await res.json();
setData(json);
@ -98,12 +99,14 @@ export default function OnboardingProgress({ taskId, portfolioSlug }: Props) {
)}
</div>
<Link
href={`/portfolio/${portfolioSlug}/settings/logs`}
className="text-xs text-gray-400 hover:text-gray-700 underline underline-offset-2 transition-colors"
>
View detailed logs
</Link>
{isDomnaUser && (
<Link
href={`/portfolio/${portfolioSlug}/settings/logs`}
className="text-xs text-gray-400 hover:text-gray-700 underline underline-offset-2 transition-colors"
>
View detailed logs
</Link>
)}
</div>
);
}

View file

@ -77,6 +77,7 @@ export default async function BulkUploadDetailPage(props: {
const { slug, uploadId } = await props.params;
const session = await getServerSession(AuthOptions);
if (!session) redirect("/login");
const isDomnaUser = !!session.user?.email?.endsWith("@domna.homes");
const [upload] = await db
.select()
@ -151,7 +152,7 @@ export default async function BulkUploadDetailPage(props: {
{(statusKey === "processing" || statusKey === "complete" || statusKey === "failed") &&
upload.taskId && (
<OnboardingProgress taskId={upload.taskId} portfolioSlug={slug} />
<OnboardingProgress taskId={upload.taskId} portfolioSlug={slug} isDomnaUser={isDomnaUser} />
)}
</div>
</div>