lenec ru

← все посты

Node.js worker_threads: параллелизм для CPU-интенсивных задач

12K

Node.js — однопоточный. Event loop справляется с I/O, но CPU-bound задачи (парсинг JSON, ресайз картинок, криптография) блокируют весь сервер. Модуль worker_threads даёт настоящие потоки с изолированным V8 внутри одного процесса. Разберём API, напишем параллельный парсер и выберем пул воркеров.

Зачем worker_threads

Event loop обрабатывает тысячи соединений, пока каждый callback занимает микросекунды. Но CPU-bound операция на 200 мс блокирует весь цикл — остальные запросы стоят в очереди. Варианты решения:

  • child_process.fork() — отдельный процесс, ~30 MB на каждый.
  • cluster — масштабирование по ядрам, но полная копия приложения в каждом воркере.
  • worker_threads — лёгкий поток (~5 MB), общая память через SharedArrayBuffer, быстрый обмен через MessagePort.

Worker_threads — золотая середина: дешевле процессов, но с настоящим параллелизмом на нескольких ядрах CPU.

API: Worker, parentPort, workerData

// main.ts
import { Worker } from 'worker_threads';

const worker = new Worker('./worker.ts', {
  workerData: { filePath: '/data/large.json' },
});
worker.on('message', (r) => console.log('Parsed:', r.count));
worker.on('error', console.error);
// worker.ts
import { parentPort, workerData } from 'worker_threads';
import { readFileSync } from 'fs';

const data = JSON.parse(readFileSync(workerData.filePath, 'utf-8'));
parentPort?.postMessage({ count: data.length });

Основные примитивы:

  • workerData — данные при создании воркера, передаются через structured clone.
  • parentPort — двусторонний канал связи с главным потоком.
  • MessageChannel — дополнительные каналы для сложных сценариев (стриминг, fan-out).
  • transferList — массив ArrayBuffer, которые передаются без копирования (zero-copy).

Пример: параллельный парсинг JSON

Задача: распарсить 8 файлов по 100 MB. Последовательно — 12 секунд. На 4 воркерах — 3.5 секунды (ускорение в 3.4x):

import { Worker } from 'worker_threads';
import { cpus } from 'os';

function parseInWorker(filePath: string): Promise<{ records: number }> {
  return new Promise((resolve, reject) => {
    const w = new Worker('./json-worker.ts', { workerData: { filePath } });
    w.on('message', resolve);
    w.on('error', reject);
  });
}

async function parseAll(files: string[]) {
  const concurrency = Math.min(files.length, cpus().length);
  const queue = [...files];
  const results: { records: number }[] = [];

  await Promise.all(
    Array.from({ length: concurrency }, async () => {
      while (queue.length) {
        results.push(await parseInWorker(queue.shift()!));
      }
    })
  );
  return results;
}

Паттерн простой: создаём N промисов-воркеров, каждый берёт задачу из очереди, пока она не опустеет. Это даёт равномерную загрузку даже при разном размере файлов.

SharedArrayBuffer и Atomics

postMessage клонирует данные через structured clone. Для массива на 100 MB это ~200 мс overhead. SharedArrayBuffer решает проблему — общая память без копирования:

// main.ts
const shared = new SharedArrayBuffer(1024 * 1024);
const view = new Int32Array(shared);

const worker = new Worker('./compute.ts', { workerData: { buffer: shared } });
Atomics.wait(view, 0, 0); // ждём сигнала от воркера
console.log('Result:', view[1]);
// compute.ts
import { workerData } from 'worker_threads';
const view = new Int32Array(workerData.buffer);
view[1] = heavyComputation();
Atomics.store(view, 0, 1);
Atomics.notify(view, 0);

SharedArrayBuffer оправдан для числовых данных: матрицы, аудиобуферы, пиксели изображений. Для обычных JS-объектов postMessage с transferList достаточно.

Пул воркеров: piscina vs workerpool

Создавать Worker на каждую задачу дорого (~5 мс на инициализацию V8). Пул переиспользует потоки:

import Piscina from 'piscina';

const pool = new Piscina({
  filename: './json-worker.ts',
  maxThreads: 4,
  idleTimeout: 30_000,
});
const result = await pool.run({ filePath: '/data/chunk-1.json' });

Сравнение библиотек:

  • piscina — от Node.js core team. Поддержка transferList, отмена через AbortController, встроенные метрики (waitTime, runTime). Рекомендация по умолчанию.
  • workerpool — проще API, работает и в браузере. Достаточно для базовых сценариев без transferList.
  • Свой пул — оправдан при специфичной логике: приоритеты задач, warm-up, кастомная балансировка нагрузки.

Подводные камни

  • Serialization overhead — передача 100 MB объекта через postMessage ~200 мс. Используйте transferList для ArrayBuffer (zero-copy) или SharedArrayBuffer для числовых данных.
  • Память — каждый воркер это отдельный V8 heap (~5-15 MB baseline). 20 воркеров = +300 MB RSS. Не создавайте больше потоков, чем ядер CPU.
  • Изоляция — глобальные переменные, кеши, DB-соединения не шарятся между потоками. Каждый воркер создаёт свои.
  • Error handling — необработанная ошибка в воркере убивает только этот поток. Без listener на 'error' промис зависнет навсегда.
  • Native addons — не все C++ аддоны thread-safe. Проверяйте документацию перед использованием в воркерах.

Worker_threads — инструмент для CPU-bound задач от 10+ мс. Для коротких операций overhead съест выигрыш. Начните с piscina, измерьте throughput и масштабируйте потоки под железо.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.