errgroup vs каналы: как организовать параллельную работу в Go
В Go одна и та же задача параллельной обработки может быть написана пятью разными способами. Каналы, WaitGroup, errgroup, semaphore, fan-out/fan-in. Когда ревьюешь чужой код, часто видишь все пять подходов в одном проекте, и это не всегда оправдано.
Расскажу, чем удобен errgroup, в каких случаях он избыточен, и где «классические» каналы лучше. Без идеологии, просто практика по итогам нескольких проектов.
Что даёт errgroup из коробки
Пакет golang.org/x/sync/errgroup — тонкая обёртка над sync.WaitGroup с тремя удобствами:
- Любая горутина возвращает первую ошибку, остальные отменяются через ctx.
- Не нужен явный WaitGroup — счётчик внутри.
- Через
SetLimitможно ограничить число одновременно выполняемых задач.
Минимальный пример:
g, ctx := errgroup.WithContext(parent)
for _, url := range urls {
url := url
g.Go(func() error {
return fetch(ctx, url)
})
}
if err := g.Wait(); err != nil {
return err
}
Если один из fetch упал, ctx отменяется, остальные горутины получают ctx.Done() и завершаются. Без errgroup пришлось бы вручную городить cancel-функцию и атомик с первой ошибкой.
Где errgroup идеален
Параллельные независимые операции
Это сценарий №1: запрашиваешь данные у трёх внешних сервисов параллельно, собираешь результат. Если хотя бы один упал — вся операция упала. Errgroup пишется в десять строк.
var (
profile *Profile
orders []Order
balance int64
)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
var err error
profile, err = profileSvc.Get(gctx, userID)
return err
})
g.Go(func() error {
var err error
orders, err = orderSvc.List(gctx, userID)
return err
})
g.Go(func() error {
var err error
balance, err = walletSvc.Balance(gctx, userID)
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
Никаких каналов, никаких атомиков. Понятно с первого взгляда.
Bounded параллелизм
Когда надо обработать миллион записей, но бить по внешнему API только 50 одновременных запросами:
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(50)
for _, item := range items {
item := item
g.Go(func() error {
return process(gctx, item)
})
}
return g.Wait()
Раньше для этого писали семафор на канале. Сейчас SetLimit делает то же самое, но без канал-юнглинга в ревью.
Pipeline-ы со стейджами
Каждый stage — отдельная g.Go:
in := make(chan Item)
out := make(chan Result)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(in)
return produce(gctx, in)
})
g.Go(func() error {
defer close(out)
return transform(gctx, in, out)
})
g.Go(func() error {
return consume(gctx, out)
})
return g.Wait()
Тут уже без каналов не обойтись — они переносят данные между стейджами. Но errgroup продолжает удобно собирать жизненный цикл всей конструкции.
Где errgroup мешает
Если первая ошибка не должна отменять остальные
Бывает, что нужно дойти до конца: например, попытаться загрузить 100 файлов и собрать список тех, что упали. Errgroup в стандартном виде остановит работу на первой ошибке. Можно обходить: возвращать nil из g.Go и собирать ошибки в shared-структуру вручную, но это пишется так:
var (
mu sync.Mutex
failed []FailedItem
)
for _, item := range items {
item := item
g.Go(func() error {
if err := process(item); err != nil {
mu.Lock()
failed = append(failed, FailedItem{Item: item, Err: err})
mu.Unlock()
}
return nil // не отменяем остальные
})
}
_ = g.Wait()
Вышло хуже, чем явный канал результатов и одна обычная WaitGroup. Здесь errgroup только мешает: его главная фича — отмена остальных — нам не нужна.
Когда нужен fan-out с разными типами результатов
Если параллельные задачи возвращают разные типы данных, и тебе нужно агрегировать их в общий аккумулятор по мере поступления — каналы лучше. Через g.Go можно только пробросить через замыкания, что быстро превращается в кашу.
Долгоживущие воркеры
Errgroup рассчитан на «запустили N задач, дождались результата». Если у тебя есть постоянный пул воркеров, который читает задачи из очереди, errgroup не подходит. Тут классический паттерн: chan Job, N горутин-консьюмеров, WaitGroup для ожидания. Errgroup усложнит код, потому что его подход «задача — горутина», а у тебя «воркер — горутина, задач много».
Каналы напрямую: когда они уместнее
Backpressure и ритмическая обработка
Канал с буфером — это естественный backpressure. Producer заблокируется, если consumer не успевает. С errgroup такой контроль приходится городить вручную.
jobs := make(chan Job, 100) // буфер 100
for i := 0; i < 10; i++ {
go worker(ctx, jobs)
}
for _, j := range jobsToDo {
select {
case jobs <- j:
case <-ctx.Done():
return ctx.Err()
}
}
close(jobs)
Если consumer тормозит, продьюсер блокируется на 100-й задаче. Хочешь больше — увеличь буфер. Хочешь без backpressure — убери буфер или переходи на errgroup с большим лимитом.
Динамический набор задач
Если задачи приходят в систему постоянно (RabbitMQ, Kafka, NATS), а не «нам нужно обработать ровно эти 1000 штук», каналы — естественная абстракция.
Антипаттерны, которые встречаю
g.Go без перехвата ctx
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return doStuff(ctx) // используем родительский ctx, а не gctx
})
Здесь смысл errgroup частично теряется: даже если другая задача упадёт и errgroup отменит свой ctx, эта задача не узнает. Используй gctx.
Захват переменной цикла
for _, item := range items {
g.Go(func() error {
return process(item) // race! item может быть последним
})
}
Со стандартом Go 1.22 это поправили — каждая итерация даёт свою копию. Но если у тебя более старая версия, без item := item ты получишь race condition. Linter это ловит, но не всегда.
Слишком большой g.SetLimit
Видел SetLimit(10000), потому что «у нас 10 тысяч задач». Это эквивалент полного отсутствия лимита и убийства внешнего API. Лимит должен соответствовать реальной capacity внешней системы. Чаще это число между 5 и 100.
Что выбирать
Простая мнемоника, которую держу в голове:
- Конечный набор параллельных задач, остановиться на первой ошибке — errgroup.
- Bounded параллелизм с лимитом — errgroup с SetLimit.
- Pipeline со стейджами — errgroup как «контейнер», каналы для передачи данных.
- Долгоживущие воркеры с очередью задач — каналы напрямую и WaitGroup.
- Нужно собрать ошибки от всех задач — каналы или явный slice с mutex, errgroup тут лишний.
Самое неудачное, что можно сделать — взять один подход и тащить его во все случаи. Тогда либо начинаешь городить семафоры на каналах ради лимита, либо превращаешь errgroup в самопальный воркер-пул. Проще честно использовать оба инструмента в зависимости от задачи.
Из реальной практики: когда я переписывал часть финтех-сервиса с самописных каналов на errgroup, размер кода в этой части упал процентов на 30, а количество багов с гонками — до нуля. Но другие куски, где был воркер-пул на потребление событий из NATS, я оставил как есть. Если бы стал тащить туда errgroup, получился бы фарш.