什么是 CSP
不要通过共享内存来通信,而要通过通信来实现内存共享。
CSP 经常被认为是 Go 在并发编程上成功的关键因素。CSP 全称是 “Communicating Sequential Processes”,这也是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤其是并发编程的代码。
channel 底层的数据结构是什么
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
buf
指向底层循环数组,只有缓冲型的 channel 才有。
sendx
,recvx
均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。sendq
,recvq
分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。waitq
是 sudog
的一个双向链表,而 sudog
实际上是对 goroutine 的一个封装:
type waitq struct {
first *sudog
last *sudog
}
goroutine 是 Go 中实现并发的重要机制,channel 是 goroutine 之间进行通信的重要桥梁。
创建
ch := make(chan int) // channel 必须定义其传递的数据类型
也可以这样
var ch chan int
“发送”和“接收”是 channel 的两个基本操作。
ch <- x // channel 接收数据 x
x <- ch // channel 发送数据并赋值给 x
<- ch // channel 发送数据,忽略接受者
通道缓冲区
通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
形象解释无缓冲和有缓冲的区别:
无缓冲是同步的,例如 make(chan int),就是一个送信人去你家门口送信,你不在家他不走,你一定要接下信,他才会走,无缓冲保证信能到你手上。
有缓冲是异步的,例如 make(chan int, 1),就是一个送信人去你家仍到你家的信箱,转身就走,除非你的信箱满了,他必须等信箱空下来,有缓冲的保证信能进你家的邮箱。
从 channel 接收数据的过程是怎样的
channel buffer
可以通过 make(chan int)
创建channel,此类 channel 称之为非缓冲通道。事实上 channel 可以定义缓冲大小,如下:
chInt := make(chan int) // unbuffered channel 非缓冲通道
chBool := make(chan bool, 0) // unbuffered channel 非缓冲通道
chStr := make(chan string, 2) // bufferd channel 缓冲通道
需要注意的是,程序中必须同时有不同的 goroutine 对非缓冲通道进行发送和接收操作,否则会造成阻塞。
与非缓冲通道不同,缓冲通道可以在同一个 goroutine 内接收容量范围内的数据,即便没有另外的 goroutine 进行读取操作,如下代码可以正常执行:
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
}
close() 函数可以用于关闭 channel,关闭后的 channel 中如果有缓冲数据,依然可以读取,但是无法再发送数据给已经关闭的channel。
func main() {
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
res := 0
for v := range ch {
res += v
}
fmt.Println(res)
}
从一个关闭的 channel 仍然能读出数据吗
先创建了一个有缓冲的 channel,向其发送一个元素,然后关闭此 channel。之后两次尝试从 channel 中读取数据,第一次仍然能正常读出值。第二次返回的 ok 为 false,说明 channel 已关闭,且通道里没有数据。
func main() {
ch := make(chan int, 5)
ch <- 18
close(ch)
x, ok := <-ch
if ok {
fmt.Println("received: ", x)
}
x, ok = <-ch
if !ok {
fmt.Println("channel closed, data invalid.")
}
}
运行结果
received: 18
channel closed, data invalid.
channel应用
定时任务
与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务。
有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定:
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。
定时执行某个任务,也比较简单:
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
}
解耦生产方和消费方
服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {}
无限循环里,从某个 channel 消费工作任务并执行:
func main() {
taskCh := make(chan int, 100)
go worker(taskCh)
// 塞任务
for i := 0; i < 10; i++ {
taskCh <- i
}
// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。
控制并发数
有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。
这里,limit <- 1
放在 func 内部而不是外部,原因是:
如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。
limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。
还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。
如何优雅地关闭 channel
关于 channel 的使用,有几点不方便的地方:
- 在不改变 channel 自身状态的情况下,无法获知一个 channel 是否关闭。
- 关闭一个 closed channel 会导致 panic。所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。
- 向一个 closed channel 发送数据会导致 panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据是很危险的事情。
一个比较粗糙的检查 channel 是否关闭的函数:
```go
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() { c := make(chan T) fmt.Println(IsClosed(c)) // false close(c) fmt.Println(IsClosed(c)) // true }
看一下代码,其实存在很多问题。首先,IsClosed 函数是一个有副作用的函数。每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数,干活就干活,还顺手牵羊!
其次,IsClosed 函数返回的结果仅代表调用那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作,改变了它的这种状态。例如,IsClosed 函数返回 true,但这时有另一个 goroutine 关闭了 channel,而你还拿着这个过时的 “channel 未关闭”的信息,向其发送数据,就会导致 panic 的发生。当然,一个 channel 不会被重复关闭两次,如果 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。
有一条广泛流传的关闭 channel 的原则:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。
比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样没法确定其他 sender 的情况,这时也不能贸然关闭 channel。<br />但是上面所说的并不是最本质的,最本质的原则就只有一条:
> don't close (or send values to) closed channels.
有两个不那么优雅地关闭 channel 的方法:
1. 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。<br />
1. 使用 sync.Once 来保证只关闭一次。<br />
到底应该如何优雅地关闭 channel?
根据 sender 和 receiver 的个数,分下面几种情况:
- 一个 sender,一个 receiver<br />
- 一个 sender, M 个 receiver<br />
- N 个 sender,一个 reciver<br />
- N 个 sender, M 个 receiver<br />
对于 1,2,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。重点关注第 3,4 种情况。
第 3 种情形下,优雅关闭 channel 的方法是:the only receiver says "please stop sending more" by closing an additional signal channel。<br />解决方案就是增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止接收数据。代码如下:
```go
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func() {
for value := range dataCh {
if value == Max-1 {
fmt.Println("send stop signal to senders.")
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <- time.After(time.Hour):
}
}
这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。
需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。
最后一种情况,优雅关闭 channel 的方法是:any one of them says “let’s end the game” by notifying a moderator to close an additional signal channel。
和第 3 种情况不同,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// It must be a buffered channel.
toStop := make(chan string, 1)
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <- time.After(time.Hour):
}
}
代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。
这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。
如果,我们把 toStop 的容量声明成 Num(senders) + Num(receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。
可以看到,这里同样没有真正关闭 dataCh,原样同第 3 种情况。
以上,就是最基本的一些情形,但已经能覆盖几乎所有的情况及其变种了。只要记住:
don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
以及更本质的原则:
don’t close (or send values to) closed channels.
select 语句
select 专门用于通道发送和接收操作,看起来和 switch 很相似,但是进行选择和判断的方法完全不同。
在下述例子中,通过 select 的使用,保证了 worker 中的事务可以执行完毕后才退出 main 函数
func strWorker(ch chan string) {
time.Sleep(1 * time.Second)
fmt.Println("do something with strWorker...")
ch <- "str"
}
func intWorker(ch chan int) {
time.Sleep(2 * time.Second)
fmt.Println("do something with intWorker...")
ch <- 1
}
func main() {
chStr := make(chan string)
chInt := make(chan int)
go strWorker(chStr)
go intWorker(chInt)
for i := 0; i < 2; i++ {
select {
case <-chStr:
fmt.Println("get value from strWorker")
case <-chInt:
fmt.Println("get value from intWorker")
}
}
}
通过 channel 实现同步机制
一个经典的例子如下,main 函数中起了一个 goroutine,通过非缓冲队列的使用,能够保证在 goroutine 执行结束之前 main 函数不会提前退出。
func worker(done chan bool){
fmt.Println("start working...")
done <- true
fmt.Println("end working...")
}
func main() {
done := make(chan bool, 1)
go worker(done)
<- done
}