context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。

context 用来解决 goroutine 之间退出通知元数据传递的功能。

控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context

Value函数并没有任何保证,编译器不会检查传进来的参数是否是合理。

什么是WaitGroup

它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(2)
  4. go func() {
  5. time.Sleep(2*time.Second)
  6. fmt.Println("first")
  7. wg.Done()
  8. }()
  9. go func() {
  10. time.Sleep(2*time.Second)
  11. fmt.Println("second")
  12. wg.Done()
  13. }()
  14. wg.Wait()
  15. fmt.Println("all done")
  16. }

一定要例子中的2个goroutine同时做完,才算是完成

可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如开启一个后台goroutine一直做事情,比如监控,定时任务等现在不需要了,就需要通知这个goroutine结束

  1. func main() {
  2. stop := make(chan bool)
  3. go func() {
  4. for {
  5. select {
  6. case <-stop:
  7. fmt.Println("break")
  8. return
  9. default:
  10. fmt.Println("watch ing")
  11. time.Sleep(1 * time.Second)
  12. }
  13. }
  14. }()
  15. time.Sleep(5 * time.Second)
  16. fmt.Println("stop")
  17. stop <- true
  18. fmt.Println(5 * time.Second)
  19. }

定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。
有了以上的逻辑,就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。

如果有一层层的无穷尽的goroutine,不太好控制

  1. func main() {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. go func(ctx context.Context) {
  4. for {
  5. select {
  6. case <-ctx.Done():
  7. fmt.Println("stop,break...")
  8. return
  9. default:
  10. fmt.Println("goroutine watching...")
  11. time.Sleep(2 * time.Second)
  12. }
  13. }
  14. }(ctx)
  15. time.Sleep(10 * time.Second)
  16. fmt.Println("all done")
  17. cancel()
  18. // 为了检测监控过是否停止,如果没有监控输出,就表示停止了
  19. time.Sleep(5 * time.Second)
  20. }

重写,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。
context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。
在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。
那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。

Context控制多个goroutine

  1. func main() {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. go watch(ctx,"【监控1】")
  4. go watch(ctx,"【监控2】")
  5. go watch(ctx,"【监控3】")
  6. time.Sleep(10 * time.Second)
  7. fmt.Println("可以了,通知监控停止")
  8. cancel()
  9. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  10. time.Sleep(5 * time.Second)
  11. }
  12. func watch(ctx context.Context, name string) {
  13. for {
  14. select {
  15. case <-ctx.Done():
  16. fmt.Println(name,"监控退出,停止了...")
  17. return
  18. default:
  19. fmt.Println(name,"goroutine监控中...")
  20. time.Sleep(2 * time.Second)
  21. }
  22. }
  23. }

启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。

  1. type Context interface {
  2. Deadline() (deadline time.Time, ok bool)
  3. Done() <-chan struct{}
  4. Err() error
  5. Value(key interface{}) interface{}
  6. }

Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。

Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。

Err方法返回取消的错误原因,因为什么Context被取消。

Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。

如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。

  1. func Stream(ctx context.Context, out chan<- Value) error {
  2. for {
  3. v, err := DoSomething(ctx)
  4. if err != nil {
  5. return err
  6. }
  7. select {
  8. case <-ctx.Done():
  9. return ctx.Err()
  10. case out <- v:
  11. }
  12. }
  13. }

Context的继承衍生

  1. func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  2. func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
  3. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  4. func WithValue(parent Context, key, val interface{}) Context

这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。
通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。

WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。
WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到

大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。

WithValue传递元数据

  1. var key string="name"
  2. func main() {
  3. ctx, cancel := context.WithCancel(context.Background())
  4. //附加值
  5. valueCtx:=context.WithValue(ctx,key,"【监控1】")
  6. go watch(valueCtx)
  7. time.Sleep(10 * time.Second)
  8. fmt.Println("可以了,通知监控停止")
  9. cancel()
  10. //为了检测监控过是否停止,如果没有监控输出,就表示停止了
  11. time.Sleep(5 * time.Second)
  12. }
  13. func watch(ctx context.Context) {
  14. for {
  15. select {
  16. case <-ctx.Done():
  17. //取出值
  18. fmt.Println(ctx.Value(key),"监控退出,停止了...")
  19. return
  20. default:
  21. //取出值
  22. fmt.Println(ctx.Value(key),"goroutine监控中...")
  23. time.Sleep(2 * time.Second)
  24. }
  25. }
  26. }

通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。
我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。
这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。
记住,使用WithValue传值,一般是必须的值,不要什么值都传递。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. func main() {
  7. ctx := context.Background()
  8. process(ctx)
  9. ctx = context.WithValue(ctx, "traceId", "rolle")
  10. process(ctx)
  11. }
  12. func process(ctx context.Context) {
  13. traceId, ok := ctx.Value("traceId").(string)
  14. if ok {
  15. fmt.Printf("process over. trace_id=%s\n", traceId)
  16. } else {
  17. fmt.Printf("process over. no trace_id\n")
  18. }
  19. }

运行结果

  1. process over. no trace_id
  2. process over. trace_id=rolle
  1. func main() {
  2. ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
  3. defer cancel()
  4. go task(ctx)
  5. time.Sleep(time.Second * 10)
  6. }
  7. func task(ctx context.Context) {
  8. ch := make(chan struct{}, 0)
  9. go func() {
  10. // 模拟4秒耗时任务
  11. time.Sleep(time.Second * 4)
  12. ch <- struct{}{}
  13. }()
  14. select {
  15. case <-ch:
  16. fmt.Println("done")
  17. case <-ctx.Done():
  18. fmt.Println("timeout")
  19. }
  20. }

Context 使用原则

  1. 不要把Context放在结构体中,要以参数的方式传递
  2. 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
  3. 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
  4. Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
  5. Context是线程安全的,可以放心的在多个goroutine中传递

超时控制

  1. 通过context的WithTimeout设置一个有效时间为800毫秒的context。
  2. 该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。
  3. 有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?

这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的,对于上游传递过来的context还剩多少时间,我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。
注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。
总结

上面的超时控制是搭配使用了ctx.Done和time.After。
Done通道负责监听context啥时候完事,如果在time.After设置的超时时间到了,你还没完事,那我就不等了,执行超时后的逻辑代码。

  1. func AsyncCall() {
  2. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
  3. defer cancel()
  4. go func(ctx context.Context) {
  5. // 发送HTTP请求
  6. }()
  7. select {
  8. case <-ctx.Done():
  9. fmt.Println("call successfully!!!")
  10. return
  11. case <-time.After(time.Duration(time.Millisecond * 900)):
  12. fmt.Println("timeout!!!")
  13. return
  14. }
  15. }

使用通道

  1. func AsyncCall() {
  2. ctx := context.Background()
  3. done := make(chan struct{}, 1)
  4. go func(ctx context.Context) {
  5. // 发送HTTP请求
  6. done <- struct{}{}
  7. }()
  8. select {
  9. case <-done:
  10. fmt.Println("call successfully!!!")
  11. return
  12. case <-time.After(time.Duration(800 * time.Millisecond)):
  13. fmt.Println("timeout!!!")
  14. return
  15. }
  16. }