概述
goroutine 遵循不要通过共享内存来通信,而应该通过通信来共享内存(Don’t communicate by sharing memory; share memory by communicating)。而channel是对后半句的完美实现。我们可以使用channel在两个或多个goroutine之间传递消息。在声明一个通道变量的时候,必须确定通道类型的传递的元素类型,通过channel传递对象的过程和调用函数时的参数传递类型必须一致。
要想理解 channel 要先知道 CSP 模型。CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,由 Tony Hoare 于 1977 年提出。简单来说,CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体。Go 语言实现了 CSP 部分理论,goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。
channel本质上是一个队列,是一个容器。
定义&初始化
channel的声明形式如下,与一般的变量声明不同的地方仅仅是在类型之前加了chan关键字,channel属于引用类型,因此调用者和被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。
var 变量名称 chan 通道元素类型
初始化一个chanel使用内置的函数make(),在创建通道的时候必须确定该通道元素类型,下面的代码中初始化了一个chan int
类型的通道,只能往ch这个通道里发送int
类型的数据,当然接收也只能是int
类型的数据。而chan string代表了一个元素类型为string的通道类型。
ch:=make(chan int)
make函数还可以可接受两个参数。第一个参数是代表了将被初始化的值的类型的字面量(比如chan int),而第二个参数是可选参数,它表示该通道的容量。尽管这个参数是int类型的,但是它代表了该通道最多可以暂存的数据的个数,是不能小于0的。
make(chan Type) //等价于make(chan Type, 0)
make(chan Type, capacity)
若我们想要初始化一个长度为3且元素类型为int的通道值,则需要这样写:
make(chan int, 3)
一个通道类似于一个先进先出(FIFO)的队列,即:越早被放入(或称发送)到通道值的数据会越先被取出(或称接收)在channel的用法中。
chan数据发送与接收都是用到左尖括号与减号组合(<-),一个左尖括号紧接着一个减号的组合形似一个箭头,箭头的方向代表了元素值的传输方向。
channel <- v //发送值到channel
<- channel //取出channel里的一个值
ch <- 2 //发送数值2给这个通道
x:=<-ch //从通道里读取值,并把读取的值赋值给x变量
x, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空
<-ch //从通道里读取值,然后忽略
发送数据
发送操作<-
在通道的后面,向channel发送(写入)数据通常会导致程序阻塞,直到有其他goroutine从这个channel中读取数据。下面的程序会出现死锁:
func foo(in chan int) {
fmt.Println(<-in)
}
func main() {
out := make(chan int)
out <- 1
go foo(out)
}
接收数据
value := <- ch
value, ok := <-ch //功能同上,同时检查通道是否已关闭或者是否为空
因此需要特别注意的是:channel接收和发送数据都是阻塞的,当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它 Go 协程从信道读取到数据,才会解除阻塞。与此类似,当读取信道的数据时,如果没有其它的协程把数据写入到这个信道,那么读取过程就会一直阻塞着。
关于接受chan中可选的第二个布尔值字段的判断逻辑
- true
:读到通道数据,不确定是否关闭,可能channel还有保存的数据,但channel已关闭。
- false
:通道关闭,无数据读到。
关闭通道
内置的close
函数关闭。
close(ch)
eg:
package main
import "fmt"
func main() {
c := make(chan int ,3)
for i:=1;i<=cap(c);i++{
c<-i
}
close(c)
for v:=range c{
fmt.Print(v)
}
}
打印结果
123
上面的for range 语句也可以等于下面的效果一样
for v, ok := <- c; ok ; v, ok = <- c {
// do something with v
}
注意
1通道一旦关闭,再对它进行发送操作,就会引发 panic。
2如果我们试图关闭一个已经关闭了的通道,会引发 panic。
当我们把接收表达式的结果同时赋给两个变量时,第二个变量的类型就是一定bool类型。它的值如果为false就说明通道已经关闭,并且再没有元素值可取了。注意,如果通道关闭时,里面还有元素值未被取出,那么接收表达式的第一个结果,仍会是通道中的某一个元素值,而第二个结果值一定会是true。因此,通过接收表达式的第二个结果值,来判断通道是否关闭是可能有延时的。由于通道的收发操作有上述特性,所以除非有特殊的保障措施,我们千万不要让接收方关闭通道,而应当让发送方做这件事。
eg:
用v,ok := <-ch
+ select
操作判断channel是否关闭
package main
import "fmt"
func sendMsg(c chan int) {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}
func main() {
c := make(chan int)
go sendMsg(c)
for {
select {
case v,ok:=<-c:
if !ok{// 通道关闭
return
}else {
fmt.Print(v)
}
}
}
}
执行结果,
0123456789
通道类型
当容量为0时,我们可以称通道为非缓冲通道,也就是不带缓冲的通道。而当容量大于0时,我们可以称为缓冲通道
无缓冲通道
无缓冲的通道(unbuffered channel) 的读取和写入操作在各自的goroutine内部都是阻塞的。意思就是,如果管道满了,一个对channel放入数据的操作就会阻塞,直到有某个routine从channel中取出数据,这个放入数据的操作才会执行。相反同理,如果管道是空的,一个从channel取出数据的操作就会阻塞,直到某个routine向这个channel中放入数据,这个取出数据的操作才会执行
主routine要向channel中放入一个数据,但是因为channel没有缓冲,相当于channel一直都是满的,所以这里会发生阻塞,主routine在这里一阻塞,造成死锁!
因此使用无缓冲通道要求发送的gorountine和接收的goroutine同时准备就绪,分别负责接收和发送,否则会出现先执行发送或接收操作的 goroutine 阻塞等待。
func main() {
ch := make(chan int)
ch <- 2
fmt.Println("发送成功")
}
上面的程序执行结果:
fatal error: all goroutines are asleep - deadlock!
对于无缓冲的channel,放入操作和取出操作不能再同一个routine中,而且应该是先确保有某个routine对它执行取出操作,然后才能在另一个routine中执行放入操作
package main
import "fmt"
func receiveData(c chan int){
r := <-c
fmt.Printf("接收数据:%d\n",r)
}
func main() {
ch := make(chan int)
go receiveData(ch) // 通过goroutine从通道接收值
ch <- 2
fmt.Println("发送成功")
}
使用无缓冲通道进行通信将导致发送和接收的goroutine
同步化。因此,无缓冲通道也被称为同步通道
。
或者如下eg:goroutine同步,使用时间休眠方式在工程代码是不可取的
package main
import (
"fmt"
"time"
)
func hello(){
fmt.Println("hello goroutine")
}
func main() {
go hello()
time.Sleep(time.Second)
fmt.Println("main goroutine")
}
使用channel 数据同步
package main
import (
"fmt"
)
func hello(c chan<- bool) {
fmt.Println("hello goroutine")
c <- true
}
func main() {
c := make(chan bool)
go hello(c)
<-c
fmt.Println("main goroutine")
}
执行结果:
hello goroutine
main goroutine
缓冲通道
缓冲通道,其实是一个队列,这个队列的最大容量就是我们使用make
函数创建通道时,通过第二个参数指定
package main
import "fmt"
func main() {
c := make(chan int, 3)
c <- 1
fmt.Println("send data one")
c <- 2
fmt.Println("send data two")
c <- 3
fmt.Println("send data three")
c <- 4
fmt.Println("send data four")
}
在执行ch <- 4这一行操作就会发生阻塞,因为前三行的放入数据的操作已经把channel填满了。造成程序死锁,上面程序执行结果如下所示:
$ go run bufferchan.go
send data one
send data two
send data three
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
对一个没有数据的缓冲队列获取数据 也会造成死锁
package main
import "fmt"
func main() {
c := make(chan int, 3)
fmt.Println(<-c)
}
上面程序执行结果
fatal error: all goroutines are asleep - deadlock!
设置成锁形式 ,只是为了演示,不建议使用
package main
import (
"fmt"
"sync"
)
var (
c = make(chan struct{}, 1) //容量为1的缓冲信道
sum int
)
func increment(x int, wg *sync.WaitGroup) {
c <- struct{}{}
sum += x
wg.Done()
<-c
}
func main() {
var wg sync.WaitGroup
v := 100
wg.Add(v)
for i := 1; i <= v; i++ {
go increment(i, &wg)
}
wg.Wait()
fmt.Println(fmt.Sprintf("1-%d的和是:%d", v, sum))
}
单向通道
限制一个通道只可以接收,但是不能发送;或者限制一个通道只能发送,但是不能接收,这种通道我们称为单向通道。定义单向通道也很简单,只需要在定义的时候,带上<-
即可。
var send chan<- int //只能发送
var receive <-chan int //只能接收
以<-操作符在chan关键字的前后位置代表不同含义的单向通道,在后面是只能发送,对应发送操作;在前面是只能接收,对应接收操作。
单向通道应用于函数或者方法的参数比较多,比如
func counter(out chan<- int) {
}
例子这样的,只能进行发送操作,防止误操作,使用了接收操作,如果使用了接收操作,在编译的时候就会报错的。
eg:
package main
import "fmt"
func sendData(c chan<- int) {
c <- 1
}
func readData(c <-chan int) {
v := <-c
fmt.Println(v)
}
func main() {
c := make(chan int)
go sendData(c)
readData(c)
}
执行结果:
1
遍历channel
for range 循环用于在一个信道关闭之前,从信道接收数据。当管道关闭的时候是不需要判断管道的状态
package main
import "fmt"
func SendData(c chan int) {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}
func main() {
c := make(chan int)
go SendData(c)
for v :=range c{
fmt.Print(v)
}
}
执行结果
0123456789
顺序输出
chan嵌套chan,chan中的值也是一个chan,在执行的时候按照先后顺序添加
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func work(idx int,wg *sync.WaitGroup)chan int{
c := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(2))*time.Second)
c <-idx
wg.Done()
}()
return c
}
func main() {
rand.Seed(time.Now().UnixNano())
reqList :=[]int{1,2,3,4,5,6}
res :=make(chan chan int,len(reqList))
var wg sync.WaitGroup
for _,x:=range reqList{
wg.Add(1)
res<-work(x,&wg)
}
go func(res <-chan chan int) {
for v:=range res{
fmt.Println(<-v)
}
}(res)
close(res)
wg.Wait()
}
链式传递
package main
import (
"fmt"
)
func producer(nums...int)<-chan int{
c :=make(chan int)
go func() {
defer close(c)
for _,n:=range nums{
c<-n
}
}()
return c
}
func work(inch <-chan int)<-chan int{
c :=make(chan int)
go func() {
defer close(c)
for n :=range inch{
c<-n*2
}
}()
return c
}
func main() {
p :=producer(1,2,3,4,5)
res :=work(p)
for v:=range res{
fmt.Println(v)
}
}