1. 线程安全

线程安全中涉及到的概念:

  • 临界资源:多个线程都能访问到的资源
  • 临界区: 执行访问临界资源的代码片段
  • 线程不安全:多线程访问临界资源导致的临界资源出现二义性的问题

在linux操作系统中,提供了多种手段来保证线程安全:互斥量、读写锁、条件变量、信号量。

2. 互斥锁

2.1 初始化

互斥锁的初始化分为静态、动态方式两种,分别如下:

  1. #include <pthread.h>
  2. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 静态初始化互斥锁
  3. // 动态初始化互斥锁
  4. // mutex:出参,返回互斥锁变量
  5. // attr:互斥锁属性,默认情况下为NULL
  6. int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);

互斥锁的类型如下:

  1. PTHREAD_MUTEX_NORMAL:最普通的互斥锁。
  2. PTHREAD_MUTEX_RECURSIVE_NP:支持递归的互斥锁,互斥锁内部维持锁的持有者和加锁计数,线程每次枷锁,都会导致加锁计数+1,每次解锁都会导致加锁计数-1,当加锁计数为0时,线程放弃对锁的占用,其他线程可以获取该锁。
  3. PTHREAD_MUTEX_ERRORCHECK_NP:可以检测递归的互斥锁,互斥锁内部会记录当前持有锁的线程id,当该线程再次加锁时,会返回错误码EDEADLK。解锁时,如果发现解锁的线程不是持有锁的线程,便会返回EPERM
  4. PTHREAD_MUTEX_ADAPTIVE_NP:自适应锁,在一段时间内尝试获取锁,如获取不到,则进入等待状态。

设置/获取互斥锁的类型:

  1. int pthread_mutexattr_settype (pthread_mutexattr_t *attr, int kind);
  2. int pthread_mutexattr_gettype (const pthread_mutexattr_t *attr, int *kind);

2.2 销毁

  1. int pthread_mutex_destroy(pthread_mutex_t *mutex);

销毁函数的注意事项如下:

  1. 使用静态初始化的互斥锁不需要销毁
  2. 不要销毁一个已经加锁的互斥量或正在与条件变量配合使用的互斥量(返回EBUSY错误码)
  3. 不要在销毁后再次对互斥量加锁。

2.3 加锁

  1. int pthread_mutex_lock(pthread_mutex_t *mutex);
  2. int pthread_mutex_trylock(pthread_mutex_t *mutex);
  3. int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abs_timeout);

下面详细解释各接口的用法:

  1. pthread_mutex_lock:该接口为阻塞接口,当互斥量被其他线程占用时,当前线程会被阻塞在pthread_mutex_lock处。
  2. pthread_mutex_trylock:该接口为非阻塞接口,无论是否加锁成功,均会立刻返回,经常与循环一起使用。
  3. pthread_mutex_timedlock:带有超时时间的加锁接口,在时间范围内,若加锁成功会立刻返回,若超时,则说明加锁失败,经常与循环一起使用。

    2.4 解锁

    1. int pthread_mutex_unlock(pthread_mutex_t *mutex);

    该接口为统一的解锁接口,上述三个加锁接口均可以使用该接口来解锁。

    2.5 互斥锁的本质

  4. 互斥锁内部有一个计数器,只能为0或者1。

  5. 当计数器为1时,表示线程可获得该互斥锁,获取成功后,计数器取值变为0。
  6. 当计数器为0时,表示该互斥锁已被其他线程获取,无法重复加锁。

注意:互斥锁释放后,每个线程获取该互斥锁的概率是一样,并不存在先来先得。

2.6 示例

  1. #define THREAD_MAX 5
  2. pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
  3. int g_param = 0;
  4. void *work_thread(void *arg)
  5. {
  6. for (int i = 0; i < 3; ++i) {
  7. pthread_mutex_lock(&g_mutex);
  8. ++g_param;
  9. printf("线程id:%d, 参数:%d \n", pthread_self(), g_param);
  10. pthread_mutex_unlock(&g_mutex);
  11. sleep(1);
  12. }
  13. return NULL;
  14. }
  15. int main ()
  16. {
  17. pthread_t tid[THREAD_MAX];
  18. for (int i = 0; i < THREAD_MAX; ++i) {
  19. pthread_create(&tid[i], NULL, work_thread, NULL);
  20. }
  21. for (int i = 0; i < THREAD_MAX; ++i) {
  22. pthread_join(tid[i], NULL);
  23. }
  24. return 0;
  25. }

3. 读写锁

大部分情况下,对于共享变量的访问特点:只是读取共享变量的值,而不是修改,只有在少数情况下,才会真正的修改共享变量的值。
这种情况下,读操作之间并发操作是安全的,当进行写操作时,必须锁住读操作才能保证结果是正确的,即多个线程可以同时读,只有一个线程可以写。

3.1 初始化与销毁

  1. #include <pthread.h>
  2. //销毁
  3. int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
  4. //初始化
  5. int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
  1. 默认属性下的读写锁:进程内部竞争读写锁,读者优先。

3.2 读者加锁

  1. #include <pthread.h>
  2. int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); //阻塞类型的读加锁接口
  3. int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); //非阻塞类型的读加锁接口
  1. 对于读者加锁,读写锁内部会维护一个引用计数,每次读者加锁,引用计数+1,当释放读加锁形式的读写锁时,引用计数-1,当引用计数为0时,该锁被完全释放。

3.3 写者加锁

  1. #include <pthread.h>
  2. int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); // 非阻塞写
  3. int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 阻塞写
  1. 写者加锁采用的时独占模式,若该锁当前被写加锁,则其他线程(读线程/写线程)均无法获得该锁。

3.4 解锁

  1. #include <pthread.h>
  2. int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
  1. 此接口为解锁统一接口。

3.5 读写锁的竞争策略

  1. PTHREAD_RWLOCK_PREFER_READER_NP, //读者优先
  2. PTHREAD_RWLOCK_PREFER_WRITER_NP, //很唬人, 但是也是读者优先
  3. PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP, //写者优先
  4. PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP
  5. #include <pthread.h>
  6. // 设置竞争策略
  7. int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
  8. // 获取竞争策略
  9. int pthread_rwlockattr_getkind_np(const pthread_rwlockattr_t *attr, int *pref);

4. 信号量

信号量是另一个种保证线程安全的方式,支持唤醒指定数量的线程。

4.1 初始化

  1. #include <semaphore.h>
  2. int sem_init(sem_t* sem, int pshared, unsigned int value);

参数解释如下:

  1. sem:函数出参,返回创建好的信号量对象。
  2. pshared:指示是否在进程间共享,若为0,则表示只在进程内线程之间共享,即线程同步;若为1,则表示在进程之间共享,即进程同步。
  3. value:信号量内部的引用计数初始值。当引用计数为0时,表示该信号量被其他线程占有,无法获取;若大于0,则表示该信号量为空闲状态,可以供其他线程使用。
  4. 返回值:0表示初始化成功,-1表示初始化失败,错误码在errno中。

    4.2 销毁

    1. int sem_destroy(sem_t* sem);

    4.3 唤醒

    1. int sem_post(sem_t* sem);

    此函数用于将信号量的资源计数+1,同时解锁信号量,这样其他使用sem_wait阻塞的线程会被唤醒。

    4.4 等待

    1. int sem_wait(sem_t* sem);
    2. int sem_trywait(sem_t* sem);
    3. int sem_timedwait(sem_t* sem, const struct timespec* abs_timeout);

    具体函数解释如下:

  5. sem_wait: 阻塞版本的等待函数。当检测到等待信号量的资源计数大于0时,将互斥量资源计数-1,然后此函数会返回。返回值为0时,表示成功,为-1是表示失败。

  6. sem_trywait:非阻塞版本的等待函数。当信号量资源计数=0时,也会马上返回(结果为-1,errno=EAGAIN
  7. sem_timedwait:带有时间的等待函数。等待指定时间后,函数会返回-1,errno=ETIMEOUT。

    4.5 实例

    ```cpp class CTask { public: CTask(int nTaskID) { m_nTaskID = nTaskID; }

    void doTask() { std::cout << “handle a task,taskid:” << m_nTaskID << “threadid:” << pthread_self() << std::endl; }

private: int m_nTaskID; };

pthread_mutex_t g_mutex; std::list g_list; sem_t g_sem;

void consumer_thread(void args) { int pid = pthread_self(); CTask *pTask = nullptr;

  1. while (true)
  2. {
  3. if (g_list.empty())
  4. continue;
  5. if (0 != sem_wait(&g_sem))
  6. continue;
  7. pthread_mutex_lock(&g_mutex);
  8. pTask = g_list.front();
  9. g_list.pop_front();
  10. pthread_mutex_unlock(&g_mutex);
  11. pTask->doTask();
  12. delete pTask;
  13. sleep(1);
  14. }

}

void producer_thread(void args) { int taskid = 0;

  1. while (true)
  2. {
  3. CTask *pTask = new CTask(taskid);
  4. taskid++;
  5. pthread_mutex_lock(&g_mutex);
  6. g_list.push_back(pTask);
  7. pthread_mutex_unlock(&g_mutex);
  8. // 唤醒消费者线程
  9. sem_post(&g_sem);
  10. sleep(1);
  11. }

}

int main() { pthread_mutex_init(&g_mutex, NULL); sem_init(&g_sem, 0, 5);

  1. pthread_t tid_consumer[5];
  2. for (int i = 0; i < 5; ++i)
  3. {
  4. pthread_create(&tid_consumer[i], NULL, consumer_thread, NULL);
  5. }
  6. pthread_t tid_producer;
  7. pthread_create(&tid_producer, NULL, producer_thread, NULL);
  8. pthread_join(tid_producer, NULL);
  9. for (int i = 0; i < 5; ++i)
  10. {
  11. pthread_join(tid_consumer[i], NULL);
  12. }
  13. pthread_mutex_destroy(&g_mutex);
  14. sem_destroy(&g_sem);
  15. return 0;

}

  1. <a name="o6afj"></a>
  2. ### 5. 条件变量
  3. 在平时开发过程中,我们总会有这样的需求:
  4. ```cpp
  5. pthread_mutex_lock(&g_mutex);
  6. while (condition is false)
  7. {
  8. pthread_mutex_unlock(&g_mutex);
  9. sleep(n);
  10. pthread_mutex_lock(&g_mutex);
  11. }
  12. // 真正执行流程
  13. dosomething();

此段代码的真正意义为:定时去查询条件是否满足。若条件不满足,则让出互斥锁交由别的线程去处理,同时等待n秒获取互斥锁后继续查询条件是否满足,若条件满足则获取互斥锁执行相关操作。但此段代码严重的效率问题:无论条件在何时被满足,我们都需要等待n秒后才能得知这一结果。因此我们需要以下的机制:

  1. 在条件不满足时,放出互斥锁等待。
  2. 条件一旦满足,其他线程能够立刻释放互斥锁并通知操作线程进行处理。

这也是条件变量的工作机制。

5.1 初始化

与互斥量类似,条件变量也支持两种初始化方式:

  1. // 动态初始化
  2. int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attr);
  3. // 静态初始化
  4. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

5.2 销毁

  1. int pthread_cond_destroy(pthread_cond_t* cond);

5.3 等待

条件变量需要绑定一个互斥量来实现线程同步,等待函数的签名如下:

  1. int pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex);
  2. int pthread_cond_timedwait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex, const struct timespec* restrict abstime);
  1. 注意:`abstime`为绝对时间,也就是说:假设要等待5秒,我们需要设置的参数为:当前时间+5得到的绝对时间。

5.4 通知

  1. int pthread_cond_signal(pthread_cond_t* cond);
  2. int pthread_cond_broadcast(pthread_cond_t* cond);
  1. `pthread_cond_signal`函数只会唤醒一个线程,但`pthread_cond_broadcast`会唤醒多个线程。

5.5 实例

  1. class CTask
  2. {
  3. public:
  4. CTask(int nTaskID)
  5. {
  6. m_nTaskID = nTaskID;
  7. }
  8. void doTask()
  9. {
  10. std::cout << "handle a task,taskid:" << m_nTaskID << " threadid:" << pthread_self() << std::endl;
  11. }
  12. private:
  13. int m_nTaskID;
  14. };
  15. pthread_mutex_t g_mutex;
  16. std::list<CTask *> g_list;
  17. pthread_cond_t g_cond;
  18. void *consumer_thread(void *args)
  19. {
  20. int pid = pthread_self();
  21. CTask *pTask = nullptr;
  22. while (true)
  23. {
  24. pthread_mutex_lock(&g_mutex);
  25. while (g_list.empty())
  26. {
  27. pthread_cond_wait(&g_cond, &g_mutex);
  28. }
  29. pTask = g_list.front();
  30. g_list.pop_front();
  31. pthread_mutex_unlock(&g_mutex);
  32. pTask->doTask();
  33. delete pTask;
  34. sleep(1);
  35. }
  36. }
  37. void *producer_thread(void *args)
  38. {
  39. int taskid = 0;
  40. while (true)
  41. {
  42. for (int i = 0; i < 2; ++i)
  43. {
  44. CTask *pTask = new CTask(taskid);
  45. taskid++;
  46. pthread_mutex_lock(&g_mutex);
  47. g_list.push_back(pTask);
  48. pthread_mutex_unlock(&g_mutex);
  49. }
  50. // pthread_cond_signal(&g_cond);
  51. pthread_cond_broadcast(&g_cond);
  52. sleep(1);
  53. }
  54. }
  55. int main()
  56. {
  57. pthread_mutex_init(&g_mutex, NULL);
  58. pthread_cond_init(&g_cond, NULL);
  59. pthread_t tid_consumer[5];
  60. for (int i = 0; i < 5; ++i)
  61. {
  62. pthread_create(&tid_consumer[i], NULL, consumer_thread, NULL);
  63. }
  64. pthread_t tid_producer;
  65. pthread_create(&tid_producer, NULL, producer_thread, NULL);
  66. pthread_join(tid_producer, NULL);
  67. for (int i = 0; i < 5; ++i)
  68. {
  69. pthread_join(tid_consumer[i], NULL);
  70. }
  71. pthread_mutex_destroy(&g_mutex);
  72. pthread_cond_destroy(&g_cond);
  73. return 0;
  74. }

5.6 注意事项

  1. 虚假唤醒:条件变量等待可能会被系统的某些信号唤醒,但此时条件并不满足,因此我们需要再次验证条件是否满足后在进行相关操作。
  2. 条件变量条件满足通知只会发送一次,若不处理将会永久消失,因此一定要保证wait函数在通知发送之前调用。
  3. 为什么条件变量要与互斥量绑定在一起? ```cpp // 假设不绑定 pthread_mutex_lock(&g_mutex); while (condition is false) { pthread_mutex_unlock(&g_mutex);

    cond_wait(&cond);

    pthread_mutex_lock(&g_mutex); }

// 真正执行流程 dosomething(); ``` 上述代码与条件变量的区别在于:条件变量解锁与等待是原子操作,但上述代码不是。我们假设:此线程玩执行玩解锁操作之后时间片丢失,此时另一个开始处理且条件满足并发出信号,此时因此处理线程还没有得到时间片从而导致该通知丢失,从而导致无限期等待下去。