生产者和消费者模型 和 条件变量
生产者
容器装满后不再生产阻塞 并通知消费者去消费
容器
消费者
容器无内容后不再消费阻塞 并通知生产者去生产
/*生产者消费者模型 也叫有限缓冲模型生产者向容器中添加节点 消费者从容器中删除节点条件变量 不是锁 但是可以在某个条件满足后阻塞线程或解阻塞线程pthread_cond_tint pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict cond_attr);初始化条件变量int pthread_cond_destroy(pthread_cond_t *cond);销毁条件变量int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);阻塞函数 等待被唤醒(在信号或广播唤醒前一直等待)pthread_cond_wait的线程在等待期间会将锁给让出去 当前线程在等待时是不拿锁的在被唤醒等待结束不阻塞时 锁又会还给这个在等待的线程int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);阻塞函数 等待被唤醒(在信号或广播唤醒或阻塞的实际还没有超过设定的abstime前 一直等待)等待期间会将锁给让出去 当前线程在等待时是不拿锁的在被唤醒等待结束不阻塞时 锁又会还给这个在等待的线int pthread_cond_signal(pthread_cond_t *cond);唤醒1个或多个在等待的线程int pthread_cond_broadcast(pthread_cond_t *cond);唤醒所有在等待的线程*/#include <stdio.h>#include <pthread.h>#include <string.h>#include <unistd.h>#include <stdlib.h> //mallocpthread_rwlock_t rwlock; //全局读写锁量void *produce(void *arg);void *custom(void *arg);//使用链表作为容器 假设这个容器不会满 但是会空typedef struct Node{int data;struct Node *next;} Node;Node *head = NULL; //头节点//创建条件变量pthread_cond_t cond;int main(){//初始化互斥量pthread_rwlock_init(&rwlock, NULL);//初始化条件变量pthread_cond_init(&cond, NULL);// 创建3个子线程 主线程只用于管理系统资源pthread_t ptids[5],ctids[5]; //线程数组 5个生产者 5个消费者for (int i = 0; i < 5; i++){int ret = pthread_create(&ptids[i], NULL, produce, NULL);if (ret != 0){printf("%s\n", strerror(ret));}//不join了 直接将10个线程都分离 由系统回收pthread_detach(ptids[i]);}for (int i = 0; i < 5; i++){int ret = pthread_create(&ctids[i], NULL, custom, NULL);if (ret != 0){printf("%s\n", strerror(ret));}//不join了 直接将10个线程都分离 由系统回收pthread_detach(ctids[i]);}while (1){sleep(10);break;}pthread_cond_destroy(&cond);//子线程执行10秒后就将锁销毁pthread_rwlock_destroy(&rwlock);//pthread_exit(NULL); //让主线程退出 当主线程退出时 不会影响其他正常运行的线程return 0; //不会执行}// 这整个链表就像栈 生产和消费的过程就是出栈和入栈 生产出来加在队首 消费也优先消费队首的节点// 生产和消费都是写操作void *produce(void *arg) //生产线程 向容器添加内容 不断创建节点添加到链表中--新节点直接作为新的头节点{while (1){pthread_rwlock_wrlock(&rwlock);Node *new_node = (Node *)malloc(sizeof(Node));new_node->data = rand() % 1000;new_node->next = head;head = new_node;printf("%ld produce node %d\n", pthread_self(), new_node->data);//只要生产了一个就通知消费者消费 唤醒在等待的消费者pthread_cond_signal(&cond);pthread_rwlock_unlock(&rwlock);usleep(1000); //每个线程在产生一个新节点后至少1ms不生产新节点}return NULL;}void *custom(void *arg){while (1){pthread_rwlock_wrlock(&rwlock);if (head == NULL){//1head此时为null说明生产线程还没有生产节点 容器为空 需要将锁给生产线程,让生产线程先生产节点先// pthread_rwlock_unlock(&rwlock);// usleep(1000); //每个线程在消费一个节点后至少1ms不争夺锁// continue;//2 没有数据阻塞等待 当生产者生产了一个节点会发信号通知 这里就会解除阻塞pthread_cond_wait(&cond, &rwlock); //当前消费者拿着锁在等待 生产者拿不到锁怎么生产?//并不会这样 pthread_cond_wait在等待期间会将锁给让出去 当前线程在等待时是不拿锁的//当唤醒后不再等待 锁又归还给当前线程pthread_rwlock_unlock(&rwlock);continue;}Node *temp = head;head = head->next;printf("%ld custom node %d\n", pthread_self(), temp->data);free(temp); //释放头节点pthread_rwlock_unlock(&rwlock);usleep(1000); //每个线程在消费一个节点后至少1ms不消费新节点}return NULL;}
信号量
相比条件变量使用起来更灵活方便
/*信号量(信号灯) 与条件变量类似 用于阻塞线程 但一样不是锁类型 sem_t#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);初始化信号量sem 要初始化的信号量pshared 此信号量用于线程间还是进程间 为0是线程 非0是进程(信号量在共享内存中)但LinuxThreads仅实现线程间共享,因此所有非0值的pshared输入都将使sem_init()返回-1,且置errno为ENOSYS。value 信号量中断的值(总共有几个灯)int sem_destroy(sem_t *sem);销毁信号量 释放资源释放自己占用的一切资源,被注销的信号量sem要求已没有线程在等待该信号量了,否则返回-1,且置errno为EBUSY。wait对信号量加锁int sem_wait(sem_t *sem);若当前亮着的灯为0个则阻塞直到亮灯不为0个 不为0不会阻塞 将亮着的灯关掉一个(-1)int sem_trywait(sem_t *sem);sem_trywait()为sem_wait()的非阻塞版,如果信号量计数大于0,则信号量立即减1并返回0,否则立即返回-1,errno置为EAGAIN。int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);sem_timedwait和sem_wait一样当信号量计数为0会阻塞,但sem_timedwait的abs_timeout参数指定了调用应该阻塞的时间限制当阻塞超时时返回失败(errno 设置为 ETIMEDOUT)尽量不要使用sem_timedwait函数来实现延时等待的功能,若要使用该延时等待的功能,建议使用sem_trywait+usleep 实现的延时阻塞int sem_post(sem_t *sem);解锁一个信号量(开一个灯(+1)) 唤醒在wait的线程(它获得了一个信号量锁)它是一个“原子操作”,即同时对同一个信号量做加“1”操作的两个线程是不会冲突的,一个先加1一个后加1最后是加2int sem_getvalue(sem_t *sem, int *sval);读取sem中信号量计数,存于*sval中,并返回0。*/#include <stdio.h>#include <pthread.h>#include <string.h>#include <unistd.h>#include <stdlib.h> //malloc#include <semaphore.h>pthread_rwlock_t rwlock; //全局读写锁量void *produce(void *arg);void *custom(void *arg);//使用链表作为容器 假设这个容器不会满 但是会空typedef struct Node{int data;struct Node *next;} Node;Node *head = NULL; //头节点//创建条两个信号量sem_t csem, psem;int main(){//初始化互斥量pthread_rwlock_init(&rwlock, NULL);//初始化信号量//这个10限定了容器中最多只会有10个节点 生产者灭的灯数+消费者亮的灯数=消费者灭的灯数+生产者亮的灯数=10 这个式子永远是恒等的sem_init(&psem, 0, 10); //生产者的信号量初始值为10 这个值其实可以设置为容器的上限sem_init(&csem, 0, 0); //消费的信号量初始值为0 这意味着消费者一开始是阻塞的(因为生产者还没生产 容器为空不允许消费)// 创建子线程 主线程只用于管理系统资源pthread_t ptids[5],ctids[5]; //线程数组 5个生产者 5个消费者for (int i = 0; i < 5; i++){int ret = pthread_create(&ptids[i], NULL, produce, NULL);if (ret != 0){printf("%s\n", strerror(ret));}//不join了 直接将10个线程都分离 由系统回收pthread_detach(ptids[i]);}for (int i = 0; i < 5; i++){int ret = pthread_create(&ctids[i], NULL, custom, NULL);if (ret != 0){printf("%s\n", strerror(ret));}//不join了 直接将10个线程都分离 由系统回收pthread_detach(ctids[i]);}while (1){sleep(10);break;}sem_destroy(&csem);sem_destroy(&psem);//子线程执行10秒后就将锁销毁pthread_rwlock_destroy(&rwlock);// pthread_exit(NULL); //让主线程退出 当主线程退出时 不会影响其他正常运行的线程return 0; //不会执行}// 这整个链表就像栈 生产和消费的过程就是出栈和入栈 生产出来加在队首 消费也优先消费队首的节点// 生产和消费都是写操作//生产者灭的灯数+消费者亮的灯数=消费者灭的灯数+生产者亮的灯数=10 这个式子永远是恒等的void *produce(void *arg) //生产线程 向容器添加内容 不断创建节点添加到链表中--新节点直接作为新的头节点{while (1){sem_wait(&psem); //每生产一次 生产者亮灯数减1 当亮灯数为0 阻塞等待消费者消费pthread_rwlock_wrlock(&rwlock);Node *new_node = (Node *)malloc(sizeof(Node));new_node->data = rand() % 1000;new_node->next = head;head = new_node;printf("%ld produce node %d\n", pthread_self(), new_node->data);//只要生产了一个就通知消费者消费 唤醒在等待的消费者pthread_rwlock_unlock(&rwlock);sem_post(&csem); //已经生产了一个节点消费者亮灯数+1 并唤醒消费者线程中在wait的线程去消费usleep(1000); //每个线程在产生一个新节点后至少1ms不生产新节点}return NULL;}void *custom(void *arg){while (1){//消费者不再需要单独地判断容器是否为空//当容器为空 则消费者亮灯数为0 自然阻塞 等待生产者生产将消费者灯点亮sem_wait(&csem);pthread_rwlock_wrlock(&rwlock);Node *temp = head;head = head->next;printf("%ld custom node %d\n", pthread_self(), temp->data);free(temp); //释放头节点pthread_rwlock_unlock(&rwlock);sem_post(&psem); //已经消费了一个节点灭了一个消费者灯 需要量一个生产者灯 并唤醒生产者线程中在wait的线程去生产usleep(1000); //每个线程在消费一个节点后至少1ms不消费新节点}return NULL;}
