(年初的时候 go 语言的学习提上了日程,前一篇 sync.pool 阅读之后,阅读代码进度本该更快些,奈何身体被掏空,所以这篇文章断断续续一个月终于攒起来了。)
channel 是 golang 中用于 goroutine 之间通讯的数据结构,有以下特点:
- 线程安全
- 创建 channel 时返回的是指针,不需要考虑拷贝的问题
- 顺序通讯,写入和读出的顺序一致
源码位置 go/src/runtime/chan.go
hchan
channel 对应的数据结构
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
参数意义
qcount uint // 表示 channel 中元素的个数
dataqsiz uint // 表示 channel 的大小长度
buf unsafe.Pointer // 存储元素的环形队列头指针
elemsize uint16 // 表示此 channel 能存储元素的大小
closed uint32 // channel 是否关闭了
elemtype *_type // 表示此 channel 能存储元素的类型
sendx uint // 表示发送操作对应 buf 的下标,超过 dataqsiz 之后清 0(因为是循环队列嘛)
recvx uint // 表示接收操作对应 buf 的下标
recvq waitq // 等待接收操作的 goroutine 队列
sendq waitq // 等待发送操作的 goroutine 队列
lock mutex // channel 的锁
waitq
用来表示等待发送或者接受的 goroutine 队列(用 sudog 表示队列一个节点)
type waitq struct {
first *sudog
last *sudog
}
参数意义
first goroutine指针,队首指针
last goroutine指针,队尾指针
函数
enqueue
两种情况:
- 队列为空,将元素放入队尾将 first 指针和 last 指针赋好值
- 队列不为空,直接将元素放入队尾
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last
if x == nil {
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
sgp.prev = x
x.next = sgp
q.last = sgp
}
dequeue
从队列头开始遍历
- first 指针为空,说明队列为空,则直接返回空
- 如果队列只有一个元素了,将元素取出,并且清空 first 指针和 last 指针
- 队列还有很多元素,直接将 first 指针对应的元素去除
- 最后判断如果这个元素 (sudog——在 channel 中用来表示等待接收或者发送的 goroutine 的) 在 select 结构中并且 select 结构有其他接口,就跳过,继续遍历下一个节点。
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil
}
if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
continue
}
return sgp
}
}
sudog
sudog 是在等待对 channel 发送或者接受的 goroutine
为什么有了 goroutine 还要有一个 sudog?
- 因为 goroutine 和等待的 channel 是多对多的关系,一个 goroutine 可能在等待多个 channel,一个 channel 也可能有很多 goroutine 在等待,所以用 sudog 表示这个等待中的 goroutine
- sudog 是 channel 等待或者接发送链表的一个 node
sudog 通过 acquireSudog 创建,releaseSudog 销毁
- 在 go/src/runtime/proc.go 中
- go 会维护一个全局的缓存(有锁),然后每个调度器(P)有自己的缓存
- 创建 sudog 时会先从 P 的缓存中找,没有就到全局缓存中找,在没有才 new 一个
- 销毁 sudog 的时候先判断 P 是不是满了,如果满了就将一半缓存放到全局缓存然后再把 sudog 放到自己缓存
- 全局缓存的生存周期时两次 GC 的间隔,go/src/runtime/mgc.go 中 clearpools() 函数中可以看到,每次 GC 都会清理全局缓存
type sudog struct {
g *g
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
parent *sudog
waitlink *sudog
waittail *sudog
c *hchan
}
创建 sudog——acquireSudog
大概逻辑就是现在当前 goroutine 所在调度器 (P) 的缓存中找,如果没有就从全局缓存中找,如果还没有就 new 一个
func acquireSudog() *sudog {
mp := acquirem()
pp := mp.p.ptr()
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp)
return s
}
销毁 sudog——releaseSudog
大概逻辑就是如果当前 goroutine 所在调度器 (P) 的缓存满了,就将调度器 (P) 的缓存一半放入全局缓存,然后在把 sudog 放入
func releaseSudog(s *sudog) {
if s.elem != nil {
throw("runtime: sudog with non-nil elem")
}
if s.isSelect {
throw("runtime: sudog with non-false isSelect")
}
if s.next != nil {
throw("runtime: sudog with non-nil next")
}
if s.prev != nil {
throw("runtime: sudog with non-nil prev")
}
if s.waitlink != nil {
throw("runtime: sudog with non-nil waitlink")
}
if s.c != nil {
throw("runtime: sudog with non-nil c")
}
gp := getg()
if gp.param != nil {
throw("runtime: releaseSudog with non-nil gp.param")
}
mp := acquirem()
pp := mp.p.ptr()
if len(pp.sudogcache) == cap(pp.sudogcache) {
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}
创建 channel
go 中所有的 channel 的创建都会使用 make 关键字,make(arg1, arg2) 函数最终会调用到 runtime.makechan 和 runtime.makechan64,下面讲解 go 在编译时期是如何做这些事情的
typecheck.go
编译器会将 make(arg1, arg2) 转化成 OMAKE 类型的节点,并在类型检查阶段将 OMAKE 类型的节点按照 arg1 的类型转化为 OMAKECHAN,OMAKEMAP,OMAKESLICE 等类型
func typecheck1(n *Node, top int) (res *Node) {
...
switch n.Op {
...
case OMAKE:
...
switch t.Etype {
...
case TCHAN:
l = nil
if i < len(args) {
....
} else {
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
...
}
...
}
walk.go
OMAKECHAN 类型的节点最终会在 SSA 中间代码生成之前被转化成 runtime.makechan 或者 runtime.makechan64
func walkexpr(n *Node, init *Nodes) *Node {
...
switch n.Op {
...
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
...
}
...
}
makechan64
check 一下 size 是否是 int,然后就执行 makechan 了
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
makechan
- 安全检查: channel 能存的元素类型大小是否超过 2^16
- 判断hchanSize是否关于 maxAlign 对齐,判断元素对齐是否 maxAlign 小,如果大 maxAlign 就没用了,这里 hchanSize 设计十分巧妙,位运算神操作优化,可以看另一篇文章关于 2 的 n 次幂对齐
- 判断申请的空间大小是否 uint64 大,判断所需空间是否超过最大可申请空间,判断 size 是否小于 0(非法)
然后就是给 hchan 申请内存空间了
- 无缓冲的 size=0 的,只需要给 hchan 申请 hchansize 大小的内存空间即可
- 有缓冲,但是元素是非指针类型的,就申请 hchanSize+mem 大小的连续内存空间, 并将 hchanSize 之后的首地址赋值给 buf
- 有缓冲,并且元素类型是指针的,hchan 和底层 buf 内存就可以分开申请不用连续
- 给其他变量赋值
- 返回 hchan 指针,注意这里返回的是指针,所以 channel 在各函数之间传递时,就不是值传递了
为什么元素类型是非指针 hchan 和 buf 要在一段地址连续的内存中,而指针类型的则可以分开
这是源码注释的原话:
Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
buf points into the same allocation, elemtype is persistent.
SudoG’s are referenced from their owning thread so they can’t be collected.
TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
猜想:
大概意思是,当 channel 中元素类型不包含指针时,gc 时需要回收这段空间的,当 channel 中元素类型包含指针时,这些指针被自己所在线程引用 gc 是不能回收,所以当元素不包含指针时申请一段连续的空间可以减小 gc 的压力
func makechan(t *chantype, size int) *hchan {
elem := t.elem
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
发送
具体编译时做的转换可参考 makechan,代码都在类似的地方
chansend
- 首先检测 channel 是否为空, 如果为空直接报错
- check 是否开启了竞争检测,golang 的竞争检测通过 ThreadSanitizer 库 (C++) 做的
- 然后 kill 掉一些不用加锁就可以判断的情况,如果是非阻塞并且 channel 未关闭,size = 0 或者 channel 满了, 直接返回 false(发送失败)
- 如果已经有 goroutine 在等待了,就直接调 send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) 发给那个 goroutine
- 如果没有 goroutine 在等待.
- 如果 channel 是非阻塞并且还地方,就放入 buffer 中,如果没地方了就直接返回 false
- 如果 channel 是阻塞并且不在 select 中或者在 select 中且没有其他出口的,就将创建一个 sudog,将 sudog 初始化并且放入待发送队列 (sendq), 并且调用 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) 使当前 goroutine 陷入沉睡直到被唤醒(已经发出去了)
- 清理这个过程的垃圾数据
第四步中如果有 goroutine 在等待就直接发送,会影响非阻塞 channel 数据的顺序吗?
不会,channel 的数据由唯一全局锁保护,读写互斥,假设一个 goroutine 来读 channel,只有两种情况:
- channel buffer 中有数据,这时 goroutine 会直接读取数据,不会被阻塞。
- channel buffer 中没有数据,这时 goroutine 会被阻塞。
只有当 buffer 中有数据且有 goroutine 被阻塞时,顺序才会被打乱,但这两个条件是互斥的,有数据就不可能阻塞,阻塞就不可能有数据。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
KeepAlive(ep)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
racereadpc
go/src/runtime/race_amd64.s
PC: 指令计数器寄存器
FP: 函数的帧指针,引用函数的参数。使用形如 symbol+offset(FP) 的方式,引用函数的输入参数。例如 arg0+0(FP),arg1+8(FP),使用 FP 不加 symbol 时,无法通过编译,在汇编层面来讲,symbol 并没有什么用,加 symbol 主要是为了提升代码可读性。
SP: 当前函数栈帧的底部
SB: 全局静态基指针,一般用来声明函数或全局变量
参数 0 放在 DI 通用寄存器
参数 1 放在 SI 通用寄存器
参数 2 放在 DX 通用寄存器
参数 3 放在 CX 通用寄存器
// void runtime·racereadpc(void *addr, void *callpc, void *pc)
TEXT runtime·racereadpc(SB), NOSPLIT, $0-24
MOVQ addr+0(FP), RARG1
MOVQ callpc+8(FP), RARG2
MOVQ pc+16(FP), RARG3
ADDQ $1, RARG3 // pc is function start, tsan wants return address
// void __tsan_read_pc(ThreadState *thr, void *addr, void *callpc, void *pc);
MOVQ $__tsan_read_pc(SB), AX
JMP racecalladdr<>(SB)
send
用于给 goroutine 直接发送数据
- 如果数据没问题就直接将数据拷贝到 x := <- c 表达式 x 的内存地址上
- 然后将该 goroutine 放到处理器 (P) 的 runnext 上面等待执行,这里不是直接让 goroutine 执行,而是等下一次调度的时候直接调这个 goroutine
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
接收
具体编译时做的转换可参考 makechan,代码都在类似的地方
chanrecv
两种接收方式:
chanrecv1 是丢弃 channel 出来的元素,类似 <- c 这中表达式
chanrecv2 是使用 channel 出来的元素,类似 elem := <- c
最终都会调用到 chanrecv
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
- 判断 chan 是否为 nil,如果是直接报错
- kill 掉一些不用枷锁就可以判断的情况,如果是非阻塞并且队列为空并且 channel 未关闭就返回 false
- 如果 channel 已经关闭了,就清空 ep 中的数据,立即返回
- 如果已经有 sendq 在等待了 (发送端提到过,如果没有 goroutine 等待接受,就加入 sendq), 就直接接收这个元素
- 如果此时没有 goroutine 等待发送
- 如果是非阻塞且 buffer 中有数据直接从 buffer 中取出,如果没有数据直接返回 false
- 如果是阻塞的且当前 goroutine 没在 select 中或者在 select 中但没有其他出口,就把自己加入 recvq,然后调用 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3),等待被唤醒(如果被唤醒说明有有数据来了)
- 清理这个过程中的垃圾数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
关闭 channel
关闭 channel 大概逻辑就是,将 buffer 中的数据都释放掉,然后 close 设置为 0
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
chan.dot
digraph {
bgcolor="#C6CFD532";
node [shape=record, fontsize="8", margin="0.04", height=0.2, color=gray]
edge [fontname="Inconsolata, Consolas", fontsize=10, arrowhead=normal]
hchan [shape=record,label="{qcount|dataqsiz|buf|elemsize|closed|elemtype|<sendx>sendx|<recvx>recvx|recvq|sendq|lock}",xlabel="hchan"]
waitq[shape=record,label="{<first>first|<last>last}",xlabel="waitq"]
sudog[shape=record,label="{g|isSelect|next|prev|elem|acquiretime|releasetime|ticket|parent|waitlink|waittail|c}",xlabel="sudog"]
hchan:sendx -> waitq [label="发送队列"]
hchan:recvx -> waitq [label="接收队列"]
waitq:first -> sudog
waitq:last -> sudog
}
我每天都在努力, 只是想证明我是认真的活着.
https://www.cnblogs.com/wuwangchuxin0924/p/13022386.html