lenec ru

← все посты

Node.js Streams: обработка больших файлов без утечек памяти

18K

Вызов fs.readFile('data.csv') на файле в 10 GB — гарантированный OOM. Весь файл загружается в память целиком. Streams решают проблему: данные обрабатываются порциями (chunks), и в памяти одновременно находится лишь небольшой буфер. Разберём типы стримов, правильную обработку ошибок и напишем парсер гигабайтного CSV.

Проблема fs.readFile на больших файлах

Наивный подход:

import { readFile } from 'fs/promises';

// 10 GB файл → 10 GB в RAM → OOM kill
const content = await readFile('/data/logs.csv', 'utf-8');
const lines = content.split('\n');

Со стримами тот же файл обрабатывается при потреблении ~64 KB памяти (размер одного chunk). Не важно, 1 MB файл или 100 GB — RSS остаётся стабильным.

Типы стримов

  • Readable — источник данных (fs.createReadStream, http.IncomingMessage, process.stdin).
  • Writable — приёмник (fs.createWriteStream, http.ServerResponse, process.stdout).
  • Transform — преобразование на лету (zlib.createGzip, crypto.createCipher, ваш парсер).
  • Duplex — и чтение, и запись независимо (net.Socket, WebSocket).

Все стримы — EventEmitter. Ключевые события: data, end, error, drain, close.

pipeline() vs .pipe()

Старый способ через .pipe() не обрабатывает ошибки автоматически:

// ПЛОХО: ошибка в middle не закроет source и dest
source.pipe(middle).pipe(dest);

pipeline() из stream/promises — правильный способ с Node 18+:

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

await pipeline(
  createReadStream('/data/logs.csv'),
  createGzip(),
  createWriteStream('/data/logs.csv.gz'),
);
// Все стримы корректно закрыты, ошибки пробрасываются

pipeline автоматически:

  • Пробрасывает ошибки из любого звена цепочки.
  • Вызывает destroy() на всех стримах при ошибке — нет утечек file descriptors.
  • Обрабатывает backpressure: если Writable не успевает — Readable приостанавливается.

Практика: парсим CSV 10 GB через Transform

import { Transform, TransformCallback } from 'stream';
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
}

class CsvParser extends Transform {
  private buffer = '';

  constructor() {
    super({ objectMode: true }); // выдаём объекты, не буферы
  }

  _transform(chunk: Buffer, _enc: string, cb: TransformCallback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop()!; // неполная строка остаётся в буфере

    for (const line of lines) {
      const [timestamp, level, message] = line.split(',');
      if (timestamp && level) {
        this.push({ timestamp, level, message } as LogEntry);
      }
    }
    cb();
  }

  _flush(cb: TransformCallback) {
    if (this.buffer.trim()) {
      const [timestamp, level, message] = this.buffer.split(',');
      this.push({ timestamp, level, message } as LogEntry);
    }
    cb();
  }
}

let errors = 0;
const counter = new Transform({
  objectMode: true,
  transform(entry: LogEntry, _enc, cb) {
    if (entry.level === 'ERROR') errors++;
    cb();
  },
});

await pipeline(
  createReadStream('/data/logs.csv'),
  new CsvParser(),
  counter,
);
console.log(`Errors found: ${errors}`);

Этот код обработает 10 GB CSV при ~2 MB RSS. Скорость — около 500 MB/s на SSD.

stream/consumers и stream/promises

Node 18+ добавил удобные утилиты для потребления стримов:

import { Readable } from 'stream';
import { text, json, buffer } from 'stream/consumers';

// Прочитать весь стрим как текст (для небольших стримов)
const body = await text(request);

// Async iteration — самый чистый способ чтения
const readable = createReadStream('/data/file.txt');
for await (const chunk of readable) {
  process(chunk);
}

Async iteration работает с backpressure из коробки: следующий chunk запрашивается только после обработки текущего.

Подводные камни

  • highWaterMark — размер внутреннего буфера (по умолчанию 16 KB для строк, 16 объектов для objectMode). Слишком маленький — много системных вызовов. Слишком большой — теряется смысл стриминга. Для файлов оптимально 64-256 KB.
  • objectMode — стрим передаёт объекты вместо буферов. highWaterMark считается в штуках объектов, не в байтах. Забыли включить — получите [object Object] в выводе.
  • destroy() — всегда вызывайте при ошибке. Без destroy файловый дескриптор не закроется, и через тысячу запросов получите EMFILE.
  • Backpressure — если writable.write() вернул false, нужно ждать события 'drain'. pipeline делает это автоматически, ручной .pipe() — тоже, но кастомный код часто забывает.
  • Error без listener — необработанное событие 'error' на стриме крашит процесс. pipeline решает это, но при ручной работе всегда вешайте .on('error').

Streams — фундамент эффективной работы с данными в Node.js. Используйте pipeline из stream/promises, пишите Transform для кастомной логики и не забывайте про backpressure. Файл любого размера обрабатывается при константном потреблении памяти.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.