lenec ru

← все посты

Outbox pattern без Kafka: реализация на RabbitMQ и NATS

14K

Outbox pattern в учебниках всегда показывают на Kafka. Это создаёт ложное впечатление, что без Kafka паттерн «не настоящий». На самом деле outbox решает универсальную проблему — атомарность записи в БД и публикации события — независимо от того, какой брокер стоит дальше. RabbitMQ, NATS, AWS SNS+SQS, Redis Streams — везде эта же боль: commit в Postgres и publish в брокер не атомарны.

Я внедрял outbox с Kafka, RabbitMQ и NATS — последний раз полгода назад. У каждого варианта свои нюансы: разные гарантии у брокеров, разная семантика подтверждений, разные подходы к идемпотентности на стороне consumer'а. Разберу, как делать outbox с RabbitMQ и NATS, и где они отличаются от Kafka-варианта.

Базовый шаблон тот же

Независимо от брокера, ядро outbox одинаковое:

  1. Бизнес-операция и запись в таблицу outbox идут в одной транзакции.
  2. Отдельный воркер читает таблицу, публикует в брокер, помечает отправленные.
  3. Гарантия — at-least-once. Дедуп на стороне consumer'а.

Меняется только то, как именно воркер публикует и как consumer гарантирует exactly-effective-once.

Outbox с RabbitMQ

RabbitMQ — традиционный брокер с очередями, exchanges, маршрутизацией. Семантика отличается от Kafka: сообщения не остаются в очереди после прочтения, очередь имеет конечный размер, нет «партиций» в кафкианском смысле.

Подтверждения

Главное, что нужно настроить — publisher confirms. Без них producer не знает, дошло ли сообщение в брокер. Включается на канале:

val factory = ConnectionFactory().apply {
    host = "rabbitmq"
    isAutomaticRecoveryEnabled = true
}
val connection = factory.newConnection()
val channel = connection.createChannel().apply {
    confirmSelect()
}

fun publish(rows: List<OutboxRow>): Set<Long> {
    val acked = ConcurrentHashMap.newKeySet<Long>()
    val pending = ConcurrentHashMap<Long, Long>() // deliveryTag -> outboxId
    
    channel.addConfirmListener({ tag, multiple ->
        if (multiple) {
            pending.entries.removeIf { it.key <= tag && acked.add(it.value) }
        } else {
            pending.remove(tag)?.also { acked.add(it) }
        }
    }, { _, _ -> /* nack обработать */ })
    
    rows.forEach { row ->
        val tag = channel.nextPublishSeqNo
        pending[tag] = row.id
        channel.basicPublish(
            "events",
            row.routingKey,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            row.payload
        )
    }
    channel.waitForConfirmsOrDie(5_000)
    return acked
}

Без confirmSelect() и явного ожидания ack'ов вы будете отмечать sent_at на сообщения, которые могли не дойти. Это превращает at-least-once в «иногда теряем».

Persistent + durable

Очередь должна быть durable, exchange — durable, сообщение — persistent. Иначе при перезагрузке RabbitMQ сообщения исчезнут.

channel.exchangeDeclare("events", BuiltinExchangeType.TOPIC, true)
channel.queueDeclare("orders.events", true, false, false, null)
channel.queueBind("orders.events", "events", "orders.*")

На consumer-стороне — manual ack. Получили, обработали, ack'ом подтверждаем. Без ack'а сообщение остаётся в очереди и при разрыве соединения вернётся.

Идемпотентность consumer'а

RabbitMQ может доставить сообщение дважды (re-delivery после разрыва). Поэтому consumer обязан быть идемпотентным. Стандартный паттерн — таблица processed_events с уникальным id события:

@Transactional
fun handle(event: OrderCreated) {
    val inserted = jdbc.update(
        "INSERT INTO processed_events (event_id, processed_at) VALUES (?, now()) ON CONFLICT DO NOTHING",
        event.id
    ) > 0
    
    if (!inserted) {
        log.info("event ${event.id} already processed, skipping")
        return
    }
    
    inventoryService.reserve(event.orderId)
}

Идентификатор события — это либо id из outbox-таблицы producer'а (если у вас контроль над всем стеком), либо UUID, который producer генерирует и кладёт в payload/headers.

Outbox с NATS

NATS — лёгкий брокер с разными режимами доставки. Базовый core NATS даёт fire-and-forget с at-most-once: если сообщение не дошло, оно потерялось. Для надёжной доставки нужен JetStream — слой персистентности поверх NATS.

JetStream даёт at-least-once при правильной настройке. С него и работает outbox.

Стрим и публикация

val nc = Nats.connect("nats://nats:4222")
val js = nc.jetStreamManagement()

// Стрим создаётся один раз при инициализации
js.addStream(StreamConfiguration.builder()
    .name("ORDERS")
    .subjects("orders.>")
    .storageType(StorageType.File)
    .retentionPolicy(RetentionPolicy.Limits)
    .maxAge(Duration.ofDays(7))
    .build())

val jsPublish = nc.jetStream()

fun publishOutboxRow(row: OutboxRow): Boolean {
    val ack = jsPublish.publish(
        "orders.${row.eventType}",
        row.payload,
        PublishOptions.builder()
            .messageId(row.id.toString()) // дедуп на стороне JetStream
            .build()
    )
    return ack.seqno > 0
}

Ключевая фича — messageId. JetStream по нему дедуплицирует: если пришло сообщение с тем же id в окне дедупликации (по умолчанию 2 минуты), оно отбрасывается. Это даёт частичный effectively-once на стороне брокера.

Это важно для outbox: воркер мог упасть после publish, но до UPDATE sent_at. При перезапуске он снова возьмёт ту же строку и опубликует с тем же messageId — JetStream её отфильтрует.

Pull vs push consumer

NATS JetStream даёт два типа consumer'ов. Pull consumer более предсказуемый: вы сами запрашиваете batch сообщений, обрабатываете, ack'аете.

val consumer = js.subscribe(
    "orders.>",
    PullSubscribeOptions.builder()
        .durable("inventory-svc")
        .build()
)

while (running) {
    val messages = consumer.fetch(100, Duration.ofSeconds(5))
    messages.forEach { msg ->
        try {
            handle(parseEvent(msg.data))
            msg.ack()
        } catch (e: Exception) {
            msg.nakWithDelay(Duration.ofSeconds(30))
        }
    }
}

Durable name (inventory-svc) — это «офсет» consumer'а, JetStream запоминает, до какого сообщения он дочитал. После перезапуска продолжит с того же места.

Дедуп на consumer-стороне

Несмотря на JetStream-дедуп на producer-стороне, consumer всё равно может получить одно сообщение дважды (повторная доставка после разрыва ack-таймаута). Поэтому идемпотентность на стороне consumer'а тоже нужна — тот же паттерн с processed_events.

Сравнение с Kafka

Чтобы понимать, чем отличаются варианты.

Kafka. Order гарантирован внутри партиции, retention настраивается на уровне топика, потребление — pull-only через consumer group. Дедуп на producer-стороне через transactional producer + idempotent producer. Из коробки лучше всего поддерживает outbox через CDC (Debezium).

RabbitMQ. Очереди, не лог. Сообщение исчезает после ack'а. Order только при single consumer на queue. Persistent confirms — отдельный механизм. Хорошо для command-like сценариев и rpc-стилевой коммуникации.

NATS JetStream. Лог-семантика, как у Kafka, но легче. Нативная дедупликация по messageId, что упрощает outbox. Меньше операционной нагрузки, чем Kafka. Подходит для low-latency сценариев.

Для outbox конкретно: если у вас нет Kafka и не планируется, NATS JetStream — самый простой выбор. RabbitMQ работает, но требует больше ручной работы с подтверждениями.

Один воркер или distributed

Воркер outbox можно запускать в одном экземпляре или в нескольких. С RabbitMQ и NATS JetStream нет встроенного «партиционирования» как в Kafka — поэтому масштабирование делается через локальный механизм.

Простейший подход — leader election: один из инстансов сервиса берёт на себя роль outbox publisher'а, остальные ждут. Делается через Postgres advisory lock:

-- В каждом инстансе при старте
SELECT pg_try_advisory_lock(42);
-- Только один получит TRUE и станет publisher'ом

На умеренных нагрузках одного воркера хватает с запасом. На высоких — партиционирование outbox-таблицы по hash(aggregate_id) и каждый воркер обрабатывает свою партицию.

SELECT id, aggregate_id, event_type, payload
FROM outbox
WHERE sent_at IS NULL
  AND ('x' || substring(md5(aggregate_id::text), 1, 8))::bit(32)::int % :total_workers = :worker_id
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;

Подводные камни

Несколько вещей, которые я ловил в проде.

RabbitMQ: переполнение очереди. Если consumer медленный, очередь растёт. Без x-max-length или TTL она съест всю память брокера. Конкретные значения зависят от объёма, но иметь обе границы — обязательно.

NATS: max_age vs max_msgs. Аналогично RabbitMQ: stream должен иметь хотя бы один из лимитов. Иначе он растёт бесконечно. На дисковом сторадже это не сразу видно, потом вдруг диск заканчивается.

Дедуп-окно JetStream. По умолчанию 2 минуты. Если воркер outbox задержался дольше (упал, перезапустился через 5 минут), второй publish того же messageId уже не отфильтруется. Увеличивайте duplicate_window до значения, превышающего ваш максимальный downtime воркера.

Routing keys в RabbitMQ. Topic exchange с гибким матчингом — мощно, но легко создать routing key, на который никто не подписан, и потерять сообщение тихо. Включите alternate exchange, чтобы такие сообщения шли в DLQ.

Heartbeats. На long-running consumer'ах RabbitMQ AMQP-соединение может рваться по таймауту. Настраивайте heartbeat и автоматическое переподключение.

Что запомнить

Outbox pattern не привязан к Kafka. Базовая идея — атомарная запись в БД и асинхронная доставка — переносится на любой брокер. Меняются детали: подтверждения у producer'а, дедуп, способ ack'ать у consumer'а.

RabbitMQ хорош для command-like интеграций и сценариев, где уже есть AMQP в стеке. NATS JetStream — лучший выбор, когда нужен лёгкий персистентный брокер без операционной нагрузки Kafka. Kafka остаётся королём для массивных потоков и аналитики.

Главное — не считать, что без Kafka «outbox не настоящий». Принципы те же, инструмент другой. Выбирайте по реальным требованиям, не по статьям про микросервисы 2018 года.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.