lenec ru

← все посты

Python type hints: mypy, pyright и строгая типизация на практике

18K

Node.js — однопоточный. Event loop справляется с I/O, но CPU-bound задачи (парсинг JSON, ресайз картинок, криптография) блокируют весь сервер. Модуль worker_threads даёт настоящие потоки с изолированным V8 внутри одного процесса. Разберём API, напишем параллельный парсер и выберем пул воркеров.

Зачем worker_threads

CPU-bound операция на 200 мс блокирует event loop — все запросы ждут. Варианты:

  • child_process.fork() — отдельный процесс, ~30 MB на каждый.
  • cluster — масштабирование по ядрам, но полная копия приложения.
  • worker_threads — лёгкий поток (~5 MB), общая память через SharedArrayBuffer.

API: Worker, parentPort, workerData

// main.ts
import { Worker } from 'worker_threads';

const worker = new Worker('./worker.ts', {
  workerData: { filePath: '/data/large.json' },
});
worker.on('message', (r) => console.log('Parsed:', r.count));
worker.on('error', console.error);
// worker.ts
import { parentPort, workerData } from 'worker_threads';
import { readFileSync } from 'fs';

const data = JSON.parse(readFileSync(workerData.filePath, 'utf-8'));
parentPort?.postMessage({ count: data.length });

Примитивы: workerData — данные при создании (structured clone), parentPort — канал с главным потоком, MessageChannel — дополнительные каналы для сложных сценариев.

Пример: параллельный парсинг JSON

8 файлов по 100 MB. Последовательно — 12 с. На 4 воркерах — 3.5 с:

import { Worker } from 'worker_threads';
import { cpus } from 'os';

function parseInWorker(filePath: string): Promise<{ records: number }> {
  return new Promise((resolve, reject) => {
    const w = new Worker('./json-worker.ts', { workerData: { filePath } });
    w.on('message', resolve);
    w.on('error', reject);
  });
}

async function parseAll(files: string[]) {
  const concurrency = Math.min(files.length, cpus().length);
  const queue = [...files];

  await Promise.all(
    Array.from({ length: concurrency }, async () => {
      while (queue.length) await parseInWorker(queue.shift()!);
    })
  );
}

SharedArrayBuffer и Atomics

postMessage клонирует данные. Для больших числовых массивов — SharedArrayBuffer:

const shared = new SharedArrayBuffer(1024 * 1024);
const view = new Int32Array(shared);

const worker = new Worker('./compute.ts', { workerData: { buffer: shared } });
Atomics.wait(view, 0, 0); // ждём сигнала
console.log('Result:', view[1]);
// compute.ts
import { workerData } from 'worker_threads';
const view = new Int32Array(workerData.buffer);
view[1] = heavyComputation();
Atomics.store(view, 0, 1);
Atomics.notify(view, 0);

Оправдан для матриц, аудио, пикселей. Для объектов postMessage достаточно.

Пул воркеров: piscina vs workerpool

Создавать Worker на каждую задачу дорого (~5 мс). Пул переиспользует потоки:

import Piscina from 'piscina';

const pool = new Piscina({
  filename: './json-worker.ts',
  maxThreads: 4,
  idleTimeout: 30_000,
});
const result = await pool.run({ filePath: '/data/chunk-1.json' });
  • piscina — от Node.js core team. transferList, AbortController, метрики. Рекомендация по умолчанию.
  • workerpool — проще, работает в браузере. Для базовых сценариев.
  • Свой пул — при специфичной логике (приоритеты, warm-up).

Подводные камни

  • Serialization — передача 100 MB через postMessage ~200 мс. Используйте transferList (zero-copy) или SharedArrayBuffer.
  • Память — каждый воркер ~5-15 MB. 20 воркеров = +300 MB RSS.
  • Изоляция — глобальные переменные, кеши, DB-соединения не шарятся.
  • Ошибки — без listener на 'error' промис зависнет навсегда.
  • Native addons — не все C++ аддоны thread-safe.

Worker_threads — инструмент для CPU-bound задач от 10+ мс. Для коротких операций overhead съест выигрыш. Начните с piscina, измерьте throughput и масштабируйте потоки под железо.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.