Pipeline Builder
Compose multi-step workflows with Anvia pipelines.
Use PipelineBuilder when a workflow should be explicit, testable, and made from named stages instead of one large prompt.
1. Start With Input and Output Types
import { PipelineBuilder } from "@anvia/core";
const pipeline = new PipelineBuilder<string>();PipelineBuilder<Input, Output = Input> tracks two types:
| Type | Meaning |
|---|---|
Input | The value accepted by run(...) after .build(). |
Output | The current value after the latest stage. It starts as Input and changes as stages return new values. |
Most pipelines only set the first generic. TypeScript infers the output after every .step(...), .use(...), .parallel(...), .prompt(...), and .extract(...).
2. Build, Then Run
const normalizeTicket = new PipelineBuilder<string>()
.step((input) => input.trim())
.step((input) => input.replace(/\s+/g, " "))
.build();
const result = await normalizeTicket.run(" checkout is failing ");
console.log(result);The builder describes the pipeline. .build() returns the runnable Pipeline<Input, Output>. run(...) and batch(...) live on the built pipeline, not on the builder.
API at a glance
| API | Input | Output |
|---|---|---|
.step(fn) | Current output | The value returned by fn |
.use(op) | Current output | The output of another PipelineOp |
.parallel({ ... }) | Current output, passed to every branch | An object keyed by branch name |
.prompt(agent) | Current output converted with String(input) | Agent text output |
.extract(extractor) | Current output converted with String(input) | Extractor schema data |
.build() | Builder state | Runnable Pipeline<Input, Output> |
pipeline.run(input) | Original Input | Final Output |
pipeline.batch(inputs, options) | Iterable of original Input | Array of final Output values |
3. Return New Shapes
Pipelines are not limited to strings. A step can return an object, array, number, or any other TypeScript value.
type SupportTicketInput = {
customer: string;
subject: string;
body: string;
};
type NormalizedTicket = {
customer: string;
title: string;
words: number;
};
const normalizeTicket = new PipelineBuilder<SupportTicketInput>()
.step((ticket): NormalizedTicket => ({
customer: ticket.customer.trim(),
title: ticket.subject.trim().toLowerCase(),
words: ticket.body.trim().split(/\s+/).length,
}))
.build();
const ticket = await normalizeTicket.run({
customer: " Acme Co. ",
subject: " Checkout is failing ",
body: "Enterprise checkout fails after payment retries.",
});
console.log(ticket.title);The first step receives SupportTicketInput. The built pipeline returns NormalizedTicket.
4. Use Arrays and Numbers
const executiveNotes = new PipelineBuilder<string[]>()
.step((notes) => notes.map((note) => `- ${note}`).join("\n"))
.build();
const notes = await executiveNotes.run([
"Checkout failures increased after payment retry changes.",
"Enterprise customers are affected.",
]);const formatPrice = new PipelineBuilder<number>()
.step((dollars) => Math.round(dollars * 100))
.step((cents) => ({
cents,
formatted: `$${(cents / 100).toFixed(2)}`,
}))
.build();
const price = await formatPrice.run(12.5);
console.log(price.formatted);5. Branch Into Different Output Types
const ticketSignals = new PipelineBuilder<string>()
.parallel({
title: new PipelineBuilder<string>()
.step((ticket) => ticket.split(".")[0] ?? ticket)
.build(),
wordCount: new PipelineBuilder<string>()
.step((ticket) => ticket.trim().split(/\s+/).length)
.build(),
urgent: new PipelineBuilder<string>()
.step((ticket) => ticket.toLowerCase().includes("outage"))
.build(),
})
.build();
const signals = await ticketSignals.run(
"Checkout outage affects enterprise customers. Payment retries fail.",
);
console.log(signals.title, signals.wordCount, signals.urgent);The result is typed as:
{
title: string;
wordCount: number;
urgent: boolean;
}6. Format Before Prompting or Extracting
.prompt(agent) and .extract(extractor) call String(input) internally. For strings, that is usually fine. For objects, add a formatting step first so the model receives intentional text instead of [object Object].
const ticketPipeline = new PipelineBuilder<SupportTicketInput>()
.step(
(ticket) =>
[
`Customer: ${ticket.customer}`,
`Subject: ${ticket.subject}`,
`Body: ${ticket.body}`,
].join("\n"),
)
.prompt(summarizer)
.extract(triageExtractor)
.build();
const triage = await ticketPipeline.run({
customer: "Acme Co.",
subject: "Checkout is failing",
body: "Enterprise checkout fails after payment retries.",
});7. Add Capabilities Gradually
Build pipelines in this order:
- Use Steps for ordinary transforms.
- Use Prompt Steps when a stage needs an agent.
- Use Extractor Steps when a stage should return typed data.
- Use Parallel Branches when independent work can run at the same time.
- Use Batch Runs when the same pipeline should process many inputs.
- Read Example: Research Pipeline for a complete fetch, summarize, and extract workflow.
Minimal End-to-End Shape
const ticketPipeline = new PipelineBuilder<string>()
.step((ticket) => ticket.trim())
.step((ticket) => `Summarize this ticket:\n\n${ticket}`)
.prompt(summarizer)
.build();
const summary = await ticketPipeline.run("Checkout fails for enterprise users.");Keep the first version linear. Add branching and composition only after the simple flow is clear.
