Run Pipeline Workers
Consume BullMQ jobs and execute Anvia pipelines in a separate worker process.
The worker owns execution. It should mark jobs running, call pipeline.run(...), update progress from pipeline stage events, persist the final result, and let BullMQ retry failures that should be retried.
Build the Pipeline
import { PipelineBuilder } from "@anvia/core";
import { ticketAgent, ticketExtractor } from "./ai";
import type { TicketJobData } from "./schema";
export const ticketPipeline = new PipelineBuilder<TicketJobData>({
id: "ticket-triage-pipeline",
name: "Ticket triage pipeline",
})
.step((input) => ({
...input,
summary: input.summary.trim(),
}))
.prompt(ticketAgent, {
name: "Draft triage",
})
.extract(ticketExtractor, {
name: "Extract ticket result",
})
.build();Build provider clients, agents, extractors, and reusable pipelines at worker startup. Keep request-local user and tenant data inside the job payload.
Worker Connection
import IORedis from "ioredis";
export const workerConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
});BullMQ workers need a Redis connection suitable for blocking commands. Keep this separate from short-lived HTTP request behavior.
Worker Handler
import { type PipelineRunEvent } from "@anvia/core";
import { Worker } from "bullmq";
import { queueName } from "./queue";
import { ticketPipeline } from "./pipeline";
import type { TicketJobData } from "./schema";
import { ticketJobs } from "./storage";
import { workerConnection } from "./worker-connection";
export const triageWorker = new Worker<TicketJobData>(
queueName,
async (job) => {
await ticketJobs.markRunning(job.id!, {
startedAt: new Date(),
attempt: job.attemptsMade + 1,
});
const output = await ticketPipeline.run(job.data, {
observer: {
async onEvent(event: PipelineRunEvent) {
await job.updateProgress({
type: event.type,
stage: event.node.label,
});
},
},
});
await ticketJobs.markCompleted(job.id!, {
completedAt: new Date(),
output,
});
return {
ticketId: job.data.ticketId,
status: "completed",
};
},
{
connection: workerConnection,
concurrency: 4,
},
);Use the pipeline observer for metadata-only progress. Do not put sensitive model output, retrieved documents, or tool results into queue progress if those values should only live in app storage or traces.
Worker Events
triageWorker.on("failed", async (job, error) => {
if (!job) return;
await ticketJobs.markFailed(job.id!, {
failedAt: new Date(),
message: error.message,
attempt: job.attemptsMade,
});
});
triageWorker.on("error", (error) => {
console.error("ticket triage worker error", error);
});The failed event is job-level state. The error event is worker-level runtime state. Handle both so job status and worker health are visible.
