API Gateway паттерн: Kong, rate limiting, authentication, request transformation
Вы читаете 10 ГБ файл через fs.createReadStream, обрабатываете каждую строку и пишете в базу. Через минуту процесс падает с OOM. Проблема не в файле — проблема в том, что вы игнорируете backpressure. Разбираемся, как Node.js streams управляют потоком данных и почему без правильной обработки backpressure ваше приложение съест всю память.
Проблема: producer быстрее consumer
Backpressure возникает, когда источник данных (producer) генерирует данные быстрее, чем получатель (consumer) может их обработать. Классический сценарий: читаете файл с NVMe SSD (500 МБ/с), а пишете в PostgreSQL через сеть (5 МБ/с). Разница в скорости — 100x. Без механизма backpressure все промежуточные данные буферизуются в памяти, и через несколько секунд процесс упирается в лимит heap.
Node.js streams решают эту проблему через сигнальный протокол: когда writable stream не успевает обрабатывать данные, он возвращает false из метода write(). Это сигнал producer'у: «остановись, я переполнен». Producer должен приостановить чтение и ждать события drain, которое сигнализирует: «буфер освободился, можно продолжать».
Критический момент: backpressure в Node.js — это кооперативный протокол, а не принудительный. Если вы игнорируете возвращаемое значение write() и продолжаете писать, Node.js будет буферизовать данные без ограничений, пока не кончится память.
Streams API: Readable, Writable, pipe
В Node.js четыре типа streams: Readable (источник данных), Writable (приёмник), Duplex (и то, и другое, например TCP socket), и Transform (Duplex с трансформацией данных на лету).
Readable streams
Readable stream имеет два режима: paused (по умолчанию) и flowing. В paused режиме вы вручную вызываете readable.read() для получения данных. В flowing режиме stream автоматически пушит данные через событие data.
const fs = require('fs');
const readable = fs.createReadStream('large-file.txt');
// Flowing mode — данные приходят автоматически
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readable.on('end', () => {
console.log('Stream finished');
});
Проблема flowing mode: если вы не контролируете скорость потребления, readable будет пушить данные так быстро, как может их прочитать. Если ваш обработчик data медленный (например, пишет в базу), данные накапливаются где-то в памяти.
Writable streams
Writable stream принимает данные через метод write(chunk). Ключевой момент — возвращаемое значение:
const writable = fs.createWriteStream('output.txt');
const canContinue = writable.write('some data');
if (!canContinue) {
// Внутренний буфер заполнен, нужно остановиться
writable.once('drain', () => {
// Буфер освободился, можно продолжать
});
}
Внутренний буфер writable stream ограничен параметром highWaterMark (по умолчанию 16 КБ). Когда буфер превышает этот порог, write() возвращает false. Это не жёсткий лимит — вы можете продолжать писать, но Node.js сигнализирует: «притормози».
pipe() и автоматический backpressure
Метод pipe() автоматически управляет backpressure между readable и writable:
const fs = require('fs');
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
readable.pipe(writable);
Внутри pipe() реализован протокол pause/resume: когда writable.write() возвращает false, pipe() вызывает readable.pause(). Когда writable эмитит drain, pipe() вызывает readable.resume().
Важно: в продакшене используйте pipeline() из stream/promises, а не pipe(). Причина — обработка ошибок: pipe() не уничтожает streams при ошибке, что приводит к утечкам файловых дескрипторов. pipeline() автоматически вызывает destroy() на всех streams в цепочке при любой ошибке.
const { pipeline } = require('stream/promises');
const fs = require('fs');
await pipeline(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt')
);
Ручное управление: pause, resume, drain
Когда pipe() не подходит (например, вам нужна кастомная логика обработки), управляйте backpressure вручную:
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Writable переполнен, останавливаем readable
readable.pause();
}
});
writable.on('drain', () => {
// Writable освободился, возобновляем readable
readable.resume();
});
readable.on('end', () => {
writable.end();
});
Этот паттерн критичен для любого кода, где вы обрабатываете данные вручную. Без проверки canContinue и вызова pause() readable будет продолжать пушить данные, а writable — буферизовать их в памяти.
Transform streams: обработка с контролем backpressure
Transform stream — это Duplex stream, где writable сторона связана с readable: данные, записанные в writable, трансформируются и выходят через readable. Классические примеры — gzip, шифрование, парсинг CSV.
Создание Transform stream:
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Трансформируем данные
const transformed = chunk.toString().toUpperCase();
// Пушим в readable сторону
this.push(transformed);
// Сигнализируем, что готовы к следующему chunk
callback();
}
}
const transform = new UpperCaseTransform();
await pipeline(
fs.createReadStream('input.txt'),
transform,
fs.createWriteStream('output.txt')
);
Backpressure в Transform streams
Transform stream имеет два внутренних буфера: один на writable стороне (входящие данные), другой на readable стороне (исходящие данные). Backpressure работает в обе стороны:
- Если readable буфер заполнен (downstream consumer медленный),
this.push()возвращаетfalse - Если вы продолжаете вызывать
callback()несмотря наfalse, upstream producer продолжает писать, и данные накапливаются в памяти
Правильная обработка backpressure в Transform:
class BackpressureAwareTransform extends Transform {
_transform(chunk, encoding, callback) {
const transformed = this.processChunk(chunk);
const canPushMore = this.push(transformed);
if (!canPushMore) {
// Readable буфер заполнен, ждём drain
this.once('drain', callback);
} else {
// Всё ок, готовы к следующему chunk
callback();
}
}
processChunk(chunk) {
// Ваша логика трансформации
return chunk.toString().toUpperCase();
}
}
На практике базовый класс Transform обрабатывает большинство случаев автоматически — если вы просто вызываете this.push() и callback(), backpressure работает. Проблемы возникают, когда вы пушите несколько chunk'ов за один _transform вызов (one-to-many трансформация) или когда делаете асинхронные операции внутри _transform.
_flush() для буферизованных данных
Метод _flush() вызывается после того, как все входящие данные обработаны, но до того, как readable сторона закрывается. Используйте его для вывода буферизованных данных:
class LineBufferTransform extends Transform {
constructor(options) {
super(options);
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Последняя строка может быть неполной
this.buffer = lines.pop();
for (const line of lines) {
this.push(line + '\n');
}
callback();
}
_flush(callback) {
// Выводим остаток буфера
if (this.buffer.length > 0) {
this.push(this.buffer + '\n');
}
callback();
}
}
Практический кейс: стриминг CSV в PostgreSQL
Реальная задача: прочитать 5 ГБ CSV файл, распарсить строки, и загрузить в PostgreSQL. Наивная реализация через fs.readFile() и массив упадёт с OOM. Правильное решение — streams с backpressure.
const { pipeline } = require('stream/promises');
const fs = require('fs');
const { Transform } = require('stream');
const { Client } = require('pg');
const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();
// Transform для парсинга CSV
class CSVParser extends Transform {
constructor() {
super({ objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Сохраняем неполную строку
for (const line of lines) {
if (line.trim()) {
const [name, email, age] = line.split(',');
this.push({ name, email, age: parseInt(age) });
}
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
const [name, email, age] = this.buffer.split(',');
this.push({ name, email, age: parseInt(age) });
}
callback();
}
}
// Writable для вставки в БД
class DBWriter extends require('stream').Writable {
constructor(client) {
super({ objectMode: true, highWaterMark: 10 });
this.client = client;
}
async _write(row, encoding, callback) {
try {
await this.client.query(
'INSERT INTO users (name, email, age) VALUES ($1, $2, $3)',
[row.name, row.email, row.age]
);
callback();
} catch (err) {
callback(err);
}
}
}
// Pipeline с автоматическим backpressure
await pipeline(
fs.createReadStream('users.csv'),
new CSVParser(),
new DBWriter(client)
);
await client.end();
console.log('Import complete');
Ключевые моменты:
objectMode: trueв Transform — chunk'и это JS объекты, а не Buffer'ыhighWaterMark: 10в DBWriter — ограничиваем буфер до 10 объектов, чтобы не перегружать БДpipeline()автоматически управляет backpressure: если БД медленная, CSVParser приостанавливается, и fs.createReadStream тоже останавливается- Пиковое потребление памяти — десятки мегабайт, независимо от размера файла
Подводные камни
Flowing mode без контроля. Если вы добавляете data listener на readable stream, он переключается в flowing mode и начинает пушить данные максимально быстро. Без вызова pause() при backpressure вы теряете контроль над потоком.
Игнорирование возвращаемого значения write(). Самая частая ошибка. Если write() вернул false, а вы продолжаете писать — данные буферизуются в памяти без ограничений.
Забытый _flush() в Transform. Если ваш Transform буферизует данные (например, собирает строки из chunk'ов), без _flush() последний кусок данных потеряется.
pipe() вместо pipeline(). pipe() не уничтожает streams при ошибке, что приводит к утечкам. Всегда используйте pipeline() из stream/promises.
highWaterMark как жёсткий лимит. highWaterMark — это порог, при котором write() возвращает false, а не максимальный размер буфера. Node.js продолжит принимать данные, если вы продолжите писать.
Вывод
Backpressure — это не опциональная оптимизация, а обязательный механизм для работы с большими объёмами данных. Без него ваше приложение либо упадёт с OOM, либо будет жрать гигабайты памяти на задачах, которые должны занимать мегабайты. Используйте pipeline() для автоматического управления backpressure, проверяйте возвращаемое значение write() при ручной обработке, и всегда реализуйте _flush() в Transform streams с буферизацией. Streams в Node.js — мощный инструмент, но только если вы понимаете, как работает backpressure.