概述
Go中goroutine之间没有父与子的关系,多个gorountine都是平行的被调度,不存在所谓的子进程退出后的通知机制。 如果一个goroutine中创建一个新的goroutine,在这个新的goroutine又创建一个新的 goroutine,最终形成一个树状的结构,如何通知这个树状上的所有goroutine退出?仅依靠语法层面的支持显然比较难处理。为此 Go 1.7 提供了一个标准库 context 来解决这个问题。它提供两种功能:
1. 消息通知:将取消或者超时等意外情况通知整个调用树上的每一个goroutine。
2. 数据传递:数据可以传递给整个goroutine调用树上的每一个goroutine
例如
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan bool)
go func() {
for{
select {
case <-c:
fmt.Println("game over")
return
default:
fmt.Println("waiting")
time.Sleep(1*time.Second)
}
}
}()
fmt.Println("start")
time.Sleep(10*time.Second)
fmt.Println("send message")
c<-true
}
上面程序执行结果
start
waiting
waiting
waiting
waiting
waiting
waiting
waiting
waiting
waiting
waiting
send message
game over
使用Go Context重写上面的示例,把原来的chan 换成Context,使用Context控制goroutine,其中context.Background()
返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)
函数,创建一个可取消的子Context。
在goroutine中,使用select调用<-ctx.Done()
判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行执行默认context.WithCancel(parent)
函数的第二个返回值cancel是一个函数,它是CancelFunc
类型的。调用它就可以发出取消指令,然后goroutine就会收到信号,就会返回结束。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx,cancel := context.WithCancel(context.Background())
go func(c context.Context) {
for{
select {
case <-c.Done():
fmt.Println("game over")
return
default:
fmt.Println("waiting")
time.Sleep(1*time.Second)
}
}
}(ctx)
fmt.Println("start")
time.Sleep(10*time.Second)
fmt.Println("send message")
cancel()
}
数据结构
Context的调用是链式的,第一个创建Context 的goroutine被称为root节点。root节点负责创建一个实现Context接口的具体对象 ,并将该对象作为参数传递到其新拉起的goroutine ,下游的goroutine可以继续封装该对象,再传递到更下游的goroutine。Context对象在传递的过程中最终形成一个树状的数据结构,这样通过位于root节点(树的根节点)的Context 对象就能遍历整个Context对象树,通知和消息就可以通过root节点传递出去 ,实现上游goroutine对下游goroutine的消息传递。
Context接口
Context作为一个interface,所有的Context对象都要实现该interface,context的使用者在调用接口中都使用Context作为参数类型。
type Context interface{
Deadline()(deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface
}
方法 | 说明 |
---|---|
Deadline()(deadline time.Time, ok bool) | 如果Context实现了超时控制,该方法返回超时时间true。否则ok为false |
Done() <-chan struct{} | Done方法返回一个只读的chan,类型是stuct{},使用<-chan struct{}来通知退出,供被调用的goroutine监听。 |
Err() error | 当Done()返回的chan收到通知后,才可以访问Err()获知被取消的原因 |
Value(key interface{}) interface | 可以 访问上游 goroutine 传递给下游 goroutine的值 |
canceler
canceler是一个拓展接口,规定了取消通知的 Context 具体类型需要实现的接口。context包中的具体类型cancelCtx 和timerCtx 都实现了该接口。
type canceler interface {
//创建cancel接口实例的 goroutine 调用cancel方法通知后续创建的 goroutine退出
cancel(removeFromParent bool, err error)
////Done 方法返回的 chan需要后端 goroutine 来监听,并及时退出
Done() <-chan struct{}
}
emptyCtx
emptyCtx实现了Context接口,但是所有的方法都是空实现,不具备任何功能,其存在的目的就是作为Context对象树的root节点,因为 context 包的使用思路就是不停地调用 context 包提供的包装函数来创建具有特殊功能的 Context 实例 ,每一个 Context 实例的创建都以 上一个 Context 对象为参数 ,最终形成一个树状的结构 。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
//......
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
//这两者返回值是一样的,文档上建议main函数可以使用Background()创建root context
cancelCtx
可以认为他与emptyCtx最大的区别在于,具体实现了cancel函数。即他可以向子goroutine传递cancel消息。
type canceler interface {
//创建cancel接口实例的 goroutine 调用cancel方法通知后续创建的 goroutine退出
cancel(removeFromParent bool, err error)
////Done 方法返回的 chan需要后端 goroutine 来监听,并及时退出
Done() <-chan struct{}
}
timerCtx
另一个实现Context接口的具体类型,内部封装了cancelCtx类型实例,同时拥有deadline变量,用于实现定时退出通知。
valueCtx
实现了Context接口的具体类型,内部分装cancelCtx类型实例,同时封装了一个kv存储变量,valueCtx可用于传递通知消息。
API函数
- 请求链取消,并发多服务调用情况下,比如一个请求进来,启动3个goroutine进行 RequestA 、RequestB 、RequestC三个服务的调用。这时候只要有其中一个服务错误,就返回错误,同时取消另外两个请求服务。可以通过 WithCancel 方法来实现。
- 超时控制,对服务请求进行超时限制,可以通过 WithDeadline 和 WithTimeout 方法来实现。
- 数据共享,一个请求的用户信息,一般业务场景,我们会有一个专门的中间件来校验用户信息,然后把用户信息注入到context中,或者共享给派生出来的多个goroutine使用,可以通过 WithValue 方法实现。
在context包内部已经为我们实现好了两个空的Context,可以通过调用Background()和TODO()方法获取。一般的将它们作为Context的根,往下派生。
func Background() Context
func TODO() Context
除了root context可以使用Background()
创建以外,其余的context都应该从cancelCtx
,timerCtx
,valueCtx
中选取一个来构建具体对象,主要使用With包装函数用来构建不同功能的Context具体对象。
下面四个with 函数参数都需要接收一个Context ,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
WithCancel
返回一个cancel函数,调研cancel函数的时候,会触发context.done函数
func WithCancel(parent Context) (Context, CancelFunc)
eg:
package main
import (
"context"
"log"
"os"
"time"
)
var logger *log.Logger
func downloadFile(ctx context.Context) {
for {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return
default:
logger.Printf("work")
}
}
}
func main() {
logger = log.New(os.Stdout, "cancel——Ctx", log.Ltime)
ctx, cancel := context.WithCancel(context.Background())
go downloadFile(ctx)
time.Sleep(5 * time.Second)
cancel()
logger.Printf("cancel done")
}
WithDeadline
创建带有超时通知的Context,内部创建一个 timerCtx 的类型实例
func WithDeadline(parent Context, deadline time.Time)(Context, CancelFunc)
WithTimeout
创建一个带有超时通知的 Context 具体对象 ,内部创建一个 timerCtx 的类型实例。具体差别在于传递绝对或相对时间。
func WithTimeout(parent Context, timeout time.Duration)(Context, CancelFunc)
eg
package main
import (
"context"
"log"
"os"
"time"
)
var logger *log.Logger
func downloadFile(ctx context.Context) {
for {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
logger.Printf("timeout done")
return
default:
logger.Printf("work")
}
}
}
func main() {
logger = log.New(os.Stdout, "timeoutCtx——", log.Ltime)
//ctx, _ := context.WithTimeout(context.Background(),3*time.Second)//相对时间
ctx, _ := context.WithDeadline(context.Background(),time.Now().Add(3*time.Second))//绝对时间
go downloadFile(ctx)
time.Sleep(5 * time.Second)
logger.Printf("finsh")
}
WithValue
func WithValue(parent Context, key, val interface{}) Context
源码分析
源码地址:https://github.com/golang/go/blob/master/src/context/context.go
context的数据结构
emptyCtx
只是一个uint类型的变量,其目的只是为了作为第一个goroutine ctx的parent,因此他不需要,也没法保存子类上下文结构。cancelCtx
的数据结构:
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
Context接口保存的就是父类的context。children map[canceler]struct{}保存的是所有直属与这个context的子类context。done chan struct{}用于发送退出信号。
我们查看创建cancelCtx的APIfunc WithCancel(...)...
:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
propagateCancel
函数的作用是将自己注册至parent context。我们稍后会讲解这个函数。
timerCtx
的数据结构:
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
timerCtx继承于cancelCtx,并为定时退出功能新增自己的数据结构。
当派生出的子 Context 的deadline在父Context之后,直接返回了一个父Context的拷贝。故语义上等效为父。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
//以下内容与定时退出机制有关,在本文不作过多分析和解释
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
timerCtx查看parent context的方法是timerCtx.cancelCtx.Context
。
4. valueCtx
的数据结构:
type valueCtx struct {
Context
key, val interface{}
}
相较于timerCtx而言非常简单,没有继承于cancelCtx struct,而是直接继承于Context接口。
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
辅助函数
这里我们会有两个疑问,第一,valueCtx为什么没有propagateCancel函数向parent context注册自己。既然没有注册,为何ctxb超时后能通知ctxc一起退出。第二,valueCtx是如何存储children和parent context结构的。相较于同样绑定Context接口的cancelCtx,valueCtx
并没有children数据。
第二个问题能解决一半第一个问题,即为何不向parent context注册。先说结论:valueCtx的children context注册在valueCtx的parent context上。函数func propagateCancel(...)
负责注册信息,我们先看一下他的构造:
func propagateCancel
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
这个函数的主要逻辑如下:接收parent context 和 child canceler方法,若parent为emptyCtx,则不注册;否则通过funcparentCancelCtx
寻找最近的一个*cancelCtx
;若该cancelCtx已经结束,则调用child的cancel方法,否则向该cancelCtx
注册child。
func parentCancelCtx
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
func parentCancelCtx
从parentCtx中向上迭代寻找第一个*cancelCtx
并返回。从函数逻辑中可以看到,只有当parent.(type)为*valueCtx
的时候,parent才会向上迭代而不是立即返回。否则该函数都是直接返回或返回经过包装的*cancelCtx
。因此我们可以发现,valueCtx是依赖于parentCtx的*cancelCtx
结构的。
至于第二个问题,事实上,parentCtx根本无需,也没有办法通过Done()方法通知valueCtx,valueCtx也没有额外实现Done()方法。可以理解为:valueCtx与parentCtx公用一个done channel,当parentCtx调用了cancel方法并关闭了done channel时,监听valueCtx的done channel的goroutine同样会收到退出信号。另外,当parentCtx没有实现cancel方法(如emptyCtx)时,可以认为valueCtx也是无法cancel的。
func (c *cancelCtx) cancel
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
该方法的主要逻辑如下:若外部err为空,则代表这是一个非法的cancel操作,抛出panic;若cancelCtx内部err不为空,说明该Ctx已经执行过cancel操作,直接返回;关闭done channel,关联该Ctx的goroutine收到退出通知;遍历children,若有的话,执行child.cancel操作;调用removeChild
将自己从parent context中移除。
参考
https://blog.golang.org/context
视频笔记:如何正确使用 Context - Jack Lindamood
https://www.jianshu.com/p/15fbda1536bc
https://www.cnblogs.com/yjf512/p/10399190.html