lenec ru

← все посты

Streams в Node.js: readable, writable, transform и backpressure handling

12K

Streams в Node.js — это не просто абстракция для работы с данными по частям. Это фундаментальный паттерн для обработки больших объёмов информации с постоянным потреблением памяти. Разберём типы streams, проблему backpressure и как правильно строить pipeline для обработки гигабайтных файлов.

Типы streams: четыре базовых класса

Readable — источник данных

Readable stream производит данные, которые можно читать по частям (chunks). Примеры: fs.createReadStream(), http.IncomingMessage, process.stdin.

const fs = require('fs');

const readable = fs.createReadStream('large-file.txt', { 
  highWaterMark: 64 * 1024  // размер буфера 64 KB
});

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readable.on('end', () => {
  console.log('Stream finished');
});

readable.on('error', (err) => {
  console.error('Stream error:', err);
});

Readable работает в двух режимах:

  • Flowing mode — данные автоматически читаются и передаются через событие data. Включается при подписке на data или вызове resume()
  • Paused mode — данные нужно явно запрашивать через read(). По умолчанию stream в этом режиме

Writable — приёмник данных

Writable stream принимает данные для записи. Примеры: fs.createWriteStream(), http.ServerResponse, process.stdout.

const writable = fs.createWriteStream('output.txt');

writable.write('First chunk\n');
writable.write('Second chunk\n');
writable.end('Final chunk\n');  // закрывает stream после записи

writable.on('finish', () => {
  console.log('All data written');
});

writable.on('error', (err) => {
  console.error('Write error:', err);
});

Метод write() возвращает boolean: true — можно писать дальше, false — внутренний буфер заполнен, нужно подождать события drain. Это ключ к пониманию backpressure.

Duplex — двунаправленный stream

Duplex stream одновременно Readable и Writable. Пример: net.Socket (TCP-соединение) — можно читать данные от клиента и писать ответ.

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }
  
  _read(size) {
    if (this.data.length) {
      this.push(this.data.shift());
    } else {
      this.push(null);  // конец stream
    }
  }
  
  _write(chunk, encoding, callback) {
    this.data.push(chunk);
    callback();
  }
}

Transform — трансформация данных

Transform stream — это Duplex, который изменяет данные на лету. Читает из входа, трансформирует, пишет в выход. Примеры: zlib.createGzip(), crypto.createCipher().

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }
}

// использование
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);

Проблема backpressure

Backpressure возникает, когда producer (Readable) генерирует данные быстрее, чем consumer (Writable) их обрабатывает. Без контроля это приводит к переполнению памяти.

Наивная реализация (ПЛОХО):

const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  writable.write(chunk);  // игнорируем возвращаемое значение
});

readable.on('end', () => {
  writable.end();
});

Проблема: если writable.write() вернёт false (буфер заполнен), мы продолжаем читать из readable. Данные накапливаются в памяти, пока writable не успеет записать. Для файла в 10 GB это OOM.

Правильная реализация с backpressure:

const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  
  if (!canContinue) {
    // буфер writable заполнен, останавливаем чтение
    readable.pause();
  }
});

writable.on('drain', () => {
  // буфер writable освободился, возобновляем чтение
  readable.resume();
});

readable.on('end', () => {
  writable.end();
});

readable.on('error', (err) => console.error('Read error:', err));
writable.on('error', (err) => console.error('Write error:', err));

Теперь если write() вернёт false, мы вызываем pause() — Readable перестаёт читать данные. Когда Writable освободит буфер, он испустит событие drain, и мы вызовем resume(). Память под контролем.

Pipeline API: композиция streams

Ручное управление backpressure утомительно и чревато ошибками. С Node.js 10+ используйте stream.pipeline() — он автоматически обрабатывает backpressure, ошибки и cleanup.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Pipeline автоматически:

  • Управляет backpressure между всеми streams
  • Пробрасывает ошибки из любого stream в callback
  • Закрывает все streams при завершении или ошибке (вызывает destroy())

С промисами (Node.js 15+):

const { pipeline } = require('stream/promises');

async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Compression complete');
}

compressFile('input.txt', 'input.txt.gz').catch(console.error);

Обработка больших файлов без загрузки в память

Классическая задача: подсчитать количество строк в файле 5 GB. Загрузка в память через fs.readFile() — OOM. Решение — stream:

const fs = require('fs');
const readline = require('readline');

async function countLines(filePath) {
  const fileStream = fs.createReadStream(filePath);
  
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity  // обрабатывает \r\n как одну строку
  });
  
  let count = 0;
  
  for await (const line of rl) {
    count++;
  }
  
  return count;
}

countLines('huge-file.txt').then(count => {
  console.log(`Total lines: ${count}`);
});

Память: ~64 KB (размер буфера stream), независимо от размера файла.

Фильтрация и трансформация

Задача: прочитать лог-файл, отфильтровать строки с ERROR, записать в новый файл.

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');
const readline = require('readline');

class ErrorFilter extends Transform {
  constructor(options) {
    super(options);
    this._buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this._buffer += chunk.toString();
    const lines = this._buffer.split('\n');
    
    // последняя строка может быть неполной
    this._buffer = lines.pop();
    
    for (const line of lines) {
      if (line.includes('ERROR')) {
        this.push(line + '\n');
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    // обрабатываем остаток буфера
    if (this._buffer && this._buffer.includes('ERROR')) {
      this.push(this._buffer + '\n');
    }
    callback();
  }
}

async function filterErrors(input, output) {
  await pipeline(
    fs.createReadStream(input),
    new ErrorFilter(),
    fs.createWriteStream(output)
  );
}

filterErrors('app.log', 'errors.log').catch(console.error);

Практический пример: парсинг CSV на 10GB с трансформацией

Задача: прочитать CSV-файл 10 GB, отфильтровать строки где age > 30, преобразовать в JSON, записать в новый файл. Память должна оставаться постоянной.

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');

class CSVParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this._buffer = '';
    this._headers = null;
  }
  
  _transform(chunk, encoding, callback) {
    this._buffer += chunk.toString();
    const lines = this._buffer.split('\n');
    this._buffer = lines.pop();  // сохраняем неполную строку
    
    for (const line of lines) {
      if (!line.trim()) continue;
      
      if (!this._headers) {
        this._headers = line.split(',').map(h => h.trim());
      } else {
        const values = line.split(',').map(v => v.trim());
        const obj = {};
        this._headers.forEach((header, i) => {
          obj[header] = values[i];
        });
        this.push(obj);
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    if (this._buffer.trim() && this._headers) {
      const values = this._buffer.split(',').map(v => v.trim());
      const obj = {};
      this._headers.forEach((header, i) => {
        obj[header] = values[i];
      });
      this.push(obj);
    }
    callback();
  }
}

class AgeFilter extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
  }
  
  _transform(obj, encoding, callback) {
    if (parseInt(obj.age) > 30) {
      this.push(obj);
    }
    callback();
  }
}

class JSONStringify extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this._first = true;
  }
  
  _transform(obj, encoding, callback) {
    const prefix = this._first ? '[\n  ' : ',\n  ';
    this._first = false;
    this.push(prefix + JSON.stringify(obj));
    callback();
  }
  
  _flush(callback) {
    this.push('\n]\n');
    callback();
  }
}

async function processCSV(input, output) {
  const startTime = Date.now();
  const startMem = process.memoryUsage().heapUsed;
  
  await pipeline(
    fs.createReadStream(input, { highWaterMark: 64 * 1024 }),
    new CSVParser(),
    new AgeFilter(),
    new JSONStringify(),
    fs.createWriteStream(output)
  );
  
  const endTime = Date.now();
  const endMem = process.memoryUsage().heapUsed;
  
  console.log(`Processed in ${endTime - startTime} ms`);
  console.log(`Memory delta: ${((endMem - startMem) / 1024 / 1024).toFixed(2)} MB`);
}

processCSV('users-10gb.csv', 'filtered-users.json').catch(console.error);

Ключевые моменты:

  • objectMode: true — Transform работает с объектами, а не буферами
  • _buffer — накапливаем неполные строки между chunks
  • _flush — обрабатываем остаток буфера в конце stream
  • pipeline — автоматический backpressure между всеми этапами

Результат: файл 10 GB обрабатывается с потреблением памяти ~100 MB (зависит от highWaterMark). Без streams пришлось бы загрузить весь файл в память — 10 GB + overhead на парсинг.

Мониторинг и отладка streams

Для отслеживания backpressure и производительности:

const { pipeline } = require('stream/promises');

class MonitorTransform extends Transform {
  constructor(name, options) {
    super(options);
    this.name = name;
    this.bytesProcessed = 0;
    this.startTime = Date.now();
  }
  
  _transform(chunk, encoding, callback) {
    this.bytesProcessed += chunk.length;
    
    const elapsed = (Date.now() - this.startTime) / 1000;
    const throughput = (this.bytesProcessed / 1024 / 1024 / elapsed).toFixed(2);
    
    console.log(`[${this.name}] Processed: ${(this.bytesProcessed / 1024 / 1024).toFixed(2)} MB, Throughput: ${throughput} MB/s`);
    
    this.push(chunk);
    callback();
  }
}

await pipeline(
  fs.createReadStream('input.txt'),
  new MonitorTransform('Reader'),
  zlib.createGzip(),
  new MonitorTransform('Compressor'),
  fs.createWriteStream('output.txt.gz')
);

Подводные камни

  • Не забывайте обрабатывать ошибки — без error-handler stream может упасть с unhandled exception. Pipeline автоматически пробрасывает ошибки в callback
  • Закрывайте streams — незакрытый stream держит file descriptor. Pipeline автоматически вызывает destroy()
  • objectMode и производительность — работа с объектами медленнее буферов. Используйте только когда нужна структурированная обработка
  • highWaterMark — размер внутреннего буфера. Больше значение — меньше системных вызовов, но больше памяти. По умолчанию 16 KB, для больших файлов ставьте 64-256 KB

Вывод

Streams в Node.js — это мощный инструмент для обработки больших объёмов данных с постоянным потреблением памяти. Четыре типа streams (Readable, Writable, Duplex, Transform) покрывают все сценарии. Backpressure — критичная проблема, решаемая через pause()/resume() или автоматически через pipeline(). Pipeline API упрощает композицию streams, автоматически управляя backpressure, ошибками и cleanup. В реальных проектах streams позволяют обрабатывать файлы в десятки гигабайт с потреблением памяти в десятки мегабайт — разница в три порядка.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.