lenec ru

← все посты

API Gateway паттерн: Kong, rate limiting, authentication, request transformation

18K

Вы читаете 10 ГБ файл через fs.createReadStream, обрабатываете каждую строку и пишете в базу. Через минуту процесс падает с OOM. Проблема не в файле — проблема в том, что вы игнорируете backpressure. Разбираемся, как Node.js streams управляют потоком данных и почему без правильной обработки backpressure ваше приложение съест всю память.

Проблема: producer быстрее consumer

Backpressure возникает, когда источник данных (producer) генерирует данные быстрее, чем получатель (consumer) может их обработать. Классический сценарий: читаете файл с NVMe SSD (500 МБ/с), а пишете в PostgreSQL через сеть (5 МБ/с). Разница в скорости — 100x. Без механизма backpressure все промежуточные данные буферизуются в памяти, и через несколько секунд процесс упирается в лимит heap.

Node.js streams решают эту проблему через сигнальный протокол: когда writable stream не успевает обрабатывать данные, он возвращает false из метода write(). Это сигнал producer'у: «остановись, я переполнен». Producer должен приостановить чтение и ждать события drain, которое сигнализирует: «буфер освободился, можно продолжать».

Критический момент: backpressure в Node.js — это кооперативный протокол, а не принудительный. Если вы игнорируете возвращаемое значение write() и продолжаете писать, Node.js будет буферизовать данные без ограничений, пока не кончится память.

Streams API: Readable, Writable, pipe

В Node.js четыре типа streams: Readable (источник данных), Writable (приёмник), Duplex (и то, и другое, например TCP socket), и Transform (Duplex с трансформацией данных на лету).

Readable streams

Readable stream имеет два режима: paused (по умолчанию) и flowing. В paused режиме вы вручную вызываете readable.read() для получения данных. В flowing режиме stream автоматически пушит данные через событие data.

const fs = require('fs');
const readable = fs.createReadStream('large-file.txt');

// Flowing mode — данные приходят автоматически
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readable.on('end', () => {
  console.log('Stream finished');
});

Проблема flowing mode: если вы не контролируете скорость потребления, readable будет пушить данные так быстро, как может их прочитать. Если ваш обработчик data медленный (например, пишет в базу), данные накапливаются где-то в памяти.

Writable streams

Writable stream принимает данные через метод write(chunk). Ключевой момент — возвращаемое значение:

const writable = fs.createWriteStream('output.txt');

const canContinue = writable.write('some data');
if (!canContinue) {
  // Внутренний буфер заполнен, нужно остановиться
  writable.once('drain', () => {
    // Буфер освободился, можно продолжать
  });
}

Внутренний буфер writable stream ограничен параметром highWaterMark (по умолчанию 16 КБ). Когда буфер превышает этот порог, write() возвращает false. Это не жёсткий лимит — вы можете продолжать писать, но Node.js сигнализирует: «притормози».

pipe() и автоматический backpressure

Метод pipe() автоматически управляет backpressure между readable и writable:

const fs = require('fs');
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

readable.pipe(writable);

Внутри pipe() реализован протокол pause/resume: когда writable.write() возвращает false, pipe() вызывает readable.pause(). Когда writable эмитит drain, pipe() вызывает readable.resume().

Важно: в продакшене используйте pipeline() из stream/promises, а не pipe(). Причина — обработка ошибок: pipe() не уничтожает streams при ошибке, что приводит к утечкам файловых дескрипторов. pipeline() автоматически вызывает destroy() на всех streams в цепочке при любой ошибке.

const { pipeline } = require('stream/promises');
const fs = require('fs');

await pipeline(
  fs.createReadStream('input.txt'),
  fs.createWriteStream('output.txt')
);

Ручное управление: pause, resume, drain

Когда pipe() не подходит (например, вам нужна кастомная логика обработки), управляйте backpressure вручную:

const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    // Writable переполнен, останавливаем readable
    readable.pause();
  }
});

writable.on('drain', () => {
  // Writable освободился, возобновляем readable
  readable.resume();
});

readable.on('end', () => {
  writable.end();
});

Этот паттерн критичен для любого кода, где вы обрабатываете данные вручную. Без проверки canContinue и вызова pause() readable будет продолжать пушить данные, а writable — буферизовать их в памяти.

Transform streams: обработка с контролем backpressure

Transform stream — это Duplex stream, где writable сторона связана с readable: данные, записанные в writable, трансформируются и выходят через readable. Классические примеры — gzip, шифрование, парсинг CSV.

Создание Transform stream:

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Трансформируем данные
    const transformed = chunk.toString().toUpperCase();
    
    // Пушим в readable сторону
    this.push(transformed);
    
    // Сигнализируем, что готовы к следующему chunk
    callback();
  }
}

const transform = new UpperCaseTransform();

await pipeline(
  fs.createReadStream('input.txt'),
  transform,
  fs.createWriteStream('output.txt')
);

Backpressure в Transform streams

Transform stream имеет два внутренних буфера: один на writable стороне (входящие данные), другой на readable стороне (исходящие данные). Backpressure работает в обе стороны:

  • Если readable буфер заполнен (downstream consumer медленный), this.push() возвращает false
  • Если вы продолжаете вызывать callback() несмотря на false, upstream producer продолжает писать, и данные накапливаются в памяти

Правильная обработка backpressure в Transform:

class BackpressureAwareTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const transformed = this.processChunk(chunk);
    
    const canPushMore = this.push(transformed);
    
    if (!canPushMore) {
      // Readable буфер заполнен, ждём drain
      this.once('drain', callback);
    } else {
      // Всё ок, готовы к следующему chunk
      callback();
    }
  }
  
  processChunk(chunk) {
    // Ваша логика трансформации
    return chunk.toString().toUpperCase();
  }
}

На практике базовый класс Transform обрабатывает большинство случаев автоматически — если вы просто вызываете this.push() и callback(), backpressure работает. Проблемы возникают, когда вы пушите несколько chunk'ов за один _transform вызов (one-to-many трансформация) или когда делаете асинхронные операции внутри _transform.

_flush() для буферизованных данных

Метод _flush() вызывается после того, как все входящие данные обработаны, но до того, как readable сторона закрывается. Используйте его для вывода буферизованных данных:

class LineBufferTransform extends Transform {
  constructor(options) {
    super(options);
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    
    // Последняя строка может быть неполной
    this.buffer = lines.pop();
    
    for (const line of lines) {
      this.push(line + '\n');
    }
    
    callback();
  }
  
  _flush(callback) {
    // Выводим остаток буфера
    if (this.buffer.length > 0) {
      this.push(this.buffer + '\n');
    }
    callback();
  }
}

Практический кейс: стриминг CSV в PostgreSQL

Реальная задача: прочитать 5 ГБ CSV файл, распарсить строки, и загрузить в PostgreSQL. Наивная реализация через fs.readFile() и массив упадёт с OOM. Правильное решение — streams с backpressure.

const { pipeline } = require('stream/promises');
const fs = require('fs');
const { Transform } = require('stream');
const { Client } = require('pg');

const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();

// Transform для парсинга CSV
class CSVParser extends Transform {
  constructor() {
    super({ objectMode: true });
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Сохраняем неполную строку
    
    for (const line of lines) {
      if (line.trim()) {
        const [name, email, age] = line.split(',');
        this.push({ name, email, age: parseInt(age) });
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    if (this.buffer.trim()) {
      const [name, email, age] = this.buffer.split(',');
      this.push({ name, email, age: parseInt(age) });
    }
    callback();
  }
}

// Writable для вставки в БД
class DBWriter extends require('stream').Writable {
  constructor(client) {
    super({ objectMode: true, highWaterMark: 10 });
    this.client = client;
  }
  
  async _write(row, encoding, callback) {
    try {
      await this.client.query(
        'INSERT INTO users (name, email, age) VALUES ($1, $2, $3)',
        [row.name, row.email, row.age]
      );
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

// Pipeline с автоматическим backpressure
await pipeline(
  fs.createReadStream('users.csv'),
  new CSVParser(),
  new DBWriter(client)
);

await client.end();
console.log('Import complete');

Ключевые моменты:

  • objectMode: true в Transform — chunk'и это JS объекты, а не Buffer'ы
  • highWaterMark: 10 в DBWriter — ограничиваем буфер до 10 объектов, чтобы не перегружать БД
  • pipeline() автоматически управляет backpressure: если БД медленная, CSVParser приостанавливается, и fs.createReadStream тоже останавливается
  • Пиковое потребление памяти — десятки мегабайт, независимо от размера файла

Подводные камни

Flowing mode без контроля. Если вы добавляете data listener на readable stream, он переключается в flowing mode и начинает пушить данные максимально быстро. Без вызова pause() при backpressure вы теряете контроль над потоком.

Игнорирование возвращаемого значения write(). Самая частая ошибка. Если write() вернул false, а вы продолжаете писать — данные буферизуются в памяти без ограничений.

Забытый _flush() в Transform. Если ваш Transform буферизует данные (например, собирает строки из chunk'ов), без _flush() последний кусок данных потеряется.

pipe() вместо pipeline(). pipe() не уничтожает streams при ошибке, что приводит к утечкам. Всегда используйте pipeline() из stream/promises.

highWaterMark как жёсткий лимит. highWaterMark — это порог, при котором write() возвращает false, а не максимальный размер буфера. Node.js продолжит принимать данные, если вы продолжите писать.

Вывод

Backpressure — это не опциональная оптимизация, а обязательный механизм для работы с большими объёмами данных. Без него ваше приложение либо упадёт с OOM, либо будет жрать гигабайты памяти на задачах, которые должны занимать мегабайты. Используйте pipeline() для автоматического управления backpressure, проверяйте возвращаемое значение write() при ручной обработке, и всегда реализуйте _flush() в Transform streams с буферизацией. Streams в Node.js — мощный инструмент, но только если вы понимаете, как работает backpressure.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.