Express
04 Streaming
Stream Anvia run events from an Express route.
Express uses Node response objects. Read from Anvia's Web ReadableStream and write NDJSON chunks to res.
1. Add /api/support/stream
import { Router } from "express";
import { z } from "zod";
import { supportAgent } from "../ai/support-agent";
const SupportStreamRequest = z.object({
message: z.string().trim().min(1, "message is required"),
});
export const supportRouter = Router();
supportRouter.post("/support/stream", async (req, res, next) => {
try {
const parsed = SupportStreamRequest.safeParse(req.body);
if (!parsed.success) {
return res.status(400).json({
error: { code: "bad_request", message: parsed.error.issues[0]?.message },
});
}
res.setHeader("Content-Type", "application/x-ndjson");
res.setHeader("Cache-Control", "no-cache");
const reader = supportAgent.prompt(parsed.data.message).readableStream().getReader();
const decoder = new TextDecoder();
while (true) {
const next = await reader.read();
if (next.done) break;
res.write(decoder.decode(next.value));
}
res.end();
} catch (error) {
next(error);
}
});2. Consume The Stream
const response = await fetch("http://localhost:3000/api/support/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: "Draft a support reply." }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const next = await reader.read();
if (next.done) break;
for (const line of decoder.decode(next.value).split("\n")) {
if (line.trim()) console.log(JSON.parse(line));
}
}3. Operational Notes
Disable proxy buffering for this route and keep request timeouts long enough for model calls. Clients should handle both final and error stream events.
Next
Add auth, request-local tools, and retrieval in Tools and Context. Related guides: Readable Streams and Streaming Events.
