Long Process Pipelines

Enqueue Pipeline Jobs

Validate API input and add stable BullMQ jobs for later pipeline execution.

The enqueue boundary is product code. It should authorize the caller, validate input, create a durable pending record, and then call queue.add(...) with a stable job id.

Job Data

import { z } from "zod";

export const ticketInputSchema = z.object({
  tenantId: z.string(),
  ticketId: z.string(),
  summary: z.string().min(1),
});

export type TicketJobData = z.infer<typeof ticketInputSchema>;

Keep job data scoped and serializable. Do not enqueue request objects, database clients, provider clients, or unbounded conversation history.

Queue Setup

import { Queue, type JobsOptions } from "bullmq";
import IORedis from "ioredis";
import type { TicketJobData } from "./schema";

export const queueName = "ticket-triage";

const producerConnection = new IORedis(process.env.REDIS_URL);

export const triageQueue = new Queue<TicketJobData>(queueName, {
  connection: producerConnection,
});

Use separate producer and worker connections when API and worker processes have different runtime constraints.

Job Options

export const jobOptions: JobsOptions = {
  attempts: 3,
  backoff: {
    type: "exponential",
    delay: 5_000,
  },
  removeOnComplete: {
    age: 60 * 60 * 24,
    count: 1_000,
  },
  removeOnFail: {
    age: 60 * 60 * 24 * 7,
  },
};

Retries are useful for transient provider, network, and database failures. They are dangerous when the worker performs non-idempotent side effects.

Enqueue Function

import { ticketInputSchema } from "./schema";
import { jobOptions, triageQueue } from "./queue";
import { ticketJobs } from "./storage";

export async function enqueueTicketTriage(input: unknown) {
  const data = ticketInputSchema.parse(input);
  const jobId = `ticket-triage:${data.tenantId}:${data.ticketId}`;

  await ticketJobs.createPending({
    jobId,
    tenantId: data.tenantId,
    ticketId: data.ticketId,
  });

  const job = await triageQueue.add("triage", data, {
    ...jobOptions,
    jobId,
  });

  return {
    jobId: job.id,
    status: "queued",
  };
}

The stable jobId prevents duplicate queue jobs for the same product operation. Keep a matching idempotency constraint in app storage because the database is the source of product truth.

API Boundary

import { enqueueTicketTriage } from "./enqueue";

export async function POST(request: Request) {
  const body = await request.json();
  const result = await enqueueTicketTriage(body);

  return Response.json(result, {
    status: 202,
  });
}

Do authentication, tenant scoping, quotas, and validation before queue.add(...). The worker should still trust only scoped job data, but the API is the first product boundary.