Anvia
Pipelines

Batch Runs

Run a pipeline across many inputs.

Use batch(...) when the same pipeline should process many inputs with a concurrency limit.

1. Build a Normal Pipeline

const normalizeTicket = new PipelineBuilder<string>()
  .step((ticket) => ticket.trim())
  .step((ticket) => ticket.replace(/\s+/g, " "))
  .build();

2. Run Many Inputs

const results = await normalizeTicket.batch(
  [
    " checkout   failed ",
    " cannot update card ",
    " password reset issue ",
  ],
  { concurrency: 2 },
);

Results preserve input order.

console.log(results[0]); // "checkout failed"

3. Pick a Concurrency Limit

Pipeline WorkStarting Concurrency
CPU-light transforms4 to 16
Provider calls2 to 5
Database or API writesmatch your service limits

4. Handle Failures at the Batch Boundary

If any item throws, the batch rejects.

try {
  const results = await pipeline.batch(tickets, { concurrency: 3 });
  return results;
} catch (error) {
  logger.error(error, "ticket batch failed");
  throw error;
}

For per-item recovery, catch inside a step and return an explicit success or failure object.