context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。
context 用来解决 goroutine 之间退出通知、元数据传递的功能。
控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context
Value函数并没有任何保证,编译器不会检查传进来的参数是否是合理。
什么是WaitGroup
它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。
func main() {var wg sync.WaitGroupwg.Add(2)go func() {time.Sleep(2*time.Second)fmt.Println("first")wg.Done()}()go func() {time.Sleep(2*time.Second)fmt.Println("second")wg.Done()}()wg.Wait()fmt.Println("all done")}
一定要例子中的2个goroutine同时做完,才算是完成
可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如开启一个后台goroutine一直做事情,比如监控,定时任务等现在不需要了,就需要通知这个goroutine结束
func main() {stop := make(chan bool)go func() {for {select {case <-stop:fmt.Println("break")returndefault:fmt.Println("watch ing")time.Sleep(1 * time.Second)}}}()time.Sleep(5 * time.Second)fmt.Println("stop")stop <- truefmt.Println(5 * time.Second)}
定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。
有了以上的逻辑,就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。
如果有一层层的无穷尽的goroutine,不太好控制
func main() {ctx, cancel := context.WithCancel(context.Background())go func(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("stop,break...")returndefault:fmt.Println("goroutine watching...")time.Sleep(2 * time.Second)}}}(ctx)time.Sleep(10 * time.Second)fmt.Println("all done")cancel()// 为了检测监控过是否停止,如果没有监控输出,就表示停止了time.Sleep(5 * time.Second)}
重写,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。
context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。
在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。
那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。
Context控制多个goroutine
func main() {ctx, cancel := context.WithCancel(context.Background())go watch(ctx,"【监控1】")go watch(ctx,"【监控2】")go watch(ctx,"【监控3】")time.Sleep(10 * time.Second)fmt.Println("可以了,通知监控停止")cancel()//为了检测监控过是否停止,如果没有监控输出,就表示停止了time.Sleep(5 * time.Second)}func watch(ctx context.Context, name string) {for {select {case <-ctx.Done():fmt.Println(name,"监控退出,停止了...")returndefault:fmt.Println(name,"goroutine监控中...")time.Sleep(2 * time.Second)}}}
启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
type Context interface {Deadline() (deadline time.Time, ok bool)Done() <-chan struct{}Err() errorValue(key interface{}) interface{}}
Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。
Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。
Err方法返回取消的错误原因,因为什么Context被取消。
Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。
如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。
func Stream(ctx context.Context, out chan<- Value) error {for {v, err := DoSomething(ctx)if err != nil {return err}select {case <-ctx.Done():return ctx.Err()case out <- v:}}}
Context的继承衍生
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)func WithValue(parent Context, key, val interface{}) Context
这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。
通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。
WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。
WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到
大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。
WithValue传递元数据
var key string="name"func main() {ctx, cancel := context.WithCancel(context.Background())//附加值valueCtx:=context.WithValue(ctx,key,"【监控1】")go watch(valueCtx)time.Sleep(10 * time.Second)fmt.Println("可以了,通知监控停止")cancel()//为了检测监控过是否停止,如果没有监控输出,就表示停止了time.Sleep(5 * time.Second)}func watch(ctx context.Context) {for {select {case <-ctx.Done()://取出值fmt.Println(ctx.Value(key),"监控退出,停止了...")returndefault://取出值fmt.Println(ctx.Value(key),"goroutine监控中...")time.Sleep(2 * time.Second)}}}
通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。
我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。
这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。
记住,使用WithValue传值,一般是必须的值,不要什么值都传递。
package mainimport ("context""fmt")func main() {ctx := context.Background()process(ctx)ctx = context.WithValue(ctx, "traceId", "rolle")process(ctx)}func process(ctx context.Context) {traceId, ok := ctx.Value("traceId").(string)if ok {fmt.Printf("process over. trace_id=%s\n", traceId)} else {fmt.Printf("process over. no trace_id\n")}}
运行结果
process over. no trace_idprocess over. trace_id=rolle
func main() {ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)defer cancel()go task(ctx)time.Sleep(time.Second * 10)}func task(ctx context.Context) {ch := make(chan struct{}, 0)go func() {// 模拟4秒耗时任务time.Sleep(time.Second * 4)ch <- struct{}{}}()select {case <-ch:fmt.Println("done")case <-ctx.Done():fmt.Println("timeout")}}
Context 使用原则
- 不要把Context放在结构体中,要以参数的方式传递
- 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
- 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
- Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
- Context是线程安全的,可以放心的在多个goroutine中传递
超时控制
- 通过context的WithTimeout设置一个有效时间为800毫秒的context。
- 该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。
- 有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?
这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的,对于上游传递过来的context还剩多少时间,我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。
注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。
总结
上面的超时控制是搭配使用了ctx.Done和time.After。
Done通道负责监听context啥时候完事,如果在time.After设置的超时时间到了,你还没完事,那我就不等了,执行超时后的逻辑代码。
func AsyncCall() {ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))defer cancel()go func(ctx context.Context) {// 发送HTTP请求}()select {case <-ctx.Done():fmt.Println("call successfully!!!")returncase <-time.After(time.Duration(time.Millisecond * 900)):fmt.Println("timeout!!!")return}}
使用通道
func AsyncCall() {ctx := context.Background()done := make(chan struct{}, 1)go func(ctx context.Context) {// 发送HTTP请求done <- struct{}{}}()select {case <-done:fmt.Println("call successfully!!!")returncase <-time.After(time.Duration(800 * time.Millisecond)):fmt.Println("timeout!!!")return}}
