Pipeline
Typed pipeline composition and batch execution.
Import from @anvia/core or @anvia/core/pipeline.
PipelineOp
interface PipelineOp<Input = unknown, Output = unknown> {
run(input: Input): Output | Promise<Output>;
}Purpose: minimal interface for anything runnable as a pipeline stage.
Return behavior: returns or resolves one output for one input.
Notable errors: implementations can throw arbitrary errors.
PipelineBatchOptions
interface PipelineBatchOptions {
concurrency: number;
}Purpose: controls bounded parallelism for Pipeline.batch(...).
Return behavior: used as input.
Notable errors: invalid values are normalized to at least 1.
Pipeline
class Pipeline<Input, Output> implements PipelineOp<Input, Awaited<Output>> {
readonly id: string;
readonly name: string | undefined;
readonly description: string | undefined;
readonly metadata: JsonObject | undefined;
run(input: Input, options?: PipelineRunOptions): Promise<Awaited<Output>>;
batch<I extends Iterable<Input>>(
inputs: I,
options: PipelineBatchOptions,
): Promise<Array<Awaited<Output>>>;
graph(): PipelineGraph;
}Purpose: runnable pipeline returned by PipelineBuilder.build().
Return behavior: run(...) resolves the final stage output; batch(...) preserves input order; graph() returns inspectable pipeline metadata, nodes, and edges.
Notable errors: forwards stage errors.
Pipeline Graph Types
type PipelineMetadata = {
id?: string;
name?: string;
description?: string;
metadata?: JsonObject;
};
type PipelineStageMetadata = {
id?: string;
name?: string;
description?: string;
metadata?: JsonObject;
};
type PipelineStageKind =
| "input"
| "step"
| "pipeline"
| "parallel"
| "branch"
| "agent"
| "extractor"
| "output";
type PipelineGraphNode = {
id: string;
kind: PipelineStageKind;
label: string;
description?: string;
metadata?: JsonObject;
agentId?: string;
agentName?: string;
pipelineId?: string;
branchKey?: string;
};
type PipelineGraphEdge = {
id: string;
source: string;
target: string;
label?: string;
};
type PipelineGraph = PipelineMetadata & {
id: string;
nodes: PipelineGraphNode[];
edges: PipelineGraphEdge[];
};Purpose: automatic graph metadata for Studio and other inspectors.
Return behavior: stage ids and labels are generated from build order unless optional metadata supplies better values.
Notable errors: none directly.
Pipeline Run Events
type PipelineRunEvent =
| { type: "stage_started"; node: PipelineGraphNode }
| { type: "stage_completed"; node: PipelineGraphNode; durationMs: number }
| { type: "stage_failed"; node: PipelineGraphNode; durationMs: number; error: unknown };
type PipelineRunObserver = {
onEvent(event: PipelineRunEvent): void | Promise<void>;
};
type PipelineRunOptions = {
observer?: PipelineRunObserver;
};Purpose: metadata-only execution events for runtimes such as Studio.
Return behavior: pass an observer to pipeline.run(input, { observer }) to receive stage status changes.
Notable errors: observer errors propagate to the run.
PipelineBuilder
class PipelineBuilder<Input, Output = Input> {
constructor();
constructor(metadata: PipelineMetadata);
constructor<S extends z.ZodType>(schema: S): PipelineBuilder<z.infer<S>, z.infer<S>>;
constructor<S extends z.ZodType>(
schema: S,
metadata: PipelineMetadata,
): PipelineBuilder<z.infer<S>, z.infer<S>>;
step<Next>(
fn: (input: Awaited<Output>) => Next | Promise<Next>,
metadata?: PipelineStageMetadata,
): PipelineBuilder<Input, Awaited<Next>>;
use<Next>(
op: PipelineOp<Awaited<Output>, Next>,
metadata?: PipelineStageMetadata,
): PipelineBuilder<Input, Awaited<Next>>;
parallel<Branches extends Record<string, PipelineOp<Awaited<Output>, unknown>>>(
branches: Branches,
metadata?: PipelineStageMetadata,
): PipelineBuilder<Input, ParallelOutput<Branches>>;
prompt(agent: Agent<CompletionModel>, metadata?: PipelineStageMetadata): PipelineBuilder<Input, string>;
extract<T>(extractor: Extractor<T, CompletionModel>, metadata?: PipelineStageMetadata): PipelineBuilder<Input, T>;
build(): Pipeline<Input, Awaited<Output>>;
}Purpose: typed composition of transform functions, operations, agents, and extractors.
Return behavior: each composition method returns a new builder with the inferred output type. The Zod schema overloads parse input at runtime, so the inferred Input type flows through the chain and invalid inputs throw ZodError before any stage runs.
Notable errors: forwards errors from transform functions, nested operations, agents, or extractors. Schema overloads additionally throw ZodError on input that does not match the schema.
For workflow guidance, see Pipeline Builder.
