Node.js Streams: обработка больших файлов без утечек памяти
Вызов fs.readFile('data.csv') на файле в 10 GB — гарантированный OOM. Весь файл загружается в память целиком. Streams решают проблему: данные обрабатываются порциями (chunks), и в памяти одновременно находится лишь небольшой буфер. Разберём типы стримов, правильную обработку ошибок и напишем парсер гигабайтного CSV.
Проблема fs.readFile на больших файлах
Наивный подход:
import { readFile } from 'fs/promises';
// 10 GB файл → 10 GB в RAM → OOM kill
const content = await readFile('/data/logs.csv', 'utf-8');
const lines = content.split('\n');
Со стримами тот же файл обрабатывается при потреблении ~64 KB памяти (размер одного chunk). Не важно, 1 MB файл или 100 GB — RSS остаётся стабильным.
Типы стримов
- Readable — источник данных (fs.createReadStream, http.IncomingMessage, process.stdin).
- Writable — приёмник (fs.createWriteStream, http.ServerResponse, process.stdout).
- Transform — преобразование на лету (zlib.createGzip, crypto.createCipher, ваш парсер).
- Duplex — и чтение, и запись независимо (net.Socket, WebSocket).
Все стримы — EventEmitter. Ключевые события: data, end, error, drain, close.
pipeline() vs .pipe()
Старый способ через .pipe() не обрабатывает ошибки автоматически:
// ПЛОХО: ошибка в middle не закроет source и dest
source.pipe(middle).pipe(dest);
pipeline() из stream/promises — правильный способ с Node 18+:
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
await pipeline(
createReadStream('/data/logs.csv'),
createGzip(),
createWriteStream('/data/logs.csv.gz'),
);
// Все стримы корректно закрыты, ошибки пробрасываются
pipeline автоматически:
- Пробрасывает ошибки из любого звена цепочки.
- Вызывает
destroy()на всех стримах при ошибке — нет утечек file descriptors. - Обрабатывает backpressure: если Writable не успевает — Readable приостанавливается.
Практика: парсим CSV 10 GB через Transform
import { Transform, TransformCallback } from 'stream';
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
interface LogEntry {
timestamp: string;
level: string;
message: string;
}
class CsvParser extends Transform {
private buffer = '';
constructor() {
super({ objectMode: true }); // выдаём объекты, не буферы
}
_transform(chunk: Buffer, _enc: string, cb: TransformCallback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop()!; // неполная строка остаётся в буфере
for (const line of lines) {
const [timestamp, level, message] = line.split(',');
if (timestamp && level) {
this.push({ timestamp, level, message } as LogEntry);
}
}
cb();
}
_flush(cb: TransformCallback) {
if (this.buffer.trim()) {
const [timestamp, level, message] = this.buffer.split(',');
this.push({ timestamp, level, message } as LogEntry);
}
cb();
}
}
let errors = 0;
const counter = new Transform({
objectMode: true,
transform(entry: LogEntry, _enc, cb) {
if (entry.level === 'ERROR') errors++;
cb();
},
});
await pipeline(
createReadStream('/data/logs.csv'),
new CsvParser(),
counter,
);
console.log(`Errors found: ${errors}`);
Этот код обработает 10 GB CSV при ~2 MB RSS. Скорость — около 500 MB/s на SSD.
stream/consumers и stream/promises
Node 18+ добавил удобные утилиты для потребления стримов:
import { Readable } from 'stream';
import { text, json, buffer } from 'stream/consumers';
// Прочитать весь стрим как текст (для небольших стримов)
const body = await text(request);
// Async iteration — самый чистый способ чтения
const readable = createReadStream('/data/file.txt');
for await (const chunk of readable) {
process(chunk);
}
Async iteration работает с backpressure из коробки: следующий chunk запрашивается только после обработки текущего.
Подводные камни
- highWaterMark — размер внутреннего буфера (по умолчанию 16 KB для строк, 16 объектов для objectMode). Слишком маленький — много системных вызовов. Слишком большой — теряется смысл стриминга. Для файлов оптимально 64-256 KB.
- objectMode — стрим передаёт объекты вместо буферов. highWaterMark считается в штуках объектов, не в байтах. Забыли включить — получите
[object Object]в выводе. - destroy() — всегда вызывайте при ошибке. Без destroy файловый дескриптор не закроется, и через тысячу запросов получите EMFILE.
- Backpressure — если writable.write() вернул false, нужно ждать события 'drain'. pipeline делает это автоматически, ручной .pipe() — тоже, но кастомный код часто забывает.
- Error без listener — необработанное событие 'error' на стриме крашит процесс. pipeline решает это, но при ручной работе всегда вешайте .on('error').
Streams — фундамент эффективной работы с данными в Node.js. Используйте pipeline из stream/promises, пишите Transform для кастомной логики и не забывайте про backpressure. Файл любого размера обрабатывается при константном потреблении памяти.