lenec ru

← все посты

Streaming ответов от Claude API в Node: пошагово

17K

Каждое нормальное приложение, в котором есть LLM, рано или поздно требует streaming. Без него пользователь смотрит на пустой экран 8 секунд, после чего ему вываливается простыня текста. Со streaming — он видит, как ответ печатается на лету, и UX совсем другой. В этой статье — практика, как сделать streaming с Claude API в Node, разобрать события и не сломать SSE при отдаче на фронт.

Простой стрим в SDK

В Anthropic SDK есть удобный метод messages.stream:

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Расскажи про SSR' }]
});

for await (const event of stream) {
  if (event.type === 'content_block_delta') {
    if (event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
}

const final = await stream.finalMessage();
console.log('\nstop reason:', final.stop_reason);
console.log('usage:', final.usage);

Это рабочий минимум. for await отдаёт события по мере их поступления, а finalMessage() в конце даёт полный ответ и метрики.

Что за события приходят

Claude API эмитит несколько типов событий:

  • message_start — начало сообщения, в нём базовая информация и пустой usage.
  • content_block_start — начало блока контента (текст или tool_use).
  • content_block_delta — собственно дельта: новый кусок текста или новый кусок аргументов tool_use.
  • content_block_stop — закрытие блока.
  • message_delta — дельта на уровне сообщения, тут приходит финальный stop_reason и usage.
  • message_stop — конец стрима.

В реальном коде ты обычно слушаешь только content_block_delta для текста и для tool_use, а на финальном message_delta снимаешь usage и stop_reason.

Tool use в стриме

Это место, где я наступил, когда первый раз делал. Tool use тоже приходит чанками, и аргументы инструмента склеиваются из дельт:

const toolBuffer: Record<string, { name: string; input: string }> = {};

for await (const event of stream) {
  if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') {
    toolBuffer[event.index] = {
      name: event.content_block.name,
      input: '',
    };
  }
  if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') {
    toolBuffer[event.index].input += event.delta.partial_json;
  }
  if (event.type === 'content_block_stop' && toolBuffer[event.index]) {
    const tool = toolBuffer[event.index];
    const args = JSON.parse(tool.input);
    // тут вызываем инструмент с args
  }
}

То есть аргументы JSON приходят кусками partial_json, ты их склеиваешь в строку и парсишь только в content_block_stop. До этого момента JSON может быть невалидным.

Отдача стрима на фронт через SSE

На бэке у меня Hono, на фронте — обычный fetch со ReadableStream. SSE-формат: каждое событие — две строки event: и data:, в data — JSON.

app.post('/api/chat', async (c) => {
  return new Response(
    new ReadableStream({
      async start(controller) {
        const encoder = new TextEncoder();
        const stream = client.messages.stream({
          model: 'claude-sonnet-4-5',
          max_tokens: 1024,
          messages: [{ role: 'user', content: c.req.body.message }],
        });

        try {
          for await (const event of stream) {
            if (event.type === 'content_block_delta' &&
                event.delta.type === 'text_delta') {
              const data = JSON.stringify({ text: event.delta.text });
              controller.enqueue(encoder.encode(`data: ${data}\n\n`));
            }
          }
          controller.enqueue(encoder.encode('data: [DONE]\n\n'));
          controller.close();
        } catch (err) {
          controller.error(err);
        }
      },
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache, no-transform',
        'X-Accel-Buffering': 'no', // важно для nginx, чтобы не буферизовал
      },
    }
  );
});

X-Accel-Buffering: no — мой первый прокол. Без этого заголовка Nginx буферизует ответ и стрим приходит на фронт целиком в конце. Отладка на localhost ничего не показывала, в продакшене — пятисекундная задержка.

Read на фронте

const res = await fetch('/api/chat', {
  method: 'POST',
  body: JSON.stringify({ message }),
  headers: { 'Content-Type': 'application/json' },
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split('\n\n');
  buffer = lines.pop()!; // незаконченный — назад в буфер
  for (const line of lines) {
    if (!line.startsWith('data: ')) continue;
    const data = line.slice(6);
    if (data === '[DONE]') return;
    const { text } = JSON.parse(data);
    appendToUi(text);
  }
}

Buffer-приём важен: дельта может прийти посреди строки, и если ты сразу её парсишь — JSON.parse падает.

Отмена стрима

Пользователь нажал «Stop». Нужно прервать на бэке и не платить за оставшиеся токены.

// на бэке
stream.controller.abort();

// или через AbortSignal в опциях
const stream = client.messages.stream({ ... }, { signal: c.req.raw.signal });

SDK уважает AbortSignal. Если фронт оборвал fetch (например, пользователь закрыл вкладку), сигнал доходит до бэка, бэк прерывает стрим к Anthropic, и ты не платишь за неполученное.

Что важно знать

  • Streaming не ускоряет генерацию — он лишь даёт пользователю «текст по мере готовности». Общее время до конца ответа то же.
  • Биллинг при streaming тот же. Платишь по факту: сколько токенов на входе и сколько на выходе. Прерванный стрим — платишь только за сгенерированное.
  • Все три провайдера (Anthropic, OpenAI, Google) используют SSE-формат, но с немного разными именами полей. Если делаешь универсальный слой — учти.

Что в итоге

Streaming в Claude API — три основных шага: messages.stream, обработка content_block_delta для текста, отдача на фронт через ReadableStream и SSE. Tool use требует отдельного склеивания partial_json. AbortSignal — обязателен, иначе платишь за брошенные стримы. После настройки UX скачет: пользователь больше не смотрит на пустой экран, и ощущения от продукта другие.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.