生产者和消费者模型 和 条件变量

    生产者
    容器装满后不再生产阻塞 并通知消费者去消费
    容器
    消费者
    容器无内容后不再消费阻塞 并通知生产者去生产

    1. /*
    2. 生产者消费者模型 也叫有限缓冲模型
    3. 生产者向容器中添加节点 消费者从容器中删除节点
    4. 条件变量 不是锁 但是可以在某个条件满足后阻塞线程或解阻塞线程
    5. pthread_cond_t
    6. int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict cond_attr);
    7. 初始化条件变量
    8. int pthread_cond_destroy(pthread_cond_t *cond);
    9. 销毁条件变量
    10. int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    11. 阻塞函数 等待被唤醒(在信号或广播唤醒前一直等待)
    12. pthread_cond_wait的线程在等待期间会将锁给让出去 当前线程在等待时是不拿锁的
    13. 在被唤醒等待结束不阻塞时 锁又会还给这个在等待的线程
    14. int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
    15. 阻塞函数 等待被唤醒(在信号或广播唤醒或阻塞的实际还没有超过设定的abstime前 一直等待)
    16. 等待期间会将锁给让出去 当前线程在等待时是不拿锁的
    17. 在被唤醒等待结束不阻塞时 锁又会还给这个在等待的线
    18. int pthread_cond_signal(pthread_cond_t *cond);
    19. 唤醒1个或多个在等待的线程
    20. int pthread_cond_broadcast(pthread_cond_t *cond);
    21. 唤醒所有在等待的线程
    22. */
    23. #include <stdio.h>
    24. #include <pthread.h>
    25. #include <string.h>
    26. #include <unistd.h>
    27. #include <stdlib.h> //malloc
    28. pthread_rwlock_t rwlock; //全局读写锁量
    29. void *produce(void *arg);
    30. void *custom(void *arg);
    31. //使用链表作为容器 假设这个容器不会满 但是会空
    32. typedef struct Node
    33. {
    34. int data;
    35. struct Node *next;
    36. } Node;
    37. Node *head = NULL; //头节点
    38. //创建条件变量
    39. pthread_cond_t cond;
    40. int main()
    41. {
    42. //初始化互斥量
    43. pthread_rwlock_init(&rwlock, NULL);
    44. //初始化条件变量
    45. pthread_cond_init(&cond, NULL);
    46. // 创建3个子线程 主线程只用于管理系统资源
    47. pthread_t ptids[5],
    48. ctids[5]; //线程数组 5个生产者 5个消费者
    49. for (int i = 0; i < 5; i++)
    50. {
    51. int ret = pthread_create(&ptids[i], NULL, produce, NULL);
    52. if (ret != 0)
    53. {
    54. printf("%s\n", strerror(ret));
    55. }
    56. //不join了 直接将10个线程都分离 由系统回收
    57. pthread_detach(ptids[i]);
    58. }
    59. for (int i = 0; i < 5; i++)
    60. {
    61. int ret = pthread_create(&ctids[i], NULL, custom, NULL);
    62. if (ret != 0)
    63. {
    64. printf("%s\n", strerror(ret));
    65. }
    66. //不join了 直接将10个线程都分离 由系统回收
    67. pthread_detach(ctids[i]);
    68. }
    69. while (1)
    70. {
    71. sleep(10);
    72. break;
    73. }
    74. pthread_cond_destroy(&cond);
    75. //子线程执行10秒后就将锁销毁
    76. pthread_rwlock_destroy(&rwlock);
    77. //pthread_exit(NULL); //让主线程退出 当主线程退出时 不会影响其他正常运行的线程
    78. return 0; //不会执行
    79. }
    80. // 这整个链表就像栈 生产和消费的过程就是出栈和入栈 生产出来加在队首 消费也优先消费队首的节点
    81. // 生产和消费都是写操作
    82. void *produce(void *arg) //生产线程 向容器添加内容 不断创建节点添加到链表中--新节点直接作为新的头节点
    83. {
    84. while (1)
    85. {
    86. pthread_rwlock_wrlock(&rwlock);
    87. Node *new_node = (Node *)malloc(sizeof(Node));
    88. new_node->data = rand() % 1000;
    89. new_node->next = head;
    90. head = new_node;
    91. printf("%ld produce node %d\n
    92. ", pthread_self(), new_node->data);
    93. //只要生产了一个就通知消费者消费 唤醒在等待的消费者
    94. pthread_cond_signal(&cond);
    95. pthread_rwlock_unlock(&rwlock);
    96. usleep(1000); //每个线程在产生一个新节点后至少1ms不生产新节点
    97. }
    98. return NULL;
    99. }
    100. void *custom(void *arg)
    101. {
    102. while (1)
    103. {
    104. pthread_rwlock_wrlock(&rwlock);
    105. if (head == NULL)
    106. {
    107. //1head此时为null说明生产线程还没有生产节点 容器为空 需要将锁给生产线程,让生产线程先生产节点先
    108. // pthread_rwlock_unlock(&rwlock);
    109. // usleep(1000); //每个线程在消费一个节点后至少1ms不争夺锁
    110. // continue;
    111. //2 没有数据阻塞等待 当生产者生产了一个节点会发信号通知 这里就会解除阻塞
    112. pthread_cond_wait(&cond, &rwlock); //当前消费者拿着锁在等待 生产者拿不到锁怎么生产?
    113. //并不会这样 pthread_cond_wait在等待期间会将锁给让出去 当前线程在等待时是不拿锁的
    114. //当唤醒后不再等待 锁又归还给当前线程
    115. pthread_rwlock_unlock(&rwlock);
    116. continue;
    117. }
    118. Node *temp = head;
    119. head = head->next;
    120. printf("%ld custom node %d\n
    121. ", pthread_self(), temp->data);
    122. free(temp); //释放头节点
    123. pthread_rwlock_unlock(&rwlock);
    124. usleep(1000); //每个线程在消费一个节点后至少1ms不消费新节点
    125. }
    126. return NULL;
    127. }



    信号量
    相比条件变量使用起来更灵活方便

    1. /*
    2. 信号量(信号灯) 与条件变量类似 用于阻塞线程 但一样不是锁
    3. 类型 sem_t
    4. #include <semaphore.h>
    5. int sem_init(sem_t *sem, int pshared, unsigned int value);
    6. 初始化信号量
    7. sem 要初始化的信号量
    8. pshared 此信号量用于线程间还是进程间 为0是线程 非0是进程(信号量在共享内存中)
    9. 但LinuxThreads仅实现线程间共享,因此所有非0值的pshared输入都将使sem_init()返回-1,且置errno为ENOSYS。
    10. value 信号量中断的值(总共有几个灯)
    11. int sem_destroy(sem_t *sem);
    12. 销毁信号量 释放资源
    13. 释放自己占用的一切资源,被注销的信号量sem要求已没有线程在等待该信号量了,否则返回-1,且置errno为EBUSY。
    14. wait对信号量加锁
    15. int sem_wait(sem_t *sem);
    16. 若当前亮着的灯为0个则阻塞直到亮灯不为0个 不为0不会阻塞 将亮着的灯关掉一个(-1)
    17. int sem_trywait(sem_t *sem);
    18. sem_trywait()为sem_wait()的非阻塞版,如果信号量计数大于0,则信号量立即减1并返回0,否则立即返回-1,errno置为EAGAIN。
    19. int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    20. sem_timedwait和sem_wait一样当信号量计数为0会阻塞,但sem_timedwait的abs_timeout参数指定了调用应该阻塞的时间限制
    21. 当阻塞超时时返回失败(errno 设置为 ETIMEDOUT)
    22. 尽量不要使用sem_timedwait函数来实现延时等待的功能,若要使用该延时等待的功能,建议使用sem_trywait+usleep 实现的延时阻塞
    23. int sem_post(sem_t *sem);
    24. 解锁一个信号量(开一个灯(+1)) 唤醒在wait的线程(它获得了一个信号量锁)
    25. 它是一个“原子操作”,即同时对同一个信号量做加“1”操作的两个线程是不会冲突的,一个先加1一个后加1最后是加2
    26. int sem_getvalue(sem_t *sem, int *sval);
    27. 读取sem中信号量计数,存于*sval中,并返回0。
    28. */
    29. #include <stdio.h>
    30. #include <pthread.h>
    31. #include <string.h>
    32. #include <unistd.h>
    33. #include <stdlib.h> //malloc
    34. #include <semaphore.h>
    35. pthread_rwlock_t rwlock; //全局读写锁量
    36. void *produce(void *arg);
    37. void *custom(void *arg);
    38. //使用链表作为容器 假设这个容器不会满 但是会空
    39. typedef struct Node
    40. {
    41. int data;
    42. struct Node *next;
    43. } Node;
    44. Node *head = NULL; //头节点
    45. //创建条两个信号量
    46. sem_t csem, psem;
    47. int main()
    48. {
    49. //初始化互斥量
    50. pthread_rwlock_init(&rwlock, NULL);
    51. //初始化信号量
    52. //这个10限定了容器中最多只会有10个节点 生产者灭的灯数+消费者亮的灯数=消费者灭的灯数+生产者亮的灯数=10 这个式子永远是恒等的
    53. sem_init(&psem, 0, 10); //生产者的信号量初始值为10 这个值其实可以设置为容器的上限
    54. sem_init(&csem, 0, 0); //消费的信号量初始值为0 这意味着消费者一开始是阻塞的(因为生产者还没生产 容器为空不允许消费)
    55. // 创建子线程 主线程只用于管理系统资源
    56. pthread_t ptids[5],
    57. ctids[5]; //线程数组 5个生产者 5个消费者
    58. for (int i = 0; i < 5; i++)
    59. {
    60. int ret = pthread_create(&ptids[i], NULL, produce, NULL);
    61. if (ret != 0)
    62. {
    63. printf("%s\n", strerror(ret));
    64. }
    65. //不join了 直接将10个线程都分离 由系统回收
    66. pthread_detach(ptids[i]);
    67. }
    68. for (int i = 0; i < 5; i++)
    69. {
    70. int ret = pthread_create(&ctids[i], NULL, custom, NULL);
    71. if (ret != 0)
    72. {
    73. printf("%s\n", strerror(ret));
    74. }
    75. //不join了 直接将10个线程都分离 由系统回收
    76. pthread_detach(ctids[i]);
    77. }
    78. while (1)
    79. {
    80. sleep(10);
    81. break;
    82. }
    83. sem_destroy(&csem);
    84. sem_destroy(&psem);
    85. //子线程执行10秒后就将锁销毁
    86. pthread_rwlock_destroy(&rwlock);
    87. // pthread_exit(NULL); //让主线程退出 当主线程退出时 不会影响其他正常运行的线程
    88. return 0; //不会执行
    89. }
    90. // 这整个链表就像栈 生产和消费的过程就是出栈和入栈 生产出来加在队首 消费也优先消费队首的节点
    91. // 生产和消费都是写操作
    92. //生产者灭的灯数+消费者亮的灯数=消费者灭的灯数+生产者亮的灯数=10 这个式子永远是恒等的
    93. void *produce(void *arg) //生产线程 向容器添加内容 不断创建节点添加到链表中--新节点直接作为新的头节点
    94. {
    95. while (1)
    96. {
    97. sem_wait(&psem); //每生产一次 生产者亮灯数减1 当亮灯数为0 阻塞等待消费者消费
    98. pthread_rwlock_wrlock(&rwlock);
    99. Node *new_node = (Node *)malloc(sizeof(Node));
    100. new_node->data = rand() % 1000;
    101. new_node->next = head;
    102. head = new_node;
    103. printf("%ld produce node %d\n", pthread_self(), new_node->data);
    104. //只要生产了一个就通知消费者消费 唤醒在等待的消费者
    105. pthread_rwlock_unlock(&rwlock);
    106. sem_post(&csem); //已经生产了一个节点消费者亮灯数+1 并唤醒消费者线程中在wait的线程去消费
    107. usleep(1000); //每个线程在产生一个新节点后至少1ms不生产新节点
    108. }
    109. return NULL;
    110. }
    111. void *custom(void *arg)
    112. {
    113. while (1)
    114. {
    115. //消费者不再需要单独地判断容器是否为空
    116. //当容器为空 则消费者亮灯数为0 自然阻塞 等待生产者生产将消费者灯点亮
    117. sem_wait(&csem);
    118. pthread_rwlock_wrlock(&rwlock);
    119. Node *temp = head;
    120. head = head->next;
    121. printf("%ld custom node %d\n", pthread_self(), temp->data);
    122. free(temp); //释放头节点
    123. pthread_rwlock_unlock(&rwlock);
    124. sem_post(&psem); //已经消费了一个节点灭了一个消费者灯 需要量一个生产者灯 并唤醒生产者线程中在wait的线程去生产
    125. usleep(1000); //每个线程在消费一个节点后至少1ms不消费新节点
    126. }
    127. return NULL;
    128. }