BullMQ: очереди задач в Node — практическая настройка
BullMQ — мой постоянный спутник в Node-проектах последние пару лет. Отправка писем, генерация PDF, синхронизация с внешним API, scheduled cron-задачи — всё это удобнее жить в очереди, чем висеть в HTTP-обработчике. Расскажу свою рабочую конфигурацию и грабли, которые ловятся не сразу.
Версии: BullMQ 5.x, Node 20, Redis 7. Можно подменить Redis на KeyDB — работает идентично.
Зачем очередь
Любой HTTP-запрос длится миллисекунды. Если в нём начинается работа на секунды (отправить почту, дёрнуть платёжного провайдера, конвертировать видео), пользователь получает таймаут, а сервер занят дольше, чем должен. Очередь даёт три вещи:
- Async-выполнение: API кладёт job, отвечает 202, воркер обрабатывает.
- Retry с экспонентой: внешний API упал — повторим через минуту, потом через пять.
- Параллелизм и контроль: одновременно работают N воркеров, остальные ждут.
Минимальный сетап
pnpm add bullmq ioredisConnection делаю явно через ioredis. BullMQ требует maxRetriesPerRequest: null для устойчивости.
// src/queue/connection.ts
import { Redis } from 'ioredis';
export const connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
connection.on('error', (err) => console.error('redis error', err));Очередь
// src/queue/email.queue.ts
import { Queue } from 'bullmq';
import { connection } from './connection';
export type EmailJob = {
to: string;
subject: string;
template: 'welcome' | 'reset-password';
data: Record<string, unknown>;
};
export const emailQueue = new Queue<EmailJob>('email', {
connection,
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 30_000 },
removeOnComplete: { age: 3600, count: 1000 },
removeOnFail: { age: 24 * 3600 },
},
});Что важно:
- attempts: для большинства задач 3–5 попыток — нормальный потолок. Если после пяти не получилось, дальше шансов мало, лучше ручное вмешательство.
- backoff exponential: 30 сек, 60, 120, 240, 480 — растёт по двойке. Не долбит внешний API, если он упал.
- removeOnComplete: автоудаление через час, но не больше 1000 последних. Без этого Redis быстро распухает.
- removeOnFail: failed-задачи держим сутки, потом убираем. На дольше — собирай в логах.
Воркер
// src/queue/email.worker.ts
import { Worker } from 'bullmq';
import { connection } from './connection';
import { sendMail } from '../email/send';
export const emailWorker = new Worker<EmailJob>(
'email',
async (job) => {
const { to, subject, template, data } = job.data;
await sendMail({ to, subject, template, data });
},
{
connection,
concurrency: 10,
limiter: { max: 50, duration: 1000 }, // не больше 50/сек
autorun: true,
},
);
emailWorker.on('failed', (job, err) => {
console.error(`email failed`, job?.id, err);
});
emailWorker.on('completed', (job) => {
console.log(`email done`, job.id);
});Concurrency 10 — это сколько одновременных задач берёт один процесс воркера. Limiter — внешний троттлинг (например, ограничения провайдера на отправку). Если ты ходишь во внешний API с лимитом «100 запросов в секунду», ставишь limiter и можешь не бояться.
Отдельный процесс под воркер
Я почти всегда запускаю воркер отдельным процессом, не вместе с веб-сервером. Это:
- Изолирует CPU/RAM нагрузку: тяжёлый job не мешает HTTP.
- Даёт независимый деплой: новый код воркера выкатывается отдельно.
- Упрощает скейлинг: больше нагрузки на email — больше воркер-инстансов.
В systemd выглядит так. Файл /etc/systemd/system/myapp-worker.service:
[Unit]
Description=MyApp BullMQ worker
After=network.target
[Service]
Type=simple
User=app
WorkingDirectory=/opt/myapp
ExecStart=/usr/bin/node /opt/myapp/dist/worker.js
Restart=on-failure
RestartSec=5
Environment=NODE_ENV=production
EnvironmentFile=/opt/myapp/.env
[Install]
WantedBy=multi-user.targetCron-задачи через repeatable
Если нужна периодика, BullMQ умеет это сам. Не запускай отдельный node-cron, добавляй repeatable job в очередь.
await emailQueue.add(
'daily-digest',
{ type: 'digest' as const },
{
repeat: { pattern: '0 8 * * *', tz: 'Europe/Moscow' },
jobId: 'daily-digest', // фиксированный, чтобы не плодить дубли
},
);Главное — фиксированный jobId. Иначе при каждом запуске приложения BullMQ создаст новую repeat-конфигурацию, и через неделю у тебя будет десяток клонов одной cron-задачи. Эта ошибка стоит мне нескольких бессонных часов: digest улетал по 7 раз в день.
Идемпотентность
Воркер должен быть готов к тому, что job выполнится дважды. Ретраи, рестарты, перезапуск Redis — всё это поводы повторного запуска. Самый простой подход: на каждое внешнее действие — уникальный ключ.
// при отправке письма
await sendMail({
to,
subject,
idempotencyKey: `email:${job.id}`,
});
// в платёжном провайдере
await pay({
amount,
reference: `payment:${job.id}`,
});Большинство SaaS API поддерживают idempotency-key или похожее поле. Если внутренний сервис свой — добавь свою таблицу idempotency_log и сверяй.
Очередь Dead Letter
В BullMQ есть концепция moveToFailed с reason. Job, который не удался по всем попыткам, остаётся в failed-таблице. Я держу простой алерт на количество failed jobs за час: если > 10 — слать уведомление. Под капотом — простой запрос к Redis (queue.getJobCounts()) каждые 5 минут.
setInterval(async () => {
const counts = await emailQueue.getJobCounts('failed');
if (counts.failed > 10) sendAlert(`email queue: ${counts.failed} failed`);
}, 5 * 60_000);Bull Board для UI
Собственный admin-интерфейс делает bull-board. Подключение в Express:
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', basicAuth({ users: { admin: process.env.ADMIN_PASS! } }), serverAdapter.getRouter());Реальное спасение для отладки: видно, что в failed, можно перезапустить руками, посмотреть payload. Защити паролем, иначе кто угодно увидит данные пользователей.
Подводные камни
Connection между Queue и Worker
Если ты используешь один и тот же ioredis-инстанс для очереди и воркера, BullMQ просит, чтобы это было два разных. На практике BullMQ сам кломнирует connection через duplicate(), но если ты передаёшь готовый объект — лучше создавать отдельный.
graceful shutdown
На SIGTERM воркер должен закончить текущие job-ы. У BullMQ есть worker.close(), который ждёт. Если не закрывать — job-ы останутся в active и зависнут до stalled-проверки (по умолчанию 30 секунд + перезапуск).
process.on('SIGTERM', async () => {
await emailWorker.close();
await connection.quit();
process.exit(0);
});Stalled jobs
BullMQ считает job stalled, если воркер не подал признак жизни в течение stalledInterval (по умолчанию 30 секунд). Stalled job возвращается в очередь и берётся другим воркером. Если у тебя долгие job-ы (минуты), увеличь stalledInterval или регулярно вызывай job.updateProgress() — это считается lock renewal.
Job не запускается
Самая частая причина — воркер не запущен или указывает не на ту очередь. Имя очереди в new Queue(name) и new Worker(name) должно совпадать. Опечатки в одну букву ловятся не сразу. Я сделал себе констант с именами и импортирую из одного места.
Шпаргалка
- Один процесс — веб, другой — воркер. Деплой и масштабирование отдельные.
- Connection через ioredis с
maxRetriesPerRequest: null. - Defaults: 3–5 attempts, exponential backoff, removeOnComplete с лимитом.
- Repeatable jobs — фиксированный jobId.
- Идемпотентность — уникальный ключ на внешнее действие.
- Bull Board за паролем для отладки.
- Алерт на failed-счётчик.
- SIGTERM → worker.close() → connection.quit().
BullMQ — стабильный, удобный инструмент, который очень аккуратно ложится на Node-стек. На серьёзных нагрузках держал у меня тысячи jobs в минуту без сюрпризов. Главное — настроить retry, идемпотентность и graceful shutdown с самого начала, а не доделывать потом.