Streams в Node.js: readable, writable, transform и backpressure handling
Streams в Node.js — это не просто абстракция для работы с данными по частям. Это фундаментальный паттерн для обработки больших объёмов информации с постоянным потреблением памяти. Разберём типы streams, проблему backpressure и как правильно строить pipeline для обработки гигабайтных файлов.
Типы streams: четыре базовых класса
Readable — источник данных
Readable stream производит данные, которые можно читать по частям (chunks). Примеры: fs.createReadStream(), http.IncomingMessage, process.stdin.
const fs = require('fs');
const readable = fs.createReadStream('large-file.txt', {
highWaterMark: 64 * 1024 // размер буфера 64 KB
});
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readable.on('end', () => {
console.log('Stream finished');
});
readable.on('error', (err) => {
console.error('Stream error:', err);
});
Readable работает в двух режимах:
- Flowing mode — данные автоматически читаются и передаются через событие
data. Включается при подписке наdataили вызовеresume() - Paused mode — данные нужно явно запрашивать через
read(). По умолчанию stream в этом режиме
Writable — приёмник данных
Writable stream принимает данные для записи. Примеры: fs.createWriteStream(), http.ServerResponse, process.stdout.
const writable = fs.createWriteStream('output.txt');
writable.write('First chunk\n');
writable.write('Second chunk\n');
writable.end('Final chunk\n'); // закрывает stream после записи
writable.on('finish', () => {
console.log('All data written');
});
writable.on('error', (err) => {
console.error('Write error:', err);
});
Метод write() возвращает boolean: true — можно писать дальше, false — внутренний буфер заполнен, нужно подождать события drain. Это ключ к пониманию backpressure.
Duplex — двунаправленный stream
Duplex stream одновременно Readable и Writable. Пример: net.Socket (TCP-соединение) — можно читать данные от клиента и писать ответ.
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_read(size) {
if (this.data.length) {
this.push(this.data.shift());
} else {
this.push(null); // конец stream
}
}
_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
}
Transform — трансформация данных
Transform stream — это Duplex, который изменяет данные на лету. Читает из входа, трансформирует, пишет в выход. Примеры: zlib.createGzip(), crypto.createCipher().
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
// использование
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
Проблема backpressure
Backpressure возникает, когда producer (Readable) генерирует данные быстрее, чем consumer (Writable) их обрабатывает. Без контроля это приводит к переполнению памяти.
Наивная реализация (ПЛОХО):
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
writable.write(chunk); // игнорируем возвращаемое значение
});
readable.on('end', () => {
writable.end();
});
Проблема: если writable.write() вернёт false (буфер заполнен), мы продолжаем читать из readable. Данные накапливаются в памяти, пока writable не успеет записать. Для файла в 10 GB это OOM.
Правильная реализация с backpressure:
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// буфер writable заполнен, останавливаем чтение
readable.pause();
}
});
writable.on('drain', () => {
// буфер writable освободился, возобновляем чтение
readable.resume();
});
readable.on('end', () => {
writable.end();
});
readable.on('error', (err) => console.error('Read error:', err));
writable.on('error', (err) => console.error('Write error:', err));
Теперь если write() вернёт false, мы вызываем pause() — Readable перестаёт читать данные. Когда Writable освободит буфер, он испустит событие drain, и мы вызовем resume(). Память под контролем.
Pipeline API: композиция streams
Ручное управление backpressure утомительно и чревато ошибками. С Node.js 10+ используйте stream.pipeline() — он автоматически обрабатывает backpressure, ошибки и cleanup.
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
Pipeline автоматически:
- Управляет backpressure между всеми streams
- Пробрасывает ошибки из любого stream в callback
- Закрывает все streams при завершении или ошибке (вызывает
destroy())
С промисами (Node.js 15+):
const { pipeline } = require('stream/promises');
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
compressFile('input.txt', 'input.txt.gz').catch(console.error);
Обработка больших файлов без загрузки в память
Классическая задача: подсчитать количество строк в файле 5 GB. Загрузка в память через fs.readFile() — OOM. Решение — stream:
const fs = require('fs');
const readline = require('readline');
async function countLines(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // обрабатывает \r\n как одну строку
});
let count = 0;
for await (const line of rl) {
count++;
}
return count;
}
countLines('huge-file.txt').then(count => {
console.log(`Total lines: ${count}`);
});
Память: ~64 KB (размер буфера stream), независимо от размера файла.
Фильтрация и трансформация
Задача: прочитать лог-файл, отфильтровать строки с ERROR, записать в новый файл.
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');
const readline = require('readline');
class ErrorFilter extends Transform {
constructor(options) {
super(options);
this._buffer = '';
}
_transform(chunk, encoding, callback) {
this._buffer += chunk.toString();
const lines = this._buffer.split('\n');
// последняя строка может быть неполной
this._buffer = lines.pop();
for (const line of lines) {
if (line.includes('ERROR')) {
this.push(line + '\n');
}
}
callback();
}
_flush(callback) {
// обрабатываем остаток буфера
if (this._buffer && this._buffer.includes('ERROR')) {
this.push(this._buffer + '\n');
}
callback();
}
}
async function filterErrors(input, output) {
await pipeline(
fs.createReadStream(input),
new ErrorFilter(),
fs.createWriteStream(output)
);
}
filterErrors('app.log', 'errors.log').catch(console.error);
Практический пример: парсинг CSV на 10GB с трансформацией
Задача: прочитать CSV-файл 10 GB, отфильтровать строки где age > 30, преобразовать в JSON, записать в новый файл. Память должна оставаться постоянной.
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');
class CSVParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this._buffer = '';
this._headers = null;
}
_transform(chunk, encoding, callback) {
this._buffer += chunk.toString();
const lines = this._buffer.split('\n');
this._buffer = lines.pop(); // сохраняем неполную строку
for (const line of lines) {
if (!line.trim()) continue;
if (!this._headers) {
this._headers = line.split(',').map(h => h.trim());
} else {
const values = line.split(',').map(v => v.trim());
const obj = {};
this._headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
}
callback();
}
_flush(callback) {
if (this._buffer.trim() && this._headers) {
const values = this._buffer.split(',').map(v => v.trim());
const obj = {};
this._headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
callback();
}
}
class AgeFilter extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
}
_transform(obj, encoding, callback) {
if (parseInt(obj.age) > 30) {
this.push(obj);
}
callback();
}
}
class JSONStringify extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this._first = true;
}
_transform(obj, encoding, callback) {
const prefix = this._first ? '[\n ' : ',\n ';
this._first = false;
this.push(prefix + JSON.stringify(obj));
callback();
}
_flush(callback) {
this.push('\n]\n');
callback();
}
}
async function processCSV(input, output) {
const startTime = Date.now();
const startMem = process.memoryUsage().heapUsed;
await pipeline(
fs.createReadStream(input, { highWaterMark: 64 * 1024 }),
new CSVParser(),
new AgeFilter(),
new JSONStringify(),
fs.createWriteStream(output)
);
const endTime = Date.now();
const endMem = process.memoryUsage().heapUsed;
console.log(`Processed in ${endTime - startTime} ms`);
console.log(`Memory delta: ${((endMem - startMem) / 1024 / 1024).toFixed(2)} MB`);
}
processCSV('users-10gb.csv', 'filtered-users.json').catch(console.error);
Ключевые моменты:
- objectMode: true — Transform работает с объектами, а не буферами
- _buffer — накапливаем неполные строки между chunks
- _flush — обрабатываем остаток буфера в конце stream
- pipeline — автоматический backpressure между всеми этапами
Результат: файл 10 GB обрабатывается с потреблением памяти ~100 MB (зависит от highWaterMark). Без streams пришлось бы загрузить весь файл в память — 10 GB + overhead на парсинг.
Мониторинг и отладка streams
Для отслеживания backpressure и производительности:
const { pipeline } = require('stream/promises');
class MonitorTransform extends Transform {
constructor(name, options) {
super(options);
this.name = name;
this.bytesProcessed = 0;
this.startTime = Date.now();
}
_transform(chunk, encoding, callback) {
this.bytesProcessed += chunk.length;
const elapsed = (Date.now() - this.startTime) / 1000;
const throughput = (this.bytesProcessed / 1024 / 1024 / elapsed).toFixed(2);
console.log(`[${this.name}] Processed: ${(this.bytesProcessed / 1024 / 1024).toFixed(2)} MB, Throughput: ${throughput} MB/s`);
this.push(chunk);
callback();
}
}
await pipeline(
fs.createReadStream('input.txt'),
new MonitorTransform('Reader'),
zlib.createGzip(),
new MonitorTransform('Compressor'),
fs.createWriteStream('output.txt.gz')
);
Подводные камни
- Не забывайте обрабатывать ошибки — без
error-handler stream может упасть с unhandled exception. Pipeline автоматически пробрасывает ошибки в callback - Закрывайте streams — незакрытый stream держит file descriptor. Pipeline автоматически вызывает
destroy() - objectMode и производительность — работа с объектами медленнее буферов. Используйте только когда нужна структурированная обработка
- highWaterMark — размер внутреннего буфера. Больше значение — меньше системных вызовов, но больше памяти. По умолчанию 16 KB, для больших файлов ставьте 64-256 KB
Вывод
Streams в Node.js — это мощный инструмент для обработки больших объёмов данных с постоянным потреблением памяти. Четыре типа streams (Readable, Writable, Duplex, Transform) покрывают все сценарии. Backpressure — критичная проблема, решаемая через pause()/resume() или автоматически через pipeline(). Pipeline API упрощает композицию streams, автоматически управляя backpressure, ошибками и cleanup. В реальных проектах streams позволяют обрабатывать файлы в десятки гигабайт с потреблением памяти в десятки мегабайт — разница в три порядка.