lenec ru

← все посты

errgroup vs каналы: как организовать параллельную работу в Go

14K

В Go одна и та же задача параллельной обработки может быть написана пятью разными способами. Каналы, WaitGroup, errgroup, semaphore, fan-out/fan-in. Когда ревьюешь чужой код, часто видишь все пять подходов в одном проекте, и это не всегда оправдано.

Расскажу, чем удобен errgroup, в каких случаях он избыточен, и где «классические» каналы лучше. Без идеологии, просто практика по итогам нескольких проектов.

Что даёт errgroup из коробки

Пакет golang.org/x/sync/errgroup — тонкая обёртка над sync.WaitGroup с тремя удобствами:

  • Любая горутина возвращает первую ошибку, остальные отменяются через ctx.
  • Не нужен явный WaitGroup — счётчик внутри.
  • Через SetLimit можно ограничить число одновременно выполняемых задач.

Минимальный пример:

g, ctx := errgroup.WithContext(parent)

for _, url := range urls {
	url := url
	g.Go(func() error {
		return fetch(ctx, url)
	})
}

if err := g.Wait(); err != nil {
	return err
}

Если один из fetch упал, ctx отменяется, остальные горутины получают ctx.Done() и завершаются. Без errgroup пришлось бы вручную городить cancel-функцию и атомик с первой ошибкой.

Где errgroup идеален

Параллельные независимые операции

Это сценарий №1: запрашиваешь данные у трёх внешних сервисов параллельно, собираешь результат. Если хотя бы один упал — вся операция упала. Errgroup пишется в десять строк.

var (
	profile  *Profile
	orders   []Order
	balance  int64
)

g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
	var err error
	profile, err = profileSvc.Get(gctx, userID)
	return err
})
g.Go(func() error {
	var err error
	orders, err = orderSvc.List(gctx, userID)
	return err
})
g.Go(func() error {
	var err error
	balance, err = walletSvc.Balance(gctx, userID)
	return err
})
if err := g.Wait(); err != nil {
	return nil, err
}

Никаких каналов, никаких атомиков. Понятно с первого взгляда.

Bounded параллелизм

Когда надо обработать миллион записей, но бить по внешнему API только 50 одновременных запросами:

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(50)

for _, item := range items {
	item := item
	g.Go(func() error {
		return process(gctx, item)
	})
}
return g.Wait()

Раньше для этого писали семафор на канале. Сейчас SetLimit делает то же самое, но без канал-юнглинга в ревью.

Pipeline-ы со стейджами

Каждый stage — отдельная g.Go:

in := make(chan Item)
out := make(chan Result)

g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
	defer close(in)
	return produce(gctx, in)
})
g.Go(func() error {
	defer close(out)
	return transform(gctx, in, out)
})
g.Go(func() error {
	return consume(gctx, out)
})
return g.Wait()

Тут уже без каналов не обойтись — они переносят данные между стейджами. Но errgroup продолжает удобно собирать жизненный цикл всей конструкции.

Где errgroup мешает

Если первая ошибка не должна отменять остальные

Бывает, что нужно дойти до конца: например, попытаться загрузить 100 файлов и собрать список тех, что упали. Errgroup в стандартном виде остановит работу на первой ошибке. Можно обходить: возвращать nil из g.Go и собирать ошибки в shared-структуру вручную, но это пишется так:

var (
	mu     sync.Mutex
	failed []FailedItem
)

for _, item := range items {
	item := item
	g.Go(func() error {
		if err := process(item); err != nil {
			mu.Lock()
			failed = append(failed, FailedItem{Item: item, Err: err})
			mu.Unlock()
		}
		return nil // не отменяем остальные
	})
}
_ = g.Wait()

Вышло хуже, чем явный канал результатов и одна обычная WaitGroup. Здесь errgroup только мешает: его главная фича — отмена остальных — нам не нужна.

Когда нужен fan-out с разными типами результатов

Если параллельные задачи возвращают разные типы данных, и тебе нужно агрегировать их в общий аккумулятор по мере поступления — каналы лучше. Через g.Go можно только пробросить через замыкания, что быстро превращается в кашу.

Долгоживущие воркеры

Errgroup рассчитан на «запустили N задач, дождались результата». Если у тебя есть постоянный пул воркеров, который читает задачи из очереди, errgroup не подходит. Тут классический паттерн: chan Job, N горутин-консьюмеров, WaitGroup для ожидания. Errgroup усложнит код, потому что его подход «задача — горутина», а у тебя «воркер — горутина, задач много».

Каналы напрямую: когда они уместнее

Backpressure и ритмическая обработка

Канал с буфером — это естественный backpressure. Producer заблокируется, если consumer не успевает. С errgroup такой контроль приходится городить вручную.

jobs := make(chan Job, 100) // буфер 100

for i := 0; i < 10; i++ {
	go worker(ctx, jobs)
}

for _, j := range jobsToDo {
	select {
	case jobs <- j:
	case <-ctx.Done():
		return ctx.Err()
	}
}
close(jobs)

Если consumer тормозит, продьюсер блокируется на 100-й задаче. Хочешь больше — увеличь буфер. Хочешь без backpressure — убери буфер или переходи на errgroup с большим лимитом.

Динамический набор задач

Если задачи приходят в систему постоянно (RabbitMQ, Kafka, NATS), а не «нам нужно обработать ровно эти 1000 штук», каналы — естественная абстракция.

Антипаттерны, которые встречаю

g.Go без перехвата ctx

g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
	return doStuff(ctx) // используем родительский ctx, а не gctx
})

Здесь смысл errgroup частично теряется: даже если другая задача упадёт и errgroup отменит свой ctx, эта задача не узнает. Используй gctx.

Захват переменной цикла

for _, item := range items {
	g.Go(func() error {
		return process(item) // race! item может быть последним
	})
}

Со стандартом Go 1.22 это поправили — каждая итерация даёт свою копию. Но если у тебя более старая версия, без item := item ты получишь race condition. Linter это ловит, но не всегда.

Слишком большой g.SetLimit

Видел SetLimit(10000), потому что «у нас 10 тысяч задач». Это эквивалент полного отсутствия лимита и убийства внешнего API. Лимит должен соответствовать реальной capacity внешней системы. Чаще это число между 5 и 100.

Что выбирать

Простая мнемоника, которую держу в голове:

  • Конечный набор параллельных задач, остановиться на первой ошибке — errgroup.
  • Bounded параллелизм с лимитом — errgroup с SetLimit.
  • Pipeline со стейджами — errgroup как «контейнер», каналы для передачи данных.
  • Долгоживущие воркеры с очередью задач — каналы напрямую и WaitGroup.
  • Нужно собрать ошибки от всех задач — каналы или явный slice с mutex, errgroup тут лишний.

Самое неудачное, что можно сделать — взять один подход и тащить его во все случаи. Тогда либо начинаешь городить семафоры на каналах ради лимита, либо превращаешь errgroup в самопальный воркер-пул. Проще честно использовать оба инструмента в зависимости от задачи.

Из реальной практики: когда я переписывал часть финтех-сервиса с самописных каналов на errgroup, размер кода в этой части упал процентов на 30, а количество багов с гонками — до нуля. Но другие куски, где был воркер-пул на потребление событий из NATS, я оставил как есть. Если бы стал тащить туда errgroup, получился бы фарш.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.