Pipeline Builder
Compose multi-step workflows with Anvia pipelines.
Use PipelineBuilder when a workflow should be explicit, testable, and made from named stages instead of one large prompt.
1. Start With an Input Schema
import { z } from "zod";
import { PipelineBuilder } from "@anvia/core/pipeline";
const pipeline = new PipelineBuilder(z.string());Pass a Zod schema to PipelineBuilder. The schema defines the input accepted by run(...), validates every run at runtime, and gives TypeScript the parsed value type for the first stage.
PipelineBuilder tracks two types from that schema:
| Type | Meaning |
|---|---|
Input | The value accepted by run(...). This is the schema input type. |
Output | The current value after the latest stage. It starts as the parsed schema output and changes as stages return new values. |
TypeScript infers the output after every .step(...), .use(...), .parallel(...), .prompt(...), and .extract(...).
2. Build, Then Run
const normalizeTicket = new PipelineBuilder(z.string())
.step((input) => input.trim())
.step((input) => input.replace(/\s+/g, " "))
.build();
const result = await normalizeTicket.run(" checkout is failing ");
console.log(result);The builder describes the pipeline. .build() returns the runnable Pipeline<Input, Output>. run(...) and batch(...) live on the built pipeline, not on the builder.
API at a glance
| API | Input | Output |
|---|---|---|
.step(fn) | Current output | The value returned by fn |
.use(op) | Current output | The output of another PipelineOp |
.parallel({ ... }) | Current output, passed to every branch | An object keyed by branch name |
.prompt(agent) | Current output converted with String(input) | Agent text output |
.extract(extractor) | Current output converted with String(input) | Extractor schema data |
.build() | Builder state | Runnable Pipeline<Input, Output> |
pipeline.run(input) | Original Input | Final Output |
pipeline.batch(inputs, options) | Iterable of original Input | Array of final Output values |
3. Return New Shapes
Pipelines are not limited to strings. A step can return an object, array, number, or any other TypeScript value.
const SupportTicketInput = z.object({
customer: z.string(),
subject: z.string(),
body: z.string(),
});
type NormalizedTicket = {
customer: string;
title: string;
words: number;
};
const normalizeTicket = new PipelineBuilder(SupportTicketInput)
.step((ticket): NormalizedTicket => ({
customer: ticket.customer.trim(),
title: ticket.subject.trim().toLowerCase(),
words: ticket.body.trim().split(/\s+/).length,
}))
.build();
const ticket = await normalizeTicket.run({
customer: " Acme Co. ",
subject: " Checkout is failing ",
body: "Enterprise checkout fails after payment retries.",
});
console.log(ticket.title);The first step receives the parsed SupportTicketInput value. The built pipeline returns NormalizedTicket.
4. Use Arrays and Numbers
const executiveNotes = new PipelineBuilder(z.array(z.string()))
.step((notes) => notes.map((note) => `- ${note}`).join("\n"))
.build();
const notes = await executiveNotes.run([
"Checkout failures increased after payment retry changes.",
"Enterprise customers are affected.",
]);const formatPrice = new PipelineBuilder(z.number())
.step((dollars) => Math.round(dollars * 100))
.step((cents) => ({
cents,
formatted: `$${(cents / 100).toFixed(2)}`,
}))
.build();
const price = await formatPrice.run(12.5);
console.log(price.formatted);5. Branch Into Different Output Types
const ticketSignals = new PipelineBuilder(z.string())
.parallel({
title: new PipelineBuilder(z.string())
.step((ticket) => ticket.split(".")[0] ?? ticket)
.build(),
wordCount: new PipelineBuilder(z.string())
.step((ticket) => ticket.trim().split(/\s+/).length)
.build(),
urgent: new PipelineBuilder(z.string())
.step((ticket) => ticket.toLowerCase().includes("outage"))
.build(),
})
.build();
const signals = await ticketSignals.run(
"Checkout outage affects enterprise customers. Payment retries fail.",
);
console.log(signals.title, signals.wordCount, signals.urgent);The result is typed as:
{
title: string;
wordCount: number;
urgent: boolean;
}6. Format Before Prompting or Extracting
.prompt(agent) and .extract(extractor) call String(input) internally. For strings, that is usually fine. For objects, add a formatting step first so the model receives intentional text instead of [object Object].
const ticketPipeline = new PipelineBuilder(SupportTicketInput)
.step(
(ticket) =>
[
`Customer: ${ticket.customer}`,
`Subject: ${ticket.subject}`,
`Body: ${ticket.body}`,
].join("\n"),
)
.prompt(summarizer)
.extract(triageExtractor)
.build();
const triage = await ticketPipeline.run({
customer: "Acme Co.",
subject: "Checkout is failing",
body: "Enterprise checkout fails after payment retries.",
});7. Use Defaults and Metadata
Schemas can apply defaults, transforms, and validation before any stage runs. The first stage receives the parsed schema output.
const SearchInput = z.object({
query: z.string().min(1),
limit: z.number().int().positive().default(10),
});
const searchPipeline = new PipelineBuilder(SearchInput)
.step(({ query, limit }) => search(query, limit))
.build();
const results = await searchPipeline.run({ query: "anvia" });
// returns `SearchInput` parsed: { query: "anvia", limit: 10 }Pass metadata as the second argument when you also want the pipeline to carry a name or description:
const searchPipeline = new PipelineBuilder(
SearchInput,
{ name: "Search Pipeline", description: "Validates and runs a search." },
)
.step(({ query, limit }) => search(query, limit))
.build();
console.log(searchPipeline.name);Invalid input throws ZodError before any stage runs, so downstream code can assume the schema has already been satisfied.
8. Add Capabilities Gradually
Build pipelines in this order:
- Use Steps for ordinary transforms.
- Use Prompt Steps when a stage needs an agent.
- Use Extractor Steps when a stage should return typed data.
- Use Parallel Branches when independent work can run at the same time.
- Use Batch Runs when the same pipeline should process many inputs.
- Read Example: Research Pipeline for a complete fetch, summarize, and extract workflow.
Minimal End-to-End Shape
const ticketPipeline = new PipelineBuilder(z.string())
.step((ticket) => ticket.trim())
.step((ticket) => `Summarize this ticket:\n\n${ticket}`)
.prompt(summarizer)
.build();
const summary = await ticketPipeline.run("Checkout fails for enterprise users.");Keep the first version linear. Add branching and composition only after the simple flow is clear.
