Anvia
Learning Paths

Build a Pipeline

Compose functions, agents, extractors, and parallel branches.

Use this path when one prompt is not enough and the workflow needs explicit steps.

Goal

By the end, you should know how to:

  • create a pipeline
  • add transform steps
  • call agents from a pipeline
  • run extractors
  • add parallel branches when work can happen independently

Path

  1. Read Pipeline Builder for the core API.
  2. Read Steps for ordinary transform steps.
  3. Read Prompt Steps to call agents.
  4. Read Extractor Steps to return typed data.
  5. Read Parallel Branches when independent work can run side by side.
  6. Read Composition Patterns for larger workflows.

When To Use a Pipeline

Use a pipeline when the workflow has named stages that should be easy to test:

  • normalize input
  • ask an agent
  • extract fields
  • enrich with a tool or database call
  • run parallel checks
  • return a final object

Do not use a pipeline just to send one prompt. Start with an agent and add a pipeline when the workflow needs shape.

Minimal Shape

import { PipelineBuilder } from "@anvia/core";

type TicketInput = {
  customer: string;
  subject: string;
  body: string;
};

const pipeline = new PipelineBuilder<TicketInput>()
  .step((ticket) => ({
    customer: ticket.customer.trim(),
    subject: ticket.subject.trim(),
    body: ticket.body.trim(),
  }))
  .step((ticket) => ({
    title: ticket.subject.toLowerCase(),
    customer: ticket.customer,
    words: ticket.body.split(/\s+/).length,
  }))
  .build();

const result = await pipeline.run({
  customer: " Acme Co. ",
  subject: " Checkout is failing ",
  body: "Enterprise checkout fails after payment retries.",
});

console.log(result.title);

Agent and Extractor Shape

import { AgentBuilder, ExtractorBuilder, PipelineBuilder } from "@anvia/core";
import { OpenAIClient } from "@anvia/openai";
import { z } from "zod";

const model = new OpenAIClient({ apiKey }).completionModel("gpt-5");

const summarizer = new AgentBuilder("summarizer", model)
  .instructions("Summarize support tickets in one paragraph.")
  .build();

const extractor = new ExtractorBuilder(
  model,
  z.object({
    priority: z.enum(["low", "medium", "high"]),
    summary: z.string(),
  }),
).build();

const pipeline = new PipelineBuilder<TicketInput>()
  .step(
    (ticket) =>
      [
        `Customer: ${ticket.customer}`,
        `Subject: ${ticket.subject}`,
        `Body: ${ticket.body}`,
      ].join("\n"),
  )
  .prompt(summarizer)
  .extract(extractor)
  .build();

const result = await pipeline.run({
  customer: "Acme Co.",
  subject: "Checkout is failing",
  body: "Checkout is failing for all enterprise users.",
});

console.log(result.priority);

For object inputs, format the prompt in a step before .prompt(...) or .extract(...).

Parallel Branch Shape

const checks = new PipelineBuilder<TicketInput>()
  .step((ticket) => ticket.body)
  .parallel({
    uppercase: new PipelineBuilder<string>()
      .step((value) => value.toUpperCase())
      .build(),
    length: new PipelineBuilder<string>()
      .step((value) => value.length)
      .build(),
    urgent: new PipelineBuilder<string>()
      .step((value) => value.toLowerCase().includes("failing"))
      .build(),
  })
  .build();

const result = await checks.run({
  customer: "Acme Co.",
  subject: "Checkout is failing",
  body: "Checkout is failing for all enterprise users.",
});

console.log(result.uppercase, result.length, result.urgent);

Add Next

NeedRead
Many inputsBatch Runs
Agents as stepsAgents in Pipelines
Typed outputsReturn Structured Output