1. 封装线程类

来自thread.hthread.cpp文件

1.1 核心逻辑

线程开发 - 图1

1.2 成员变量

  1. class Thread
  2. {
  3. ...
  4. ...
  5. private:
  6. ///线程ID 用户态的线程ID和内核线程ID不是一个概念 调试时候需要拿到内核中的ID
  7. pid_t m_id = -1;
  8. ///线程号
  9. pthread_t m_thread = 0; //unsigned long int
  10. ///线程执行的回调函数
  11. std::function<void()> m_cb;
  12. ///线程名称
  13. std::string m_name;
  14. //信号量对象
  15. Semaphore m_sem;
  16. };

· 补充说明:开始定义线程局部变量

目的:存储管理当前线程的类对象Threadthis指针 和 当前线程的命名
thread_local是C++11引入的类型符)
image.png

1.3 接口

1.3.1 构造函数

功能:利用pthread库开启运行线程,并且置一个信号量去等待线程完全开启后再退出构造函数

  1. Thread::Thread(std::function<void()> cb, const std::string& name)
  2. :m_cb(cb)
  3. {
  4. m_name = name.size() > 0 ? name : "UNKNOW";
  5. //创建线程
  6. int ret = pthread_create(&m_thread, nullptr, &Thread::run, this);
  7. if(ret < 0)
  8. {
  9. KIT_LOG_ERROR(g_logger) << "Thread:: pthread_create fail, ret= " << ret << " thread name= " << m_name;
  10. throw std::logic_error("pthread_create fail!!");
  11. }
  12. //试想 当执行完上面的API线程可能不会马上运行 手动等待一下直到线程完全开始运行
  13. m_sem.wait();
  14. }

· 补充说明:线程类对象之间应禁止拷贝赋值

原理:使用C++11特性,在成员函数尾部添加=delete表示该成员函数被禁用。发生相应可调用场合会自动屏蔽函数调用。
image.png

1.3.2 析构函数

  1. /**
  2. * @brief 线程类析构函数
  3. */
  4. ~Thread();
  5. Thread:: ~Thread()
  6. {
  7. //析构时候不马上杀死线程 而是将其置为分离态
  8. if(m_thread)
  9. pthread_detach(m_thread);
  10. }

** 私有接口

· run()

功能:pthread库指定的线程格式,用于作为线程回调函数。其中包含线程真正需要去执行的函数体,以及对线程的一系列初始化。

注意:必须声明为static静态函数。

  1. /**
  2. * @brief 线程回调函数(固定格式)
  3. * @return void*
  4. */
  5. static void *run(void *);
  6. void *Thread::run(void * arg)
  7. {
  8. //接收传入的this指针 因为是个static方法 因此依靠传入this指针很重要
  9. Thread* thread = (Thread*)arg;
  10. //初始化线程局部变量
  11. t_thread = thread;
  12. t_thread_name = thread->m_name;
  13. //获取内核线程ID
  14. thread->m_id = GetThreadId(); //util.cpp定义的方法
  15. //给内核线程命名 但是只能接收小于等于16个字符
  16. pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
  17. //防止m_cb中智能指针的引用不被释放 交换一次会减少引用次数
  18. std::function<void()> cb;
  19. cb.swap(thread->m_cb);
  20. //确保线程已经完全初始化好了 再进行唤醒
  21. thread->m_sem.notify();
  22. //调取执行函数
  23. cb();
  24. return 0;
  25. }

· 小技巧:在线程回调函数里面顺利使用类成员

原理:在调用pthread_create()线程API的时候,传入参数this指针,就能通过这个指针去访问到类中的成员,否则类成员静态方法中无法使用非静态成员变量。或者,另外写一个普通成员函数,在线程回调函数中去调用那个函数也能解决该问题。

  • **Thread::Thread()**中:

image.png

  • **void *Thread::run(void * arg)**线程回调函数中:

image.png

1.3.3 join()

功能:以阻塞态的形式回收处理线程资源

  1. /**
  2. * @brief 线程回收处理函数
  3. */
  4. void join();
  5. void Thread::join()
  6. {
  7. if(m_thread)
  8. {
  9. int ret = pthread_join(m_thread, nullptr);
  10. if(ret < 0)
  11. {
  12. KIT_LOG_ERROR(g_logger) << "Thread:: pthread_join fail, ret= " << ret << " thread name= " << m_name;
  13. throw std::logic_error("pthread_join fail!!");
  14. }
  15. m_thread = 0;
  16. }
  17. }

1.3.4 其他常用接口

  1. /**
  2. * @brief 获取线程ID
  3. * @return pid_t
  4. */
  5. pid_t getId() const {return m_id;}
  6. /**
  7. * @brief 获取线程名称
  8. * @return const std::string&
  9. */
  10. const std::string& getName() const {return m_name;}
  11. /**
  12. * @brief 获取当前正在运行的线程指针
  13. * @return Thread*
  14. */
  15. static Thread* GetThis();
  16. Thread* Thread::GetThis()
  17. {
  18. return t_thread;
  19. }
  20. /**
  21. * @brief 获取当前运行线程的名称
  22. * @return const std::string&
  23. */
  24. static const std::string& GetName();
  25. const std::string& Thread::GetName()
  26. {
  27. return t_thread_name;
  28. }
  29. /**
  30. * @brief 设置当前运行线程的名称
  31. * @param[in] name 线程名称
  32. */
  33. static void SetName(const std::string& name);
  34. void Thread::SetName(const std::string& name)
  35. {
  36. if(t_thread)
  37. {
  38. t_thread->m_name = name;
  39. }
  40. t_thread_name = name;
  41. }

C++知识补充复习1:线程局部变量(TLS thread-local-storage)

出处:《深入理解C++11新特性解析与应用》P215
概念:有线程生命周期以及线程可见性的变量。
特点:线程创建开始时,创建变量;线程结束时自动释放。

解决的问题:全局变量/静态变量在多线程环境下的共享问题。并非所有情况下,这种类型的变量都需要进行存储空间的共享。例如POSIX标准下的错误码全局变量errno,我们希望在某个线程下错误码是全局生效的,但是又不希望这个错误码会被其他线程所影响,因此引出线程局部变量,使得每个线程在”全局”上可以定义一个变量,但是空间的存储不是和所有线程共享。

本质上:由于线程是共享同一进程下的资源,因此线程局部变量的存储空间也在所依附的进程下。其他线程是能看到这一块空间的,但是会有线程局部特性的函数限制其他线程对该空间的访问。

C++知识补充复习2:区域锁(Scoped Locking)

概念:区域锁(Scoped Locking)并非一种锁的种类,而是使用锁的一种模式。这种概念是C++中RAII(Resource Acquisition Is Initialization)模式具体体现之一,”资源需要即初始化”。基本思想:C++中的资源应该交由对象管理,资源在对象的构造函数中完成初始化,在对象的析构函数中被释放。
拿现在锁的这个例子,一个管理锁的对象在构造函数中完成对锁的初始化,在析构函数中完成对锁的资源释放。


2. 封装互斥锁、信号量

来自mutex.hmutex.cpp文件
思想:运用C++中RAII的模式思想,完成由对象管理锁、信号量资源。

2.1 信号量代码如下:

  1. /**
  2. * @brief 信号量类
  3. */
  4. class Semaphore : Noncopyable
  5. {
  6. public:
  7. /**
  8. * @brief 信号量类构造函数
  9. * @param[in] count 计数基准值, 一般以0位基准
  10. */
  11. Semaphore(uint32_t count = 0);
  12. /**
  13. * @brief 信号量类析构函数
  14. */
  15. ~Semaphore();
  16. //数-1
  17. void wait();
  18. //数+1
  19. void notify();
  20. private:
  21. //禁止拷贝赋值
  22. Semaphore(const Semaphore &) = delete;
  23. Semaphore(const Semaphore &&) = delete;
  24. Semaphore& operator=(const Semaphore &) = delete;
  25. private:
  26. /// 信号量结构体
  27. sem_t m_semaphore;
  28. };
  29. Semaphore::Semaphore(uint32_t count)
  30. {
  31. //初始化信号量
  32. if(sem_init(&m_semaphore, 0, count) < 0)
  33. {
  34. KIT_LOG_ERROR(g_logger) << "Semaphore::Semaphore sem_init fail";
  35. throw std::logic_error("sem_init fail!!");
  36. }
  37. }
  38. Semaphore::~Semaphore()
  39. {
  40. //销毁信号量
  41. sem_destroy(&m_semaphore);
  42. }
  43. //数-1
  44. void Semaphore::wait()
  45. {
  46. while(1)
  47. {
  48. //递减1 函数成功时返回0; 出错返回-1
  49. if(!sem_wait(&m_semaphore))
  50. break;
  51. //如果中断就继续
  52. if(errno == EINTR)
  53. continue;
  54. KIT_LOG_ERROR(g_logger) << "Semaphore::wait sem_wait error";
  55. throw std::logic_error("sem_wait error!!");
  56. }
  57. }
  58. //数+1
  59. void Semaphore::notify()
  60. {
  61. //递增1 函数成功时返回0; 出错返回-1
  62. if(sem_post(&m_semaphore) < 0)
  63. {
  64. KIT_LOG_ERROR(g_logger) << "Semaphore::notify sem_post error";
  65. throw std::logic_error("sem_post error!!");
  66. }
  67. }

2.2 互斥锁代码如下:

  1. /**
  2. * @brief 局部区域互斥锁模板类
  3. * @tparam T
  4. */
  5. template<class T>
  6. class ScopedLockImpl
  7. {
  8. public:
  9. /**
  10. * @brief 构造时加锁
  11. * @param[in] mutex 互斥锁变量
  12. */
  13. ScopedLockImpl(T &mutex)
  14. :m_mutex(mutex)
  15. {
  16. m_mutex.lock();
  17. m_locked = true;
  18. }
  19. /**
  20. * @brief 析构时解锁
  21. */
  22. ~ScopedLockImpl()
  23. {
  24. unlock();
  25. }
  26. void lock()
  27. {
  28. if(!m_locked)
  29. {
  30. m_locked = true;
  31. m_mutex.lock();
  32. }
  33. }
  34. void unlock()
  35. {
  36. if(m_locked)
  37. {
  38. m_mutex.unlock();
  39. m_locked = false;
  40. }
  41. }
  42. private:
  43. /// 锁资源
  44. T& m_mutex;
  45. /// 上锁状态
  46. bool m_locked;
  47. };
  48. /**
  49. * @brief 互斥锁类
  50. */
  51. class Mutex : Noncopyable
  52. {
  53. public:
  54. typedef ScopedLockImpl<Mutex> Lock;
  55. /**
  56. * @brief 互斥锁类构造函数
  57. */
  58. Mutex() {pthread_mutex_init(&m_mutex, nullptr);}
  59. /**
  60. * @brief 互斥锁类析构函数
  61. */
  62. ~Mutex() {pthread_mutex_destroy(&m_mutex);}
  63. /**
  64. * @brief 加锁
  65. */
  66. void lock() {pthread_mutex_lock(&m_mutex);}
  67. /**
  68. * @brief 解锁
  69. */
  70. void unlock() {pthread_mutex_unlock(&m_mutex);}
  71. private:
  72. /// 互斥锁
  73. pthread_mutex_t m_mutex;
  74. };

2.3 读写锁代码如下:

  1. /**
  2. * @brief 局部区域读锁模板类
  3. * @tparam T 锁类型
  4. */
  5. template<class T>
  6. class ReadScopedLockImpl
  7. {
  8. public:
  9. ReadScopedLockImpl(T &mutex)
  10. :m_mutex(mutex)
  11. {
  12. m_mutex.rdlock();
  13. m_locked = true;
  14. }
  15. ~ReadScopedLockImpl()
  16. {
  17. unlock();
  18. }
  19. void lock()
  20. {
  21. if(!m_locked)
  22. {
  23. m_locked = true;
  24. m_mutex.rdlock();
  25. }
  26. }
  27. void unlock()
  28. {
  29. if(m_locked)
  30. {
  31. m_mutex.unlock();
  32. m_locked = false;
  33. }
  34. }
  35. private:
  36. T& m_mutex;
  37. bool m_locked;
  38. };
  39. /**
  40. * @brief 局部区域写锁模板类
  41. * @tparam T 锁类型
  42. */
  43. template<class T>
  44. class WriteScopedLockImpl
  45. {
  46. public:
  47. WriteScopedLockImpl(T &mutex)
  48. :m_mutex(mutex)
  49. {
  50. m_mutex.wrlock();
  51. m_locked = true;
  52. }
  53. ~WriteScopedLockImpl()
  54. {
  55. unlock();
  56. }
  57. void lock()
  58. {
  59. if(!m_locked)
  60. {
  61. m_locked = true;
  62. m_mutex.wrlock();
  63. }
  64. }
  65. void unlock()
  66. {
  67. if(m_locked)
  68. {
  69. m_mutex.unlock();
  70. m_locked = false;
  71. }
  72. }
  73. private:
  74. T& m_mutex;
  75. bool m_locked;
  76. };
  77. /**
  78. * @brief 读写锁类
  79. */
  80. class RWMutex: Noncopyable
  81. {
  82. public:
  83. typedef ReadScopedLockImpl<RWMutex> ReadLock;
  84. typedef WriteScopedLockImpl<RWMutex> WriteLock;
  85. /**
  86. * @brief 读写锁类构造函数 初始化读写锁
  87. */
  88. RWMutex() {pthread_rwlock_init(&m_rwlock, nullptr);}
  89. /**
  90. * @brief 读写锁类构造函数 初始化读写锁
  91. */
  92. ~RWMutex() {pthread_rwlock_destroy(&m_rwlock);}
  93. /**
  94. * @brief 读加锁
  95. */
  96. void rdlock() {pthread_rwlock_rdlock(&m_rwlock);}
  97. /**
  98. * @brief 写加锁
  99. */
  100. void wrlock() {pthread_rwlock_wrlock(&m_rwlock);}
  101. /**
  102. * @brief 读写全部解锁
  103. */
  104. void unlock() {pthread_rwlock_unlock(&m_rwlock);}
  105. private:
  106. /// 读写锁
  107. pthread_rwlock_t m_rwlock;
  108. };

2.4 自旋锁代码如下:

  1. **
  2. * @brief 自旋锁类
  3. */
  4. class SpinMutex: Noncopyable
  5. {
  6. public:
  7. typedef ScopedLockImpl<SpinMutex> Lock;
  8. /**
  9. * @brief 自旋锁类构造函数
  10. */
  11. SpinMutex() {pthread_spin_init(&m_mutex, 0);}
  12. /**
  13. * @brief 自旋锁类析构函数
  14. */
  15. ~SpinMutex() {pthread_spin_destroy(&m_mutex);}
  16. /**
  17. * @brief 加锁
  18. */
  19. void lock() {pthread_spin_lock(&m_mutex);}
  20. /**
  21. * @brief 解锁
  22. */
  23. void unlock() {pthread_spin_unlock(&m_mutex);}
  24. private:
  25. /// 自旋锁
  26. pthread_spinlock_t m_mutex;
  27. };

2.5 原子操作代码如下:

  1. **
  2. * @brief 原子锁类
  3. */
  4. class CASMutex: Noncopyable
  5. {
  6. public:
  7. typedef ScopedLockImpl<CASMutex> Lock;
  8. /**
  9. * @brief 原子锁类构造函数
  10. */
  11. CASMutex(){ m_mutex.clear();}
  12. /**
  13. * @brief 原子锁类析构函数
  14. */
  15. ~CASMutex(){}
  16. /**
  17. * @brief 加锁
  18. */
  19. void lock()
  20. {
  21. //执行本次原子操作之前 所有读原子操作必须全部完成
  22. while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
  23. }
  24. /**
  25. * @brief 解锁
  26. */
  27. void unlock()
  28. {
  29. //执行本次原子操作之前 所有写原子操作必须全部完成
  30. std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
  31. }
  32. private:
  33. /// 原子锁
  34. volatile std::atomic_flag m_mutex;
  35. };

C++知识补充复习3:信号量semaphore

概念:信号量是一种特殊的变量,对其操作访问都是源原子操作。且只允许等待wait和发送post操作

作用:通常用于保护共享资源,保证资源在任何时刻只有一个线程(进程)在访问。

线程开发 - 图6

主要使用的是POSIX 信号量,定义在头文件<semaphore.h>, 底层是一个非负整数,通过原子操作对其加减,控制线程(进程)对共享资源的访问;SYSTEM V 信号量定义在头文件<sys/sem.h>,底层是一个信号量集合,同样通过原子操作管理集合,但是操作比较复杂。

  • POSIX 信号量API:

    • int sem_init(sem_t *sem, int pshared, unsigned int value); 初始化——-构造
    • int sem_wait(sem_t *sem); 减1操作———-wait
    • int sem_post(sem_t *sem); 加1操作———-notify
    • int sem_destroy(sem_t *sem); 释放———-析构
  • SYSTEM V 信号量API: ```cpp union semun //信号量集合 系统定义 { int val; struct semid_ds buf; unsigned short arry; };

struct sembuf //信号量属性集合 系统定义好的 { unsigned short sem_num; //非负计数 short sem_op; //操作形式 加/减 short sem_flg; //模式 };

int semget(key_t key, int nsems, int semflg); //创建或引用一个信号量集合 int semctl(int semid, int semnum, int cmd, …); //对信号量集合指定操作 int semop(int semid, struct sembuf *sops, size_t nsops); //修改并自动执行信号量的操作指令 ```

  • 信号量和互斥锁之间的区别:

① 概念上,互斥锁只保证线程间对一个资源的互斥访问;信号量不仅保证互斥,还能控制线程同步
可以实现多个同类资源的多线程有序访问。
②用法上,互斥锁的加锁和解锁必须由同一个线程完成;信号量可以一个线程释放,另一个线程得到