Outbox pattern без Kafka: реализация на RabbitMQ и NATS
Outbox pattern в учебниках всегда показывают на Kafka. Это создаёт ложное впечатление, что без Kafka паттерн «не настоящий». На самом деле outbox решает универсальную проблему — атомарность записи в БД и публикации события — независимо от того, какой брокер стоит дальше. RabbitMQ, NATS, AWS SNS+SQS, Redis Streams — везде эта же боль: commit в Postgres и publish в брокер не атомарны.
Я внедрял outbox с Kafka, RabbitMQ и NATS — последний раз полгода назад. У каждого варианта свои нюансы: разные гарантии у брокеров, разная семантика подтверждений, разные подходы к идемпотентности на стороне consumer'а. Разберу, как делать outbox с RabbitMQ и NATS, и где они отличаются от Kafka-варианта.
Базовый шаблон тот же
Независимо от брокера, ядро outbox одинаковое:
- Бизнес-операция и запись в таблицу
outboxидут в одной транзакции. - Отдельный воркер читает таблицу, публикует в брокер, помечает отправленные.
- Гарантия — at-least-once. Дедуп на стороне consumer'а.
Меняется только то, как именно воркер публикует и как consumer гарантирует exactly-effective-once.
Outbox с RabbitMQ
RabbitMQ — традиционный брокер с очередями, exchanges, маршрутизацией. Семантика отличается от Kafka: сообщения не остаются в очереди после прочтения, очередь имеет конечный размер, нет «партиций» в кафкианском смысле.
Подтверждения
Главное, что нужно настроить — publisher confirms. Без них producer не знает, дошло ли сообщение в брокер. Включается на канале:
val factory = ConnectionFactory().apply {
host = "rabbitmq"
isAutomaticRecoveryEnabled = true
}
val connection = factory.newConnection()
val channel = connection.createChannel().apply {
confirmSelect()
}
fun publish(rows: List<OutboxRow>): Set<Long> {
val acked = ConcurrentHashMap.newKeySet<Long>()
val pending = ConcurrentHashMap<Long, Long>() // deliveryTag -> outboxId
channel.addConfirmListener({ tag, multiple ->
if (multiple) {
pending.entries.removeIf { it.key <= tag && acked.add(it.value) }
} else {
pending.remove(tag)?.also { acked.add(it) }
}
}, { _, _ -> /* nack обработать */ })
rows.forEach { row ->
val tag = channel.nextPublishSeqNo
pending[tag] = row.id
channel.basicPublish(
"events",
row.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
row.payload
)
}
channel.waitForConfirmsOrDie(5_000)
return acked
}Без confirmSelect() и явного ожидания ack'ов вы будете отмечать sent_at на сообщения, которые могли не дойти. Это превращает at-least-once в «иногда теряем».
Persistent + durable
Очередь должна быть durable, exchange — durable, сообщение — persistent. Иначе при перезагрузке RabbitMQ сообщения исчезнут.
channel.exchangeDeclare("events", BuiltinExchangeType.TOPIC, true)
channel.queueDeclare("orders.events", true, false, false, null)
channel.queueBind("orders.events", "events", "orders.*")На consumer-стороне — manual ack. Получили, обработали, ack'ом подтверждаем. Без ack'а сообщение остаётся в очереди и при разрыве соединения вернётся.
Идемпотентность consumer'а
RabbitMQ может доставить сообщение дважды (re-delivery после разрыва). Поэтому consumer обязан быть идемпотентным. Стандартный паттерн — таблица processed_events с уникальным id события:
@Transactional
fun handle(event: OrderCreated) {
val inserted = jdbc.update(
"INSERT INTO processed_events (event_id, processed_at) VALUES (?, now()) ON CONFLICT DO NOTHING",
event.id
) > 0
if (!inserted) {
log.info("event ${event.id} already processed, skipping")
return
}
inventoryService.reserve(event.orderId)
}Идентификатор события — это либо id из outbox-таблицы producer'а (если у вас контроль над всем стеком), либо UUID, который producer генерирует и кладёт в payload/headers.
Outbox с NATS
NATS — лёгкий брокер с разными режимами доставки. Базовый core NATS даёт fire-and-forget с at-most-once: если сообщение не дошло, оно потерялось. Для надёжной доставки нужен JetStream — слой персистентности поверх NATS.
JetStream даёт at-least-once при правильной настройке. С него и работает outbox.
Стрим и публикация
val nc = Nats.connect("nats://nats:4222")
val js = nc.jetStreamManagement()
// Стрим создаётся один раз при инициализации
js.addStream(StreamConfiguration.builder()
.name("ORDERS")
.subjects("orders.>")
.storageType(StorageType.File)
.retentionPolicy(RetentionPolicy.Limits)
.maxAge(Duration.ofDays(7))
.build())
val jsPublish = nc.jetStream()
fun publishOutboxRow(row: OutboxRow): Boolean {
val ack = jsPublish.publish(
"orders.${row.eventType}",
row.payload,
PublishOptions.builder()
.messageId(row.id.toString()) // дедуп на стороне JetStream
.build()
)
return ack.seqno > 0
}Ключевая фича — messageId. JetStream по нему дедуплицирует: если пришло сообщение с тем же id в окне дедупликации (по умолчанию 2 минуты), оно отбрасывается. Это даёт частичный effectively-once на стороне брокера.
Это важно для outbox: воркер мог упасть после publish, но до UPDATE sent_at. При перезапуске он снова возьмёт ту же строку и опубликует с тем же messageId — JetStream её отфильтрует.
Pull vs push consumer
NATS JetStream даёт два типа consumer'ов. Pull consumer более предсказуемый: вы сами запрашиваете batch сообщений, обрабатываете, ack'аете.
val consumer = js.subscribe(
"orders.>",
PullSubscribeOptions.builder()
.durable("inventory-svc")
.build()
)
while (running) {
val messages = consumer.fetch(100, Duration.ofSeconds(5))
messages.forEach { msg ->
try {
handle(parseEvent(msg.data))
msg.ack()
} catch (e: Exception) {
msg.nakWithDelay(Duration.ofSeconds(30))
}
}
}Durable name (inventory-svc) — это «офсет» consumer'а, JetStream запоминает, до какого сообщения он дочитал. После перезапуска продолжит с того же места.
Дедуп на consumer-стороне
Несмотря на JetStream-дедуп на producer-стороне, consumer всё равно может получить одно сообщение дважды (повторная доставка после разрыва ack-таймаута). Поэтому идемпотентность на стороне consumer'а тоже нужна — тот же паттерн с processed_events.
Сравнение с Kafka
Чтобы понимать, чем отличаются варианты.
Kafka. Order гарантирован внутри партиции, retention настраивается на уровне топика, потребление — pull-only через consumer group. Дедуп на producer-стороне через transactional producer + idempotent producer. Из коробки лучше всего поддерживает outbox через CDC (Debezium).
RabbitMQ. Очереди, не лог. Сообщение исчезает после ack'а. Order только при single consumer на queue. Persistent confirms — отдельный механизм. Хорошо для command-like сценариев и rpc-стилевой коммуникации.
NATS JetStream. Лог-семантика, как у Kafka, но легче. Нативная дедупликация по messageId, что упрощает outbox. Меньше операционной нагрузки, чем Kafka. Подходит для low-latency сценариев.
Для outbox конкретно: если у вас нет Kafka и не планируется, NATS JetStream — самый простой выбор. RabbitMQ работает, но требует больше ручной работы с подтверждениями.
Один воркер или distributed
Воркер outbox можно запускать в одном экземпляре или в нескольких. С RabbitMQ и NATS JetStream нет встроенного «партиционирования» как в Kafka — поэтому масштабирование делается через локальный механизм.
Простейший подход — leader election: один из инстансов сервиса берёт на себя роль outbox publisher'а, остальные ждут. Делается через Postgres advisory lock:
-- В каждом инстансе при старте
SELECT pg_try_advisory_lock(42);
-- Только один получит TRUE и станет publisher'омНа умеренных нагрузках одного воркера хватает с запасом. На высоких — партиционирование outbox-таблицы по hash(aggregate_id) и каждый воркер обрабатывает свою партицию.
SELECT id, aggregate_id, event_type, payload
FROM outbox
WHERE sent_at IS NULL
AND ('x' || substring(md5(aggregate_id::text), 1, 8))::bit(32)::int % :total_workers = :worker_id
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;Подводные камни
Несколько вещей, которые я ловил в проде.
RabbitMQ: переполнение очереди. Если consumer медленный, очередь растёт. Без x-max-length или TTL она съест всю память брокера. Конкретные значения зависят от объёма, но иметь обе границы — обязательно.
NATS: max_age vs max_msgs. Аналогично RabbitMQ: stream должен иметь хотя бы один из лимитов. Иначе он растёт бесконечно. На дисковом сторадже это не сразу видно, потом вдруг диск заканчивается.
Дедуп-окно JetStream. По умолчанию 2 минуты. Если воркер outbox задержался дольше (упал, перезапустился через 5 минут), второй publish того же messageId уже не отфильтруется. Увеличивайте duplicate_window до значения, превышающего ваш максимальный downtime воркера.
Routing keys в RabbitMQ. Topic exchange с гибким матчингом — мощно, но легко создать routing key, на который никто не подписан, и потерять сообщение тихо. Включите alternate exchange, чтобы такие сообщения шли в DLQ.
Heartbeats. На long-running consumer'ах RabbitMQ AMQP-соединение может рваться по таймауту. Настраивайте heartbeat и автоматическое переподключение.
Что запомнить
Outbox pattern не привязан к Kafka. Базовая идея — атомарная запись в БД и асинхронная доставка — переносится на любой брокер. Меняются детали: подтверждения у producer'а, дедуп, способ ack'ать у consumer'а.
RabbitMQ хорош для command-like интеграций и сценариев, где уже есть AMQP в стеке. NATS JetStream — лучший выбор, когда нужен лёгкий персистентный брокер без операционной нагрузки Kafka. Kafka остаётся королём для массивных потоков и аналитики.
Главное — не считать, что без Kafka «outbox не настоящий». Принципы те же, инструмент другой. Выбирайте по реальным требованиям, не по статьям про микросервисы 2018 года.