Anvia
Pipelines

Pipeline Builder

Compose multi-step workflows with Anvia pipelines.

Use PipelineBuilder when a workflow should be explicit, testable, and made from named stages instead of one large prompt.

1. Start With Input and Output Types

import { PipelineBuilder } from "@anvia/core";

const pipeline = new PipelineBuilder<string>();

PipelineBuilder<Input, Output = Input> tracks two types:

TypeMeaning
InputThe value accepted by run(...) after .build().
OutputThe current value after the latest stage. It starts as Input and changes as stages return new values.

Most pipelines only set the first generic. TypeScript infers the output after every .step(...), .use(...), .parallel(...), .prompt(...), and .extract(...).

2. Build, Then Run

const normalizeTicket = new PipelineBuilder<string>()
  .step((input) => input.trim())
  .step((input) => input.replace(/\s+/g, " "))
  .build();

const result = await normalizeTicket.run("  checkout   is failing  ");

console.log(result);

The builder describes the pipeline. .build() returns the runnable Pipeline<Input, Output>. run(...) and batch(...) live on the built pipeline, not on the builder.

API at a glance

APIInputOutput
.step(fn)Current outputThe value returned by fn
.use(op)Current outputThe output of another PipelineOp
.parallel({ ... })Current output, passed to every branchAn object keyed by branch name
.prompt(agent)Current output converted with String(input)Agent text output
.extract(extractor)Current output converted with String(input)Extractor schema data
.build()Builder stateRunnable Pipeline<Input, Output>
pipeline.run(input)Original InputFinal Output
pipeline.batch(inputs, options)Iterable of original InputArray of final Output values

3. Return New Shapes

Pipelines are not limited to strings. A step can return an object, array, number, or any other TypeScript value.

type SupportTicketInput = {
  customer: string;
  subject: string;
  body: string;
};

type NormalizedTicket = {
  customer: string;
  title: string;
  words: number;
};

const normalizeTicket = new PipelineBuilder<SupportTicketInput>()
  .step((ticket): NormalizedTicket => ({
    customer: ticket.customer.trim(),
    title: ticket.subject.trim().toLowerCase(),
    words: ticket.body.trim().split(/\s+/).length,
  }))
  .build();

const ticket = await normalizeTicket.run({
  customer: " Acme Co. ",
  subject: " Checkout is failing ",
  body: "Enterprise checkout fails after payment retries.",
});

console.log(ticket.title);

The first step receives SupportTicketInput. The built pipeline returns NormalizedTicket.

4. Use Arrays and Numbers

const executiveNotes = new PipelineBuilder<string[]>()
  .step((notes) => notes.map((note) => `- ${note}`).join("\n"))
  .build();

const notes = await executiveNotes.run([
  "Checkout failures increased after payment retry changes.",
  "Enterprise customers are affected.",
]);
const formatPrice = new PipelineBuilder<number>()
  .step((dollars) => Math.round(dollars * 100))
  .step((cents) => ({
    cents,
    formatted: `$${(cents / 100).toFixed(2)}`,
  }))
  .build();

const price = await formatPrice.run(12.5);

console.log(price.formatted);

5. Branch Into Different Output Types

const ticketSignals = new PipelineBuilder<string>()
  .parallel({
    title: new PipelineBuilder<string>()
      .step((ticket) => ticket.split(".")[0] ?? ticket)
      .build(),
    wordCount: new PipelineBuilder<string>()
      .step((ticket) => ticket.trim().split(/\s+/).length)
      .build(),
    urgent: new PipelineBuilder<string>()
      .step((ticket) => ticket.toLowerCase().includes("outage"))
      .build(),
  })
  .build();

const signals = await ticketSignals.run(
  "Checkout outage affects enterprise customers. Payment retries fail.",
);

console.log(signals.title, signals.wordCount, signals.urgent);

The result is typed as:

{
  title: string;
  wordCount: number;
  urgent: boolean;
}

6. Format Before Prompting or Extracting

.prompt(agent) and .extract(extractor) call String(input) internally. For strings, that is usually fine. For objects, add a formatting step first so the model receives intentional text instead of [object Object].

const ticketPipeline = new PipelineBuilder<SupportTicketInput>()
  .step(
    (ticket) =>
      [
        `Customer: ${ticket.customer}`,
        `Subject: ${ticket.subject}`,
        `Body: ${ticket.body}`,
      ].join("\n"),
  )
  .prompt(summarizer)
  .extract(triageExtractor)
  .build();

const triage = await ticketPipeline.run({
  customer: "Acme Co.",
  subject: "Checkout is failing",
  body: "Enterprise checkout fails after payment retries.",
});

7. Add Capabilities Gradually

Build pipelines in this order:

  1. Use Steps for ordinary transforms.
  2. Use Prompt Steps when a stage needs an agent.
  3. Use Extractor Steps when a stage should return typed data.
  4. Use Parallel Branches when independent work can run at the same time.
  5. Use Batch Runs when the same pipeline should process many inputs.
  6. Read Example: Research Pipeline for a complete fetch, summarize, and extract workflow.

Minimal End-to-End Shape

const ticketPipeline = new PipelineBuilder<string>()
  .step((ticket) => ticket.trim())
  .step((ticket) => `Summarize this ticket:\n\n${ticket}`)
  .prompt(summarizer)
  .build();

const summary = await ticketPipeline.run("Checkout fails for enterprise users.");

Keep the first version linear. Add branching and composition only after the simple flow is clear.