context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。
context 用来解决 goroutine 之间退出通知
、元数据传递
的功能。
控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context
Value函数并没有任何保证,编译器不会检查传进来的参数是否是合理。
什么是WaitGroup
它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("first")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("second")
wg.Done()
}()
wg.Wait()
fmt.Println("all done")
}
一定要例子中的2个goroutine同时做完,才算是完成
可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如开启一个后台goroutine一直做事情,比如监控,定时任务等现在不需要了,就需要通知这个goroutine结束
func main() {
stop := make(chan bool)
go func() {
for {
select {
case <-stop:
fmt.Println("break")
return
default:
fmt.Println("watch ing")
time.Sleep(1 * time.Second)
}
}
}()
time.Sleep(5 * time.Second)
fmt.Println("stop")
stop <- true
fmt.Println(5 * time.Second)
}
定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。
有了以上的逻辑,就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。
如果有一层层的无穷尽的goroutine,不太好控制
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("stop,break...")
return
default:
fmt.Println("goroutine watching...")
time.Sleep(2 * time.Second)
}
}
}(ctx)
time.Sleep(10 * time.Second)
fmt.Println("all done")
cancel()
// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
重写,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。
context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。
在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。
那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。
Context控制多个goroutine
func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"【监控1】")
go watch(ctx,"【监控2】")
go watch(ctx,"【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"监控退出,停止了...")
return
default:
fmt.Println(name,"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。
Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。
Err方法返回取消的错误原因,因为什么Context被取消。
Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。
如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
Context的继承衍生
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。
通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。
WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。
WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到
大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。
WithValue传递元数据
var key string="name"
func main() {
ctx, cancel := context.WithCancel(context.Background())
//附加值
valueCtx:=context.WithValue(ctx,key,"【监控1】")
go watch(valueCtx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
//取出值
fmt.Println(ctx.Value(key),"监控退出,停止了...")
return
default:
//取出值
fmt.Println(ctx.Value(key),"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。
我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。
这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。
记住,使用WithValue传值,一般是必须的值,不要什么值都传递。
package main
import (
"context"
"fmt"
)
func main() {
ctx := context.Background()
process(ctx)
ctx = context.WithValue(ctx, "traceId", "rolle")
process(ctx)
}
func process(ctx context.Context) {
traceId, ok := ctx.Value("traceId").(string)
if ok {
fmt.Printf("process over. trace_id=%s\n", traceId)
} else {
fmt.Printf("process over. no trace_id\n")
}
}
运行结果
process over. no trace_id
process over. trace_id=rolle
func main() {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
go task(ctx)
time.Sleep(time.Second * 10)
}
func task(ctx context.Context) {
ch := make(chan struct{}, 0)
go func() {
// 模拟4秒耗时任务
time.Sleep(time.Second * 4)
ch <- struct{}{}
}()
select {
case <-ch:
fmt.Println("done")
case <-ctx.Done():
fmt.Println("timeout")
}
}
Context 使用原则
- 不要把Context放在结构体中,要以参数的方式传递
- 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
- 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
- Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
- Context是线程安全的,可以放心的在多个goroutine中传递
超时控制
- 通过context的WithTimeout设置一个有效时间为800毫秒的context。
- 该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。
- 有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?
这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的,对于上游传递过来的context还剩多少时间,我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。
注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。
总结
上面的超时控制是搭配使用了ctx.Done和time.After。
Done通道负责监听context啥时候完事,如果在time.After设置的超时时间到了,你还没完事,那我就不等了,执行超时后的逻辑代码。
func AsyncCall() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
defer cancel()
go func(ctx context.Context) {
// 发送HTTP请求
}()
select {
case <-ctx.Done():
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(time.Millisecond * 900)):
fmt.Println("timeout!!!")
return
}
}
使用通道
func AsyncCall() {
ctx := context.Background()
done := make(chan struct{}, 1)
go func(ctx context.Context) {
// 发送HTTP请求
done <- struct{}{}
}()
select {
case <-done:
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(800 * time.Millisecond)):
fmt.Println("timeout!!!")
return
}
}