问题定义

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

解决方案

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
用信号量解决该问题的重点如下图所示:
image.png
注意到每一个进程最先开始的P操作不能调换前后位置,必须是先P一下是否有资源再去上互斥锁,如果先上了互斥锁,再去P资源,可能会导致资源不足无法P但其他的进程又不能提供该资源(因为上了互斥锁),便会造成循环等待(死锁)

具体实现

下面以生产者进程不断地向数组添加数据(写入100次),消费者从数组读取数据并求和为例,给出基于信号量解决生产者-消费者问题的程序代码。该程序假设有一个生产者进程和两个消费者进程,创建了fullid、enptyid和mutxid三个信号量,共进程间同步访问临界区。同时,还建立4个共享主存区,其中array用户维护生产者,消费者进程之间的共享数据,sum保存当前求和结果,而set和get分别记录当前生产者和消费者进程的读写次数。

  1. #include <fcntl.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <sys/ipc.h>
  6. #include <sys/sem.h>
  7. #include <sys/shm.h>
  8. #include <sys/stat.h>
  9. #include <sys/types.h>
  10. #include <sys/wait.h>
  11. #include <time.h>
  12. #include <unistd.h>
  13. #define MAXSEM 5
  14. const int ARRAY_SIZE = 100;
  15. const int producerSpeed = 10000;
  16. const int customerASpeed = 40000;
  17. const int customerBSpeed = 50000;
  18. // 对信号量数组semnum编号的信号量做P操作
  19. int P(int semid, int semnum) {
  20. struct sembuf sops = {semnum, -1, 0};
  21. return (semop(semid, &sops, 1));
  22. }
  23. // 对信号量数组semnum编号的信号量做V操作
  24. int V(int semid, int semnum) {
  25. struct sembuf sops = {semnum, +1, 0};
  26. return (semop(semid, &sops, 1));
  27. }
  28. // 赋值结构体
  29. union semun {
  30. int val;
  31. };
  32. // 有则创建,无则获取
  33. int flag = IPC_CREAT | IPC_EXCL | 00666;
  34. int main() {
  35. // 清除系统内的所有的信号量
  36. system("ipcrm -a");
  37. key_t key;
  38. // 声明三个信号量
  39. int fullid, emptyid, mutxid;
  40. // 创建信号量fullid,emptyid,mutxid
  41. key = ftok("..", 1);
  42. fullid = semget(key, 1, IPC_CREAT | IPC_EXCL | 00666);
  43. key = ftok("..", 2);
  44. emptyid = semget(key, 1, IPC_CREAT | IPC_EXCL | 00666);
  45. key = ftok("..", 3);
  46. mutxid = semget(key, 1, IPC_CREAT | IPC_EXCL | 00666);
  47. // 为信号量赋值
  48. union semun arg;
  49. arg.val = 0;
  50. if (semctl(fullid, 0, SETVAL, arg) == -1)
  51. perror("semctl setval error");
  52. arg.val = MAXSEM;
  53. if (semctl(emptyid, 0, SETVAL, arg) == -1)
  54. perror("semctl setval error");
  55. arg.val = 1;
  56. if (semctl(mutxid, 0, SETVAL, arg) == -1)
  57. perror("semctl setval error");
  58. // 创建共享内存
  59. int *array, *sum, *set, *get;
  60. key = ftok("..", 4);
  61. int arrayid =
  62. shmget(key, sizeof(int) * MAXSEM, 0666 | IPC_CREAT | IPC_EXCL);
  63. if (arrayid == -1)
  64. perror("shmget array: ");
  65. key = ftok("..", 5);
  66. int sumid = shmget(key, sizeof(int), 0666 | IPC_CREAT | IPC_EXCL);
  67. if (sumid == -1)
  68. perror("shmget sum: ");
  69. key = ftok("..", 6);
  70. int getid = shmget(key, sizeof(int), 0666 | IPC_CREAT | IPC_EXCL);
  71. if (getid == -1)
  72. perror("shmget get: ");
  73. key = ftok("..", 7);
  74. int setid = shmget(key, sizeof(int), 0666 | IPC_CREAT | IPC_EXCL);
  75. if (setid == -1)
  76. perror("shmget set: ");
  77. if (fork() != 0) {
  78. // parent add
  79. // 挂载共享内存
  80. array = (int *)shmat(arrayid, NULL, 0);
  81. set = (int *)shmat(setid, NULL, 0);
  82. get = (int *)shmat(getid, NULL, 0);
  83. sum = (int *)shmat(sumid, NULL, 0);
  84. for (int i = 0; i < ARRAY_SIZE; i++) {
  85. P(emptyid, 0);
  86. P(mutxid, 0);
  87. printf("add num:%d\n", i + 1);
  88. array[*(set) % MAXSEM] = i + 1;
  89. (*set)++;
  90. usleep(producerSpeed);
  91. V(mutxid, 0);
  92. V(fullid, 0);
  93. }
  94. } else {
  95. if (fork() == 0) {
  96. // children1 customerA
  97. // 挂载共享内存
  98. array = (int *)shmat(arrayid, NULL, 0);
  99. set = (int *)shmat(setid, NULL, 0);
  100. get = (int *)shmat(getid, NULL, 0);
  101. sum = (int *)shmat(sumid, NULL, 0);
  102. while (*get < ARRAY_SIZE) {
  103. P(fullid, 0);
  104. P(mutxid, 0);
  105. *sum += array[*(get) % MAXSEM];
  106. // printf("The ComsumerA index %d\n", *get);
  107. // printf("A: The `mutxid` is %d\n", semctl(mutxid, 0, GETVAL,
  108. // 0));
  109. printf("The ComsumerA Get Number %d\n", array[*(get) % MAXSEM]);
  110. if (*get == ARRAY_SIZE - 1)
  111. printf("In ComsumerA, The sum is %d \n", *sum);
  112. (*get)++;
  113. usleep(customerASpeed);
  114. V(mutxid, 0);
  115. V(emptyid, 0);
  116. }
  117. } else {
  118. // children2 customerB
  119. // 挂载共享内存
  120. array = (int *)shmat(arrayid, NULL, 0);
  121. set = (int *)shmat(setid, NULL, 0);
  122. get = (int *)shmat(getid, NULL, 0);
  123. sum = (int *)shmat(sumid, NULL, 0);
  124. while (*get < ARRAY_SIZE) {
  125. P(fullid, 0);
  126. P(mutxid, 0);
  127. *sum += array[*(get) % MAXSEM];
  128. // printf("The ComsumerB index %d\n", *get);
  129. // printf("B: The `mutxid` is %d\n", semctl(mutxid, 0, GETVAL,
  130. // 0));
  131. printf("The ComsumerB Get Number %d\n", array[*(get) % MAXSEM]);
  132. if (*get == ARRAY_SIZE - 1)
  133. printf("In ComsumerB, The sum is %d \n", *sum);
  134. (*get)++;
  135. usleep(customerBSpeed);
  136. V(mutxid, 0);
  137. V(emptyid, 0);
  138. }
  139. }
  140. }
  141. shmdt(array);
  142. shmdt(sum);
  143. shmdt(get);
  144. shmdt(set);
  145. // semctl(fullid, 0, IPC_RMID);
  146. // semctl(mutxid, 0, IPC_RMID);
  147. // semctl(emptyid, 0, IPC_RMID);
  148. return 0;
  149. }

实现时踩的坑

一、System V信号量和POSIX信号量

信号量有两种实现:传统的System V信号量和新的POSIX信号量。它们所提供的函数很容易被区分:对于所有System V信号量函数,在它们的名字里面没有下划线。例如,应该是semget()而不是sem_get()。然而,所有的的POSIX信号量函数都有一个下划线。下面列出了它们提供的所有函数清单:

Systm V POSIX
semctl() sem_getvalue()
semget() sem_post()
semop() sem_timedwait()
sem_trywait()
sem_wait()
sem_destroy()
sem_init()
sem_close()
sem_open()
sem_unlink()

使用上的区别

1、XSI system V的信号量是信号量集,可以包括多个信号灯(有个数组),每个操作可以同时操作多个信号灯
posix是单个信号灯,POSIX有名信号灯支持进程间通信,无名信号灯放在共享内存中时可以用于进程间通信。
2、POSIX信号量在有些平台并没有被实现,比如:SUSE8,而SYSTEM V大多数LINUX/UNIX都已经实现。两者都可以用于进程和线程间通信。但一般来说,system v信号量用于 进程间同步、有名信号灯既可用于线程间的同步,又可以用于进程间的同步、posix无名用于同一个进程的不同线程间,如果无名信号量要用于进程间同步,信号量要放在共享内存中。
3、POSIX有两种类型的信号量,有名信号量和无名信号量。有名信号量像system v信号量一样由一个名字标识。
4、POSIX通过sem_open单一的调用就完成了信号量的创建、初始化和权限的设置,而system v要两步。也就是说posix 信号是多线程,多进程安全的,而system v不是,可能会出现问题。
5、system V信号量通过一个int类型的值来标识自己(类似于调用open()返回的fd),而sem_open函数返回sem_t类型(长整形)作为posix信号量的标识值。
6、对于System V信号量你可以控制每次自增或是自减的信号量计数,而在Posix里面,信号量计数每次只能自增或是自减1。
7、Posix无名信号量提供一种非常驻的信号量机制。
8、相关进程: 如果进程是从一已经存在的进程创建,并最终操作这个创建进程的资源,那么这些进程被称为相关的。

总结

1、System V的信号量一般用于进程同步, 且是内核持续的, api为:semget、semctl、semop
2、Posix的有名信号量一般用于进程同步, 有名信号量是内核持续的. 有名信号量的api为:sem_open、sem_close、sem_unlink
3、Posix的无名信号量一般用于线程同步, 无名信号量是进程持续的, 无名信号量的api为:sem_init、sem_destroy

二、<Linux/sem.h><sys/sem.h><semaphore.h>实现的信号量的不同

  • 头文件<linux/sem.h><sys/sem.h>不同,前者是内核的头文件,后者是glibc的头文件,否则将会出现内存申请了无法读取的问题
  • <sys/sem.h>为XSI(最初是Unix System V)信号灯提供接口。这些不是POSIX基本标准的一部分(它们在XSI选项中,这主要是为了与传统的Unix兼容),虽然它们还没有被认为是过时/废弃的,但许多程序员认为它们应当废弃。
  • <semaphore.h> 定义了 POSIX 信号量,它们的设计方式使得它们可以完全在用户空间中实现,除非进程将调用内核进入睡眠状态。它们的性能应该接近最佳,但它们不像 XSI 信号量那么有特色。 POSIX 信号量还为您提供了是否需要进程本地信号量(用于多线程环境,甚至在某些情况下,单线程程序中的信号处理程序)或进程共享信号量的选择,在后一种情况下,您还可以选择是让系统按名称在共享命名空间中分配它,还是自己获取共享内存并在共享内存中对其进行初始化。

    总结

    为了进程同步,应该使用<sys/sem.h>或者<semaphore.h>的信号量,书本《Linux操作系统实验教程》中P132页的实例代码给出的头文件为错误头文件,易误导读者。