@anvia/server

Server-side event stream helpers for Anvia applications.

@anvia/server converts an async iterable of agent events into a streaming HTTP Response. It supports both JSONL and Server-Sent Events (SSE) formats.

Install

pnpm add @anvia/server

No peer dependencies. Works with any framework that returns a Response object (Next.js, Hono, Express, Bun, Deno, etc.).

Quick Start

import { createEventStream } from "@anvia/server";

// In a route handler:
return createEventStream(agent.prompt("Draft a reply.").stream(), {
  format: "jsonl",
});

This produces a Response with content-type: application/x-ndjson, cache-control: no-cache, no-transform, and connection: keep-alive.

createEventStream

The primary export. Converts an async iterable into a streaming Response.

function createEventStream<TEvent>(
  events: AsyncIterable<TEvent>,
  options?: {
    format?: "jsonl" | "sse";
    headers?: HeadersInit;
    status?: number;
    statusText?: string;
    jsonl?: JsonlStreamOptions;
    sse?: SseStreamOptions;
  },
): Response;
OptionDefaultPurpose
format"jsonl"Stream format: JSONL or SSE
headers--Additional response headers
status200HTTP status code
statusText--HTTP status text

JSONL vs SSE

JSONL (default): one JSON object per line, content-type: application/x-ndjson. Best for most Anvia client transports.

SSE: Server-Sent Events with data: fields, content-type: text/event-stream. Use when you need browser EventSource compatibility or SSE-specific tooling.

// JSONL (default)
createEventStream(events);

// SSE
createEventStream(events, { format: "sse" });

Low-Level Streams

Use these when you need a ReadableStream<Uint8Array> instead of a full Response:

createJsonlStream

function createJsonlStream<TEvent>(
  events: AsyncIterable<TEvent>,
  options?: {
    serialize?: (event: TEvent | { type: "error"; error: unknown }) => string;
  },
): ReadableStream<Uint8Array>;

Each event becomes one JSON line. If the iterable throws, the stream emits { type: "error", error } and closes.

createSseStream

function createSseStream<TEvent>(
  events: AsyncIterable<TEvent>,
  options?: {
    eventName?: string | ((event: TEvent | { type: "error"; error: unknown }) => string | undefined);
    serialize?: (event: TEvent | { type: "error"; error: unknown }) => string;
    retry?: number;
  },
): ReadableStream<Uint8Array>;

Each event becomes a data: field in an SSE message. Use eventName to set custom event types.

Framework Examples

Next.js App Router

// app/api/chat/route.ts
import { createEventStream } from "@anvia/server";

export async function POST(request: Request) {
  const { message } = await request.json();
  return createEventStream(agent.prompt(message).stream());
}

Hono

import { createEventStream } from "@anvia/server";

app.post("/api/chat", async (c) => {
  const { message } = await c.req.json();
  return createEventStream(agent.prompt(message).stream());
});

Express

import { createEventStream } from "@anvia/server";

app.post("/api/chat", async (req, res) => {
  const response = createEventStream(agent.prompt(req.body.message).stream());
  res.status(response.status);
  response.headers.forEach((value, key) => res.setHeader(key, value));
  const body = response.body!;
  const reader = body.getReader();
  const pump = async () => {
    const { done, value } = await reader.read();
    if (done) { res.end(); return; }
    res.write(value);
    await pump();
  };
  await pump();
});

Error Handling

If the async iterable throws, both JSONL and SSE streams emit an error event and close gracefully. The client receives a structured error rather than a broken connection.