lenec ru

← все посты

Инкрементальные модели в dbt: как настроить, чтобы они не ломались

13K

Первый раз я переключила тяжёлую таблицу фактов с materialized='table' на materialized='incremental' — и через неделю мне в Slack постучался аналитик: «У нас в дашборде дубли». Оказалось, я неправильно выбрала unique_key, и часть строк подтягивалась дважды. Тогда же я поняла, что инкрементальные модели — это не «добавил флаг и сэкономил время», а отдельный паттерн со своими ловушками.

Расскажу, как я в итоге собираю инкрементальные модели в проектах на Postgres и Snowflake, чтобы они переживали и поздно прилетающие данные, и рестейты, и переключение источников. Примеры — на dbt-core 1.8, синтаксис в 1.7+ совпадает.

Когда инкрементальная модель действительно нужна

Прежде чем переключать материализацию, я задаю себе три вопроса. Если хотя бы на один отвечаю «нет», оставляю обычную table.

  • Полный пересчёт занимает дольше 5–10 минут или съедает заметную часть кредитов в Snowflake.
  • В источнике есть надёжная колонка времени события или загрузки (event_timestamp, updated_at, _loaded_at).
  • Можно чётко определить «новую порцию данных» относительно того, что уже лежит в таблице.

Когда у меня ETL работал по 4 часа, я узнала, что забыла индекс — и оказалось, что инкремент тут вообще не нужен, табличку просто не на чем было считать. Сначала смотри план запроса, и только потом переключай материализацию.

Базовый шаблон, который работает

Вот заготовка, которую я копирую почти во все факт-таблицы. Дальше буду показывать, что в ней важно.

{{
  config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
  )
}}

with source as (
    select *
    from {{ ref('stg_events') }}
    {% if is_incremental() %}
      where _loaded_at >= (
          select coalesce(max(_loaded_at), '1900-01-01') - interval '2 days'
          from {{ this }}
      )
    {% endif %}
)

select
    event_id,
    user_id,
    event_type,
    event_ts,
    _loaded_at
from source

Что тут важного:

  • Фильтр в CTE, а не в финальном select. Так оптимизатор Postgres и Snowflake может прокинуть условие в источник.
  • Окно с запасом: - interval '2 days'. Это страховка от поздних данных. Об этом подробнее ниже.
  • Фильтр по _loaded_at, а не по event_ts. Бизнесовая дата события может прийти задним числом, а вот метка «когда мы это загрузили» — монотонна.
  • Стратегия merge — про неё тоже отдельно.

Как выбрать unique_key

Это первая вещь, на которой я когда-то споткнулась. unique_key должен реально однозначно идентифицировать строку в финальной таблице. Не в источнике, а в том виде, в каком она появится после твоих преобразований.

Несколько правил, к которым я пришла:

  • Если в источнике есть стабильный первичный ключ — бери его.
  • Если строка — это «факт за день» (агрегат), ключ обычно составной: ['user_id', 'event_date']. dbt поддерживает массивы в unique_key с версии 1.5.
  • Если ключ нестабилен (например, ты его сам генерируешь через md5), убедись, что хеш считается из тех же колонок и в том же порядке, иначе после рефакторинга получишь дубликаты.

Проверка простая: после первого полного запуска и одного инкрементального прогона выполни запрос:

select event_id, count(*)
from analytics.fct_events
group by 1
having count(*) > 1;

Если что-то нашлось — стратегия или ключ выбраны неправильно. Лучше поймать на дев-схеме, чем в проде.

Стратегии: append, merge, delete+insert, insert_overwrite

В dbt четыре основные стратегии. Я пользуюсь так:

append

Просто добавляет новые строки. Подходит, если гарантировано, что одна и та же строка не прилетит дважды и не обновляется. Например, иммутабельный лог событий, где у каждого события есть свой event_id и он никогда не меняется.

Минус: dbt не проверяет уникальность. Если источник дёрнули дважды — получишь дубли. Поэтому я append почти не использую без отдельных тестов на уникальность.

merge

Дефолт в большинстве адаптеров. Делает MERGE INTO по unique_key. Если строка с таким ключом есть — обновляет, нет — вставляет. На Postgres до версии 15 транслируется в delete + insert через временную таблицу, на 16+ — настоящий MERGE. На Snowflake и BigQuery — нативный merge.

Пользуюсь по умолчанию для всех таблиц, где строки могут обновляться (статусы заказов, профили пользователей, агрегаты).

delete+insert

Удаляет строки с матчащимся ключом и вставляет новые. По смыслу как merge, но без обновления — полная перезапись. Удобно, если меняется много колонок и проще переписать строку целиком.

insert_overwrite

Перезаписывает целиком указанные партиции. Работает только на адаптерах с партиционированием (BigQuery, Spark, недавно — Snowflake через micro-partitions). Идеально для ежедневных снапшотов: пересчитываешь партицию за день и заменяешь её целиком.

{{
  config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={'field': 'event_date', 'data_type': 'date'}
  )
}}

На BigQuery это самый дешёвый способ переcчитать «вчера и сегодня».

Поздно прилетающие данные

Самая частая боль. Пример: события из мобильного приложения. Пользователь был оффлайн, событие случилось вчера в 23:50, но прилетело сегодня в 09:00. Если фильтровать строго where event_ts > max(event_ts), ты его потеряешь.

Что я делаю:

  1. Фильтрую по технической колонке загрузки (_loaded_at или _ingested_at), а не по бизнесовой.
  2. Беру окно с запасом: max(_loaded_at) - interval '2 days'. Размер окна подбираю по реальному распределению задержек.
  3. В стратегии — merge или delete+insert, чтобы повторно прилетевшие строки обновились, а не задвоились.

Окно «2 дня» — это компромисс. Слишком маленькое — потеряешь поздние события. Слишком большое — каждый прогон будет почти как фуллскан. Я обычно строю гистограмму задержек и беру 99-й перцентиль, округлённый вверх.

select
    date_trunc('hour', _loaded_at - event_ts) as delay,
    count(*)
from raw.events
where _loaded_at >= current_date - interval '30 days'
group by 1
order by 1;

Что делать при изменении схемы

Параметр on_schema_change — это про то, что dbt делает, если колонки в SQL-модели и в существующей таблице разошлись.

  • ignore — дефолт. dbt молча игнорирует новые колонки. Обычно не то, что нужно.
  • fail — падает с ошибкой. Хорошо для строгих DWH, плохо если у тебя 30 моделей и одна сломала ночной прогон.
  • append_new_columns — добавляет новые колонки в существующую таблицу с NULL в исторических данных. Я ставлю это почти везде по умолчанию.
  • sync_all_columns — добавляет новые и удаляет исчезнувшие. Использую только для моделей, которыми владею целиком.

Важно: on_schema_change не пересчитывает исторические данные. Если новая колонка должна быть посчитана и для старых строк — нужен --full-refresh.

Когда нужен --full-refresh

Инкрементальная модель — это не «один раз настроил и забыл». Я планирую полные пересчёты в нескольких случаях:

  • Изменилась логика трансформации (например, поменяла маппинг событий).
  • Добавила новую колонку, которую нужно посчитать ретроспективно.
  • Заметили баг в исторических данных.
  • Раз в месяц-квартал — просто чтобы сверить с источником.

В CI у меня отдельный джоб, который раз в неделю гоняет dbt build --full-refresh --select tag:weekly_full. Помечаю им только модели, для которых полный пересчёт реально дешевле, чем разбираться, что там накопилось за неделю.

Тесты, без которых я не выкатываю инкрементальную модель

Минимальный набор:

version: 2

models:
  - name: fct_events
    columns:
      - name: event_id
        tests:
          - unique
          - not_null
      - name: user_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_users')
              field: user_id
    tests:
      - dbt_utils.recency:
          datepart: hour
          field: _loaded_at
          interval: 6

recency из dbt-utils — это страховка от тихих поломок. Если по какой-то причине инкремент перестал догонять, тест упадёт раньше, чем аналитики заметят, что цифры остановились.

Дополнительно — кастомный тест на отсутствие «дырок». В одном из проектов я писала такой:

-- tests/no_gaps_in_fct_events.sql
with days as (
    select generate_series(
        current_date - interval '7 days',
        current_date - interval '1 day',
        interval '1 day'
    )::date as d
),
actual as (
    select event_date, count(*) as cnt
    from {{ ref('fct_events') }}
    where event_date >= current_date - interval '7 days'
    group by 1
)
select d
from days
left join actual on days.d = actual.event_date
where actual.cnt is null or actual.cnt = 0;

Идемпотентность и ручные перезапуски

Хорошая инкрементальная модель — идемпотентная. Если запустить её дважды подряд, результат должен быть одинаковым. Проверяю так: на дев-схеме делаю dbt run --select my_model два раза и сравниваю количество строк и контрольные суммы.

select count(*), sum(amount), max(_loaded_at)
from analytics_dev.fct_orders;

Если после второго прогона цифры поехали — где-то в логике учитывается «текущее состояние» таблицы непредсказуемым образом. Чаще всего это баг в фильтре или забытый distinct.

Что я бы хотела знать раньше

Несколько вещей, которые сэкономили бы мне пару выходных:

  • Не делай инкрементальную модель из любопытства. Замерь время полного пересчёта. Если оно меньше 5 минут — не трогай.
  • Никогда не используй append без теста на уникальность. Никогда.
  • Имя CTE с фильтром оставляй простым (source, filtered_source). Когда через полгода придётся отлаживать, ты не захочешь вспоминать, что значит cte_42.
  • Документируй, какое окно поздних данных ты заложила. Я пишу прямо в описании модели в schema.yml: «Учитывает события, прилетевшие с задержкой до 48 часов».
  • В Postgres проверь, что у целевой таблицы есть индекс по unique_key. Без него MERGE превращается в фуллскан, и инкремент работает медленнее, чем полный пересчёт.

Инкрементальные модели — это не магия и не серебряная пуля. Это компромисс: ты меняешь простоту на скорость и платишь за это дополнительной сложностью. Если эта сложность окупается — отлично. Если нет — оставайся на table, выспись и сэкономь себе и команде нервы.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.