Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。
Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。
那这里等待的条件是什么呢?等待的条件,可以是某个变量达到了某个阈值或者某个时间点,也可以是一组变量分别都达到了某个阈值,还可以是某个对象的状态满足了特定的条件。总结来讲,等待的条件是一种可以用来计算结果是 true 还是 false 的条件。

Cond的基本用法

标准库中的 Cond 并发原语初始化的时候,需要关联一个 Locker 接口的实例,一般我们使用 Mutex 或者 RWMutex。
Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列。
我们分别看下它的三个方法Broadcast、Signal和Wait方法。

  • Signal

允许调用者Caller唤醒一个等待此Cond的goroutine,如果此时没有等待的goroutine,则无需通知waiter,如果Cond等待队列中有一个或多个等待的goroutine,则需要从等待队列中移除第一个goroutine并把它唤醒。

  • Broadcast

允许调用者Caller唤醒所有等待此Cond的goroutine,如果此时没有等待的goroutine,显然无需通知waiter,如果Cond等待队列中有一个或多个等待的goroutine,则清空所有等待的goroutine,并全部唤醒。

  • Wait

会把调用者Caller放入Cond的等待队列中并阻塞,直到被Signal或者Broadcast方法从等待队列中移除并唤醒。
注意:调用Signal和Broadcast方法,不强求持有c.L的锁,调用Wait方法是必须要持有c.L的锁。

示例

10 个运动员进入赛场之后需要先做拉伸活动活动筋骨 ,在自己的赛道上做好准备;等所有的运动员都准备好之后,裁判员才会打响发令枪。
每个运动员做好准备之后,将 ready 加一,表明自己做好准备了,同时调用 Broadcast 方法通知裁判员。因为裁判员只有一个,所以这里可以直接替换成 Signal 方法调用。
调用 Broadcast 方法的时候,我们并没有请求 c.L 锁,只是在更改等待变量的时候才使用到了锁。
裁判员会等待运动员都准备好。虽然每个运动员准备好之后都唤醒了裁判员,但是裁判员被唤醒之后需要检查等待条件是否满足(运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待。

  1. func main() {
  2. c := sync.NewCond(&sync.Mutex{})
  3. ready := 0
  4. for i := 0; i < 10; i ++{
  5. go func(i int) {
  6. time.Sleep(time.Second * time.Duration(rand.Int63n(10)))
  7. // 加锁更改等待条件
  8. c.L.Lock()
  9. ready++
  10. c.L.Unlock()
  11. fmt.Printf("运动员%d已准备就绪\n",i)
  12. // 广播唤醒等待者,这里可以使用Broadcast和Signal
  13. c.Signal()
  14. }(i)
  15. }
  16. c.L.Lock()
  17. for ready != 10 {
  18. c.Wait()
  19. log.Println("裁判员被唤醒一次")
  20. }
  21. c.L.Unlock()
  22. log.Println("所有运动员都准备就绪,比赛开始。。。")
  23. }

Cond实现原理

  1. type Cond struct {
  2. noCopy noCopy
  3. // 当观察或者修改等待条件的时候需要加锁
  4. L Locker
  5. // 等待队列
  6. notify notifyList
  7. checker copyChecker
  8. }
  9. func NewCond(l Locker) *Cond {
  10. return &Cond{L: l}
  11. }
  12. func (c *Cond) Wait() {
  13. c.checker.check()
  14. // 增加到等待队列中
  15. t := runtime_notifyListAdd(&c.notify)
  16. c.L.Unlock()
  17. // 阻塞休眠直到被唤醒
  18. runtime_notifyListWait(&c.notify, t)
  19. c.L.Lock()
  20. }
  21. func (c *Cond) Signal() {
  22. c.checker.check()
  23. runtime_notifyListNotifyOne(&c.notify)
  24. }
  25. func (c *Cond) Broadcast() {
  26. c.checker.check()
  27. runtime_notifyListNotifyAll(&c.notify
  28. }

runtime_notifyListXXX 是运行时实现的方法,实现了一个等待 / 通知的队列。
copyChecker 是一个辅助结构,可以在运行时检查 Cond 是否被复制使用。
Signal 和 Broadcast 只涉及到 notifyList 数据结构,不涉及到锁。
Wait 把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。在阻塞休眠期间,调用者是不持有锁的,这样能让其他 goroutine 有机会检查或者更新等待变量。