context是Go并发编程中常用到一种编程模式。本文将从为什么需要context,深入了解context的实现原理,以了解如何使用context。

前言

这篇文章将介绍Golang并发编程中常用到一种编程模式:context。本文将从为什么需要context出发,深入了解context的实现原理,以及了解如何使用context。

为什么需要context

在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作。
熟悉channel的朋友应该都见过使用done channel来处理此类问题。比如以下这个例子:

  1. func main() {
  2. messages := make(chan int, 10)
  3. done := make(chan bool)
  4. defer close(messages)
  5. // consumer
  6. go func() {
  7. ticker := time.NewTicker(1 * time.Second)
  8. for _ = range ticker.C {
  9. select {
  10. case <-done:
  11. fmt.Println("child process interrupt...")
  12. return
  13. default:
  14. fmt.Printf("send message: %d\n", <-messages)
  15. }
  16. }
  17. }()
  18. // producer
  19. for i := 0; i < 10; i++ {
  20. messages <- i
  21. }
  22. time.Sleep(5 * time.Second)
  23. close(done)
  24. time.Sleep(1 * time.Second)
  25. fmt.Println("main process exit!")
  26. }

上述例子中定义了一个buffer为0的channel done, 子协程运行着定时任务。如果主协程需要在某个时刻发送消息通知子协程中断任务退出,那么就可以让子协程监听这个done channel,一旦主协程关闭done channel,那么子协程就可以推出了,这样就实现了主协程通知子协程的需求。这很好,但是这也是有限的。

如果我们可以在简单的通知上附加传递额外的信息来控制取消:为什么取消,或者有一个它必须要完成的最终期限,更或者有多个取消选项,我们需要根据额外的信息来判断选择执行哪个取消选项。

考虑下面这种情况:假如主协程中有多个任务1, 2, …m,主协程对这些任务有超时控制;而其中任务1又有多个子任务1, 2, …n,任务1对这些子任务也有自己的超时控制,那么这些子任务既要感知主协程的取消信号,也需要感知任务1的取消信号。

如果还是使用done channel的用法,我们需要定义两个done channel,子任务们需要同时监听这两个done channel。嗯,这样其实好像也还行哈。但是如果层级更深,如果这些子任务还有子任务,那么使用done channel的方式将会变得非常繁琐且混乱。

我们需要一种优雅的方案来实现这样一种机制:

  • 上层任务取消后,所有的下层任务都会被取消;
  • 中间某一层的任务取消后,只会将当前任务的下层任务取消,而不会影响上层的任务以及同级任务。

这个时候context就派上用场了。我们首先看看context的结构设计和实现原理。

context是什么

context接口

先看Context接口结构,看起来非常简单。

  1. type Context interface {
  2. Deadline() (deadline time.Time, ok bool)
  3. Done() <-chan struct{}
  4. Err() error
  5. Value(key interface{}) interface{}
  6. }

Context接口包含四个方法:

  • Deadline返回绑定当前context的任务被取消的截止时间;如果没有设定期限,将返回ok == false。
  • Done 当绑定当前context的任务被取消时,将返回一个关闭的channel;如果当前context不会被取消,将返回nil。
  • Err 如果Done返回的channel没有关闭,将返回nil;如果Done返回的channel已经关闭,将返回非空的值表示任务结束的原因。如果是context被取消,Err将返回Canceled;如果是context超时,Err将返回DeadlineExceeded。
  • Value 返回context存储的键值对中当前key对应的值,如果没有对应的key,则返回nil。

可以看到Done方法返回的channel正是用来传递结束信号以抢占并中断当前任务;Deadline方法指示一段时间后当前goroutine是否会被取消;以及一个Err方法,来解释goroutine被取消的原因;而Value则用于获取特定于当前任务树的额外信息。而context所包含的额外信息键值对是如何存储的呢?其实可以想象一颗树,树的每个节点可能携带一组键值对,如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点,具体后面会说到。
再来看看context包中的其他关键内容。

emptyCtx

emptyCtx是一个int类型的变量,但实现了context的接口。emptyCtx没有超时时间,不能取消,也不能存储任何额外信息,所以emptyCtx用来作为context树的根节点。

  1. // An emptyCtx is never canceled, has no values, and has no deadline. It is not
  2. // struct{}, since vars of this type must have distinct addresses.
  3. type emptyCtx int
  4. func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
  5. return
  6. }
  7. func (*emptyCtx) Done() <-chan struct{} {
  8. return nil
  9. }
  10. func (*emptyCtx) Err() error {
  11. return nil
  12. }
  13. func (*emptyCtx) Value(key interface{}) interface{} {
  14. return nil
  15. }
  16. func (e *emptyCtx) String() string {
  17. switch e {
  18. case background:
  19. return "context.Background"
  20. case todo:
  21. return "context.TODO"
  22. }
  23. return "unknown empty Context"
  24. }
  25. var (
  26. background = new(emptyCtx)
  27. todo = new(emptyCtx)
  28. )
  29. func Background() Context {
  30. return background
  31. }
  32. func TODO() Context {
  33. return todo
  34. }

但我们一般不会直接使用emptyCtx,而是使用由emptyCtx实例化的两个变量,分别可以通过调用Background和TODO方法得到,但这两个context在实现上是一样的。那么Background和TODO方法得到的context有什么区别呢?可以看一下官方的解释:

  1. // Background returns a non-nil, empty Context. It is never canceled, has no
  2. // values, and has no deadline. It is typically used by the main function,
  3. // initialization, and tests, and as the top-level Context for incoming
  4. // requests.
  5. // TODO returns a non-nil, empty Context. Code should use context.TODO when
  6. // it's unclear which Context to use or it is not yet available (because the
  7. // surrounding function has not yet been extended to accept a Context
  8. // parameter).

Background和TODO只是用于不同场景下: Background通常被用于主函数、初始化以及测试中,作为一个顶层的context,也就是说一般我们创建的context都是基于Background;而TODO是在不确定使用什么context的时候才会使用。
下面将介绍两种不同功能的基础context类型:valueCtx和cancelCtx。

valueCtx

valueCtx结构体

  1. type valueCtx struct {
  2. Context
  3. key, val interface{}
  4. }
  5. func (c *valueCtx) Value(key interface{}) interface{} {
  6. if c.key == key {
  7. return c.val
  8. }
  9. return c.Context.Value(key)
  10. }

valueCtx利用一个Context类型的变量来表示父节点context,所以当前context继承了父context的所有信息;valueCtx类型还携带一组键值对,也就是说这种context可以携带额外的信息。valueCtx实现了Value方法,用以在context链路上获取key对应的值,如果当前context上不存在需要的key,会沿着context链向上寻找key对应的值,直到根节点。

WithValue

WithValue用以向context添加键值对:

  1. func WithValue(parent Context, key, val interface{}) Context {
  2. if key == nil {
  3. panic("nil key")
  4. }
  5. if !reflect.TypeOf(key).Comparable() {
  6. panic("key is not comparable")
  7. }
  8. return &valueCtx{parent, key, val}
  9. }

这里添加键值对不是在原context结构体上直接添加,而是以此context作为父节点,重新创建一个新的valueCtx子节点,将键值对添加在子节点上,由此形成一条context链。获取value的过程就是在这条context链上由尾部上前搜寻:

image.png

cancelCtx

cancelCtx结构体

  1. type cancelCtx struct {
  2. Context
  3. mu sync.Mutex // protects following fields
  4. done chan struct{} // created lazily, closed by first cancel call
  5. children map[canceler]struct{} // set to nil by the first cancel call
  6. err error // set to non-nil by the first cancel call
  7. }
  8. type canceler interface {
  9. cancel(removeFromParent bool, err error)
  10. Done() <-chan struct{}
  11. }

跟valueCtx类似,cancelCtx中也有一个context变量作为父节点;变量done表示一个channel,用来表示传递关闭信号;children表示一个map,存储了当前context节点下的子节点;err用于存储错误信息表示任务结束的原因。
再来看一下cancelCtx实现的方法:

  1. func (c *cancelCtx) Done() <-chan struct{} {
  2. c.mu.Lock()
  3. if c.done == nil {
  4. c.done = make(chan struct{})
  5. }
  6. d := c.done
  7. c.mu.Unlock()
  8. return d
  9. }
  10. func (c *cancelCtx) Err() error {
  11. c.mu.Lock()
  12. err := c.err
  13. c.mu.Unlock()
  14. return err
  15. }
  16. func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  17. if err == nil {
  18. panic("context: internal error: missing cancel error")
  19. }
  20. c.mu.Lock()
  21. if c.err != nil {
  22. c.mu.Unlock()
  23. return // already canceled
  24. }
  25. // 设置取消原因
  26. c.err = err
  27. 设置一个关闭的channel或者将done channel关闭,用以发送关闭信号
  28. if c.done == nil {
  29. c.done = closedchan
  30. } else {
  31. close(c.done)
  32. }
  33. // 将子节点context依次取消
  34. for child := range c.children {
  35. // NOTE: acquiring the child's lock while holding parent's lock.
  36. child.cancel(false, err)
  37. }
  38. c.children = nil
  39. c.mu.Unlock()
  40. if removeFromParent {
  41. // 将当前context节点从父节点上移除
  42. removeChild(c.Context, c)
  43. }
  44. }

可以发现cancelCtx类型变量其实也是canceler类型,因为cancelCtx实现了canceler接口。 Done方法和Err方法没必要说了,cancelCtx类型的context在调用cancel方法时会设置取消原因,将done channel设置为一个关闭channel或者关闭channel,然后将子节点context依次取消,如果有需要还会将当前节点从父节点上移除。

WithCancel

WithCancel函数用来创建一个可取消的context,即cancelCtx类型的context。WithCancel返回一个context和一个CancelFunc,调用CancelFunc即可触发cancel操作。直接看源码:

  1. type CancelFunc func()
  2. func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
  3. c := newCancelCtx(parent)
  4. propagateCancel(parent, &c)
  5. return &c, func() { c.cancel(true, Canceled) }
  6. }
  7. // newCancelCtx returns an initialized cancelCtx.
  8. func newCancelCtx(parent Context) cancelCtx {
  9. // 将parent作为父节点context生成一个新的子节点
  10. return cancelCtx{Context: parent}
  11. }
  12. func propagateCancel(parent Context, child canceler) {
  13. if parent.Done() == nil {
  14. // parent.Done()返回nil表明父节点以上的路径上没有可取消的context
  15. return // parent is never canceled
  16. }
  17. // 获取最近的类型为cancelCtx的祖先节点
  18. if p, ok := parentCancelCtx(parent); ok {
  19. p.mu.Lock()
  20. if p.err != nil {
  21. // parent has already been canceled
  22. child.cancel(false, p.err)
  23. } else {
  24. if p.children == nil {
  25. p.children = make(map[canceler]struct{})
  26. }
  27. // 将当前子节点加入最近cancelCtx祖先节点的children中
  28. p.children[child] = struct{}{}
  29. }
  30. p.mu.Unlock()
  31. } else {
  32. go func() {
  33. select {
  34. case <-parent.Done():
  35. child.cancel(false, parent.Err())
  36. case <-child.Done():
  37. }
  38. }()
  39. }
  40. }
  41. func parentCancelCtx(parent Context) (*cancelCtx, bool) {
  42. for {
  43. switch c := parent.(type) {
  44. case *cancelCtx:
  45. return c, true
  46. case *timerCtx:
  47. return &c.cancelCtx, true
  48. case *valueCtx:
  49. parent = c.Context
  50. default:
  51. return nil, false
  52. }
  53. }
  54. }

之前说到cancelCtx取消时,会将后代节点中所有的cancelCtx都取消,propagateCancel即用来建立当前节点与祖先节点这个取消关联逻辑。

  1. 如果parent.Done()返回nil,表明父节点以上的路径上没有可取消的context,不需要处理;
  2. 如果在context链上找到到cancelCtx类型的祖先节点,则判断这个祖先节点是否已经取消,如果已经取消就取消当前节点;否则将当前节点加入到祖先节点的children列表。
  3. 否则开启一个协程,监听parent.Done()和child.Done(),一旦parent.Done()返回的channel关闭,即context链中某个祖先节点context被取消,则将当前context也取消。

这里或许有个疑问,为什么是祖先节点而不是父节点?这是因为当前context链可能是这样的:

image.png

当前cancelCtx的父节点context并不是一个可取消的context,也就没法记录children。

timerCtx

  1. type timerCtx struct {
  2. cancelCtx
  3. timer *time.Timer // Under cancelCtx.mu.
  4. deadline time.Time
  5. }
  6. func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
  7. return c.deadline, true
  8. }
  9. func (c *timerCtx) cancel(removeFromParent bool, err error) {
  10. 将内部的cancelCtx取消
  11. c.cancelCtx.cancel(false, err)
  12. if removeFromParent {
  13. // Remove this timerCtx from its parent cancelCtx's children.
  14. removeChild(c.cancelCtx.Context, c)
  15. }
  16. c.mu.Lock()
  17. if c.timer != nil {
  18. 取消计时器
  19. c.timer.Stop()
  20. c.timer = nil
  21. }
  22. c.mu.Unlock()
  23. }

timerCtx内部使用cancelCtx实现取消,另外使用定时器timer和过期时间deadline实现定时取消的功能。timerCtx在调用cancel方法,会先将内部的cancelCtx取消,如果需要则将自己从cancelCtx祖先节点上移除,最后取消计时器。

WithDeadline

WithDeadline返回一个基于parent的可取消的context,并且其过期时间deadline不晚于所设置时间d。

  1. func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
  2. if cur, ok := parent.Deadline(); ok && cur.Before(d) {
  3. // The current deadline is already sooner than the new one.
  4. return WithCancel(parent)
  5. }
  6. c := &timerCtx{
  7. cancelCtx: newCancelCtx(parent),
  8. deadline: d,
  9. }
  10. // 建立新建context与可取消context祖先节点的取消关联关系
  11. propagateCancel(parent, c)
  12. dur := time.Until(d)
  13. if dur <= 0 {
  14. c.cancel(true, DeadlineExceeded) // deadline has already passed
  15. return c, func() { c.cancel(false, Canceled) }
  16. }
  17. c.mu.Lock()
  18. defer c.mu.Unlock()
  19. if c.err == nil {
  20. c.timer = time.AfterFunc(dur, func() {
  21. c.cancel(true, DeadlineExceeded)
  22. })
  23. }
  24. return c, func() { c.cancel(true, Canceled) }
  25. }
  1. 如果父节点parent有过期时间并且过期时间早于给定时间d,那么新建的子节点context无需设置过期时间,使用WithCancel创建一个可取消的context即可;
  2. 否则,就要利用parent和过期时间d创建一个定时取消的timerCtx,并建立新建context与可取消context祖先节点的取消关联关系,接下来判断当前时间距离过期时间d的时长dur:

  3. 如果dur小于0,即当前已经过了过期时间,则直接取消新建的timerCtx,原因为DeadlineExceeded;

  4. 否则,为新建的timerCtx设置定时器,一旦到达过期时间即取消当前timerCtx。

    WithTimeout

    与WithDeadline类似,WithTimeout也是创建一个定时取消的context,只不过WithDeadline是接收一个过期时间点,而WithTimeout接收一个相对当前时间的过期时长timeout:

    1. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    2. return WithDeadline(parent, time.Now().Add(timeout))
    3. }

    context的使用

    首先使用context实现文章开头done channel的例子来示范一下如何更优雅实现协程间取消信号的同步:

    1. func main() {
    2. messages := make(chan int, 10)
    3. // producer
    4. for i := 0; i < 10; i++ {
    5. messages <- i
    6. }
    7. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    8. // consumer
    9. go func(ctx context.Context) {
    10. ticker := time.NewTicker(1 * time.Second)
    11. for _ = range ticker.C {
    12. select {
    13. case <-ctx.Done():
    14. fmt.Println("child process interrupt...")
    15. return
    16. default:
    17. fmt.Printf("send message: %d\n", <-messages)
    18. }
    19. }
    20. }(ctx)
    21. defer close(messages)
    22. defer cancel()
    23. select {
    24. case <-ctx.Done():
    25. time.Sleep(1 * time.Second)
    26. fmt.Println("main process exit!")
    27. }
    28. }

    这个例子中,只要让子线程监听主线程传入的ctx,一旦ctx.Done()返回空channel,子线程即可取消执行任务。但这个例子还无法展现context的传递取消信息的强大优势。
    阅读过net/http包源码的朋友可能注意到在实现http server时就用到了context, 下面简单分析一下。
    1、首先Server在开启服务时会创建一个valueCtx,存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx。

    1. func (srv *Server) Serve(l net.Listener) error {
    2. ...
    3. var tempDelay time.Duration // how long to sleep on accept failure
    4. baseCtx := context.Background() // base is always background, per Issue 16220
    5. ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    6. for {
    7. rw, e := l.Accept()
    8. ...
    9. tempDelay = 0
    10. c := srv.newConn(rw)
    11. c.setState(c.rwc, StateNew) // before Serve can return
    12. go c.serve(ctx)
    13. }
    14. }

    2、建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

    1. func (c *conn) serve(ctx context.Context) {
    2. c.remoteAddr = c.rwc.RemoteAddr().String()
    3. ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
    4. ...
    5. ctx, cancelCtx := context.WithCancel(ctx)
    6. c.cancelCtx = cancelCtx
    7. defer cancelCtx()
    8. ...
    9. for {
    10. w, err := c.readRequest(ctx)
    11. ...
    12. serverHandler{c.server}.ServeHTTP(w, w.req)
    13. ...
    14. }
    15. }

    3、读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消方法。 ```go func (c conn) readRequest(ctx context.Context) (w response, err error) {

    req, err := readRequest(c.bufr, keepHostHeader)

    ctx, cancelCtx := context.WithCancel(ctx) req.ctx = ctx

    w = &response{

    1. conn: c,
    2. cancelCtx: cancelCtx,
    3. req: req,
    4. reqBody: req.Body,
    5. handlerHeader: make(Header),
    6. contentLength: -1,
    7. closeNotifyCh: make(chan bool, 1),
    8. // We populate these ahead of time so we're not
    9. // reading from req.Header after their Handler starts
    10. // and maybe mutates it (Issue 14940)
    11. wants10KeepAlive: req.wantsHttp10KeepAlive(),
    12. wantsClose: req.wantsClose(),

    }

    … return w, nil }

``` 这样处理的目的主要有以下几点:

  • 一旦请求超时,即可中断当前请求;
  • 在处理构建response过程中如果发生错误,可直接调用response对象的cancelCtx方法结束当前请求;
  • 在处理构建response完成之后,调用response对象的cancelCtx方法结束当前请求。

在整个server处理流程中,使用了一条context链贯穿Server、Connection、Request,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

总结

context主要用于父子任务之间的同步取消信号,本质上是一种协程调度的方式。另外在使用context时有两点值得注意:上游任务仅仅使用context通知下游任务不再需要,但不会直接干涉和中断下游任务的执行,由下游任务自行决定后续的处理操作,也就是说context的取消操作是无侵入的;context是线程安全的,因为context本身是不可变的(immutable),因此可以放心地在多个协程中传递使用。


原文链接: https://juejin.im/post/5e52688c51882549417fc671