一、并发
Go 语言支持并发,我们只需要通过 go 关键字来开启 goroutine 即可。
goroutine 是轻量级线程,goroutine 的调度是由 Golang 运行时进行管理的。
二、通道
通道(channel)是Go语言中一种非常独特的数据结构。它可用于在不同Goroutine之间传递类型化的数据,并且是并发安全的。
通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。
ch <- v // 把 v 发送到通道 chv := <-ch // 从 ch 接收数据并把值赋给 v
声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:
ch := make(chan int)
:::info 注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。 :::
使用 chose 关闭通道,关闭后的通道不能再向其输入:
close(ch)
通道工作示意图:
通道缓冲区
通道有带缓冲和非缓冲之分。缓冲通道中可以缓存N个数据。我们在初始化一个通道值的时候必须指定这个N。相对的,非缓冲通道不会缓存任何数据。发送方在向通道值发送数据的时候会立即被阻塞,直到有某一个接收方已从该通道值中接收了这条数据。非缓冲的通道值的初始化方法如下:
make(chan int, 0)
通过 make 的第二个参数设置缓冲区缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
:::info 注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。 :::
示例:
// 这里我们定义了一个可以存储整数类型的带缓冲通道缓冲区大小为2
ch := make(chan int, 2)
// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据,而不用立刻需要去同步读取数据
ch <- 1
ch <- 2
// 获取这两个数据
fmt.Println(<-ch)
fmt.Println(<-ch)
如果将通道缓冲区改为1,则执行此段程序会报错:
fatal error: all goroutines are asleep - deadlock!
单向通道
默认情况下,通道都是双向的,即**双向通道**。如果数据只能在通道中单向传输,那么该通道就被称作**单向通道**。我们在初始化一个通道值的时候不能指定它为单向。但是,在编写类型声明的时候,我们却是可以这样做的。例如:
type Receiver <-chan int
类型Receiver代表了一个只可从中接收数据的单向通道类型。这样的通道也被称为接收通道。在关键字chan左边的接收操作符<-形象地表示出了数据的流向。相对应的,如果我们想声明一个发送通道类型,那么应该这样:
type Sender chan<- int
这次<-被放在了chan的右边,并且“箭头”直指“通道”。我们可以把一个双向通道值赋予上述类型的变量,就像这样:
var myChannel = make(chan int, 3)
var sender Sender = myChannel
var receiver Receiver = myChannel
但是,反之则是不行的。像下面这样的代码是通不过编译的:
var myChannel1 chan int = sender
单向通道的主要作用是约束程序对通道值的使用方式。比如,我们调用一个函数时给予它一个发送通道作为参数,以此来约束它只能向该通道发送数据。又比如,一个函数将一个接收通道作为结果返回,以此来约束调用该函数的代码只能从这个通道中接收数据。
举个例子:
type Sender chan<-int
type Receiver <-chan int
func main() {
var myChannel = make(chan int, 1)
var number = 6
func(){
var sender Sender = myChannel
sender <- number
fmt.Println("sent!")
}()
func(){
var receiver Receiver = myChannel
fmt.Println("Received!", <-receiver)
}()
}
通道阻塞
通道阻塞的条件:
- 1.输入Channel的数据量>Channel能接受的量
- Channel输出的数据量>Channel内现有的数据量
不阻塞的情况:输入数据量<=channel缓冲区大小 && 输出数据量<=channel缓冲区大小
var items = make(chan int, 10) // channel缓冲区大小为10
func producer(index int) {
items <- index
}
func consumer(index int) {
fmt.Printf("第 %d 个消费者消费了第 %d 个商品 \n", index, <- items)
}
func main() {
for i := 0; i < 10; i++ {
go producer(i) // 输入10个数据
}
for i := 0; i < 10; i++ {
go consumer(i) // 输出10个数据
}
time.Sleep(time.Second * 10)
}
阻塞的情况1:输入的数据量 > channel缓冲区大小
var items = make(chan int, 10) // channel缓冲区大小为10
func producer(index int) {
items <- index
}
func consumer(index int) {
fmt.Printf("第 %d 个消费者消费了第 %d 个商品 \n", index, <- items)
}
func main() {
for i := 0; i < 100; i++ {
go producer(i) // 输入100个数据
}
for i := 0; i < 10; i++ {
go consumer(i) // 输出10个数据
}
time.Sleep(time.Second * 10)
}
阻塞的情况2:输出的数据量 > channel缓冲区大小
var items = make(chan int, 10) // channel缓冲区大小为10
func producer(index int) {
items <- index
}
func consumer(index int) {
fmt.Printf("第 %d 个消费者消费了第 %d 个商品 \n", index, <- items)
}
func main() {
for i := 0; i < 10; i++ {
go producer(i) // 输入10个数据
}
for i := 0; i < 100; i++ {
go consumer(i) // 输出100个数据
}
time.Sleep(time.Second * 10)
}
三、协程
协程的相关概念
协程的创建
通常执行程序,都是顺序执行的:
func main() {
Loop()
Loop()
}
func Loop() {
for i := 0; i <= 10; i++ {
fmt.Printf("%d ", i)
}
}
上面的程序,无论执行多少次,都会依次输出:
0 1 2 3 4 5 6 7 8 9 10 0 1 2 3 4 5 6 7 8 9 10
在go语言中,使用 go 关键字启动一个协程。
func main() {
go Loop()
go Loop()
time.Sleep(time.Second * 5)
}
func Loop() {
for i := 0; i <= 10; i++ {
time.Sleep(time.Microsecond * 10)
fmt.Printf("%d ", i)
}
}
可以看到,控制台输出将会两个循环依次执行:
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10
而之所以在主程序中添加 time.Sleep ,是因为运行太快的话,协程还未将输出结果发送到控制台,主线程就结束了。
如果设置不同的间隔时间,将会发现两个协程一个快一个慢。
func main() {
go Loop1()
go Loop2()
time.Sleep(time.Second)
}
func Loop1() {
for i := 0; i <= 10; i++ {
time.Sleep(time.Microsecond * 10)
fmt.Printf("%d ", i)
}
}
func Loop2() {
for i := 0; i <= 10; i++ {
time.Sleep(time.Microsecond * 1000)
fmt.Printf("%d ", i)
}
}
输出:
0 0 1 1 2 3 2 3 4 5 6 4 5 7 8 6 9 7 10 8 9 10
与主线程并发执行
主线程实际上也可看做一条协程,比如如下程序,主线程中的循环将于协程中的循环一起执行:
func main() {
go Loop()
Loop()
time.Sleep(time.Second * 10)
}
输出:
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10
设置CPU最大核心数
fmt.Println(runtime.NumCPU()) // 获取CPU最大核心数
runtime.GOMAXPROCS(runtime.NumCPU() - 2) // 设置CPU最大核心数
go Loop1()
go Loop2()
time.Sleep(time.Second)
四、协程通讯
协程与主线程之间的通讯
以下实例通过两个 goroutine 来计算数字之和:
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[len(s)/2:], c) // 将前三位相加
go sum(s[:len(s)/2], c) // 将后三位相加
x, y := <-c, <-c // 从通道 c 中接收数据
fmt.Println(x, y, x+y)
}
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 把 sum 发送到通道 c
}
在协程完成了一次相加操作之后,将相加的结果通过通道传递,再从主线程中取出通道中的数据。
多个协程之间通讯
go中的协程通讯需要用到通道(chan),一个简单的例子如下:
// 定义一个通道
var chanInt chan int = make(chan int, 3)
func main() {
go send()
go receive()
time.Sleep(time.Second * 5)
}
// 向通道发送数据
func send() {
time.Sleep(time.Second)
chanInt <- 1
time.Sleep(time.Second)
chanInt <- 2
time.Sleep(time.Second)
chanInt <- 3
}
// 获取通道中的数据
func receive() {
num := <- chanInt // 读取chanInt中的数据
fmt.Println(num)
num = <- chanInt
fmt.Println(num)
num = <- chanInt
fmt.Println(num)
}
send 和 receive 两个方法本身应该并发执行,由于send方法不停地向 chanInt 通道发送数据,而 receive 方法不停地读取chanInt 通道的数据,即可完成协程间的通讯。
通过select从通道中读取数据
如果存在多个通道,可以使用select语句选择从不同的通道中读取数据。
// 定义多个通道
var chanInt chan int = make(chan int, 3)
var chanSting chan string = make(chan string)
var chanBool chan bool = make(chan bool)
func main() {
go send()
go receive()
time.Sleep(time.Second * 10)
chanSting <- "send over"
chanBool <- true
}
// 向通道发送数据
func send() {
time.Sleep(time.Second)
chanInt <- 1
time.Sleep(time.Second)
chanInt <- 2
time.Sleep(time.Second)
chanInt <- 3
}
// 获取通道中的数据
func receive() {
for {
select {
case num := <- chanInt:
fmt.Println(num)
case str := <- chanSting:
fmt.Println(str)
case <- chanBool:
fmt.Println("运行结束")
default:
fmt.Println("unknown channel")
}
}
}
输出:
1
2
3
send over
运行结束
五、协程同步
协程同步需要用到sync.WaitGroup 工具,相关的方法有:
Add添加协程记录Done移出协程记录Wait同步等待所有记录的携程全部结束 ```go var WG sync.WaitGroup
func main() { read() // 主线程中读取文件 go write() // 启动协程往文件中写数据 WG.Wait() // 等待写入完成
// 后续操作
fmt.Println("All done.")
time.Sleep(time.Second * 60)
}
func read() { for i := 1; i < 3; i++ { WG.Add(1) fmt.Println(“read data: “, i) } }
func write() { for i := 1; i < 3; i++ { time.Sleep(time.Second) fmt.Println(“write data: “, i) WG.Done() } }
打印出的结果为:
```go
read data: 1
read data: 2
write data: 1
write data: 2
All done.
注意Add的次数跟Done的次数得一致,否则会抛出一个死锁的错误:
fatal error: all goroutines are asleep - deadlock!
比如将 read 方法中的 WG.Add(1) 换为 WG.Add(i) ,这样相当于加的delta为3,而只Done了2次。就会报上面的错误。从其源码中就可以看出,每次Done是将其delta减一。
并发的应用
抢票问题
经典的抢票程序:有10张票,有100个人来抢票,每个人最多只能购买1张。
func main() {
wg := sync.WaitGroup{}
tickets := make(chan int, 10)
for i := 0; i < 10; i++ {
go func(i int) {
wg.Add(1)
tickets <- i
wg.Done()
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
wg.Add(1)
select {
case ticket := <- tickets:
fmt.Printf("第%d个人抢到了第%d张票\n", i, ticket)
default:
}
wg.Done()
}(i)
}
wg.Wait()
}
执行后,每次输出会不一样:
第3个人抢到了第6张票
第0个人抢到了第0张票
第1个人抢到了第1张票
第5个人抢到了第2张票
第6个人抢到了第3张票
第2个人抢到了第4张票
第7个人抢到了第5张票
第11个人抢到了第8张票
第8个人抢到了第9张票
第4个人抢到了第7张票
