Инкрементальные модели в dbt: как настроить, чтобы они не ломались
Первый раз я переключила тяжёлую таблицу фактов с materialized='table' на materialized='incremental' — и через неделю мне в Slack постучался аналитик: «У нас в дашборде дубли». Оказалось, я неправильно выбрала unique_key, и часть строк подтягивалась дважды. Тогда же я поняла, что инкрементальные модели — это не «добавил флаг и сэкономил время», а отдельный паттерн со своими ловушками.
Расскажу, как я в итоге собираю инкрементальные модели в проектах на Postgres и Snowflake, чтобы они переживали и поздно прилетающие данные, и рестейты, и переключение источников. Примеры — на dbt-core 1.8, синтаксис в 1.7+ совпадает.
Когда инкрементальная модель действительно нужна
Прежде чем переключать материализацию, я задаю себе три вопроса. Если хотя бы на один отвечаю «нет», оставляю обычную table.
- Полный пересчёт занимает дольше 5–10 минут или съедает заметную часть кредитов в Snowflake.
- В источнике есть надёжная колонка времени события или загрузки (
event_timestamp,updated_at,_loaded_at). - Можно чётко определить «новую порцию данных» относительно того, что уже лежит в таблице.
Когда у меня ETL работал по 4 часа, я узнала, что забыла индекс — и оказалось, что инкремент тут вообще не нужен, табличку просто не на чем было считать. Сначала смотри план запроса, и только потом переключай материализацию.
Базовый шаблон, который работает
Вот заготовка, которую я копирую почти во все факт-таблицы. Дальше буду показывать, что в ней важно.
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='append_new_columns'
)
}}
with source as (
select *
from {{ ref('stg_events') }}
{% if is_incremental() %}
where _loaded_at >= (
select coalesce(max(_loaded_at), '1900-01-01') - interval '2 days'
from {{ this }}
)
{% endif %}
)
select
event_id,
user_id,
event_type,
event_ts,
_loaded_at
from source
Что тут важного:
- Фильтр в CTE, а не в финальном
select. Так оптимизатор Postgres и Snowflake может прокинуть условие в источник. - Окно с запасом:
- interval '2 days'. Это страховка от поздних данных. Об этом подробнее ниже. - Фильтр по
_loaded_at, а не поevent_ts. Бизнесовая дата события может прийти задним числом, а вот метка «когда мы это загрузили» — монотонна. - Стратегия
merge— про неё тоже отдельно.
Как выбрать unique_key
Это первая вещь, на которой я когда-то споткнулась. unique_key должен реально однозначно идентифицировать строку в финальной таблице. Не в источнике, а в том виде, в каком она появится после твоих преобразований.
Несколько правил, к которым я пришла:
- Если в источнике есть стабильный первичный ключ — бери его.
- Если строка — это «факт за день» (агрегат), ключ обычно составной:
['user_id', 'event_date']. dbt поддерживает массивы вunique_keyс версии 1.5. - Если ключ нестабилен (например, ты его сам генерируешь через
md5), убедись, что хеш считается из тех же колонок и в том же порядке, иначе после рефакторинга получишь дубликаты.
Проверка простая: после первого полного запуска и одного инкрементального прогона выполни запрос:
select event_id, count(*)
from analytics.fct_events
group by 1
having count(*) > 1;
Если что-то нашлось — стратегия или ключ выбраны неправильно. Лучше поймать на дев-схеме, чем в проде.
Стратегии: append, merge, delete+insert, insert_overwrite
В dbt четыре основные стратегии. Я пользуюсь так:
append
Просто добавляет новые строки. Подходит, если гарантировано, что одна и та же строка не прилетит дважды и не обновляется. Например, иммутабельный лог событий, где у каждого события есть свой event_id и он никогда не меняется.
Минус: dbt не проверяет уникальность. Если источник дёрнули дважды — получишь дубли. Поэтому я append почти не использую без отдельных тестов на уникальность.
merge
Дефолт в большинстве адаптеров. Делает MERGE INTO по unique_key. Если строка с таким ключом есть — обновляет, нет — вставляет. На Postgres до версии 15 транслируется в delete + insert через временную таблицу, на 16+ — настоящий MERGE. На Snowflake и BigQuery — нативный merge.
Пользуюсь по умолчанию для всех таблиц, где строки могут обновляться (статусы заказов, профили пользователей, агрегаты).
delete+insert
Удаляет строки с матчащимся ключом и вставляет новые. По смыслу как merge, но без обновления — полная перезапись. Удобно, если меняется много колонок и проще переписать строку целиком.
insert_overwrite
Перезаписывает целиком указанные партиции. Работает только на адаптерах с партиционированием (BigQuery, Spark, недавно — Snowflake через micro-partitions). Идеально для ежедневных снапшотов: пересчитываешь партицию за день и заменяешь её целиком.
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'event_date', 'data_type': 'date'}
)
}}
На BigQuery это самый дешёвый способ переcчитать «вчера и сегодня».
Поздно прилетающие данные
Самая частая боль. Пример: события из мобильного приложения. Пользователь был оффлайн, событие случилось вчера в 23:50, но прилетело сегодня в 09:00. Если фильтровать строго where event_ts > max(event_ts), ты его потеряешь.
Что я делаю:
- Фильтрую по технической колонке загрузки (
_loaded_atили_ingested_at), а не по бизнесовой. - Беру окно с запасом:
max(_loaded_at) - interval '2 days'. Размер окна подбираю по реальному распределению задержек. - В стратегии —
mergeилиdelete+insert, чтобы повторно прилетевшие строки обновились, а не задвоились.
Окно «2 дня» — это компромисс. Слишком маленькое — потеряешь поздние события. Слишком большое — каждый прогон будет почти как фуллскан. Я обычно строю гистограмму задержек и беру 99-й перцентиль, округлённый вверх.
select
date_trunc('hour', _loaded_at - event_ts) as delay,
count(*)
from raw.events
where _loaded_at >= current_date - interval '30 days'
group by 1
order by 1;
Что делать при изменении схемы
Параметр on_schema_change — это про то, что dbt делает, если колонки в SQL-модели и в существующей таблице разошлись.
ignore— дефолт. dbt молча игнорирует новые колонки. Обычно не то, что нужно.fail— падает с ошибкой. Хорошо для строгих DWH, плохо если у тебя 30 моделей и одна сломала ночной прогон.append_new_columns— добавляет новые колонки в существующую таблицу сNULLв исторических данных. Я ставлю это почти везде по умолчанию.sync_all_columns— добавляет новые и удаляет исчезнувшие. Использую только для моделей, которыми владею целиком.
Важно: on_schema_change не пересчитывает исторические данные. Если новая колонка должна быть посчитана и для старых строк — нужен --full-refresh.
Когда нужен --full-refresh
Инкрементальная модель — это не «один раз настроил и забыл». Я планирую полные пересчёты в нескольких случаях:
- Изменилась логика трансформации (например, поменяла маппинг событий).
- Добавила новую колонку, которую нужно посчитать ретроспективно.
- Заметили баг в исторических данных.
- Раз в месяц-квартал — просто чтобы сверить с источником.
В CI у меня отдельный джоб, который раз в неделю гоняет dbt build --full-refresh --select tag:weekly_full. Помечаю им только модели, для которых полный пересчёт реально дешевле, чем разбираться, что там накопилось за неделю.
Тесты, без которых я не выкатываю инкрементальную модель
Минимальный набор:
version: 2
models:
- name: fct_events
columns:
- name: event_id
tests:
- unique
- not_null
- name: user_id
tests:
- not_null
- relationships:
to: ref('dim_users')
field: user_id
tests:
- dbt_utils.recency:
datepart: hour
field: _loaded_at
interval: 6
recency из dbt-utils — это страховка от тихих поломок. Если по какой-то причине инкремент перестал догонять, тест упадёт раньше, чем аналитики заметят, что цифры остановились.
Дополнительно — кастомный тест на отсутствие «дырок». В одном из проектов я писала такой:
-- tests/no_gaps_in_fct_events.sql
with days as (
select generate_series(
current_date - interval '7 days',
current_date - interval '1 day',
interval '1 day'
)::date as d
),
actual as (
select event_date, count(*) as cnt
from {{ ref('fct_events') }}
where event_date >= current_date - interval '7 days'
group by 1
)
select d
from days
left join actual on days.d = actual.event_date
where actual.cnt is null or actual.cnt = 0;
Идемпотентность и ручные перезапуски
Хорошая инкрементальная модель — идемпотентная. Если запустить её дважды подряд, результат должен быть одинаковым. Проверяю так: на дев-схеме делаю dbt run --select my_model два раза и сравниваю количество строк и контрольные суммы.
select count(*), sum(amount), max(_loaded_at)
from analytics_dev.fct_orders;
Если после второго прогона цифры поехали — где-то в логике учитывается «текущее состояние» таблицы непредсказуемым образом. Чаще всего это баг в фильтре или забытый distinct.
Что я бы хотела знать раньше
Несколько вещей, которые сэкономили бы мне пару выходных:
- Не делай инкрементальную модель из любопытства. Замерь время полного пересчёта. Если оно меньше 5 минут — не трогай.
- Никогда не используй
appendбез теста на уникальность. Никогда. - Имя CTE с фильтром оставляй простым (
source,filtered_source). Когда через полгода придётся отлаживать, ты не захочешь вспоминать, что значитcte_42. - Документируй, какое окно поздних данных ты заложила. Я пишу прямо в описании модели в
schema.yml: «Учитывает события, прилетевшие с задержкой до 48 часов». - В Postgres проверь, что у целевой таблицы есть индекс по
unique_key. Без негоMERGEпревращается в фуллскан, и инкремент работает медленнее, чем полный пересчёт.
Инкрементальные модели — это не магия и не серебряная пуля. Это компромисс: ты меняешь простоту на скорость и платишь за это дополнительной сложностью. Если эта сложность окупается — отлично. Если нет — оставайся на table, выспись и сэкономь себе и команде нервы.