基础操作

channel是一种类型,一种引用类型。声明栗子如下:

  1. var ch1 chan int // 声明一个传递整型的通道
  2. var ch2 chan bool // 声明一个传递布尔型的通道
  3. var ch3 chan []int // 声明一个传递int切片的通道

创建也十分简单:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。
举个小例子:

func main() {
    ch := make(chan int,5)
    go func(ch chan int) {
        for true {
            ret := <-ch
            fmt.Println("接收成功", ret)
        }

    }(ch)
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second)
        ch <- 10
    }
    close(ch)
    fmt.Println("发送成功")
}

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:

  • 对一个关闭的通道再发送值就会导致panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致panic。


    原理

    channel基于go的并发调度实现,本身并不复杂,代码全写在chan.go里。

其中看下关键的数据结构 hchan

type hchan struct {
   qcount   uint               // 缓冲区(数组)已有元素个数
   dataqsiz uint               // 缓冲区的最大容量
   buf      unsafe.Pointer     // 缓冲区指针
   elemsize uint16             // 元素大小
   closed   uint32             // 关闭标记,0没关闭,1关闭
   elemtype *_type             // 数据项类型
   sendx    uint               // 发送索引
   recvx    uint               // 接收索引
   recvq    waitq              // 接收者队列,也就是等待读消息的goroutine队列
   sendq    waitq              // 发送者队列,也就是等待写消息的goroutine队列
   lock mutex                  // 互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}

// sudog 代表goroutine
type waitq struct {
    first *sudog
    last  *sudog
}

直接看字段很明显就可以猜到大概的逻辑(排除无缓冲类型channel先):

  1. 首先确认的是channel维护一个固定大小的缓冲数组,一个接收者队列、一个发送者队列。
  2. 根据上面的例子,当执行到 ch <- 10 时,也就是准备把10传输到channel了,如果接收者队列刚好有则出列并把数据发给接收者,也就是执行到 ret := <-ch,如果没有,则考虑存到缓冲区。
  3. 当缓冲区的大小没超过最大容量,则把数据存到缓存区并大小+1,当超过缓存区时,那根据是否阻塞来决定后续是跳出方法还是进行阻塞等待。

当然上面的步骤都是猜测,我们根据上面的例子直接看源码是怎么实现的。

初始化

首先看初始化channel逻辑 ch := make(chan int,5),其对应的方法为:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    //编译器检查元素类型
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    //获取需要分配的内存
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // 如果 hchan 中的元素不包含有指针,那么就没什么和 GC 相关的信息了
    var c *hchan
    //多实用switch少用if else
    switch {
    //队列或元素大小为0
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    //元素中不包含指针    
    case elem.kind&kindNoPointers != 0:
        //元素不包含指针。
        //一次调用分配hchan和buf。
        //这种情况,gc不会对 channel 中的元素进行 scan
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素中包含指针
        //区别:调用了两次分配空间的函数
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

从上面方法总结出:
make函数在创建channel的时候会在该进程的heap区申请一块内存,创建一个hchan结构体,返回执行该内存的指针,所以获取的的ch变量本身就是一个指针,在函数之间传递的时候是同一个channel。

发送

看发送的逻辑 ch <- 10,其对应的方法为:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //如果channel为nil
    if c == nil {
        if !block {
            return false
        }
        // nil channel 发送数据会永远阻塞下去
        // 挂起当前 goroutine
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    if debugChan {
        print("chansend: chan=", c, "\n")
    }
    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }
    //检查是否阻塞
    //检查是否关闭,channel关闭之后不能发送
    //检查channel容量&&检查channel 接收队列是否为空
    //检查channel队列是否满了
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    lock(&c.lock)

    // 如果channel已经关闭,则直接抛异常
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 接收者队列中有接收者协程在等待
    if sg := c.recvq.dequeue(); sg != nil {
        // 直接把要发的数据拷贝给这个接收者
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    //如果channel队列还没满
    if c.qcount < c.dataqsiz {
        // 通道缓冲区中有可用空间。发送的元素入队。
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        //将 goroutine 的数据拷贝到 buffer 中
        typedmemmove(c.elemtype, qp, ep)
        // 将发送 index 加一
        c.sendx++
        //满了,重置index
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        //长度加1
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ------- 如果缓存区满了 --------

    // 非阻塞的话就直接跳出
    if !block {
        unlock(&c.lock)
        return false
    }

    //在 channel 上阻塞,receiver 会帮我们完成后续的工作
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
    c.sendq.enqueue(mysg)
    // 将这个发送 goroutine 从 Grunning -> Gwaiting
    // 进入休眠
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    //确保发送的值保持活动状态,直到接收者将其复制出来
    KeepAlive(ep)

    // 唤醒
    //如果不是合法的唤醒
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    //唤醒
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        //唤醒字后发现channel被关闭了
        panic(plainError("send on closed channel"))
    }
    //可唤醒
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

从上面的方法总结出大概的步骤:

  1. 先锁住整个channel。
  2. 如果发送数据的过程中,先发现等待队列不为空,说明有阻塞等待获取channel中数据的协程,那么直接从recvq接收者队列中取出协程,写入数据,就完成了。
  3. 如果recvq接收者队列为空,那么判断缓冲区是否满了,如果没满,把数据拷贝到缓冲区buf中,结束。如果满了,那么当前协程阻塞,封装成一个sudog放入发送队列sendq发送者队列。
  4. 释放lock

接收

看看接收的逻辑 ret := <-ch,其对应的方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }
    //如果channel为空
    if c == nil {
        if !block {
            return
        }
        // 挂起当前 goroutine
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 同上
    // 非阻塞且没内容可收的情况下要直接返回
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    //加锁
    lock(&c.lock)

    // 当前channel没有数据可读
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // 发送者队列中有发送者协程在等待
    // 直接从该 sudog 中获取数据拷贝到当前 g 即可
    if sg := c.sendq.dequeue(); sg != nil {
        // 这里会先处理协程自己的数据,如果没数据再处理传进来的ep
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    //还有可读数据
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 直接从 buffer 里拷贝数据
        typedmemclr(c.elemtype, qp)
        // 接收索引 +1
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buffer 元素计数 -1
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // 没有可用的发送者,阻塞当前channel
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 进入接收者队列
    c.recvq.enqueue(mysg)
    //等待 Grunning -> Gwaiting
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // 非正常唤醒
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    //唤醒
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

从上面的逻辑总结下步骤:

  1. 锁住整个channel。
  2. 尝试在sendq发送者队列中获取等待的发送者协程。
  3. 如果有等待的发送者协程,而且缓冲区里没数据,取出发送者协程并读取数据,然后唤醒这个协程,结束读取释放锁。
  4. 如果有等待发送者协程,且有缓冲区(缓冲区满了),从缓冲区队列首取数据,再从发送者队列取出一个发送者协程,将发送者协程中的数据存放到buf队列尾,结束读取释放锁。
  5. 如果没有等待的发送者协程,且缓冲区有数据,直接读取缓冲区数据,结束释放锁。
  6. 如果没有等待的发送者协程,且没有缓冲区或者缓冲区为空,将当前发送者协程加入到发送者队列,进入睡眠,等待被写的入发送者协程唤醒,结束读取释放锁。