NestJS

04 Streaming

Stream Anvia run events from a NestJS controller.

With the default Express adapter, inject @Res() and pipe Anvia's Web stream into the Node response.

1. Add A Streaming Controller Method

import { BadRequestException, Body, Controller, Post, Res } from "@nestjs/common";
import type { Response } from "express";
import { Readable } from "node:stream";
import { z } from "zod";
import { SupportAgentService } from "../ai/support-agent.service";

const SupportStreamRequest = z.object({
  message: z.string().trim().min(1, "message is required"),
});

@Controller("api/support")
export class SupportController {
  constructor(private readonly supportAgent: SupportAgentService) {}

  @Post("stream")
  async stream(@Body() body: unknown, @Res() res: Response) {
    const parsed = SupportStreamRequest.safeParse(body);

    if (!parsed.success) {
      throw new BadRequestException(parsed.error.issues[0]?.message);
    }

    res.setHeader("Content-Type", "application/x-ndjson");
    res.setHeader("Cache-Control", "no-cache");

    const stream = Readable.fromWeb(this.supportAgent.streamSupport(parsed.data.message));
    stream.pipe(res);
  }
}

2. Consume The Stream

const response = await fetch("http://localhost:3000/api/support/stream", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message: "Draft a support reply." }),
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (reader) {
  const next = await reader.read();
  if (next.done) break;

  for (const line of decoder.decode(next.value).split("\n")) {
    if (line.trim()) console.log(JSON.parse(line));
  }
}

3. Adapter Notes

If your Nest app uses the Fastify adapter, use Fastify reply handling instead of Express Response.

Next

Add auth, request-local tools, and retrieval in Tools and Context. Related guides: Readable Streams and Streaming Events.