Streaming ответов от Claude API в Node: пошагово
Каждое нормальное приложение, в котором есть LLM, рано или поздно требует streaming. Без него пользователь смотрит на пустой экран 8 секунд, после чего ему вываливается простыня текста. Со streaming — он видит, как ответ печатается на лету, и UX совсем другой. В этой статье — практика, как сделать streaming с Claude API в Node, разобрать события и не сломать SSE при отдаче на фронт.
Простой стрим в SDK
В Anthropic SDK есть удобный метод messages.stream:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-5',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Расскажи про SSR' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta') {
if (event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
}
const final = await stream.finalMessage();
console.log('\nstop reason:', final.stop_reason);
console.log('usage:', final.usage);Это рабочий минимум. for await отдаёт события по мере их поступления, а finalMessage() в конце даёт полный ответ и метрики.
Что за события приходят
Claude API эмитит несколько типов событий:
- message_start — начало сообщения, в нём базовая информация и пустой usage.
- content_block_start — начало блока контента (текст или tool_use).
- content_block_delta — собственно дельта: новый кусок текста или новый кусок аргументов tool_use.
- content_block_stop — закрытие блока.
- message_delta — дельта на уровне сообщения, тут приходит финальный stop_reason и usage.
- message_stop — конец стрима.
В реальном коде ты обычно слушаешь только content_block_delta для текста и для tool_use, а на финальном message_delta снимаешь usage и stop_reason.
Tool use в стриме
Это место, где я наступил, когда первый раз делал. Tool use тоже приходит чанками, и аргументы инструмента склеиваются из дельт:
const toolBuffer: Record<string, { name: string; input: string }> = {};
for await (const event of stream) {
if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') {
toolBuffer[event.index] = {
name: event.content_block.name,
input: '',
};
}
if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') {
toolBuffer[event.index].input += event.delta.partial_json;
}
if (event.type === 'content_block_stop' && toolBuffer[event.index]) {
const tool = toolBuffer[event.index];
const args = JSON.parse(tool.input);
// тут вызываем инструмент с args
}
}То есть аргументы JSON приходят кусками partial_json, ты их склеиваешь в строку и парсишь только в content_block_stop. До этого момента JSON может быть невалидным.
Отдача стрима на фронт через SSE
На бэке у меня Hono, на фронте — обычный fetch со ReadableStream. SSE-формат: каждое событие — две строки event: и data:, в data — JSON.
app.post('/api/chat', async (c) => {
return new Response(
new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
const stream = client.messages.stream({
model: 'claude-sonnet-4-5',
max_tokens: 1024,
messages: [{ role: 'user', content: c.req.body.message }],
});
try {
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
const data = JSON.stringify({ text: event.delta.text });
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
controller.close();
} catch (err) {
controller.error(err);
}
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no', // важно для nginx, чтобы не буферизовал
},
}
);
});X-Accel-Buffering: no — мой первый прокол. Без этого заголовка Nginx буферизует ответ и стрим приходит на фронт целиком в конце. Отладка на localhost ничего не показывала, в продакшене — пятисекундная задержка.
Read на фронте
const res = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ message }),
headers: { 'Content-Type': 'application/json' },
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop()!; // незаконченный — назад в буфер
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') return;
const { text } = JSON.parse(data);
appendToUi(text);
}
}Buffer-приём важен: дельта может прийти посреди строки, и если ты сразу её парсишь — JSON.parse падает.
Отмена стрима
Пользователь нажал «Stop». Нужно прервать на бэке и не платить за оставшиеся токены.
// на бэке
stream.controller.abort();
// или через AbortSignal в опциях
const stream = client.messages.stream({ ... }, { signal: c.req.raw.signal });SDK уважает AbortSignal. Если фронт оборвал fetch (например, пользователь закрыл вкладку), сигнал доходит до бэка, бэк прерывает стрим к Anthropic, и ты не платишь за неполученное.
Что важно знать
- Streaming не ускоряет генерацию — он лишь даёт пользователю «текст по мере готовности». Общее время до конца ответа то же.
- Биллинг при streaming тот же. Платишь по факту: сколько токенов на входе и сколько на выходе. Прерванный стрим — платишь только за сгенерированное.
- Все три провайдера (Anthropic, OpenAI, Google) используют SSE-формат, но с немного разными именами полей. Если делаешь универсальный слой — учти.
Что в итоге
Streaming в Claude API — три основных шага: messages.stream, обработка content_block_delta для текста, отдача на фронт через ReadableStream и SSE. Tool use требует отдельного склеивания partial_json. AbortSignal — обязателен, иначе платишь за брошенные стримы. После настройки UX скачет: пользователь больше не смотрит на пустой экран, и ощущения от продукта другие.