lenec ru

← все посты

Streaming ответов от Claude API в Node: как сделать правильно

14K

Когда я в первый раз прикрутил Claude к интерфейсу с чатом, ответ просто появлялся целиком через 4–8 секунд. Технически работало. Эмоционально — отвратительно. Любой, кто видел стриминг в ChatGPT, ждёт, что слова будут появляться по мере генерации. Без этого LLM-чат ощущается сломанным.

Расскажу, как сделать стриминг от Claude в Node — от SDK-варианта до прокидывания токенов клиенту через Server-Sent Events. С нюансами, на которых я сам набил шишек.

Базовый стрим из SDK

Anthropic SDK поддерживает стрим из коробки. Принципиальная разница с обычным messages.create — флаг stream: true или метод stream().

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

const stream = client.messages.stream({
  model: "claude-sonnet-4-5",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Расскажи в трёх абзацах про реки России." }],
});

stream.on("text", (text) => {
  process.stdout.write(text);
});

const final = await stream.finalMessage();
console.log("\n---\nUsage:", final.usage);

Этот вариант я использую, когда стрим не выходит за пределы серверного процесса. SDK даёт удобные события: text (только текстовые куски), message (полное сообщение в конце), error. Под капотом это SSE от Anthropic с типизированными событиями.

Что приходит в стриме на самом деле

Если копнуть глубже — Anthropic шлёт не просто "следующее слово". Поток состоит из событий с типами. Самые важные:

  • message_start — начало сообщения, известны usage и id.
  • content_block_start — начался новый блок (text, tool_use или thinking).
  • content_block_delta — приращение к текущему блоку. Тут летит текст по кусочкам.
  • content_block_stop — блок закончился.
  • message_delta — финальные метаданные (stop_reason, output_tokens).
  • message_stop — конец.

В content_block_delta есть подвиды: text_delta для обычного текста, input_json_delta для аргументов tool use, thinking_delta для extended thinking. Если ты обрабатываешь стрим вручную — придётся различать.

const stream = client.messages.stream({...});

for await (const event of stream) {
  if (event.type === "content_block_delta") {
    if (event.delta.type === "text_delta") {
      process.stdout.write(event.delta.text);
    }
    if (event.delta.type === "input_json_delta") {
      // Аргументы инструмента летят как частичный JSON
      // Полностью соберутся к content_block_stop
    }
  }
  if (event.type === "message_delta") {
    console.log("\nstop:", event.delta.stop_reason);
  }
}

Прокидываем стрим в браузер

В реальном чате стрим нужен на клиенте. Самый простой вариант — Server-Sent Events: однонаправленный канал, нативная поддержка в браузере через EventSource или fetch с ReadableStream.

На Node 20+ с нативным fetch и Express всё компактно:

import express from "express";
import Anthropic from "@anthropic-ai/sdk";

const app = express();
app.use(express.json());
const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

app.post("/api/chat", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no"); // если за nginx
  res.flushHeaders();

  const { messages } = req.body;
  const stream = client.messages.stream({
    model: "claude-sonnet-4-5",
    max_tokens: 2048,
    messages,
  });

  stream.on("text", (text) => {
    res.write(`data: ${JSON.stringify({ type: "text", text })}\n\n`);
  });

  stream.on("error", (err) => {
    res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`);
    res.end();
  });

  const final = await stream.finalMessage();
  res.write(`data: ${JSON.stringify({ type: "done", usage: final.usage })}\n\n`);
  res.end();
});

app.listen(3000);

На клиенте слушаешь поток через fetch и парсишь куски:

const res = await fetch("/api/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ messages }),
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });

  const parts = buffer.split("\n\n");
  buffer = parts.pop() || "";

  for (const part of parts) {
    if (!part.startsWith("data: ")) continue;
    const data = JSON.parse(part.slice(6));
    if (data.type === "text") appendToUi(data.text);
    if (data.type === "done") console.log("usage", data.usage);
  }
}

Грабли, на которые я наступал

Буферизация nginx. Если у тебя проксирующий nginx между Node и клиентом, по дефолту он будет буферизовать ответ до байтов 4–8 КБ. Стрим встанет колом. Нужно proxy_buffering off в location либо заголовок X-Accel-Buffering: no от приложения. Я пишу второй — переносится с проксей.

Сжатие. Похожая история: если включён gzip на ответ, выдача будет копиться, пока буфер не наберётся. Для SSE гзип отключай. Многие фреймворки это делают автоматически по Content-Type, но проверить стоит.

Heartbeat. Браузеры закрывают неактивные соединения, и иногда промежуточные прокси тоже. Если ответ модели генерится дольше 30–60 секунд — кидай : keepalive\n\n раз в 15 секунд. Это комментарий в SSE, клиент его не увидит, но соединение живёт.

const heartbeat = setInterval(() => {
  res.write(": keepalive\n\n");
}, 15000);

stream.on("end", () => clearInterval(heartbeat));

Отмена с клиента. Если пользователь закрыл вкладку, стрим со стороны клиента оборвётся, но твой запрос к Anthropic продолжит лить токены за деньги. Подпишись на req.on("close") и руби стрим:

req.on("close", () => {
  stream.controller.abort();
});

Anthropic SDK это спокойно проглотит, и ты не доплатишь за уже не нужные токены.

Ошибки в середине стрима. Anthropic может закрыть соединение посередине с ошибкой — например, при overloaded на их стороне. SDK выкинет это в error. Не молчи: пошли клиенту событие с типом ошибки и кодом, чтобы UI показал нормальное "что-то сломалось, попробуйте ещё". Запрос с ретраем — отдельный разговор, но в стрим подхватывать запросто не получится: модель уже начала генерить, сшивать стримы — плохая идея.

Tool use в стриме

Когда модель решает вызвать инструмент, в стриме идут блоки tool_use. Их аргументы тоже стримятся, но как кусочки JSON-строки. SDK даёт удобный helper:

const stream = client.messages.stream({...});

stream.on("inputJson", (delta, snapshot) => {
  // delta — кусок, snapshot — собранный пока JSON
});

const final = await stream.finalMessage();
for (const block of final.content) {
  if (block.type === "tool_use") {
    // здесь полный input уже распарсен
  }
}

На практике я редко делаю что-то с частичным JSON. Дожидаюсь finalMessage, проверяю stop_reason: "tool_use", вызываю функции, шлю результат и стримлю следующий шаг. Пользователь видит "печатание" в каждом из шагов, между шагами короткая пауза. Это нормально и понятно.

Extended thinking

Если включил extended thinking, в стрим прилетят блоки thinking с типом thinking_delta. Их обычно не показывают пользователю как обычный текст. Я отображаю их в свернутой панели "показать рассуждения" или просто игнорирую. В SDK для них есть отдельное событие thinking.

Когда стрим не нужен

Не везде он оправдан. Если ответ короткий (классификация, извлечение поля, JSON-выход) — выгоднее обычный messages.create. Меньше кода, проще обработка ошибок и ретраев, нет риска полузакрытого соединения.

Стрим даёт смысл там, где пользователь ждёт длинный человеческий текст — чат, объяснение, генерация черновика. Везде остальное — обычный синхронный вызов.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.