lenec ru

← все посты

BullMQ: очереди задач в Node — практическая настройка

17K

BullMQ — мой постоянный спутник в Node-проектах последние пару лет. Отправка писем, генерация PDF, синхронизация с внешним API, scheduled cron-задачи — всё это удобнее жить в очереди, чем висеть в HTTP-обработчике. Расскажу свою рабочую конфигурацию и грабли, которые ловятся не сразу.

Версии: BullMQ 5.x, Node 20, Redis 7. Можно подменить Redis на KeyDB — работает идентично.

Зачем очередь

Любой HTTP-запрос длится миллисекунды. Если в нём начинается работа на секунды (отправить почту, дёрнуть платёжного провайдера, конвертировать видео), пользователь получает таймаут, а сервер занят дольше, чем должен. Очередь даёт три вещи:

  • Async-выполнение: API кладёт job, отвечает 202, воркер обрабатывает.
  • Retry с экспонентой: внешний API упал — повторим через минуту, потом через пять.
  • Параллелизм и контроль: одновременно работают N воркеров, остальные ждут.

Минимальный сетап

pnpm add bullmq ioredis

Connection делаю явно через ioredis. BullMQ требует maxRetriesPerRequest: null для устойчивости.

// src/queue/connection.ts
import { Redis } from 'ioredis';

export const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
});

connection.on('error', (err) => console.error('redis error', err));

Очередь

// src/queue/email.queue.ts
import { Queue } from 'bullmq';
import { connection } from './connection';

export type EmailJob = {
  to: string;
  subject: string;
  template: 'welcome' | 'reset-password';
  data: Record<string, unknown>;
};

export const emailQueue = new Queue<EmailJob>('email', {
  connection,
  defaultJobOptions: {
    attempts: 5,
    backoff: { type: 'exponential', delay: 30_000 },
    removeOnComplete: { age: 3600, count: 1000 },
    removeOnFail: { age: 24 * 3600 },
  },
});

Что важно:

  • attempts: для большинства задач 3–5 попыток — нормальный потолок. Если после пяти не получилось, дальше шансов мало, лучше ручное вмешательство.
  • backoff exponential: 30 сек, 60, 120, 240, 480 — растёт по двойке. Не долбит внешний API, если он упал.
  • removeOnComplete: автоудаление через час, но не больше 1000 последних. Без этого Redis быстро распухает.
  • removeOnFail: failed-задачи держим сутки, потом убираем. На дольше — собирай в логах.

Воркер

// src/queue/email.worker.ts
import { Worker } from 'bullmq';
import { connection } from './connection';
import { sendMail } from '../email/send';

export const emailWorker = new Worker<EmailJob>(
  'email',
  async (job) => {
    const { to, subject, template, data } = job.data;
    await sendMail({ to, subject, template, data });
  },
  {
    connection,
    concurrency: 10,
    limiter: { max: 50, duration: 1000 }, // не больше 50/сек
    autorun: true,
  },
);

emailWorker.on('failed', (job, err) => {
  console.error(`email failed`, job?.id, err);
});
emailWorker.on('completed', (job) => {
  console.log(`email done`, job.id);
});

Concurrency 10 — это сколько одновременных задач берёт один процесс воркера. Limiter — внешний троттлинг (например, ограничения провайдера на отправку). Если ты ходишь во внешний API с лимитом «100 запросов в секунду», ставишь limiter и можешь не бояться.

Отдельный процесс под воркер

Я почти всегда запускаю воркер отдельным процессом, не вместе с веб-сервером. Это:

  • Изолирует CPU/RAM нагрузку: тяжёлый job не мешает HTTP.
  • Даёт независимый деплой: новый код воркера выкатывается отдельно.
  • Упрощает скейлинг: больше нагрузки на email — больше воркер-инстансов.

В systemd выглядит так. Файл /etc/systemd/system/myapp-worker.service:

[Unit]
Description=MyApp BullMQ worker
After=network.target

[Service]
Type=simple
User=app
WorkingDirectory=/opt/myapp
ExecStart=/usr/bin/node /opt/myapp/dist/worker.js
Restart=on-failure
RestartSec=5
Environment=NODE_ENV=production
EnvironmentFile=/opt/myapp/.env

[Install]
WantedBy=multi-user.target

Cron-задачи через repeatable

Если нужна периодика, BullMQ умеет это сам. Не запускай отдельный node-cron, добавляй repeatable job в очередь.

await emailQueue.add(
  'daily-digest',
  { type: 'digest' as const },
  {
    repeat: { pattern: '0 8 * * *', tz: 'Europe/Moscow' },
    jobId: 'daily-digest', // фиксированный, чтобы не плодить дубли
  },
);

Главное — фиксированный jobId. Иначе при каждом запуске приложения BullMQ создаст новую repeat-конфигурацию, и через неделю у тебя будет десяток клонов одной cron-задачи. Эта ошибка стоит мне нескольких бессонных часов: digest улетал по 7 раз в день.

Идемпотентность

Воркер должен быть готов к тому, что job выполнится дважды. Ретраи, рестарты, перезапуск Redis — всё это поводы повторного запуска. Самый простой подход: на каждое внешнее действие — уникальный ключ.

// при отправке письма
await sendMail({
  to,
  subject,
  idempotencyKey: `email:${job.id}`,
});
// в платёжном провайдере
await pay({
  amount,
  reference: `payment:${job.id}`,
});

Большинство SaaS API поддерживают idempotency-key или похожее поле. Если внутренний сервис свой — добавь свою таблицу idempotency_log и сверяй.

Очередь Dead Letter

В BullMQ есть концепция moveToFailed с reason. Job, который не удался по всем попыткам, остаётся в failed-таблице. Я держу простой алерт на количество failed jobs за час: если > 10 — слать уведомление. Под капотом — простой запрос к Redis (queue.getJobCounts()) каждые 5 минут.

setInterval(async () => {
  const counts = await emailQueue.getJobCounts('failed');
  if (counts.failed > 10) sendAlert(`email queue: ${counts.failed} failed`);
}, 5 * 60_000);

Bull Board для UI

Собственный admin-интерфейс делает bull-board. Подключение в Express:

import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
  queues: [new BullMQAdapter(emailQueue)],
  serverAdapter,
});
app.use('/admin/queues', basicAuth({ users: { admin: process.env.ADMIN_PASS! } }), serverAdapter.getRouter());

Реальное спасение для отладки: видно, что в failed, можно перезапустить руками, посмотреть payload. Защити паролем, иначе кто угодно увидит данные пользователей.

Подводные камни

Connection между Queue и Worker

Если ты используешь один и тот же ioredis-инстанс для очереди и воркера, BullMQ просит, чтобы это было два разных. На практике BullMQ сам кломнирует connection через duplicate(), но если ты передаёшь готовый объект — лучше создавать отдельный.

graceful shutdown

На SIGTERM воркер должен закончить текущие job-ы. У BullMQ есть worker.close(), который ждёт. Если не закрывать — job-ы останутся в active и зависнут до stalled-проверки (по умолчанию 30 секунд + перезапуск).

process.on('SIGTERM', async () => {
  await emailWorker.close();
  await connection.quit();
  process.exit(0);
});

Stalled jobs

BullMQ считает job stalled, если воркер не подал признак жизни в течение stalledInterval (по умолчанию 30 секунд). Stalled job возвращается в очередь и берётся другим воркером. Если у тебя долгие job-ы (минуты), увеличь stalledInterval или регулярно вызывай job.updateProgress() — это считается lock renewal.

Job не запускается

Самая частая причина — воркер не запущен или указывает не на ту очередь. Имя очереди в new Queue(name) и new Worker(name) должно совпадать. Опечатки в одну букву ловятся не сразу. Я сделал себе констант с именами и импортирую из одного места.

Шпаргалка

  • Один процесс — веб, другой — воркер. Деплой и масштабирование отдельные.
  • Connection через ioredis с maxRetriesPerRequest: null.
  • Defaults: 3–5 attempts, exponential backoff, removeOnComplete с лимитом.
  • Repeatable jobs — фиксированный jobId.
  • Идемпотентность — уникальный ключ на внешнее действие.
  • Bull Board за паролем для отладки.
  • Алерт на failed-счётчик.
  • SIGTERM → worker.close() → connection.quit().

BullMQ — стабильный, удобный инструмент, который очень аккуратно ложится на Node-стек. На серьёзных нагрузках держал у меня тысячи jobs в минуту без сюрпризов. Главное — настроить retry, идемпотентность и graceful shutdown с самого начала, а не доделывать потом.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.