源码地址:https://github.com/golang/go/blob/master/src/context/context.go
Context接口方法
type Context interface {Deadline() (deadline time.Time, ok bool)Done() <-chan struct{}Err() errorValue(key interface{}) interface{}}
Deadline()
该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此时deadline为一个初始值的time.Time值
Done()
该方法返回一个channel,需要在select-case语句中使用,如”case <-context.Done():”。
当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;
当context还未关闭时,Done()返回nil。
Err()
该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如Deadline context,关闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同:
- 因deadline关闭:“context deadline exceeded”;
- 因主动关闭: “context canceled”。
当context关闭后,Err()返回context的关闭原因;
当context还未关闭时,Err()返回nil;
Value()
有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息。Value()方法就是用于此种类型的context,该方法根据key值查询map中的value。
canceler接口
type canceler interface {cancel(removeFromParent bool, err error)Done() <-chan struct{}}
emptyCtx
context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现了Context,本身不包含任何值,仅用于其他context的父节点。
emptyCtx类型定义如下代码所示:
type emptyCtx intfunc (*emptyCtx) Deadline() (deadline time.Time, ok bool) {return}func (*emptyCtx) Done() <-chan struct{} {return nil}func (*emptyCtx) Err() error {return nil}func (*emptyCtx) Value(key interface{}) interface{} {return nil}
background/todo
context包中定义了一个公用的emptCtx和todo全局变量,,可以使用context.Background()和TODO()获取它,源码如下:
var (background = new(emptyCtx)todo = new(emptyCtx))func Background() Context {return background}func TODO() Context {return todo}
cancelCtx
type cancelCtx struct {Contextmu sync.Mutex // protects following fieldsdone chan struct{} // created lazily, closed by first cancel callchildren map[canceler]struct{} // set to nil by the first cancel callerr error // set to non-nil by the first cancel call}
children中记录了由此context派生的所有child,此context被cancel时会把其中的所有child都cancel掉。
cancelCtx与deadline和value无关,所以只需要实现Done()和Err()外露接口即可。
Done()实现
按照Context定义,Done()接口只需要返回一个只读channel,对于cancelCtx来说只需要返回成员变量done。done的创建采用“懒汉式”创建
func (c *cancelCtx) Done() <-chan struct{} {c.mu.Lock()if c.done == nil {c.done = make(chan struct{})}d := c.donec.mu.Unlock()return d}
由于cancelCtx没有指定初始化函数,所以cancelCtx.done可能还未分配,所以需要考虑初始化。
cancelCtx.done会在context被cancel时关闭,所以cancelCtx.done的值一般经历如下三个阶段:
nil —> chan struct{} —> closed chan。
Err()实现
按照Context定义,Err()只需要返回一个error告知context被关闭的原因。对于cancelCtx来说只需要返回成员变量err即可。
还是直接看下源码:
func (c *cancelCtx) Err() error {c.mu.Lock()err := c.errc.mu.Unlock()return err}
cancelCtx.err默认是nil,在context被cancel时指定一个error变量:var Canceled = errors.New("context canceled")。
cancel()实现
cancel()内部方法是理解cancelCtx的最关键的方法,其作用是关闭自己和其后代,其后代存储在cancelCtx.children的map中,其中key值即后代对象,value值并没有意义,这里使用map只是为了方便查询而已。
cancel方法实现伪代码如下所示:
func (c *cancelCtx) cancel(removeFromParent bool, err error) {c.mu.Lock()c.err = err //设置一个error,说明关闭原因close(c.done) //将channel关闭,以此通知派生的contextfor child := range c.children { //遍历所有children,逐个调用cancel方法child.cancel(false, err)}c.children = nilc.mu.Unlock()if removeFromParent { //正常情况下,需要将自己从parent删除removeChild(c.Context, c)}}
实际上,WithCancel()返回的第二个用于cancel context的方法正是此cancel()。
func parentCancelCtx(parent Context) (*cancelCtx, bool) {done := parent.Done()if done == closedchan || done == nil {return nil, false}p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)if !ok {return nil, false}p.mu.Lock()ok = p.done == donep.mu.Unlock()if !ok {return nil, false}return p, true}func removeChild(parent Context, child canceler) {p, ok := parentCancelCtx(parent)if !ok {return}p.mu.Lock()if p.children != nil {delete(p.children, child)}p.mu.Unlock()}
WithCancel()方法实现
WithCancel()方法作了三件事:
- 初始化一个cancelCtx实例
- 将cancelCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
- 返回cancelCtx实例和cancel()方法
其实现源码如下所示:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {c := newCancelCtx(parent)propagateCancel(parent, &c) //将自身添加到父节点return &c, func() { c.cancel(true, Canceled) }}
propagateCancel 函数
func propagateCancel(parent Context, child canceler) {done := parent.Done()// 父节点是个空节点if done == nil {return // parent is never canceled}select {case <-done:// parent is already canceledchild.cancel(false, parent.Err())returndefault:}if p, ok := parentCancelCtx(parent); ok {p.mu.Lock()if p.err != nil {// parent has already been canceledchild.cancel(false, p.err)} else {if p.children == nil {p.children = make(map[canceler]struct{})}p.children[child] = struct{}{}}p.mu.Unlock()} else {atomic.AddInt32(&goroutines, +1)go func() {select {case <-parent.Done():child.cancel(false, parent.Err())case <-child.Done():}}()}}
- 如果父节点也支持cancel,也就是说其父节点肯定有children成员,那么把新context添加到children里即可;
- 如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到children里;
- 如果所有的父节点均不支持cancel,则启动一个协程等待父节点结束,然后再把当前context结束。
典型使用案例
一个典型的使用cancel context的例子如下所示:
package mainimport ("fmt""time""context")func HandelRequest(ctx context.Context) {go WriteRedis(ctx)go WriteDatabase(ctx)for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running")time.Sleep(2 * time.Second)}}}func WriteRedis(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteRedis Done.")returndefault:fmt.Println("WriteRedis running")time.Sleep(2 * time.Second)}}}func WriteDatabase(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteDatabase Done.")returndefault:fmt.Println("WriteDatabase running")time.Sleep(2 * time.Second)}}}func main() {ctx, cancel := context.WithCancel(context.Background())go HandelRequest(ctx)time.Sleep(5 * time.Second)fmt.Println("It's time to stop all sub goroutines!")cancel()//Just for test whether sub goroutines exit or nottime.Sleep(5 * time.Second)}
上面代码中协程HandelRequest()用于处理某个请求,其又会创建两个协程:WriteRedis()、WriteDatabase(),main协程创建创建context,并把context在各子协程间传递,main协程在适当的时机可以cancel掉所有子协程。
程序输出如下所示:
HandelRequest runningWriteDatabase runningWriteRedis runningHandelRequest runningWriteDatabase runningWriteRedis runningHandelRequest runningWriteDatabase runningWriteRedis runningIt's time to stop all sub goroutines!WriteDatabase Done.HandelRequest Done.WriteRedis Done.
timerCtx
type timerCtx struct {cancelCtxtimer *time.Timer // Under cancelCtx.mu.deadline time.Time}
timerCtx在cancelCtx基础上增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器。
由此,衍生出WithDeadline()和WithTimeout()。实现上这两种类型实现原理一样,只不过使用语境不一样:
- deadline: 指定最后期限,比如context将2018.10.20 00:00:00之时自动结束
- timeout: 指定最长存活时间,比如context将在30s后结束。
对于接口来说,timerCtx在cancelCtx基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。
Deadline()实现
Deadline()方法仅仅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法设置的。
cancel()接口实现
cancel()方法基本继承cancelCtx,只需要额外把timer关闭。
timerCtx被关闭后,timerCtx.cancelCtx.err将会存储关闭原因:
- 如果deadline到来之前手动关闭,则关闭原因与cancelCtx显示一致;
- 如果deadline到来时自动关闭,则原因为:”context deadline exceeded”
WithDeadline()方法实现
WithDeadline()方法实现步骤如下:
- 初始化一个timerCtx实例
- 将timerCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
- 启动定时器,定时器到期后会自动cancel本context
- 返回timerCtx实例和cancel()方法
也就是说,timerCtx类型的context不仅支持手动cancel,也会在定时器到来后自动cancel。
WithTimeout()方法实现
WithTimeout()实际调用了WithDeadline,二者实现原理一致。
看代码会非常清晰:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {return WithDeadline(parent, time.Now().Add(timeout))}
典型使用案例
下面例子中使用WithTimeout()获得一个context并在其子协程中传递:
package mainimport ("fmt""time""context")func HandelRequest(ctx context.Context) {go WriteRedis(ctx)go WriteDatabase(ctx)for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running")time.Sleep(2 * time.Second)}}}func WriteRedis(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteRedis Done.")returndefault:fmt.Println("WriteRedis running")time.Sleep(2 * time.Second)}}}func WriteDatabase(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteDatabase Done.")returndefault:fmt.Println("WriteDatabase running")time.Sleep(2 * time.Second)}}}func main() {ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)go HandelRequest(ctx)time.Sleep(10 * time.Second)}
主协程中创建一个10s超时的context,并将其传递给子协程,10s自动关闭context。程序输出如下:
HandelRequest runningWriteRedis runningWriteDatabase runningHandelRequest runningWriteRedis runningWriteDatabase runningHandelRequest runningWriteRedis runningWriteDatabase runningHandelRequest Done.WriteDatabase Done.WriteRedis Done.
valueCtx
type valueCtx struct {Contextkey, val interface{}}func stringify(v interface{}) string {switch s := v.(type) {case stringer:return s.String()case string:return s}return "<not Stringer>"}func (c *valueCtx) String() string {return contextName(c.Context) + ".WithValue(type " +reflectlite.TypeOf(c.key).String() +", val " + stringify(c.val) + ")"}func (c *valueCtx) Value(key interface{}) interface{} {if c.key == key {return c.val}return c.Context.Value(key)}
valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。
由于valueCtx既不需要cancel,也不需要deadline,那么只需要实现Value()接口即可。
Value()实现
由valueCtx数据结构定义可见,valueCtx.key和valueCtx.val分别代表其key和value值。 实现也很简单:
func (c *valueCtx) Value(key interface{}) interface{} {if c.key == key {return c.val}return c.Context.Value(key)}
这里有个细节需要关注一下,即当前context查找不到key时,会向父节点查找,如果查询不到则最终返回interface{}。也就是说,可以通过子context查询到父的value值。
WithValue()方法实现
func WithValue(parent Context, key, val interface{}) Context {if parent == nil {panic("cannot create context from nil parent")}if key == nil {panic("nil key")}if !reflectlite.TypeOf(key).Comparable() {panic("key is not comparable")}return &valueCtx{parent, key, val}}
典型使用案例
下面示例程序展示valueCtx的用法:
package mainimport ("fmt""time""context")func HandelRequest(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))time.Sleep(2 * time.Second)}}}func main() {ctx := context.WithValue(context.Background(), "parameter", "1")go HandelRequest(ctx)time.Sleep(10 * time.Second)}
上例main()中通过WithValue()方法获得一个context,需要指定一个父context、key和value。然后通将该context传递给子协程HandelRequest,子协程可以读取到context的key-value。
注意:本例中子协程无法自动结束,因为context是不支持cancle的,也就是说<-ctx.Done()永远无法返回。如果需要返回,需要在创建context时指定一个可以cancel的context作为父节点,使用父节点的cancel()在适当的时机结束整个context。
