Long Process Pipelines
Status, Retries, and Testing
Persist product status and test long-running BullMQ pipeline jobs.
BullMQ owns queue mechanics. Your application owns product status, permissions, idempotency, audit records, and user-facing results.
Durable Status
export type TicketJobRecord = {
jobId: string;
tenantId: string;
ticketId: string;
status: "queued" | "running" | "completed" | "failed";
output?: unknown;
errorMessage?: string;
updatedAt: Date;
};Store status in app storage before enqueueing. Update the same record from the worker. Use BullMQ job state for operations and debugging, not as the only user-facing product record.
Queue Events
import { QueueEvents } from "bullmq";
import { queueName } from "./queue";
import { workerConnection } from "./worker-connection";
export const triageQueueEvents = new QueueEvents(queueName, {
connection: workerConnection,
});
triageQueueEvents.on("completed", ({ jobId }) => {
console.info("ticket triage job completed", { jobId });
});
triageQueueEvents.on("failed", ({ jobId, failedReason }) => {
console.warn("ticket triage job failed", { jobId, failedReason });
});Queue events are useful for operational logs and metrics. Product state should still be written by the API and worker paths that understand tenant, ticket, and result data.
Retry Policy
| Case | Suggested behavior |
|---|---|
| provider timeout | retry with backoff |
| temporary Redis or database error | retry with backoff |
| invalid job data | fail without re-enqueueing |
| permission or tenant mismatch | fail and alert |
| non-idempotent side effect | protect with operation id before retrying |
If the pipeline writes data, sends messages, or calls external write APIs, use service-layer idempotency keys. A retried worker must not create duplicate side effects.
Failure Modes
| Failure | Fix |
|---|---|
| duplicate jobs for one ticket | stable jobId plus app-level idempotency |
| Redis outage blocks API too long | tune producer connection retry behavior and return a retryable API error |
| worker retries repeat side effects | keep side effects idempotent and persist operation ids |
| user cannot see progress after completion cleanup | store status and result in app storage |
| worker process hides emitted errors | attach a worker error listener |
| pipeline stage is hard to debug | pass a pipeline observer and include job id in logs and traces |
Test Checklist
- Test input validation before enqueueing.
- Test stable job ids for duplicate enqueue attempts.
- Test pending records are created before
queue.add(...). - Test worker success writes completed status and output.
- Test worker failure writes failed status and leaves retry behavior to BullMQ.
- Test idempotent side effects when BullMQ retries the same job.
- Test progress updates from pipeline stage events.
- Run the pipeline directly in unit tests before testing BullMQ wiring.
