Enqueue Pipeline Jobs
Validate API input and add stable BullMQ jobs for later pipeline execution.
The enqueue boundary is product code. It should authorize the caller, validate input, create a durable pending record, and then call queue.add(...) with a stable job id.
Job Data
import { z } from "zod";
export const ticketInputSchema = z.object({
tenantId: z.string(),
ticketId: z.string(),
summary: z.string().min(1),
});
export type TicketJobData = z.infer<typeof ticketInputSchema>;Keep job data scoped and serializable. Do not enqueue request objects, database clients, provider clients, or unbounded conversation history.
Queue Setup
import { Queue, type JobsOptions } from "bullmq";
import IORedis from "ioredis";
import type { TicketJobData } from "./schema";
export const queueName = "ticket-triage";
const producerConnection = new IORedis(process.env.REDIS_URL);
export const triageQueue = new Queue<TicketJobData>(queueName, {
connection: producerConnection,
});Use separate producer and worker connections when API and worker processes have different runtime constraints.
Job Options
export const jobOptions: JobsOptions = {
attempts: 3,
backoff: {
type: "exponential",
delay: 5_000,
},
removeOnComplete: {
age: 60 * 60 * 24,
count: 1_000,
},
removeOnFail: {
age: 60 * 60 * 24 * 7,
},
};Retries are useful for transient provider, network, and database failures. They are dangerous when the worker performs non-idempotent side effects.
Enqueue Function
import { ticketInputSchema } from "./schema";
import { jobOptions, triageQueue } from "./queue";
import { ticketJobs } from "./storage";
export async function enqueueTicketTriage(input: unknown) {
const data = ticketInputSchema.parse(input);
const jobId = `ticket-triage:${data.tenantId}:${data.ticketId}`;
await ticketJobs.createPending({
jobId,
tenantId: data.tenantId,
ticketId: data.ticketId,
});
const job = await triageQueue.add("triage", data, {
...jobOptions,
jobId,
});
return {
jobId: job.id,
status: "queued",
};
}The stable jobId prevents duplicate queue jobs for the same product operation. Keep a matching idempotency constraint in app storage because the database is the source of product truth.
API Boundary
import { enqueueTicketTriage } from "./enqueue";
export async function POST(request: Request) {
const body = await request.json();
const result = await enqueueTicketTriage(body);
return Response.json(result, {
status: 202,
});
}Do authentication, tenant scoping, quotas, and validation before queue.add(...). The worker should still trust only scoped job data, but the API is the first product boundary.
