前言
本文介绍 conc - 一个更好的 go 并发库, sourcegraph 在日常开发中使用go原生并发出现了问题,由此开发了 conc ,相比标准并发代码更优雅,代码更少,下面展示一个例子,可以看出代码减少了许多.
type propagatedPanic struct {
val any
stack []byte
}
func main() {
done := make(chan *propagatedPanic)
go func() {
defer func() {
if v := recover(); v != nil {
done <- &propagatedPanic{
val: v,
stack: debug.Stack(),
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic()
}()
if val := <-done; val != nil {
panic(val)
}
}
// conc
func main() {
var wg conc.WaitGroup
wg.Go(doSomethingThatMightPanic)
// panics with a nice stacktrace
wg.Wait()
}
安装
使用以下命令进行安装: go get github.com/sourcegraph/conc
接下来对 conc 如何使用进行介绍。
介绍
conc.WaitGroup
conc.WaitGroup 与标准库 sync.WaitGroup 的区别在于 conc.WaitGroup中子goroutine中的panic会被传递给Wait方法的调用方,避免去 recover goroutine的panic.
例子:
func main() {
var count atomic.Int64
var wg conc.WaitGroup
for i := 1; i < 100; i++ {
wg.Go(func() {
count.Add(1)
})
}
wg.Wait()
fmt.Println(count.Load())
}
如果想要 recover 某个 goroutine 发生的panic,可以使用 WaitAndRecover 方法:
func main() {
var count atomic.Int64
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(func() {
if i == 7 {
panic("bad thing")
}
count.Add(1)
})
}
wg.WaitAndRecover()
fmt.Println(count.Load())
}
goroutine 池
Pool 是用于处理并发任务的 goroutine 池,Pool中的goroutine是延迟启动的,所以创建一个新的Pool是廉价的。产生的 goroutine 永远不会多于提交的任务。池是高效的,但不是零成本。它不应该用于非常短的任务。启动和拆卸的开销约为 1μs,每个任务的开销约为 300ns。
例子:
func main() {
p := pool.New().WithMaxGoroutines(3)
for i := 0; i < 5; i++ {
p.Go(func() {
fmt.Println("conc")
})
}
p.Wait()
}
使用WithContext可以创建一个传递Context的Pool,通过这个父Context来控制池中的goroutine。默认情况下,在取消父Context之前,Pool中的Context不会取消。如果需要在出现 panic 或错误时取消 context 可以通过配置 WithCancelOnError来实现,例子:
func main() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
ResultPool是一个执行返回泛型结果的任务池。使用Go()在池中执行任务,然后由Wait()返回任务的结果。例子:
p := pool.NewWithResults[int]()
for i := 0; i < 10; i++ {
i := i
p.Go(func() int {
return i * 2
})
}
res := p.Wait()
// Result order is nondeterministic, so sort them first
sort.Ints(res)
fmt.Println(res)
Stream
Pool 执行任务返回的顺序是无序的,想要有序的结果可以使用 Stream 。要使用Stream,您需要提交一定数量的 Task,每个任务都会返回一个回调。每个任务都将在任务池中同时执行,并且回调将按照任务提交的顺序顺序执行。 任务提交完需使用 Wait() 方法等待任务执行完并传递 panic.
同Pool一样,Stream也不适用于非常短的任务。启动和拆卸会增加几微秒的开销,每个任务的开销大约是500ns。
例子:
func main() {
times := []int{20, 52, 16, 45, 4, 80}
stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
}
小结
conc 相较于标准库的并发处理库,代码更加简洁,使用更加方便,是一套非常适合初学者的并发工具。