1. 正如我们现在所知,需要锁和条件变量来解决广泛的相关且有趣的并发问题。多年前最早意识到这一点的人之一是 **Edsger Dijkstra**(虽然很难知道确切的历史 [GR92]),他以其著名的图论中的“最短路径”算法 [D59] 而闻名,这是一个早期的论战关于名为 Goto Statements "Considered Harmful" [D68a](多棒的标题!)的结构化编程,以及在我们将在这里研究的情况下,引入称为**信号量(semaphore)** [D68b, D72] 的同步原语。事实上,Dijkstra 及其同事发明了信号量作为与同步相关的所有事物的单一原语;正如您将看到的,**可以将信号量用作锁和条件变量**。

关键的问题:如何使用信号量(HOW TO USE SEMAPHORES) 我们如何使用信号量来代替锁和条件变量?信号量的定义是什么?什么是二元信号量(binary semaphore)?用锁和条件变量构建信号量是否简单?用信号量构造锁和条件变量呢?

31.1 信号量:定义 Semaphores: A Definition

信号量是一个具有整数值的对象,可以用两个例程来操作;在POSIX标准中,这些例程是sem_wait()和sem_post()。因为信号量的初始值决定了它的行为,所以在调用任何其他例程与信号量交互之前,我们必须首先将它初始化为某个值,如图31.1中的代码所示。
在历史上,sem_wait()被Dijkstra称为P(),而sem_post()被称为V()。这些缩写形式来自荷兰语;有趣的是,随着时间的推移,它们来源于哪些荷兰语词汇已经发生了变化。最初,P()来自“passering”(通过),V()来自“vrijgave”(释放);后来,Dijkstra写道,P()来自“prolaag”,是“probeer”(荷兰语是“尝试”)和“verlaag”(“减少”)的缩写,V()来自“verhoog”,意思是“增加”。有时候,人们会把他们叫上叫下。使用荷兰语来打动你的朋友,或者迷惑他们,或者两者兼得。详情请参见https://news.ycombinator.com/item?id=8761539)。

  1. #include <semaphore.h>
  2. sem_t s;
  3. sem_init(&s, 0, 1);

Figure 31.1: Initializing A Semaphore
在图中,我们声明了一个信号量 s 并通过传入 1 作为第三个参数将其初始化为值 1。在我们将看到的所有示例中,sem_init() 的第二个参数将被设置为 0;这表明信号量在同一进程中的线程之间共享。有关信号量其他用法的详细信息,请参阅手册页(即,如何使用它们来同步不同进程之间的访问),这需要为第二个参数设置不同的值。
在初始化信号量之后,我们可以调用两个函数中的一个来与它交互,sem_wait()或sem_post()。这两个函数的行为如图31.2所示。

  1. int sem_wait(sem_t *s) {
  2. decrement the value of semaphore s by one
  3. wait if value of semaphore s is negative
  4. }
  5. int sem_post(sem_t *s) {
  6. increment the value of semaphore s by one
  7. if there are one or more threads waiting, wake one
  8. }

Figure 31.2: Semaphore: Definitions Of Wait And Post
现在,我们不关心这些例程的实现,这显然需要一些注意;由于有多个线程调用sem_wait()和sem_post(),显然需要管理这些临界区域。现在我们将重点讨论如何使用这些原语;稍后我们将讨论它们是如何构建的。
我们应该在这里讨论接口的一些重要方面。首先,我们可以看到 sem_wait() 要么立即返回(因为当我们调用 sem_wait() 时信号量的值是 1 或更高),要么它会导致调用者暂停执行,等待后续post。当然,多个调用线程可能会调用 sem_wait(),从而全部排队等待被唤醒
其次,我们可以看到 sem_post() 不像 sem_wait() 那样等待某些特定条件成立。相反,它只是增加信号量的值,然后,如果有一个线程等待唤醒,则唤醒其中一个线程。
第三,当信号量为负数时,它的值等于等待线程的数量[D68b]。虽然信号量的用户通常看不到这个值,但这个不变式值得了解,也许可以帮助您记住信号量是如何工作的
不要(还)担心信号量中可能出现的竞争条件(race conditions);假设他们所做的操作是原子执行的。我们很快就会使用锁和条件变量来实现这一点。

31.2 二元信号量(锁) Binary Semaphores (Locks)

我们现在准备使用信号量。我们的第一个用途是我们已经熟悉的用途:使用信号量作为锁。代码片段见图 31.3;在其中,您会看到我们只是用 sem_wait()/sem_post() 包围了感兴趣的临界区域。然而,完成这项工作的关键是信号量 m 的初始值(在图中初始化为 X)。X应该是什么?

  1. sem_t m;
  2. sem_init(&m, 0, X); // initialize to X; what should X be?
  3. sem_wait(&m);
  4. // critical section here
  5. sem_post(&m);

Figure 31.3: A Binary Semaphore (That Is, A Lock)
... (试着在继续之前考虑一下)...
回头看看上面sem_wait()和sem_post()例程的定义,我们可以看到初始值应该是1。
为了更清楚地说明这一点,让我们想象一个有两个线程的场景。第一个线程(线程0)调用sem_wait();它将首先减少信号量的值,将其更改为0。然后,只有当值不大于等于0时,它才会等待。因为这个值是0,所以sem_wait()将直接返回,调用线程将继续执行;线程0现在可以自由进入临界区。如果在线程0处于临界区时没有其他线程尝试获取锁,当它调用sem_post()时,它将简单地将信号量的值恢复为1(而不会唤醒正在等待的线程,因为没有线程等待)。图31.4显示了这个场景的轨迹。
image.png
更有趣的情况是,线程0“持有锁”(即,它已经调用了sem_wait(),但还没有调用sem_post()),而另一个线程(线程1)试图通过调用sem_wait()进入临界区。在这种情况下,线程1将把信号量的值减少到-1,从而等待(使自己进入睡眠状态并放弃处理器)。当线程0再次运行时,它最终将调用sem_post(),将信号量的值加回0,然后唤醒等待的线程(线程1),然后该线程将能够为自己获取锁。当线程1结束时,它将再次增加信号量的值,再次将其恢复为1。
图31.5显示了这个示例的一个轨迹。除了线程操作外,该图还显示了每个线程的调度状态(scheduler state):Run(线程正在运行)、Ready(即,可运行但未运行)和Sleep(线程被阻塞)。注意,当线程1试图获取已经持有的锁时,它就进入了睡眠状态;只有当线程0再次运行时,线程1才会被唤醒并可能再次运行。
image.png
如果您想完成自己的示例,请尝试一个场景,其中多个线程排队等待一个锁。在这样的跟踪过程中,信号量的值是多少?
因此,我们可以将信号量用作锁。因为锁只有两种状态(持有和未持有),所以我们有时将作为锁的信号量称为二元信号量(binary semaphore)。注意,如果您仅以这种二元方式使用信号量,那么它可以用比我们在这里介绍的广义信号量更简单的方式实现。

31.3 信号量用作排序 Semaphores For Ordering

信号量对于并发程序中的事件排序也很有用。例如,线程可能希望等待列表变为非空,以便从列表中删除一个元素。在这种使用模式中,我们经常发现一个线程等待某事发生,另一个线程使某事发生,然后发出信号,表明它已经发生,从而唤醒正在等待的线程。因此,我们使用信号量作为排序原语(ordering primitive)(类似于前面条件变量(condition variables)的使用)。
下面是一个简单的例子。假设一个线程创建了另一个线程,然后希望等待它完成执行(图31.6)。当这个程序运行时,我们希望看到以下内容:
parent: begin
child
parent: end

  1. sem_t s;
  2. void *child(void *arg) {
  3. printf("child\n");
  4. sem_post(&s); // signal here: child is done
  5. return NULL;
  6. }
  7. int main(int argc, char *argv[]) {
  8. sem_init(&s, 0, X); // what should X be?
  9. printf("parent: begin\n");
  10. pthread_t c;
  11. Pthread_create(&c, NULL, child, NULL);
  12. sem_wait(&s); // wait here for child
  13. printf("parent: end\n");
  14. return 0;
  15. }

Figure 31.6: A Parent Waiting For Its Child
那么,问题是如何使用信号量来达到这个效果;事实证明,答案相对容易理解。正如您在代码中看到的,父进程调用sem_wait()和子进程调用sem_post()来等待子进程完成其执行的条件变为真。然而,这就提出了一个问题:这个信号量的初始值应该是多少?
(再次强调,在这里思考一下,而不是向前阅读)
当然,答案是信号量的值应该设置为0。有两种情况需要考虑。首先,让我们假设父线程创建了子线程,但子线程还没有运行(也就是说,它在就绪队列中,但还没有运行)。在这种情况下(图31.7),父线程会在子进程调用sem_post()之前调用sem_wait();我们希望父线程等待子线程执行。只有当信号量的值不大于0时才会发生这种情况;因此,0是初始值。父线程运行,将信号量减1(为-1),然后等待(休眠)。当子线程最终运行时,它将调用sem_post(),将信号量的值增加到0,并唤醒父程序,然后父线程将从sem_wait()返回并完成程序。
image.png
第二种情况(图31.8)发生在子线程在父线程获得调用sem_wait()的机会之前运行完成时。在这种情况下,子线程将首先调用sem_post(),从而将信号量的值从0增加到1。当父线程有机会运行时,它将调用sem_wait()并发现信号量的值为1;因此,父线程将减少该值(为0)并从sem_wait()返回,而不等待,这也达到了预期的效果。
image.png

31.4 生产者/消费者(有界缓冲区)问题 The Producer/Consumer (Bounded Buffer) Problem

我们将在本章中遇到的下一个问题称为生产者/消费者问题,有时也称为有界缓冲区问题[D72]。这个问题在前一章的条件变量中有详细的描述;详情请看那里。

Aside:设置信号量的值 我们已经看到了两个初始化信号量的例子。在第一种情况下,我们将值设为1,将信号量作为锁使用;在第二种情况下,为0,使用信号量排序。信号量初始化的一般规则是什么? Perry Kivolowitz提出了一种简单的思考方法,即考虑初始化后您愿意立即放弃的资源数量。对于锁,它是1,因为您愿意在初始化后立即锁定(放弃)锁。在排序的情况下,它是0,因为一开始没有东西可以给出;只有当子线程完成时,才创建资源,此时,该值将增加到1。在以后的信号量问题上试试这种思路,看看是否有帮助。

第一次尝试 First Attempt

我们解决这个问题的第一次尝试引入了两个信号量,empty和full,线程将分别使用它们来指示缓冲区条目(entry)何时被清空或被填充。put和get例程的代码如图31.9所示,我们解决生产者和消费者问题的尝试如图31.10所示。

  1. int buffer[MAX];
  2. int fill = 0;
  3. int use = 0;
  4. void put(int value) {
  5. buffer[fill] = value; // Line F1
  6. fill = (fill + 1) % MAX; // Line F2
  7. }
  8. int get() {
  9. int tmp = buffer[use]; // Line G1
  10. use = (use + 1) % MAX; // Line G2
  11. return tmp;
  12. }

Figure 31.9: The Put And Get Routines

  1. sem_t empty;
  2. sem_t full;
  3. void *producer(void *arg) {
  4. int i;
  5. for (i = 0; i < loops; i++) {
  6. sem_wait(&empty); // Line P1
  7. put(i); // Line P2
  8. sem_post(&full); // Line P3
  9. }
  10. }
  11. void *consumer(void *arg) {
  12. int tmp = 0;
  13. while (tmp != -1) {
  14. sem_wait(&full); // Line C1
  15. tmp = get(); // Line C2
  16. sem_post(&empty); // Line C3
  17. printf("%d\n", tmp);
  18. }
  19. }
  20. int main(int argc, char *argv[]) {
  21. // ...
  22. sem_init(&empty, 0, MAX); // MAX are empty
  23. sem_init(&full, 0, 0); // 0 are full
  24. // ...
  25. }

Figure 31.10: Adding The Full And Empty Conditions
在本例中,生产者首先等待缓冲区变为空,以便将数据放入其中,而消费者在使用缓冲区之前也同样等待缓冲区被填充。让我们首先假设MAX=1(数组中只有一个缓冲区),看看这是否有效。
再想象一下有两个线程,一个生产者和一个消费者。让我们来看看单个CPU上的一个特定场景。假设消费者首先运行。因此,消费者将到达图31.10中的C1行,调用sem_wait(&full)。因为full被初始化为0,所以调用full会减少(为-1),阻塞消费者,并等待另一个线程在full上调用sem_post()。
假设生产者接着运行。它将命中P1行,从而调用sem_wait(&empty)例程。与消费者不同的是,生产者将继续执行这一行,因为empty被初始化为MAX(在本例中为1)。因此,empty将被递减为0,生产者将把一个数据值放入buffer的第一个条目中(第P2行)。然后生产者将继续到P3并调用sem_post(&full),将full信号量的值从-1更改为0并唤醒消费者(例如,将它从blocked移动到ready)。
在这种情况下,可能会发生两种情况之一。如果生产者继续运行,它将循环并再次点击P1。然而,这一次它会阻塞,因为empty信号量的值是0。如果生产者被中断,消费者开始运行,它将从sem_wait(&full)(行C1)返回,发现缓冲区已满,并消耗它。在任何一种情况下,我们都能达到预期的行为。
您可以使用更多线程(例如,多个生产者和多个消费者)来尝试相同的示例。它应该还是有效的。
现在让我们假设MAX大于1(假设MAX=10)。对于本例,让我们假设有多个生产者和多个消费者。现在我们有一个问题:竞争条件(race condition)。看到它发生的地方了吗?如果你看不出来,这里有个提示:仔细看看put()和get()代码。
好的,让我们来理解这个问题。假设两个生产者(Pa和Pb)几乎同时调用put()。假设生产者Pa首先运行,并开始填充第一个缓冲区条目(在第F1行填充=0)。在Pa有机会将填充计数器增加到1之前,它被中断了。生产者Pb开始运行,在第F1行,它也将其数据放入缓冲区的第0个元素,这意味着旧的数据被覆盖!这种行为是不允许的;我们不希望生产者的任何数据丢失。

一个解决方案:添加互斥 A Solution: Adding Mutual Exclusion

如你所见,这里我们忘记了互斥。缓冲区的填充和索引在缓冲区中的递增是一个临界区域,因此必须小心保护。让我们使用我们的朋友,二元信号量,并添加一些锁。图31.11显示了我们的尝试。

  1. void *producer(void *arg) {
  2. int i;
  3. for (i = 0; i < loops; i++) {
  4. sem_wait(&mutex); // Line P0 (NEW LINE)
  5. sem_wait(&empty); // Line P1
  6. put(i); // Line P2
  7. sem_post(&full); // Line P3
  8. sem_post(&mutex); // Line P4 (NEW LINE)
  9. }
  10. }
  11. void *consumer(void *arg) {
  12. int i;
  13. for (i = 0; i < loops; i++) {
  14. sem_wait(&mutex); // Line C0 (NEW LINE)
  15. sem_wait(&full); // Line C1
  16. int tmp = get(); // Line C2
  17. sem_post(&empty); // Line C3
  18. sem_post(&mutex); // Line C4 (NEW LINE)
  19. printf("%d\n", tmp);
  20. }
  21. }

Figure 31.11: Adding Mutual Exclusion (Incorrectly)
现在,我们已经在整个代码的put()/get()部分添加了一些锁,如NEW LINE注释所示。那似乎是正确的主意,但也行不通。为什么?死锁(Deadlock)。为什么会发生死锁?花点时间考虑一下;试着找出出现死锁的情况。程序死锁的步骤顺序是什么?

阻止死锁 Avoiding Deadlock

好了,既然你已经算出来了,这就是答案。想象两个线程,一个生产者和一个消费者。消费者首先运行。它获取互斥锁(行C0),然后在full信号量上调用sem_wait()(行C1);因为还没有数据,这个调用会导致消费者阻塞,从而让出CPU;但重要的是,消费者仍然掌握着这把互斥锁(mutex)。
然后运行一个生产者。它有要生成的数据,如果它能够运行,它就能够唤醒消费者线程,一切都很好。不幸的是,它所做的第一件事是在二元互斥量上调用sem_wait()(第P0行)。锁已经被持有。因此,生产者现在也只能等待了。
这里有一个简单的循环。消费者持有互斥锁并等待某人发出full的信号。生产者可以发出full的信号,但正在等待互斥锁。因此,生产者和消费者彼此都在等待对方:一个典型的死锁。

最后,一个可行方案 At Last, A Working Solution

要解决这个问题,我们必须简单地减小锁的范围。图31.12显示了正确的解决方案。正如你所看到的,我们只是将互斥锁的获取和释放移动到临界区附近,full和empty的等待和信号代码被留在外面。其结果是一个简单且有效的有界缓冲区,这是多线程程序中常用的模式。现在理解它;供以后使用。在未来的岁月里,你会感谢我们的。或者至少,当你在期末考试或工作面试中被问到同样的问题时,你会感谢我们。

实际上,出于模块化的目的,将互斥锁acquire/release放在put()和get()函数中可能更自然。

  1. void *producer(void *arg) {
  2. int i;
  3. for (i = 0; i < loops; i++) {
  4. sem_wait(&empty); // Line P1
  5. sem_wait(&mutex); // Line P1.5 (MUTEX HERE)
  6. put(i); // Line P2
  7. sem_post(&mutex); // Line P2.5 (AND HERE)
  8. sem_post(&full); // Line P3
  9. }
  10. }
  11. void *consumer(void *arg) {
  12. int i;
  13. for (i = 0; i < loops; i++) {
  14. sem_wait(&full); // Line C1
  15. sem_wait(&mutex); // Line C1.5 (MUTEX HERE)
  16. int tmp = get(); // Line C2
  17. sem_post(&mutex); // Line C2.5 (AND HERE)
  18. sem_post(&empty); // Line C3
  19. printf("%d\n", tmp);
  20. }
  21. }

Figure 31.12: Adding Mutual Exclusion (Correctly)

31.5 读写锁 Reader-Writer Locks

另一个经典问题源于对更灵活的锁定原语的渴望,该原语承认不同的数据结构访问可能需要不同类型的锁定。例如,假设有许多并发的列表操作,包括插入和简单的查找。插入会改变列表的状态(因此传统的临界区是有意义的),而查找只是读取数据结构;只要能够保证没有插入操作,就可以允许许多查找同时进行。我们现在将开发一种特殊类型的锁来支持这种类型的操作,称为读写锁(reader-writer lock)[CHP71]。图31.13提供了这种锁的代码。
代码非常简单。如果某个线程想要更新有问题的数据结构,它应该调用一对新的同步操作:rwlock_acquire_writelock()来获取写锁,rwlock_release_writelock()来释放锁。在内部,它们只是使用writelock信号量来确保只有一个writer能够获得锁,从而进入临界区来更新相关的数据结构。
更有趣的是获取和释放读锁(read lock)的一对例程。当获取读锁时,reader首先获取lock,然后增加readers变量以跟踪数据结构中当前有多少个reader。在rwlock_acquire_readlock()中执行的重要步骤发生在第一个reader获得锁的时候;在这种情况下,reader也通过调用writellock信号量上的sem_wait()来获得写锁,然后通过调用sem_post()来释放锁。
因此,一旦一个reader获得了读锁,就会允许更多的readers也获得读锁;然而,任何希望获得写锁的线程都必须等待,直到所有的读取操作完成;最后一个退出临界区的reader在”writelock”上调用sem_post(),因此允许等待的 write 获取锁。
这种方法是可行的(正如预期的那样),但也有一些缺点,特别是涉及到公平性时。特别是,reader很容易饿死writer。针对这个问题存在更复杂的解决方案;也许你能想到一个更好的实现?提示:考虑一下,当一个writer正在等待时,您需要做些什么来防止更多的reader进入锁。

  1. typedef struct _rwlock_t {
  2. sem_t lock; // binary semaphore (basic lock)
  3. sem_t writelock; // allow ONE writer/MANY readers
  4. int readers; // #readers in critical section
  5. } rwlock_t;
  6. void rwlock_init(rwlock_t *rw) {
  7. rw->readers = 0;
  8. sem_init(&rw->lock, 0, 1);
  9. sem_init(&rw->writelock, 0, 1);
  10. }
  11. void rwlock_acquire_readlock(rwlock_t *rw) {
  12. sem_wait(&rw->lock);
  13. rw->readers++;
  14. if (rw->readers == 1) // first reader gets writelock
  15. sem_wait(&rw->writelock);
  16. sem_post(&rw->lock);
  17. }
  18. void rwlock_release_readlock(rwlock_t *rw) {
  19. sem_wait(&rw->lock);
  20. rw->readers--;
  21. if (rw->readers == 0) // last reader lets it go
  22. sem_post(&rw->writelock);
  23. sem_post(&rw->lock);
  24. }
  25. void rwlock_acquire_writelock(rwlock_t *rw) {
  26. sem_wait(&rw->writelock);
  27. }
  28. void rwlock_release_writelock(rwlock_t *rw) {
  29. sem_post(&rw->writelock);
  30. }

Figure 31.13: A Simple Reader-Writer Lock
最后,应该注意读写锁的使用要谨慎。它们通常会增加更多的开销(特别是在更复杂的实现中),因此与仅仅使用简单而快速的锁定原语相比,最终不会提高性能[CB08]。无论哪种方式,它们都再次展示了我们如何以一种有趣而有用的方式使用信号量。

Tip:简单和愚蠢可以更好(希尔定律) 你永远不应该低估这样一种观念,即简单而愚蠢的方法可能是最好的方法。对于锁定,有时一个简单的自选锁效果最好,因为它容易实现且快速。虽然像读写锁这样的东西听起来很酷,但它们很复杂,而复杂可能意味着缓慢。因此,总是先尝试简单而愚蠢的方法。 这种追求简单的想法在很多地方都能找到。一个早期的来源是Mark Hill的论文[H87],该论文研究了如何为cpu设计缓存。Hill发现简单的直接映射缓存比奇特的集关联设计工作得更好(一个原因是,在缓存中,更简单的设计可以实现更快的查找)。正如希尔对他的作品的简洁总结:“大而笨更好。”因此我们把这个类似的建议称为希尔定律(Hill’s Law)。

31.6 餐厅哲学家 The Dining Philosophers

Dijkstra提出并解决的最著名的并发问题之一是餐厅哲学家的问题[D71]。这个问题很有名,因为它很有趣,在智力上也很有趣;但是,其实际效用较低。然而,它的名气迫使它在这里上榜;事实上,你可能会在一些面试中被问到这个问题,如果你错过了这个问题而没有得到这份工作,你会非常讨厌你的操作系统教授。相反,如果你得到了这份工作,请随时给你的操作系统教授发一封感谢信,或者一些股票期权。
这个问题的基本设置是这样的(如图31.14所示):假设有五位“哲学家”围坐在一张桌子旁。每一对哲学家之间都有一个叉子(因此总共有五个)。哲学家们每个人都有思考的时间,不需要任何叉子,也有吃饭的时间。为了吃东西,哲学家需要两把叉子,一把在他们的左边,一把在他们的右边。对这些叉子的争用以及随之而来的同步问题,使得这成为我们在并发编程中研究的一个问题。
image.png
下面是每个哲学家的基本循环,假设从0到4(包括)每个哲学家都有一个唯一的线程标识符p:

  1. while (1) {
  2. think();
  3. get_forks(p);
  4. eat();
  5. put_forks(p);
  6. }
  1. 因此,关键的挑战是编写例程get_forks()和put_forks(),这样就不会出现死锁,不会有哲学家挨饿而永远得不到食物,并且并发性很高(也就是说,尽可能多的哲学家在同一时间吃东西)。<br />按照Downey的解决方案[D08],我们将使用一些辅助函数来帮助我们找到解决方案。它们是:
  1. int left(int p) { return p; }
  2. int right(int p) { return (p + 1) % 5; }
  1. 当哲学家 p 希望引用他们左边的叉子时,他们只需调用 left(p)。类似地,哲学家 p 右边的叉子是通过调用 right(p) 来引用的;其中的模运算符处理最后一个哲学家 (p=4) 试图抓住他们右边的叉子的一种情况,即叉子 0。<br />我们还需要一些信号量来解决这个问题。让我们假设我们有五个,每个分叉一个: `sem_t forks[5]`

失败的解决方案 Broken Solution

我们尝试解决问题的第一个解决方案。假设我们将每个信号量(在 forks 数组中)初始化为值 1。还假设每个哲学家都知道自己的数字 (p)。因此,我们可以编写 get_forks() 和 put_forks() 例程(图 31.15)。

  1. void get_forks(int p) {
  2. sem_wait(&forks[left(p)]);
  3. sem_wait(&forks[right(p)]);
  4. }
  5. void put_forks(int p) {
  6. sem_post(&forks[left(p)]);
  7. sem_post(&forks[right(p)]);
  8. }

Figure 31.15: The get forks() And put forks() Routines
这个(失败的)解决方案背后的直觉如下。为了得到这些叉子,我们只需在每把叉子上加一把“锁”:先拿左边的那个,然后再拿右边的那个。当我们吃完,我们就释放它们。简单的,不是吗?不幸的是,在这种情况下,简单意味着失败。你能看出出现的问题吗?想想。
问题是死锁(deadlock)。如果每个哲学家碰巧在任何一个哲学家能抓住他们右边的叉子之前抓住了他们左边的叉子,那么他们就会永远地拿着一个叉子等待另一个叉子。具体来说,哲学家0拿着叉子0,哲学家1拿着叉子1,哲学家2拿着叉子2,哲学家3拿着叉子3,哲学家4拿着叉子4;所有的叉子都是被获得了,而所有的哲学家都在等待另一个哲学家拥有的叉子。我们很快会更详细地研究死锁;就目前而言,可以肯定地说,这不是一个可行的解决方案。

解决方案:打破依赖关系 A Solution: Breaking The Dependency

解决这个问题最简单的方法是改变叉子的获取方式,至少要有一位哲学家这样做;的确,这就是Dijkstra自己解决问题的方法。具体来说,让我们假设哲学家4(编号最大的哲学家)以不同于其他哲学家的顺序获得叉子(图31.16);put_forks()代码保持不变。

  1. void get_forks(int p) {
  2. if (p == 4) {
  3. sem_wait(&forks[right(p)]);
  4. sem_wait(&forks[left(p)]);
  5. } else {
  6. sem_wait(&forks[left(p)]);
  7. sem_wait(&forks[right(p)]);
  8. }
  9. }

Figure 31.16: Breaking The Dependency In get forks()
因为最后一个哲学家试图先抓住右,再抓住左,所以不存在每个哲学家抓住一个叉子却困在等待另一个叉子的情况;等待的循环被打破了。仔细考虑这个解决方案的结果,并说服自己它是有效的。
还有其他类似的“著名”问题,例如,吸烟者的问题(cigarette smoker’s problem)睡着的理发师的问题(sleeping barber problem)。其中大多数只是考虑并发性的借口;有些名字很吸引人。如果你有兴趣学习更多知识,或者只是想要更多的以并行方式思考的练习,可以查阅它们[D08]。

31.7 线程限流 Thread Throttling

信号量的另一个简单用例偶尔会出现,因此我们在这里给出它。具体的问题是:程序员如何防止“太多”线程同时做某事,从而使系统陷入停滞?回答:确定“太多”的阈值,然后使用信号量来限制并发执行相关代码段的线程数。我们称这种方法为限流(throttling)[T99],并将其视为一种准入控制(admission control)形式。
让我们考虑一个更具体的例子。假设您创建了数百个线程来并行处理某个问题。但是,在代码的某个部分,每个线程需要大量的内存来执行部分计算;我们把这部分代码称为内存密集区(memory-intensive region)。如果所有线程同时进入内存密集区,那么所有内存分配请求的总和将超过机器上的物理内存数量。结果,机器将开始超负荷(即向磁盘交换页面),整个计算速度会变慢。
一个简单的信号量就可以解决这个问题。通过将信号量的值初始化为您希望一次进入内存密集区的最大线程数,然后在该区域周围放置 sem_wait() 和 sem_post(),信号量可以自然地限制同时并发进入内存密集危险区域代码的线程数。

31.8 如何实现信号量 How To Implement Semaphores

最后,让我们使用低级同步原语、锁和条件变量来构建我们自己的信号量版本,称为……(这里鼓声响起来)……Zemaphores!这个任务相当简单,如图31.17所示。

  1. typedef struct __Zem_t {
  2. int value;
  3. pthread_cond_t cond;
  4. pthread_mutex_t lock;
  5. } Zem_t;
  6. // only one thread can call this
  7. void Zem_init(Zem_t *s, int value) {
  8. s->value = value;
  9. Cond_init(&s->cond);
  10. Mutex_init(&s->lock);
  11. }
  12. void Zem_wait(Zem_t *s) {
  13. Mutex_lock(&s->lock);
  14. while (s->value <= 0)
  15. Cond_wait(&s->cond, &s->lock);
  16. s->value--;
  17. Mutex_unlock(&s->lock);
  18. }
  19. void Zem_post(Zem_t *s) {
  20. Mutex_lock(&s->lock);
  21. s->value++;
  22. Cond_signal(&s->cond);
  23. Mutex_unlock(&s->lock);
  24. }

Figure 31.17: Implementing Zemaphores With Locks And CVs
在上面的代码中,我们只使用一个锁和一个条件变量,加上一个状态变量来跟踪信号量的值。自己研究代码,直到真正理解它。开始!
Zemaphore和Dijkstra定义的纯信号量之间的一个细微区别是,我们不维护信号量的值为负时反映等待线程数的不变条件;事实上,这个值永远不会低于零。这种行为更容易实现,并且与当前的Linux实现相匹配。
奇怪的是,从信号量构建条件变量是一个非常棘手的问题。一些经验丰富的并行程序员试图在Windows环境中这样做,许多不同的错误接踵而至[B04]。您可以自己尝试一下,看看能否找出为什么用信号量构建条件变量比表面上看起来更具有挑战性的问题。

31.9 总结 Summary

信号量是编写并发程序的强大而灵活的原语。由于它们的简单性和实用性,一些程序员专门使用它们,避免使用锁和条件变量。
在本章中,我们只介绍了几个经典问题和解决方案。如果你有兴趣了解更多,你可以参考许多其他资料。Allen Downey关于并发和用信号量编程的书[D08]是一个很好的(而且免费的参考)。这本书有许多难题,您可以解决,以提高您对特定信号量和一般并发性的理解。成为一个真正的并发专家需要多年的努力;超越课堂所学无疑是掌握这类问题的关键。

Tip:小心泛化(BE CAREFUL WITH GENERALIZATION) 因此,抽象的泛化(generalization)技术在系统设计中非常有用,在系统设计中,一个好的想法可以稍微扩展一些,从而解决更大类别的问题。然而,在推广时要小心;正如兰普森警告我们的那样:“不要一概而论;泛化通常是错误的”[L83]。 我们可以把信号量看作是锁和条件变量的泛化;然而,这样的泛化是必要的吗?而且,考虑到在信号量上实现条件变量的难度,也许这种泛化并不像你想象的那么通用。

References

[B04] “Implementing Condition Variables with Semaphores” by Andrew Birrell. December 2004.
这是一份有趣的读物,讲述了在信号量上实现CVs有多么困难,以及作者和同事在此过程中所犯的错误。因为这个团队已经做了大量的并发编程;例如,Birrell以编写各种线程编程指南而闻名。
[CB08] “Real-world Concurrency” by Bryan Cantrill, Jeff Bonwick. ACM Queue. Volume 6, No. 5. September 2008.
这是一篇很好的文章,作者是来自Sun公司的一些内核黑客,讨论了并发代码面临的实际问题。
[CHP71] “Concurrent Control with Readers and Writers” by P.J. Courtois, F. Heymans, D.L. Parnas. Communications of the ACM, 14:10, October 1971.
介绍了Reader与Writer的问题,并给出了简单的解决方法。后来的工作引入了更复杂的解决方案,这里略过,因为它们相当复杂。
[D59] “A Note on Two Problems in Connexion with Graphs” by E. W. Dijkstra. Numerische Mathematik 1, 269271, 1959.
Available: http://www-m3.ma.tum.de/twiki/pub/MN0506/WebHome/dijkstra.pdf
你能相信人们在1959年就开始研究算法了吗?我们不能。甚至在电脑使用起来很有趣之前,这些人就有一种感觉,他们会改变世界……
[D68a] “Go-to Statement Considered Harmful” by E.W. Dijkstra. CACM, volume 11(3), March 1968. http://www.cs.utexas.edu/users/EWD/ewd02xx/EWD215.PDF
有时被认为是软件工程领域的开端。
[D68b] “The Structure of the THE Multiprogramming System” by E.W. Dijkstra. CACM, volume 11(5), 1968.
最早指出计算机科学中的系统工作是一项引人入胜的智力努力的论文之一。还强烈主张分层系统形式的模块化。
[D72] “Information Streams Sharing a Finite Buffer” by E.W. Dijkstra. Information Processing Letters 1, 1972.
http://www.cs.utexas.edu/users/EWD/ewd03xx/EWD329.PDF
Dijkstra发明了一切吗?没有,但也差不多了。他当然是第一个清楚地写下并发代码中存在的问题的人。然而,操作系统设计的实践者知道Dijkstra所描述的许多问题,因此,也许对他过于信任是一种误解。
[D08] “The Little Book of Semaphores” by A.B. Downey.
http://greenteapress.com/semaphores/
一本关于信号量的好书(而且是免费的!)有很多有趣的问题要解决,如果你喜欢这类事情的话。
[D71] “Hierarchical ordering of sequential processes” by E.W. Dijkstra.
http://www.cs.utexas.edu/users/EWD/ewd03xx/EWD310.PDF
提出了许多共同的问题,包括吃饭的哲学家。关于这个问题的维基百科页面也很有用。
[GR92] “Transaction Processing: Concepts and Techniques” by Jim Gray, Andreas Reuter. Morgan Kaufmann, September 1992.
在第 485 页第 8.8 节的顶部,我们发现特别幽默的确切引用:“大约在 1960 年,第一个多处理器具有测试和设置指令……大概是操作系统实现者制定了适当的算法,尽管 Dijkstra 是通常被认为在多年后发明了信号量。”哦,啪!
[H87] “Aspects of Cache Memory and Instruction Buffer Performance” by Mark D. Hill. Ph.D. Dissertation, U.C. Berkeley, 1987.
Hill的论文为那些痴迷于早期系统缓存的人所做。定量论文的一个很好的例子。
[L83] “Hints for Computer Systems Design” by Butler Lampson. ACM Operating Systems Review, 15:5, October 1983.
著名的系统研究员兰普森喜欢在计算机系统设计中使用提示(hints)。提示通常是正确的,但也可能是错误的;在这种使用中,signal() 告诉等待线程它改变了等待线程等待的条件,但不相信当等待线程唤醒时条件将处于所需状态。在这篇关于设计系统提示的论文中,兰普森的一般提示之一是你应该使用提示。它并不像听起来那么令人困惑。
[T99] “Re: NT kernel guy playing with Linux” by Linus Torvalds. June 27, 1999.
https://yarchive.net/comp/linux/semaphores.html
李纳斯本人对信号量的效用的回应,包括我们在文中提到的节流情况。和往常一样,莱纳斯有点无礼,但信息量很大。

Homework (Code)

在本作业中,我们将使用信号量来解决一些众所周知的并发问题。其中很多都取自唐尼出色的《Little Book of Semaphores》,它很好地汇集了许多经典问题并引入了一些新的变体;感兴趣的读者应该查看小书以获得更多乐趣。
下面的每个问题都提供了一个代码框架;您的工作是填充代码,使其在给定的信号量下工作。在Linux上,您将使用本机信号量;在Mac(没有信号量支持)上,你必须首先构建一个实现(使用锁和条件变量,如本章所述)。好运!

Questions

  1. 第一个问题是实现和测试fork/join问题(fork/join problem)的解决方案,如本文所述。尽管文本中描述了这个解决方案,但你自己输入它的行为是值得的;甚至巴赫也会改写《维瓦尔第》,让即将成为大师的人可以向现有大师学习。详情请参阅fork-join.c。给子线程添加sleep(1)调用以确保它有效。
  2. 现在让我们通过研究会合问题(rendezvous problem)来推广一下。问题如下:您有两个线程,每个线程都即将进入代码中的会合点。任何一方都不应该在另一方进入之前退出这部分代码。考虑为这个任务使用两个信号量,详细信息请参阅rendezvous.c。
  3. 现在通过实现barrier synchronization(屏障同步)的通用解决方案更进一步。假设在一段连续的代码中有两个点,称为 P1 和 P2。在 P1 和 P2 之间设置屏障(barrier)可确保所有线程在任何一个线程执行 P2 之前执行 P1。您的任务:编写代码以实现可以以这种方式使用的 barrier() 函数。可以安全地假设您知道 N(正在运行的程序中的线程总数)并且所有 N 个线程都将尝试进入屏障。同样,您可能应该使用两个信号量来实现解决方案,并使用其他一些整数来辅助计算。有关详细信息,请参阅 barrier.c。

参考The Little Book of Semaphores的3.6 Barrier一节得到了答案(这本书很有趣,写得很好很清晰),解答如下:

  1. typedef struct __barrier_t {
  2. // add semaphores and other information here
  3. sem_t mut;
  4. sem_t barr;
  5. long long count;
  6. } barrier_t;
  7. // the single barrier we are using for this program
  8. barrier_t b;
  9. void barrier_init(barrier_t *b, int num_threads) {
  10. // initialization code goes here
  11. sem_init(&(b->mut), 0, 1);
  12. sem_init(&(b->barr), 0, 0);
  13. b->count = num_threads;
  14. }
  15. void barrier(barrier_t *b) {
  16. // barrier code goes here
  17. sem_wait(&(b->mut));
  18. --(b->count);
  19. sem_post(&(b->mut));
  20. if (b->count == 0) sem_post(&(b->barr));
  21. sem_wait(&(b->barr));
  22. sem_post(&(b->barr));
  23. }
  1. 现在让我们来解决reader-writer problem(读取器-写入器问题),就像文章中描述的那样。在第一次时,不要担心饥饿。详见reader-writer.c中的代码。将 sleep() 调用添加到您的代码中以证明它按您的预期工作。你能证明饥饿(starvation)问题的存在吗?

参考The Little Book of Semaphores的4.2.1和4.2.2节得到了答案,解答如下:

  1. typedef struct __rwlock_t {
  2. int readers;
  3. sem_t mutex;
  4. sem_t roomEmpty;
  5. } rwlock_t;
  6. void rwlock_init(rwlock_t *rw) {
  7. rw->readers = 0;
  8. sem_init(&rw->mutex, 0, 1);
  9. sem_init(&rw->roomEmpty, 0, 1);
  10. }
  11. void rwlock_acquire_readlock(rwlock_t *rw) {
  12. sem_wait(&rw->mutex);
  13. ++rw->readers;
  14. if (rw->readers == 1) {
  15. sem_wait(&rw->roomEmpty);
  16. }
  17. sem_post(&rw->mutex);
  18. }
  19. void rwlock_release_readlock(rwlock_t *rw) {
  20. sem_wait(&rw->mutex);
  21. --rw->readers;
  22. if (rw->readers == 0) {
  23. sem_post(&rw->roomEmpty);
  24. }
  25. sem_post(&rw->mutex);
  26. }
  27. void rwlock_acquire_writelock(rwlock_t *rw) {
  28. sem_wait(&rw->roomEmpty);
  29. }
  30. void rwlock_release_writelock(rwlock_t *rw) {
  31. sem_post(&rw->roomEmpty);
  32. }
  1. 让我们再来看看reader-writer问题,但这次是担心饥饿(starvation)。你如何确保所有的 readers 和 writers 最终都能取得进展?详见reader-writer-nostarve.c。

参考The Little Book of Semaphores的4.2.3、4.2.4和4.2.5节得到了答案,解答如下:

  1. typedef struct __rwlock_t {
  2. int readers;
  3. sem_t mutex;
  4. sem_t rootEmpty;
  5. sem_t turnstile;
  6. } rwlock_t;
  7. void rwlock_init(rwlock_t *rw) {
  8. rw->readers = 0;
  9. sem_init(&rw->mutex, 0, 1);
  10. sem_init(&rw->rootEmpty, 0, 1);
  11. sem_init(&rw->turnstile, 0, 1);
  12. }
  13. void rwlock_acquire_readlock(rwlock_t *rw) {
  14. sem_wait(&rw->turnstile);
  15. sem_post(&rw->turnstile);
  16. sem_wait(&rw->mutex);
  17. ++rw->readers;
  18. if (rw->readers == 1) {
  19. sem_wait(&rw->rootEmpty);
  20. }
  21. sem_post(&rw->mutex);
  22. }
  23. void rwlock_release_readlock(rwlock_t *rw) {
  24. sem_wait(&rw->mutex);
  25. --rw->readers;
  26. if (rw->readers == 0) {
  27. sem_post(&rw->rootEmpty);
  28. }
  29. sem_post(&rw->mutex);
  30. }
  31. void rwlock_acquire_writelock(rwlock_t *rw) {
  32. sem_wait(&rw->turnstile);
  33. sem_wait(&rw->rootEmpty);
  34. }
  35. void rwlock_release_writelock(rwlock_t *rw) {
  36. sem_post(&rw->turnstile);
  37. sem_post(&rw->rootEmpty);
  38. }
  1. 使用信号量来构建一个非饿死互斥锁(no-starve mutex),在这个互斥锁中,任何试图获取互斥锁的线程最终都会获得它。有关更多信息,请参阅mutex-nostarave.c中的代码。

参考The Little Book of Semaphores的4.3节得到了答案,解答如下:

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <unistd.h>
  4. #include <pthread.h>
  5. #include "common_threads.h"
  6. //
  7. // Here, you have to write (almost) ALL the code. Oh no!
  8. // How can you show that a thread does not starve
  9. // when attempting to acquire this mutex you build?
  10. //
  11. typedef struct __ns_mutex_t {
  12. int room1;
  13. int room2;
  14. sem_t mutex;
  15. sem_t t1;
  16. sem_t t2; //turnstiles1, turnstiles2
  17. } ns_mutex_t;
  18. void ns_mutex_init(ns_mutex_t *m) {
  19. m->room1 = 0;
  20. m->room2 = 0;
  21. sem_init(&m->mutex, 0, 1);
  22. sem_init(&m->t1, 0, 1);
  23. sem_init(&m->t2, 0, 0);
  24. }
  25. void ns_mutex_acquire(ns_mutex_t *m) {
  26. sem_wait(&m->mutex);
  27. m->room1 += 1;
  28. sem_post(&m->mutex);
  29. sem_wait(&m->t1);
  30. m->room2 += 1;
  31. sem_wait(&m->mutex);
  32. m->room1 -= 1;
  33. if (m->room1 == 0) {
  34. sem_post(&m->mutex);
  35. sem_post(&m->t2);
  36. } else {
  37. sem_post(&m->mutex);
  38. sem_post(&m->t1);
  39. }
  40. sem_wait(&m->t2);
  41. m->room2 -= 1;
  42. }
  43. void ns_mutex_release(ns_mutex_t *m) {
  44. if (m->room2 == 0) {
  45. sem_post(&m->t1);
  46. } else {
  47. sem_post(&m->t2);
  48. }
  49. }
  50. typedef struct __tinfo_t {
  51. int thread_id;
  52. } tinfo_t;
  53. ns_mutex_t m;
  54. void *worker(void *arg) {
  55. tinfo_t *t = (tinfo_t *) arg;
  56. ns_mutex_acquire(&m);
  57. printf("child %d: before\n", t->thread_id);
  58. printf("child %d: after\n", t->thread_id);
  59. ns_mutex_release(&m);
  60. return NULL;
  61. }
  62. int main(int argc, char *argv[]) {
  63. assert(argc == 2);
  64. int num_threads = atoi(argv[1]);
  65. assert(num_threads > 0);
  66. pthread_t p[num_threads];
  67. tinfo_t t[num_threads];
  68. ns_mutex_init(&m);
  69. printf("parent: begin\n");
  70. int i;
  71. for (i = 0; i < num_threads; i++) {
  72. t[i].thread_id = i;
  73. Pthread_create(&p[i], NULL, worker, &t[i]);
  74. }
  75. for (i = 0; i < num_threads; i++)
  76. Pthread_join(p[i], NULL);
  77. printf("parent: end\n");
  78. return 0;
  79. }
  1. 喜欢这些问题呢?看唐尼的免费书就像他们。别忘了,玩得开心!但是,当你写代码的时候,你总是这样做,不是吗?