7.5 sleep与wakeup

调度和锁有助于隐藏一个进程对另一个进程的存在,但到目前为止,我们还没有帮助进程进行有意交互的抽象。为解决这个问题已经发明了许多机制。Xv6使用了一种称为sleepwakeup的方法,它允许一个进程在等待事件时休眠,而另一个进程在事件发生后将其唤醒。睡眠和唤醒通常被称为序列协调(sequence coordination)或条件同步机制(conditional synchronization mechanisms)。

为了说明,让我们考虑一个称为信号量(semaphore)的同步机制,它可以协调生产者和消费者。信号量维护一个计数并提供两个操作。“V”操作(对于生产者)增加计数。“P”操作(对于使用者)等待计数为非零,然后递减并返回。如果只有一个生产者线程和一个消费者线程,并且它们在不同的CPU上执行,并且编译器没有进行过积极的优化,那么此实现将是正确的:

  1. struct semaphore {
  2. struct spinlock lock;
  3. int count;
  4. };
  5. void V(struct semaphore* s) {
  6. acquire(&s->lock);
  7. s->count += 1;
  8. release(&s->lock);
  9. }
  10. void P(struct semaphore* s) {
  11. while (s->count == 0)
  12. ;
  13. acquire(&s->lock);
  14. s->count -= 1;
  15. release(&s->lock);
  16. }

上面的实现代价昂贵。如果生产者很少采取行动,消费者将把大部分时间花在while循环中,希望得到非零计数。消费者的CPU可以找到比通过反复轮询s->count繁忙等待更有成效的工作。要避免繁忙等待,消费者需要一种方法来释放CPU,并且只有在V增加计数后才能恢复。

这是朝着这个方向迈出的一步,尽管我们将看到这是不够的。让我们想象一对调用,sleepwakeup,工作流程如下。Sleep(chan)在任意值chan上睡眠,称为等待通道(wait channel)。Sleep将调用进程置于睡眠状态,释放CPU用于其他工作。Wakeup(chan)唤醒所有在chan上睡眠的进程(如果有),使其sleep调用返回。如果没有进程在chan上等待,则wakeup不执行任何操作。我们可以将信号量实现更改为使用sleepwakeup(更改的行添加了注释):

  1. void V(struct semaphore* s) {
  2. acquire(&s->lock);
  3. s->count += 1;
  4. wakeup(s); // !pay attention
  5. release(&s->lock);
  6. }
  7. void P(struct semaphore* s) {
  8. while (s->count == 0)
  9. sleep(s); // !pay attention
  10. acquire(&s->lock);
  11. s->count -= 1;
  12. release(&s->lock);
  13. }

P现在放弃CPU而不是自旋,这很好。然而,事实证明,使用此接口设计sleepwakeup而不遭受所谓的丢失唤醒(lost wake-up)问题并非易事。假设P在第9行发现s->count==0。当P在第9行和第10行之间时,V在另一个CPU上运行:它将s->count更改为非零,并调用wakeup,这样就不会发现进程处于休眠状态,因此不会执行任何操作。现在P继续在第10行执行:它调用sleep并进入睡眠。这会导致一个问题:P正在休眠,等待调用V,而V已经被调用。除非我们运气好,生产者再次呼叫V,否则消费者将永远等待,即使count为非零。

这个问题的根源是V在错误的时刻运行,违反了P仅在s->count==0时才休眠的不变量。保护不变量的一种不正确的方法是将锁的获取(下面以黄色突出显示)移动到P中,以便其检查count和调用sleep是原子的:

  1. void V(struct semaphore* s) {
  2. acquire(&s->lock);
  3. s->count += 1;
  4. wakeup(s);
  5. release(&s->lock);
  6. }
  7. void P(struct semaphore* s) {
  8. acquire(&s->lock); // !pay attention
  9. while (s->count == 0)
  10. sleep(s);
  11. s->count -= 1;
  12. release(&s->lock);
  13. }

人们可能希望这个版本的P能够避免丢失唤醒,因为锁阻止V在第10行和第11行之间执行。它确实这样做了,但它会导致死锁:P在睡眠时持有锁,因此V将永远阻塞等待锁。

我们将通过更改sleep的接口来修复前面的方案:调用方必须将条件锁(condition lock)传递给sleep,以便在调用进程被标记为asleep并在睡眠通道上等待后sleep可以释放锁。如果有一个并发的V操作,锁将强制它在P将自己置于睡眠状态前一直等待,因此wakeup将找到睡眠的消费者并将其唤醒。一旦消费者再次醒来,sleep会在返回前重新获得锁。我们新的正确的sleep/wakeup方案可用如下(更改以黄色突出显示):

  1. void V(struct semaphore* s) {
  2. acquire(&s->lock);
  3. s->count += 1;
  4. wakeup(s);
  5. release(&s->lock);
  6. }
  7. void P(struct semaphore* s) {
  8. acquire(&s->lock);
  9. while (s->count == 0)
  10. sleep(s, &s->lock); // !pay attention
  11. s->count -= 1;
  12. release(&s->lock);
  13. }

P持有s->lock的事实阻止VP检查s->count和调用sleep之间试图唤醒它。然而请注意,我们需要sleep释放s->lock并使消费者进程进入睡眠状态的操作是原子的。