lenec ru

← все посты

Inbox pattern: как не обработать одно событие дважды

17K

Outbox pattern закрывает половину задачи: producer гарантирует, что событие будет опубликовано хотя бы раз. Но «хотя бы раз» означает «возможно, дважды». На стороне consumer'а это превращается в проблему: один и тот же OrderCreated приходит трижды, и каждый раз сервис уведомлений шлёт письмо, биллинг списывает деньги, склад резервирует товар.

Inbox pattern — зеркало outbox на стороне consumer'а. Идея простая: прежде чем обработать событие, запишите его id в локальную таблицу. Если уже есть — отбрасывайте без обработки. Это даёт effectively-once на стороне получателя, что вместе с at-least-once producer'а складывается в нормальную доставку.

За двенадцать лет я видел три-четыре варианта реализации, и в половине случаев первая попытка содержала тонкую ошибку, которая стреляла на проде. Разберу базовую схему, типичные грабли и какие альтернативы есть.

Зачем inbox

Без inbox любой ретрай в брокере (Kafka rebalance, RabbitMQ redelivery, NATS повторная доставка после ack-таймаута) приводит к повторной обработке. Кодом это решается двумя способами: либо весь обработчик идемпотентен по бизнес-логике (часто невозможно), либо есть явный дедуп.

Бизнес-идемпотентность работает там, где можно её естественно встроить. UPDATE inventory SET reserved = true WHERE order_id = ? — повторный вызов ничего не изменит. Но если обработчик создаёт новую запись или вызывает внешний API — повтор становится дублем.

Inbox даёт универсальное решение: дедуп ставится один раз в инфраструктурном слое, всё бизнес-кодом не парится.

Минимальная схема

CREATE TABLE inbox (
    event_id     UUID PRIMARY KEY,
    event_type   TEXT NOT NULL,
    source       TEXT NOT NULL,
    received_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    processed_at TIMESTAMPTZ
);

CREATE INDEX inbox_unprocessed_idx ON inbox (received_at)
    WHERE processed_at IS NULL;

event_id — это глобальный идентификатор события. Кто его генерирует — отдельный вопрос (см. ниже). PRIMARY KEY обеспечивает дедуп на уровне БД: попытка вставить уже существующий id провалится.

processed_at отделён от received_at: между этими моментами событие лежит в inbox, но ещё не обработано. Это окно нужно для отказоустойчивости — если обработчик упадёт между записью в inbox и реальной обработкой, при перезапуске мы видим «есть в inbox, не processed» и докатываем.

Алгоритм обработки

Версия для синхронного pull-консьюмера:

fun handle(event: ReceivedEvent) {
    transaction {
        val inserted = jdbc.update(
            """INSERT INTO inbox (event_id, event_type, source)
               VALUES (?, ?, ?)
               ON CONFLICT (event_id) DO NOTHING""",
            event.id, event.type, event.source
        ) > 0
        
        if (!inserted) {
            log.info("event ${event.id} already in inbox, skipping")
            return@transaction
        }
        
        // Бизнес-логика. В той же транзакции, что и пометка processed.
        when (event.type) {
            "OrderCreated" -> orderService.handleCreated(event.payload)
            "OrderPaid" -> orderService.handlePaid(event.payload)
            else -> log.warn("unknown event type ${event.type}")
        }
        
        jdbc.update(
            "UPDATE inbox SET processed_at = now() WHERE event_id = ?",
            event.id
        )
    }
    
    ackToBroker(event.brokerHandle)
}

Ключевые свойства:

  • Вставка в inbox и бизнес-логика — одна транзакция в БД. Если транзакция откатилась, нет ни записи в inbox, ни побочных эффектов.
  • Ack брокеру — после коммита транзакции. Если упали до ack — событие придёт повторно, но inbox его отфильтрует.
  • Если упали внутри транзакции — она откатится, событие повторится через брокер, обработается с нуля.

Откуда брать event_id

Самая частая ошибка — попытаться использовать «естественный» бизнес-id (order_id, user_id) как event_id. Это не работает: одна сущность может породить много событий разных типов.

Правильные источники:

UUID, сгенерированный producer'ом. При создании события producer кладёт UUID в headers/payload. Этот id живёт с событием через брокер до consumer'а. Просто, надёжно, не зависит от брокера.

ID из outbox-таблицы producer'а. Если outbox.id монотонный и уникальный в рамках producer-сервиса, можно использовать составной ключ (source_service, outbox_id). Удобно, когда у вас несколько producer'ов, и id'ы у них пересекаются.

Composite из брокера. Для Kafka — topic + partition + offset. Гарантирует уникальность в рамках топика. Для NATS JetStream — stream + sequence. Не работает, если consumer перебрасывают между топиками или происходит реплей.

Я предпочитаю UUID от producer'а: переносится между брокерами, не зависит от инфраструктуры, и при миграции стека ничего не ломается.

Когда inbox мешает

Несколько случаев, когда inbox добавляет проблем больше, чем решает.

Очень высокая нагрузка на чтение/запись

Каждое событие — INSERT в inbox + UPDATE на завершение + бизнес-операция. На 50 000 событий в секунду эта таблица становится горячей точкой. Нужно партиционирование, отдельный tablespace, иногда — отдельный кластер БД.

Альтернатива — пропустить inbox и использовать бизнес-идемпотентность там, где она возможна. Inbox остаётся только для тех обработчиков, где иначе нельзя.

Долгие обработчики

Обработчик отправляет письмо через внешний API, занимает 3 секунды. Если держать транзакцию открытой всё это время, БД задыхается от блокировок.

Решение — двухфазный inbox:

  1. Получили событие → записали в inbox со статусом received, ack'нули брокеру.
  2. Отдельный воркер читает received из inbox, обрабатывает, переводит в processed.

Это превращает обработку из синхронной в асинхронную и снимает нагрузку с БД. Платите дополнительной задержкой и сложностью retry-логики на обработчике.

Стейт-фул событие

Inbox дедуплицирует event_id, но не дедуплицирует смысл. Если producer два раза прислал «начислить 100 рублей» с разными event_id (баг или преднамеренно), inbox обработает оба — потому что для него это разные события.

Здесь нужна другая защита: бизнес-идемпотентность по бизнес-ключу (operation_id). Inbox не заменяет её, дополняет.

Очистка inbox

Таблица растёт быстрее, чем вы думаете. На 1000 событий в секунду — 86 миллионов строк в день. Через неделю PRIMARY KEY-индекс размером с память.

Стратегии чистки:

  • Удаление по возрасту: DELETE WHERE processed_at < now() - interval '7 days' пачками по cron.
  • Партиционирование по дате: DROP PARTITION вместо DELETE.
  • Архивация в холодное хранилище: события старше N перемещаются в S3.

Минимальное окно хранения — это максимально возможный лаг между doing-the-thing и при котором событие может прийти повторно. Обычно 24-48 часов хватает: брокеры с длительной задержкой повторной доставки — экзотика.

Inbox vs идемпотентность на уровне бизнес-операции

Иногда возникает спор: «зачем inbox, если у меня каждая операция идемпотентна?» Разберу различие.

Бизнес-идемпотентность работает на уровне действия: «зарезервировать товар X для заказа Y» — повторный вызов с теми же параметрами не меняет ничего, потому что флаг уже выставлен.

Inbox работает на уровне сообщения: «событие с id Z уже было обработано, не повторять». Ему всё равно, какая бизнес-логика исполнялась.

Они дополняют друг друга. Бизнес-идемпотентность — первая линия обороны, она спасает от логических дублей (например, при ручном retry). Inbox — вторая линия, спасает от дублей в брокере. Я обычно держу обе там, где это критично.

Подводные камни

Несколько граблей, на которые я наступал.

Race condition между producer'ами. Два producer'а сгенерировали разные события с одним и тем же event_id (UUID-коллизия в теории невозможна, но я видел баг с детерминированной генерацией). Inbox молча отбрасывает второе. Лекарство — UUID v4 (случайный) или композит с источником.

Когда inbox используется в нескольких сервисах, удобно делать (source, event_id) primary key вместо одного event_id. Так точно нельзя случайно сжать события из разных источников.

Inbox без транзакции. Записали в inbox, обработали, потом упали до UPDATE processed_at. При перезапуске вторая попытка увидит запись в inbox и пропустит. Бизнес-логика выполнилась один раз, но процесс «не знает» об этом — может прислать дубликат внешнему API.

Лекарство — обработка и UPDATE в одной транзакции. Или явный статус «обрабатывается» с idempotent-проверкой при перезапуске.

«Просроченные» события. Событие лежит в inbox со статусом received неделю — обработчик упал и забыл про него. Нужен мониторинг: алерт по количеству received, которые старше N минут.

Внешние эффекты вне транзакции. Обработчик в одной транзакции с inbox делает HTTP-вызов наружу. Транзакция держит строку в inbox в блокировке всё это время. Обработка тормозит, БД страдает. Внешние вызовы — после коммита, в отдельном шаге, с собственной идемпотентностью на их стороне.

Что запомнить

Inbox pattern — зеркало outbox, и без него at-least-once producer'а превращается в «обработали несколько раз». Базовая схема — таблица с PRIMARY KEY по event_id, INSERT-ON-CONFLICT-DO-NOTHING, обработка в одной транзакции с пометкой processed_at, ack брокеру после коммита.

Тонкости — в правильном выборе event_id (UUID от producer'а, не бизнес-ключ), в чистке таблицы, в работе с долгими обработчиками. Если у вас уже есть outbox на стороне producer'ов, inbox — естественный следующий шаг для consumer'ов, и стоит он на порядок дешевле, чем разбор инцидента с двойными списаниями.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.