Long Process Pipelines

Status, Retries, and Testing

Persist product status and test long-running BullMQ pipeline jobs.

BullMQ owns queue mechanics. Your application owns product status, permissions, idempotency, audit records, and user-facing results.

Durable Status

export type TicketJobRecord = {
  jobId: string;
  tenantId: string;
  ticketId: string;
  status: "queued" | "running" | "completed" | "failed";
  output?: unknown;
  errorMessage?: string;
  updatedAt: Date;
};

Store status in app storage before enqueueing. Update the same record from the worker. Use BullMQ job state for operations and debugging, not as the only user-facing product record.

Queue Events

import { QueueEvents } from "bullmq";
import { queueName } from "./queue";
import { workerConnection } from "./worker-connection";

export const triageQueueEvents = new QueueEvents(queueName, {
  connection: workerConnection,
});

triageQueueEvents.on("completed", ({ jobId }) => {
  console.info("ticket triage job completed", { jobId });
});

triageQueueEvents.on("failed", ({ jobId, failedReason }) => {
  console.warn("ticket triage job failed", { jobId, failedReason });
});

Queue events are useful for operational logs and metrics. Product state should still be written by the API and worker paths that understand tenant, ticket, and result data.

Retry Policy

CaseSuggested behavior
provider timeoutretry with backoff
temporary Redis or database errorretry with backoff
invalid job datafail without re-enqueueing
permission or tenant mismatchfail and alert
non-idempotent side effectprotect with operation id before retrying

If the pipeline writes data, sends messages, or calls external write APIs, use service-layer idempotency keys. A retried worker must not create duplicate side effects.

Failure Modes

FailureFix
duplicate jobs for one ticketstable jobId plus app-level idempotency
Redis outage blocks API too longtune producer connection retry behavior and return a retryable API error
worker retries repeat side effectskeep side effects idempotent and persist operation ids
user cannot see progress after completion cleanupstore status and result in app storage
worker process hides emitted errorsattach a worker error listener
pipeline stage is hard to debugpass a pipeline observer and include job id in logs and traces

Test Checklist

  • Test input validation before enqueueing.
  • Test stable job ids for duplicate enqueue attempts.
  • Test pending records are created before queue.add(...).
  • Test worker success writes completed status and output.
  • Test worker failure writes failed status and leaves retry behavior to BullMQ.
  • Test idempotent side effects when BullMQ retries the same job.
  • Test progress updates from pipeline stage events.
  • Run the pipeline directly in unit tests before testing BullMQ wiring.