Express

04 Streaming

Stream Anvia run events from an Express route.

Express uses Node response objects. Read from Anvia's Web ReadableStream and write NDJSON chunks to res.

1. Add /api/support/stream

import { Router } from "express";
import { z } from "zod";
import { supportAgent } from "../ai/support-agent";

const SupportStreamRequest = z.object({
  message: z.string().trim().min(1, "message is required"),
});

export const supportRouter = Router();

supportRouter.post("/support/stream", async (req, res, next) => {
  try {
    const parsed = SupportStreamRequest.safeParse(req.body);

    if (!parsed.success) {
      return res.status(400).json({
        error: { code: "bad_request", message: parsed.error.issues[0]?.message },
      });
    }

    res.setHeader("Content-Type", "application/x-ndjson");
    res.setHeader("Cache-Control", "no-cache");

    const reader = supportAgent.prompt(parsed.data.message).readableStream().getReader();
    const decoder = new TextDecoder();

    while (true) {
      const next = await reader.read();
      if (next.done) break;
      res.write(decoder.decode(next.value));
    }

    res.end();
  } catch (error) {
    next(error);
  }
});

2. Consume The Stream

const response = await fetch("http://localhost:3000/api/support/stream", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message: "Draft a support reply." }),
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (reader) {
  const next = await reader.read();
  if (next.done) break;

  for (const line of decoder.decode(next.value).split("\n")) {
    if (line.trim()) console.log(JSON.parse(line));
  }
}

3. Operational Notes

Disable proxy buffering for this route and keep request timeouts long enough for model calls. Clients should handle both final and error stream events.

Next

Add auth, request-local tools, and retrieval in Tools and Context. Related guides: Readable Streams and Streaming Events.