控制并发有两种经典的方式,一种是waitgroup, 另外一种是context。

什么是waitGourp

waitGroup可以控制多个goroutine同时完成

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(2)
  4. go func() {
  5. time.Sleep(2*time.Second)
  6. fmt.Println("1号完成")
  7. wg.Done()
  8. }()
  9. go func() {
  10. time.Sleep(2*time.Second)
  11. fmt.Println("2号完成")
  12. wg.Done()
  13. }()
  14. wg.Wait()
  15. fmt.Println("好了,大家都干完了,放工")
  16. }

一个很简单的例子,一定要例子中两个goroutine同时做完,才算是完成,先做好的需要等其他未完成的,所有的goroutine要都全部完成才可以。

这是一种控制并发的方式,这种尤其适用用于,好多个goroutine协同做一件事情的时候,因为每个goroutine做的都是这件事情的一部分,只有全部的goroutine都完成,这件事情才算是完成,这就是等待的方式。

在实际的业务中,我们可能会有一种场景,需要我们主动的通知某一个goroutine结束,比如我们开启一个后台goroutine一直做监控。现在不需要了,就要通知这个goroutine结束,不然它会一直跑,就泄漏了。

chan通知

我们都知一个goroutine启动后,我们是无法控制他的。大部分情况是等待它自己结束,那么如果这个goroutine是一个不会自己结束的goroutine呢?

这种情况下,一种傻瓜式的方法是监控全局变量,其他地方通过修改这个变量完成结束通知,然后后台goroutine不停的检查这个变量。如果发现被通知关闭了,就自我结束。

但是我们首先要保证这个变量是线程安全的,基于此,更好的方式的使用:chan + select

  1. func main() {
  2. stop := make(chan bool)
  3. go func() {
  4. for {
  5. select {
  6. case <-stop:
  7. fmt.Println("监控退出,停止了...")
  8. return
  9. default:
  10. fmt.Println("goroutine监控中...")
  11. time.Sleep(2 * time.Second)
  12. }
  13. }
  14. }()
  15. time.Sleep(10 * time.Second)
  16. fmt.Println("可以了,通知监控停止")
  17. stop<- true
  18. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  19. time.Sleep(5 * time.Second)
  20. }

这种chan + select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性。如果有很多goroutine都需要控制结束怎么办?如果这些goroutine又衍生了其他更多的goroutine怎么办?如果一层层无穷尽的goroutine呢?这就是非常复杂了,即使我们定义很多的chan也很难解决这个问题,因为goroutine的关系就导致了这种场景非常复杂。

初始context

上面说的这种场景是存在的,比如一个网络请求request,每个request都需要开启一个goroutine做一些事情。这些goroutine又可能会开启其他goroutine。所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的。这就是go语言为我们提供的contex,称为上下文非常贴切,它就是goroutine的上下文。

  1. func main() {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. go func(ctx context.Context) {
  4. for {
  5. select {
  6. case <-ctx.Done():
  7. fmt.Println("监控退出,停止了...")
  8. return
  9. default:
  10. fmt.Println("goroutine监控中...")
  11. time.Sleep(2 * time.Second)
  12. }
  13. }
  14. }(ctx)
  15. time.Sleep(10 * time.Second)
  16. fmt.Println("可以了,通知监控停止")
  17. cancel()
  18. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  19. time.Sleep(5 * time.Second)
  20. }

context.Backgroup(),返回一个空的Context。这个空的Context一般用于整个Context树的根节点,然后我们使用context.WithCancel(parent)函数,创建一个可以取消的子Context,然后当作参数传给goroutine适应,这样就可以使用这子context跟踪这个goroutine。

Context 控制多个goroutine

  1. func main() {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. go watch(ctx,"【监控1】")
  4. go watch(ctx,"【监控2】")
  5. go watch(ctx,"【监控3】")
  6. time.Sleep(10 * time.Second)
  7. fmt.Println("可以了,通知监控停止")
  8. cancel()
  9. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  10. time.Sleep(5 * time.Second)
  11. }
  12. func watch(ctx context.Context, name string) {
  13. for {
  14. select {
  15. case <-ctx.Done():
  16. fmt.Println(name,"监控退出,停止了...")
  17. return
  18. default:
  19. fmt.Println(name,"goroutine监控中...")
  20. time.Sleep(2 * time.Second)
  21. }
  22. }
  23. }

示例中启动3个监控goroutine进行不断低监控,每一个使用了Context进行跟踪,当我们使用context函数通知取消时,这3个goroutine都会被结束。这就是context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个context或者衍生的子context都会收到通知,这时候就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。

Context接口

context接口定义比较简洁,我们看下这个接口的方法:

  1. type Context interface {
  2. Deadline() (deadline time.Time, ok bool)
  3. Done() <-chan struct{}
  4. Err() error
  5. Value(key interface{}) interface{}
  6. }

Deadline方法是获取设置的截止时间的意思,第一个返回是截止时间,到了这个时间,context会自动发起取消请求。第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。

Done方法返回一个只读chan, 类型为struct{}。我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求。我们通过Done方法收到了这个信号后,就应该清理操作,让后退出goroutine,释放资源。

Err方法返回取消的错误原因,因为什么context被取消。

Value方法取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。

以上四个方法中常用的就是Done了,如果context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到context取消的信号了,以下是这个方法的经典用法。

  1. func Stream(ctx context.Context, out chan<- Value) error {
  2. for {
  3. v, err := DoSomething(ctx)
  4. if err != nil {
  5. return err
  6. }
  7. select {
  8. case <-ctx.Done():
  9. return ctx.Err()
  10. case out <- v:
  11. }
  12. }
  13. }

context接口并不需要我们实现,go内置已经帮我们实现了2个,代码中最开始都是以这两个内置的作为最顶层的partent context,衍生出更多的Contezt。

  1. var (
  2. background = new(emptyCtx)
  3. todo = new(emptyCtx)
  4. )
  5. func Background() Context {
  6. return background
  7. }
  8. func TODO() Context {
  9. return todo
  10. }

一个是backgourd, 主要用于main函数,初始化以及测试代码中,作为context这个树结构的最顶层的context, 也就是context。

他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的context。

  1. type emptyCtx int
  2. func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
  3. return
  4. }
  5. func (*emptyCtx) Done() <-chan struct{} {
  6. return nil
  7. }
  8. func (*emptyCtx) Err() error {
  9. return nil
  10. }
  11. func (*emptyCtx) Value(key interface{}) interface{} {
  12. return nil
  13. }

这就是emptyCtx实现Context接口的方法,可以看到,这些方法什么都没有做,直接返回都是nil或者零值。

树状结构图

  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2332713/1625708215973-f7673ff6-bf3b-4d21-a970-5bb81edfe7f0.png#clientId=u09a8657d-833f-4&from=paste&height=197&id=u921369cc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=276&originWidth=702&originalType=binary&ratio=1&size=151738&status=done&style=none&taskId=ucef233ea-639f-401f-9df3-7762cd6f61b&width=500)<br />可以看出,cancelCtx也是一棵树,当触发cancel时,会cancel本结点和其子树的所有cancelCtx。

参考

Go语言实战笔记(二十)| Go Context
Go Context的踩坑经历