Pipelines

Pipeline Builder

Compose multi-step workflows with Anvia pipelines.

Use PipelineBuilder when a workflow should be explicit, testable, and made from named stages instead of one large prompt.

1. Start With an Input Schema

import { z } from "zod";
import { PipelineBuilder } from "@anvia/core/pipeline";

const pipeline = new PipelineBuilder(z.string());

Pass a Zod schema to PipelineBuilder. The schema defines the input accepted by run(...), validates every run at runtime, and gives TypeScript the parsed value type for the first stage.

PipelineBuilder tracks two types from that schema:

TypeMeaning
InputThe value accepted by run(...). This is the schema input type.
OutputThe current value after the latest stage. It starts as the parsed schema output and changes as stages return new values.

TypeScript infers the output after every .step(...), .use(...), .parallel(...), .prompt(...), and .extract(...).

2. Build, Then Run

const normalizeTicket = new PipelineBuilder(z.string())
  .step((input) => input.trim())
  .step((input) => input.replace(/\s+/g, " "))
  .build();

const result = await normalizeTicket.run("  checkout   is failing  ");

console.log(result);

The builder describes the pipeline. .build() returns the runnable Pipeline<Input, Output>. run(...) and batch(...) live on the built pipeline, not on the builder.

API at a glance

APIInputOutput
.step(fn)Current outputThe value returned by fn
.use(op)Current outputThe output of another PipelineOp
.parallel({ ... })Current output, passed to every branchAn object keyed by branch name
.prompt(agent)Current output converted with String(input)Agent text output
.extract(extractor)Current output converted with String(input)Extractor schema data
.build()Builder stateRunnable Pipeline<Input, Output>
pipeline.run(input)Original InputFinal Output
pipeline.batch(inputs, options)Iterable of original InputArray of final Output values

3. Return New Shapes

Pipelines are not limited to strings. A step can return an object, array, number, or any other TypeScript value.

const SupportTicketInput = z.object({
  customer: z.string(),
  subject: z.string(),
  body: z.string(),
});

type NormalizedTicket = {
  customer: string;
  title: string;
  words: number;
};

const normalizeTicket = new PipelineBuilder(SupportTicketInput)
  .step((ticket): NormalizedTicket => ({
    customer: ticket.customer.trim(),
    title: ticket.subject.trim().toLowerCase(),
    words: ticket.body.trim().split(/\s+/).length,
  }))
  .build();

const ticket = await normalizeTicket.run({
  customer: " Acme Co. ",
  subject: " Checkout is failing ",
  body: "Enterprise checkout fails after payment retries.",
});

console.log(ticket.title);

The first step receives the parsed SupportTicketInput value. The built pipeline returns NormalizedTicket.

4. Use Arrays and Numbers

const executiveNotes = new PipelineBuilder(z.array(z.string()))
  .step((notes) => notes.map((note) => `- ${note}`).join("\n"))
  .build();

const notes = await executiveNotes.run([
  "Checkout failures increased after payment retry changes.",
  "Enterprise customers are affected.",
]);
const formatPrice = new PipelineBuilder(z.number())
  .step((dollars) => Math.round(dollars * 100))
  .step((cents) => ({
    cents,
    formatted: `$${(cents / 100).toFixed(2)}`,
  }))
  .build();

const price = await formatPrice.run(12.5);

console.log(price.formatted);

5. Branch Into Different Output Types

const ticketSignals = new PipelineBuilder(z.string())
  .parallel({
    title: new PipelineBuilder(z.string())
      .step((ticket) => ticket.split(".")[0] ?? ticket)
      .build(),
    wordCount: new PipelineBuilder(z.string())
      .step((ticket) => ticket.trim().split(/\s+/).length)
      .build(),
    urgent: new PipelineBuilder(z.string())
      .step((ticket) => ticket.toLowerCase().includes("outage"))
      .build(),
  })
  .build();

const signals = await ticketSignals.run(
  "Checkout outage affects enterprise customers. Payment retries fail.",
);

console.log(signals.title, signals.wordCount, signals.urgent);

The result is typed as:

{
  title: string;
  wordCount: number;
  urgent: boolean;
}

6. Format Before Prompting or Extracting

.prompt(agent) and .extract(extractor) call String(input) internally. For strings, that is usually fine. For objects, add a formatting step first so the model receives intentional text instead of [object Object].

const ticketPipeline = new PipelineBuilder(SupportTicketInput)
  .step(
    (ticket) =>
      [
        `Customer: ${ticket.customer}`,
        `Subject: ${ticket.subject}`,
        `Body: ${ticket.body}`,
      ].join("\n"),
  )
  .prompt(summarizer)
  .extract(triageExtractor)
  .build();

const triage = await ticketPipeline.run({
  customer: "Acme Co.",
  subject: "Checkout is failing",
  body: "Enterprise checkout fails after payment retries.",
});

7. Use Defaults and Metadata

Schemas can apply defaults, transforms, and validation before any stage runs. The first stage receives the parsed schema output.

const SearchInput = z.object({
  query: z.string().min(1),
  limit: z.number().int().positive().default(10),
});

const searchPipeline = new PipelineBuilder(SearchInput)
  .step(({ query, limit }) => search(query, limit))
  .build();

const results = await searchPipeline.run({ query: "anvia" });
// returns `SearchInput` parsed: { query: "anvia", limit: 10 }

Pass metadata as the second argument when you also want the pipeline to carry a name or description:

const searchPipeline = new PipelineBuilder(
  SearchInput,
  { name: "Search Pipeline", description: "Validates and runs a search." },
)
  .step(({ query, limit }) => search(query, limit))
  .build();

console.log(searchPipeline.name);

Invalid input throws ZodError before any stage runs, so downstream code can assume the schema has already been satisfied.

8. Add Capabilities Gradually

Build pipelines in this order:

  1. Use Steps for ordinary transforms.
  2. Use Prompt Steps when a stage needs an agent.
  3. Use Extractor Steps when a stage should return typed data.
  4. Use Parallel Branches when independent work can run at the same time.
  5. Use Batch Runs when the same pipeline should process many inputs.
  6. Read Example: Research Pipeline for a complete fetch, summarize, and extract workflow.

Minimal End-to-End Shape

const ticketPipeline = new PipelineBuilder(z.string())
  .step((ticket) => ticket.trim())
  .step((ticket) => `Summarize this ticket:\n\n${ticket}`)
  .prompt(summarizer)
  .build();

const summary = await ticketPipeline.run("Checkout fails for enterprise users.");

Keep the first version linear. Add branching and composition only after the simple flow is clear.