1. 封装线程类
1.1 核心逻辑
1.2 成员变量
class Thread{......private:///线程ID 用户态的线程ID和内核线程ID不是一个概念 调试时候需要拿到内核中的IDpid_t m_id = -1;///线程号pthread_t m_thread = 0; //unsigned long int///线程执行的回调函数std::function<void()> m_cb;///线程名称std::string m_name;//信号量对象Semaphore m_sem;};
· 补充说明:开始定义线程局部变量
目的:存储管理当前线程的类对象Thread的this指针 和 当前线程的命名
(thread_local是C++11引入的类型符)
1.3 接口
1.3.1 构造函数
功能:利用pthread库开启运行线程,并且置一个信号量去等待线程完全开启后再退出构造函数
Thread::Thread(std::function<void()> cb, const std::string& name):m_cb(cb){m_name = name.size() > 0 ? name : "UNKNOW";//创建线程int ret = pthread_create(&m_thread, nullptr, &Thread::run, this);if(ret < 0){KIT_LOG_ERROR(g_logger) << "Thread:: pthread_create fail, ret= " << ret << " thread name= " << m_name;throw std::logic_error("pthread_create fail!!");}//试想 当执行完上面的API线程可能不会马上运行 手动等待一下直到线程完全开始运行m_sem.wait();}
· 补充说明:线程类对象之间应禁止拷贝赋值
原理:使用C++11特性,在成员函数尾部添加=delete表示该成员函数被禁用。发生相应可调用场合会自动屏蔽函数调用。
1.3.2 析构函数
/*** @brief 线程类析构函数*/~Thread();Thread:: ~Thread(){//析构时候不马上杀死线程 而是将其置为分离态if(m_thread)pthread_detach(m_thread);}
** 私有接口
· run()
功能:pthread库指定的线程格式,用于作为线程回调函数。其中包含线程真正需要去执行的函数体,以及对线程的一系列初始化。
注意:必须声明为static静态函数。
/*** @brief 线程回调函数(固定格式)* @return void**/static void *run(void *);void *Thread::run(void * arg){//接收传入的this指针 因为是个static方法 因此依靠传入this指针很重要Thread* thread = (Thread*)arg;//初始化线程局部变量t_thread = thread;t_thread_name = thread->m_name;//获取内核线程IDthread->m_id = GetThreadId(); //util.cpp定义的方法//给内核线程命名 但是只能接收小于等于16个字符pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());//防止m_cb中智能指针的引用不被释放 交换一次会减少引用次数std::function<void()> cb;cb.swap(thread->m_cb);//确保线程已经完全初始化好了 再进行唤醒thread->m_sem.notify();//调取执行函数cb();return 0;}
· 小技巧:在线程回调函数里面顺利使用类成员
原理:在调用pthread_create()线程API的时候,传入参数this指针,就能通过这个指针去访问到类中的成员,否则类成员静态方法中无法使用非静态成员变量。或者,另外写一个普通成员函数,在线程回调函数中去调用那个函数也能解决该问题。
**Thread::Thread()**中:

**void *Thread::run(void * arg)**线程回调函数中:

1.3.3 join()
功能:以阻塞态的形式回收处理线程资源
/*** @brief 线程回收处理函数*/void join();void Thread::join(){if(m_thread){int ret = pthread_join(m_thread, nullptr);if(ret < 0){KIT_LOG_ERROR(g_logger) << "Thread:: pthread_join fail, ret= " << ret << " thread name= " << m_name;throw std::logic_error("pthread_join fail!!");}m_thread = 0;}}
1.3.4 其他常用接口
/*** @brief 获取线程ID* @return pid_t*/pid_t getId() const {return m_id;}/*** @brief 获取线程名称* @return const std::string&*/const std::string& getName() const {return m_name;}/*** @brief 获取当前正在运行的线程指针* @return Thread**/static Thread* GetThis();Thread* Thread::GetThis(){return t_thread;}/*** @brief 获取当前运行线程的名称* @return const std::string&*/static const std::string& GetName();const std::string& Thread::GetName(){return t_thread_name;}/*** @brief 设置当前运行线程的名称* @param[in] name 线程名称*/static void SetName(const std::string& name);void Thread::SetName(const std::string& name){if(t_thread){t_thread->m_name = name;}t_thread_name = name;}
C++知识补充复习1:线程局部变量(TLS thread-local-storage)
出处:《深入理解C++11新特性解析与应用》P215
概念:有线程生命周期以及线程可见性的变量。
特点:线程创建开始时,创建变量;线程结束时自动释放。
解决的问题:全局变量/静态变量在多线程环境下的共享问题。并非所有情况下,这种类型的变量都需要进行存储空间的共享。例如POSIX标准下的错误码全局变量errno,我们希望在某个线程下错误码是全局生效的,但是又不希望这个错误码会被其他线程所影响,因此引出线程局部变量,使得每个线程在”全局”上可以定义一个变量,但是空间的存储不是和所有线程共享。
本质上:由于线程是共享同一进程下的资源,因此线程局部变量的存储空间也在所依附的进程下。其他线程是能看到这一块空间的,但是会有线程局部特性的函数限制其他线程对该空间的访问。
C++知识补充复习2:区域锁(Scoped Locking)
概念:区域锁(Scoped Locking)并非一种锁的种类,而是使用锁的一种模式。这种概念是C++中RAII(Resource Acquisition Is Initialization)模式具体体现之一,”资源需要即初始化”。基本思想:C++中的资源应该交由对象管理,资源在对象的构造函数中完成初始化,在对象的析构函数中被释放。
拿现在锁的这个例子,一个管理锁的对象在构造函数中完成对锁的初始化,在析构函数中完成对锁的资源释放。
2. 封装互斥锁、信号量
来自mutex.h、mutex.cpp文件
思想:运用C++中RAII的模式思想,完成由对象管理锁、信号量资源。
2.1 信号量代码如下:
/*** @brief 信号量类*/class Semaphore : Noncopyable{public:/*** @brief 信号量类构造函数* @param[in] count 计数基准值, 一般以0位基准*/Semaphore(uint32_t count = 0);/*** @brief 信号量类析构函数*/~Semaphore();//数-1void wait();//数+1void notify();private://禁止拷贝赋值Semaphore(const Semaphore &) = delete;Semaphore(const Semaphore &&) = delete;Semaphore& operator=(const Semaphore &) = delete;private:/// 信号量结构体sem_t m_semaphore;};Semaphore::Semaphore(uint32_t count){//初始化信号量if(sem_init(&m_semaphore, 0, count) < 0){KIT_LOG_ERROR(g_logger) << "Semaphore::Semaphore sem_init fail";throw std::logic_error("sem_init fail!!");}}Semaphore::~Semaphore(){//销毁信号量sem_destroy(&m_semaphore);}//数-1void Semaphore::wait(){while(1){//递减1 函数成功时返回0; 出错返回-1if(!sem_wait(&m_semaphore))break;//如果中断就继续if(errno == EINTR)continue;KIT_LOG_ERROR(g_logger) << "Semaphore::wait sem_wait error";throw std::logic_error("sem_wait error!!");}}//数+1void Semaphore::notify(){//递增1 函数成功时返回0; 出错返回-1if(sem_post(&m_semaphore) < 0){KIT_LOG_ERROR(g_logger) << "Semaphore::notify sem_post error";throw std::logic_error("sem_post error!!");}}
2.2 互斥锁代码如下:
/*** @brief 局部区域互斥锁模板类* @tparam T*/template<class T>class ScopedLockImpl{public:/*** @brief 构造时加锁* @param[in] mutex 互斥锁变量*/ScopedLockImpl(T &mutex):m_mutex(mutex){m_mutex.lock();m_locked = true;}/*** @brief 析构时解锁*/~ScopedLockImpl(){unlock();}void lock(){if(!m_locked){m_locked = true;m_mutex.lock();}}void unlock(){if(m_locked){m_mutex.unlock();m_locked = false;}}private:/// 锁资源T& m_mutex;/// 上锁状态bool m_locked;};/*** @brief 互斥锁类*/class Mutex : Noncopyable{public:typedef ScopedLockImpl<Mutex> Lock;/*** @brief 互斥锁类构造函数*/Mutex() {pthread_mutex_init(&m_mutex, nullptr);}/*** @brief 互斥锁类析构函数*/~Mutex() {pthread_mutex_destroy(&m_mutex);}/*** @brief 加锁*/void lock() {pthread_mutex_lock(&m_mutex);}/*** @brief 解锁*/void unlock() {pthread_mutex_unlock(&m_mutex);}private:/// 互斥锁pthread_mutex_t m_mutex;};
2.3 读写锁代码如下:
/*** @brief 局部区域读锁模板类* @tparam T 锁类型*/template<class T>class ReadScopedLockImpl{public:ReadScopedLockImpl(T &mutex):m_mutex(mutex){m_mutex.rdlock();m_locked = true;}~ReadScopedLockImpl(){unlock();}void lock(){if(!m_locked){m_locked = true;m_mutex.rdlock();}}void unlock(){if(m_locked){m_mutex.unlock();m_locked = false;}}private:T& m_mutex;bool m_locked;};/*** @brief 局部区域写锁模板类* @tparam T 锁类型*/template<class T>class WriteScopedLockImpl{public:WriteScopedLockImpl(T &mutex):m_mutex(mutex){m_mutex.wrlock();m_locked = true;}~WriteScopedLockImpl(){unlock();}void lock(){if(!m_locked){m_locked = true;m_mutex.wrlock();}}void unlock(){if(m_locked){m_mutex.unlock();m_locked = false;}}private:T& m_mutex;bool m_locked;};/*** @brief 读写锁类*/class RWMutex: Noncopyable{public:typedef ReadScopedLockImpl<RWMutex> ReadLock;typedef WriteScopedLockImpl<RWMutex> WriteLock;/*** @brief 读写锁类构造函数 初始化读写锁*/RWMutex() {pthread_rwlock_init(&m_rwlock, nullptr);}/*** @brief 读写锁类构造函数 初始化读写锁*/~RWMutex() {pthread_rwlock_destroy(&m_rwlock);}/*** @brief 读加锁*/void rdlock() {pthread_rwlock_rdlock(&m_rwlock);}/*** @brief 写加锁*/void wrlock() {pthread_rwlock_wrlock(&m_rwlock);}/*** @brief 读写全部解锁*/void unlock() {pthread_rwlock_unlock(&m_rwlock);}private:/// 读写锁pthread_rwlock_t m_rwlock;};
2.4 自旋锁代码如下:
*** @brief 自旋锁类*/class SpinMutex: Noncopyable{public:typedef ScopedLockImpl<SpinMutex> Lock;/*** @brief 自旋锁类构造函数*/SpinMutex() {pthread_spin_init(&m_mutex, 0);}/*** @brief 自旋锁类析构函数*/~SpinMutex() {pthread_spin_destroy(&m_mutex);}/*** @brief 加锁*/void lock() {pthread_spin_lock(&m_mutex);}/*** @brief 解锁*/void unlock() {pthread_spin_unlock(&m_mutex);}private:/// 自旋锁pthread_spinlock_t m_mutex;};
2.5 原子操作代码如下:
*** @brief 原子锁类*/class CASMutex: Noncopyable{public:typedef ScopedLockImpl<CASMutex> Lock;/*** @brief 原子锁类构造函数*/CASMutex(){ m_mutex.clear();}/*** @brief 原子锁类析构函数*/~CASMutex(){}/*** @brief 加锁*/void lock(){//执行本次原子操作之前 所有读原子操作必须全部完成while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));}/*** @brief 解锁*/void unlock(){//执行本次原子操作之前 所有写原子操作必须全部完成std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);}private:/// 原子锁volatile std::atomic_flag m_mutex;};
C++知识补充复习3:信号量semaphore
概念:信号量是一种特殊的变量,对其操作访问都是源原子操作。且只允许等待wait和发送post操作
作用:通常用于保护共享资源,保证资源在任何时刻只有一个线程(进程)在访问。

主要使用的是POSIX 信号量,定义在头文件<semaphore.h>, 底层是一个非负整数,通过原子操作对其加减,控制线程(进程)对共享资源的访问;SYSTEM V 信号量定义在头文件<sys/sem.h>,底层是一个信号量集合,同样通过原子操作管理集合,但是操作比较复杂。
POSIX 信号量API:
int sem_init(sem_t *sem, int pshared, unsigned int value);初始化——-构造int sem_wait(sem_t *sem);减1操作———-waitint sem_post(sem_t *sem);加1操作———-notifyint sem_destroy(sem_t *sem);释放———-析构
SYSTEM V 信号量API: ```cpp union semun //信号量集合 系统定义 { int val; struct semid_ds buf; unsigned short arry; };
struct sembuf //信号量属性集合 系统定义好的 { unsigned short sem_num; //非负计数 short sem_op; //操作形式 加/减 short sem_flg; //模式 };
int semget(key_t key, int nsems, int semflg); //创建或引用一个信号量集合 int semctl(int semid, int semnum, int cmd, …); //对信号量集合指定操作 int semop(int semid, struct sembuf *sops, size_t nsops); //修改并自动执行信号量的操作指令 ```
- 信号量和互斥锁之间的区别:
① 概念上,互斥锁只保证线程间对一个资源的互斥访问;信号量不仅保证互斥,还能控制线程同步
可以实现多个同类资源的多线程有序访问。
②用法上,互斥锁的加锁和解锁必须由同一个线程完成;信号量可以一个线程释放,另一个线程得到
