@anvia/server
Server-side event stream helpers for Anvia applications.
@anvia/server converts an async iterable of agent events into a streaming HTTP Response. It supports both JSONL and Server-Sent Events (SSE) formats.
Install
pnpm add @anvia/serverNo peer dependencies. Works with any framework that returns a Response object (Next.js, Hono, Express, Bun, Deno, etc.).
Quick Start
import { createEventStream } from "@anvia/server";
// In a route handler:
return createEventStream(agent.prompt("Draft a reply.").stream(), {
format: "jsonl",
});This produces a Response with content-type: application/x-ndjson, cache-control: no-cache, no-transform, and connection: keep-alive.
createEventStream
The primary export. Converts an async iterable into a streaming Response.
function createEventStream<TEvent>(
events: AsyncIterable<TEvent>,
options?: {
format?: "jsonl" | "sse";
headers?: HeadersInit;
status?: number;
statusText?: string;
jsonl?: JsonlStreamOptions;
sse?: SseStreamOptions;
},
): Response;| Option | Default | Purpose |
|---|---|---|
format | "jsonl" | Stream format: JSONL or SSE |
headers | -- | Additional response headers |
status | 200 | HTTP status code |
statusText | -- | HTTP status text |
JSONL vs SSE
JSONL (default): one JSON object per line, content-type: application/x-ndjson. Best for most Anvia client transports.
SSE: Server-Sent Events with data: fields, content-type: text/event-stream. Use when you need browser EventSource compatibility or SSE-specific tooling.
// JSONL (default)
createEventStream(events);
// SSE
createEventStream(events, { format: "sse" });Low-Level Streams
Use these when you need a ReadableStream<Uint8Array> instead of a full Response:
createJsonlStream
function createJsonlStream<TEvent>(
events: AsyncIterable<TEvent>,
options?: {
serialize?: (event: TEvent | { type: "error"; error: unknown }) => string;
},
): ReadableStream<Uint8Array>;Each event becomes one JSON line. If the iterable throws, the stream emits { type: "error", error } and closes.
createSseStream
function createSseStream<TEvent>(
events: AsyncIterable<TEvent>,
options?: {
eventName?: string | ((event: TEvent | { type: "error"; error: unknown }) => string | undefined);
serialize?: (event: TEvent | { type: "error"; error: unknown }) => string;
retry?: number;
},
): ReadableStream<Uint8Array>;Each event becomes a data: field in an SSE message. Use eventName to set custom event types.
Framework Examples
Next.js App Router
// app/api/chat/route.ts
import { createEventStream } from "@anvia/server";
export async function POST(request: Request) {
const { message } = await request.json();
return createEventStream(agent.prompt(message).stream());
}Hono
import { createEventStream } from "@anvia/server";
app.post("/api/chat", async (c) => {
const { message } = await c.req.json();
return createEventStream(agent.prompt(message).stream());
});Express
import { createEventStream } from "@anvia/server";
app.post("/api/chat", async (req, res) => {
const response = createEventStream(agent.prompt(req.body.message).stream());
res.status(response.status);
response.headers.forEach((value, key) => res.setHeader(key, value));
const body = response.body!;
const reader = body.getReader();
const pump = async () => {
const { done, value } = await reader.read();
if (done) { res.end(); return; }
res.write(value);
await pump();
};
await pump();
});Error Handling
If the async iterable throws, both JSONL and SSE streams emit an error event and close gracefully. The client receives a structured error rather than a broken connection.
Related
- Streaming Events for event types
- Readable Streams for stream internals
- Server Reference for full API types
@anvia/reactfor the client-side counterpart
