1 什么是协程

协程是与其他函数或方法一起并发运行的函数或方法。Go协程可以看作是轻量级线程,与线程相比,创建一个 Go 协程的成本很小。

1.1 协程与线程的对比

  • 协程的成本极低。堆栈大小只有若干KB(2或4KB),并且可以根据应用的需求进行增减。而线程必须指定堆栈的大小,其堆栈是固定不变的(一般默认2MB)。固定了栈的大小导致两个问题:
    • 一是对于很多只需要很小的栈空间的线程来说是一个巨大的浪费
    • 二是对于少数需要巨大栈空间的线程来说又面临栈溢出的风险
  • 协程会复用(Multiplex)数量更少的 OS 线程。即使程序有数以千计的协程,也可能只有一个线程
    • 如果该线程中的某一Go协程发生了阻塞(比如说等待用户输入),那么系统会再创建一个OS线程,并把其余协程都移动到这个新的OS线程。所有这一切都在运行时进行,作为程序员,我们没有直接面临这些复杂的细节,而是有一个简洁的 API 来处理并发。
    • Go内置半抢占式的协作调度器,在用户态进行协程的调度。
  • Go协程使用信道(Channel)来进行通信。信道用于防止多个协程访问共享内存时发生竞态条件(Race Condition)。信道可以看作是协程之间通信的管道。

    1.2 启动协程

    调用函数或者方法时,在前面加上关键字go,可以让一个新的Go协程并发地运行。需要注意:

  • 启动一个新的协程时,协程的调用会立即返回。程序控制不会去等待Go协程执行完毕。在调用Go协程之后,程序控制会立即返回到代码的下一行,忽略该协程的任何返回值。

  • 如果希望运行其他Go协程,Go 主协程必须继续运行着。如果Go主协程终止,则程序终止,于是其他协程也不会继续运行

使用示例如下:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func numbers() {
  7. for i := 1; i <= 5; i++ {
  8. time.Sleep(250 * time.Millisecond)
  9. fmt.Printf("%d ", i)
  10. }
  11. }
  12. func alphabets() {
  13. for i := 'a'; i <= 'e'; i++ {
  14. time.Sleep(400 * time.Millisecond)
  15. fmt.Printf("%c ", i)
  16. }
  17. }
  18. func main() {
  19. go numbers() //启动协程
  20. go alphabets() //启动协程
  21. //等待子协程允许完毕,后面介绍更高级的信道方式,这里就简单的等待
  22. time.Sleep(3000 * time.Millisecond)
  23. fmt.Println("main terminated")
  24. }
  25. //输出:1 a 2 3 b 4 c 5 d e main terminated

下图可以清晰的看到三个协程的运行关系:
4 协程与并发 - 图1

2 信道

2.1 信道的创建

信道可以想像成协程之间通信的管道。如同管道中的水会从一端流到另一端,通过使用信道,数据也可以从一端发送,在另一端接收。
所有信道都关联了一个类型。信道只能运输这种类型的数据,而运输其他类型的数据都是非法的chan T表示T类型的信道,使用make函数进行初始化。例如:

a := make(chan int)

2.2 信道的收发

信道旁的箭头方向指定了是发送数据还是接收数据

data := <- a // 读取信道a,保存值到data
a <- data // 写入信道a

发送与接收默认是阻塞的。当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它协程从信道读取到数据,才会解除阻塞。与此类似,当读取信道的数据时,如果没有其它的协程把数据写入到这个信道,那么读取过程就会一直阻塞着。**信道的这种特性能够帮助Go协程之间进行高效的通信,不需要用到其他编程语言常见的显式锁或条件变量。

借助阻塞这个特性,我们可以用一个读操作等待子协程结束,而不是使用sleep:

func hello(done chan bool) {  
    fmt.Println("Hello world goroutine")
    done <- true//子协程结束,写入数据
}
func main() {  
    done := make(chan bool)//创建bool信道
    go hello(done)
    <-done //读操作,一直阻塞直到子协程结束
    fmt.Println("main function")
}

2.3 小心死锁

使用信道需要考虑的一个重点是死锁。

  • 当Go协程给一个信道发送数据时,照理说会有其他Go协程来接收数据。如果没有的话,程序就会在运行时触发 panic,形成死锁。
  • 当有Go协程等着从一个信道接收数据时,我们期望其他的Go协程会向该信道写入数据,要不然程序就会触发 panic。

    2.4 关闭信道和range遍历

    数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭。
    func producer(chnl chan int) {  
      for i := 0; i < 10; i++ {
          chnl <- i
      }
      close(chnl)//关闭信道
    }
    func main() {  
      ch := make(chan int)
      go producer(ch)
      for {
          v, ok := <-ch //判断信道是否关闭
          if ok == false {
              break
          }
          fmt.Println("Received ", v, ok)
      }
    }
    
    上面的语句里,如果成功接收信道所发送的数据,那么 ok 等于 true。而如果 ok 等于 false,说明我们试图读取一个关闭的通道。从关闭的信道读取到的值会是该信道类型的零值。

或者我们可以用range遍历信道,代替上面示例中的for循环:

func main() {  
    ch := make(chan int)
    go producer(ch)
    for v := range ch {//range可以在信道关闭后自动结束,不用显示的判断
        fmt.Println("Received ",v)
    }
}

2.5 缓冲信道

上面无缓冲信道的发送和接收过程是阻塞的,读写操作会一直阻塞。
我们还可以创建一个有缓冲的信道(Buffered Channel)。只在缓冲已满的情况,才会阻塞向缓冲信道发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据

通过向 make 函数时再传递一个表示容量的参数(指定缓冲的大小,sizeof(type) * capacity),就可以创建缓冲信道。

ch := make(chan type, capacity)//capacity 应该大于 0。无缓冲信道的容量默认为 0

缓冲区容量和长度的区别:

  • 容量是指信道可以存储的值的数量(总的大小)。我们在使用make函数创建缓冲信道的时候会指定容量大小。
  • 长度是指信道中当前排队的元素个数(当前保存的大小)。

使用示例如下:

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i //写入两个值之后缓冲区满,阻塞等待缓冲区空闲
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)//缓冲大小为2
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)
    }
}

2.6 select

select 语句用于在多个发送/接收信道操作中进行选择。该语法与 switch 类似,所不同的是,这里的每个 case 语句都是信道操作。

  • select 语句会一直阻塞,直到发送/接收操作准备就绪。如果有多个信道操作准备完毕,select 会随机地选取其中之一执行。
  • 在没有case准备就绪时,可以执行select语句中的默认情况(Default Case),这通常用于防止select语句一直阻塞,没有信道可用时会立刻返回。

使用示例:

func server1(ch chan string) {  
    time.Sleep(6 * time.Second)
    ch <- "from server1"
}
func server2(ch chan string) {  
    time.Sleep(3 * time.Second)
    ch <- "from server2"

}
func main() {  
    output1 := make(chan string)
    output2 := make(chan string)
    go server1(output1)
    go server2(output2)
    select {//一直阻塞,直到某个信道可用
    case s1 := <-output1:
        fmt.Println(s1)
    case s2 := <-output2:
        fmt.Println(s2)
    }
}

3 WaitGroup

3.1 WaitGroup的使用

WaitGroup可以用来等待一批go协程执行结束,类似于C++的std::Thread::join。使用示例如下:

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {//waitgroup参数指针,因为要修改内部的值,不能是值传递
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()//子协程结束,调用done减少计数器
}

func main() {
    no := 3
    var wg sync.WaitGroup //定义waitgroup
    for i := 0; i < no; i++ {
        wg.Add(1)//+1,增加计数器
        go process(i, &wg)
    }
    wg.Wait()//等待协程结束,计数器为0
    fmt.Println("All go routines finished executing")
}

3.2 实现一个协程池

基本思路:

  • 创建一个Go协程池,监听一个等待作业分配的输入型缓冲信道
  • 将作业添加到该输入型缓冲信道中
  • 作业完成后,再将结果写入一个输出型缓冲信道
  • 从输出型缓冲信道读取并打印结果

代码和解析如下:

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)
//定义任务和结果两个结构体
type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job //包含job结构体
    sumofdigits int
}
//创建任务和结果的两个缓冲信道
var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)
//计算一个整数每一位相加的和
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
//遍历job信道,计算后每个job的数字并将结果写入reslut信道
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
//初始化waitgroup,并开启多个协程开始计算
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()//等待协程池中协程都执行完毕
    close(results)
}
//创建job,添加到job信道中
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
//打印所有计算结果
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)//这里会一直读取result信道,直到信道关闭

    noOfWorkers := 10
    createWorkerPool(noOfWorkers)

    <-done//等待result打印完毕
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

4 协程的同步手段

4.1 互斥与Mutex

Mutex用于提供一种加锁机制(Locking Mechanism),可确保在某时刻只有一个协程在临界区运行,以防止出现竞态条件。
Mutex可以在sync包内找到。Mutex 定义了两个方法:LockUnlock。所有在 Lock 和 Unlock 之间的代码,都只能由一个Go协程执行,于是就可以避免竞态条件。

mutex.Lock()
x = x + 1  
mutex.Unlock()

使用示例:

//互斥锁保证线程同步
package main

import (
    "fmt"
    "sync"
)

var total struct { //全局的结构体变量
    sync.Mutex //互斥锁
    value      int
}

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i <= 100; i++ {
        total.Lock() //加锁
        total.value++
        total.Unlock() //解锁
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go worker(&wg)
    go worker(&wg)
    wg.Wait()

    fmt.Println(total.value)
}

4.2 原子操作

用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的sync/atomic包对原子操作提供了丰富的支持:
sync/atomic包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value原子对象提供了LoadStore两个原子方法,分别用于加载和保存数据,返回值和参数都是interface{}类型。

//原子操作实现线程同步
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var total uint64

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    var i uint64
    for i = 0; i <= 100; i++ {
        atomic.AddUint64(&total, 1) //原子操作,线程安全的
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go worker(&wg)
    go worker(&wg)
    wg.Wait()

    fmt.Println(atomic.LoadUint64(&total)) //读取值
}

4.3 阻塞信道

上面的示例我们也可以用信道来实现互斥(还是推荐实际中使用Mutex),使用大小为1的缓冲信道可以导致可写阻塞,这样其他协程就不能继续执行,只能等待阻塞结束。
在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而Go语言却另辟蹊径,它将共享的值通过Channel传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个Goroutine能够拥有该资源。

//使用channel实现线程同步
package main

import (
    "fmt"
    "sync"
)

var total uint64

func worker(wg *sync.WaitGroup, ch chan bool) {
    defer wg.Done()
    var i uint64
    for i = 0; i <= 100; i++ {
        ch <- true //信道被写入值,其他协程到这一句也想写入值,就会阻塞等待信道可写
        total++
        <-ch //本协程读取信道,信道空了,其他协程可以写入了
    }
}

func main() {
    ch := make(chan bool, 1) // 创建大小为1的chan
    var wg sync.WaitGroup
    wg.Add(2)
    go worker(&wg, ch)
    go worker(&wg, ch)
    wg.Wait()

    fmt.Println(total) //读取值
}

不仅如此,我们还可以通过设置chan的缓存大小来控制最大并发数。

5 常见并发模型

5.1 生产者消费者模型

通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题。

Go可以使用带缓冲区的chan作为成功队列,由不同的协程负责接入和读取,很简单的实现生产者消费者模型:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
    for i := 0; ; i++ {
        out <- i * factor //往信道缓冲区内写入数据
    }
}

// 消费者
func Consumer(in <-chan int) {
    for v := range in {
        fmt.Println(v) //从信道读取数据打印
    }
}
func main() {
    ch := make(chan int, 64) // 成果队列,大小为64
    //开启了2个Producer生产流水线,分别用于生成3和5的倍数的序列
    //然后开启1个Consumer消费者线程,打印获取的结果
    go Producer(3, ch) // 生成 3 的倍数的序列
    go Producer(5, ch) // 生成 5 的倍数的序列
    go Consumer(ch)    // 消费 生成的队列

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
}

5.2 发布订阅模型

发布订阅(publish/subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则成为订阅者(subscriber),生产者和消费者是M:N的关系。在传统生产者和消费者模型中,是将消息发送到一个队列中,而发布订阅模型则是将消息发布给一个主题。
在发布订阅模型中,每条消息都会传送给多个订阅者发布者通常不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加,是一种松散的耦合关系,这使得系统的复杂性可以随时间的推移而增长。在现实生活中,像天气预报之类的应用就可以应用这个并发模式。

示例代码如下:

// 发布订阅模型实现
package pubsub

import (
    "sync"
    "time"
)

type (
    subscriber chan interface{}         // 订阅者为一个管道
    topicFunc  func(v interface{}) bool // 主题为一个过滤器
)

// 发布者对象
type Publisher struct {
    m           sync.RWMutex             // 读写锁,保护订阅者map
    buffer      int                      // 订阅队列的缓存大小
    timeout     time.Duration            // 发布超时时间
    subscribers map[subscriber]topicFunc // 订阅者信息
}

// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    return &Publisher{ //返回对象指针
        buffer:      buffer,
        timeout:     publishTimeout,
        subscribers: make(map[subscriber]topicFunc), //创建订阅者map
    }
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
    return p.SubscribeTopic(nil)
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    ch := make(chan interface{}, p.buffer)
    p.m.Lock()
    p.subscribers[ch] = topic
    p.m.Unlock()
    return ch
}

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
    p.m.Lock()
    defer p.m.Unlock() //函数退出时解锁

    delete(p.subscribers, sub) //根据key删除map中一项
    close(sub)                 //关闭chan
}

// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
    p.m.RLock()
    defer p.m.RUnlock()

    var wg sync.WaitGroup
    for sub, topic := range p.subscribers {
        wg.Add(1)
        go p.sendTopic(sub, topic, v, &wg)
    }
    wg.Wait()
}

// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    for sub := range p.subscribers {
        delete(p.subscribers, sub)
        close(sub)
    }
}

// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
    defer wg.Done()
    if topic != nil && !topic(v) {
        return
    }
    //监听sub chan写入成功或超时
    select {
    case sub <- v:
    case <-time.After(p.timeout):
    }
}

我们可以选择订阅全部,或指定自定义函数只订阅符合要求的消息,返回chan对象:

all := p.Subscribe() //添加一个订阅者,订阅全部消息
//添加一个订阅者,只关系有golang字符串的内容
golang := p.SubscribeTopic(func(v interface{}) bool {
    if s, ok := v.(string); ok {
        return strings.Contains(s, "golang")
    }
    return false
})