NestJS
04 Streaming
Stream Anvia run events from a NestJS controller.
With the default Express adapter, inject @Res() and pipe Anvia's Web stream into the Node response.
1. Add A Streaming Controller Method
import { BadRequestException, Body, Controller, Post, Res } from "@nestjs/common";
import type { Response } from "express";
import { Readable } from "node:stream";
import { z } from "zod";
import { SupportAgentService } from "../ai/support-agent.service";
const SupportStreamRequest = z.object({
message: z.string().trim().min(1, "message is required"),
});
@Controller("api/support")
export class SupportController {
constructor(private readonly supportAgent: SupportAgentService) {}
@Post("stream")
async stream(@Body() body: unknown, @Res() res: Response) {
const parsed = SupportStreamRequest.safeParse(body);
if (!parsed.success) {
throw new BadRequestException(parsed.error.issues[0]?.message);
}
res.setHeader("Content-Type", "application/x-ndjson");
res.setHeader("Cache-Control", "no-cache");
const stream = Readable.fromWeb(this.supportAgent.streamSupport(parsed.data.message));
stream.pipe(res);
}
}2. Consume The Stream
const response = await fetch("http://localhost:3000/api/support/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: "Draft a support reply." }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const next = await reader.read();
if (next.done) break;
for (const line of decoder.decode(next.value).split("\n")) {
if (line.trim()) console.log(JSON.parse(line));
}
}3. Adapter Notes
If your Nest app uses the Fastify adapter, use Fastify reply handling instead of Express Response.
Next
Add auth, request-local tools, and retrieval in Tools and Context. Related guides: Readable Streams and Streaming Events.
