1. 封装线程类
1.1 核心逻辑
1.2 成员变量
class Thread
{
...
...
private:
///线程ID 用户态的线程ID和内核线程ID不是一个概念 调试时候需要拿到内核中的ID
pid_t m_id = -1;
///线程号
pthread_t m_thread = 0; //unsigned long int
///线程执行的回调函数
std::function<void()> m_cb;
///线程名称
std::string m_name;
//信号量对象
Semaphore m_sem;
};
· 补充说明:开始定义线程局部变量
目的:存储管理当前线程的类对象Thread
的this
指针 和 当前线程的命名
(thread_local
是C++11引入的类型符)
1.3 接口
1.3.1 构造函数
功能:利用pthread库开启运行线程,并且置一个信号量去等待线程完全开启后再退出构造函数
Thread::Thread(std::function<void()> cb, const std::string& name)
:m_cb(cb)
{
m_name = name.size() > 0 ? name : "UNKNOW";
//创建线程
int ret = pthread_create(&m_thread, nullptr, &Thread::run, this);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "Thread:: pthread_create fail, ret= " << ret << " thread name= " << m_name;
throw std::logic_error("pthread_create fail!!");
}
//试想 当执行完上面的API线程可能不会马上运行 手动等待一下直到线程完全开始运行
m_sem.wait();
}
· 补充说明:线程类对象之间应禁止拷贝赋值
原理:使用C++11特性,在成员函数尾部添加=delete
表示该成员函数被禁用。发生相应可调用场合会自动屏蔽函数调用。
1.3.2 析构函数
/**
* @brief 线程类析构函数
*/
~Thread();
Thread:: ~Thread()
{
//析构时候不马上杀死线程 而是将其置为分离态
if(m_thread)
pthread_detach(m_thread);
}
** 私有接口
· run()
功能:pthread库指定的线程格式,用于作为线程回调函数。其中包含线程真正需要去执行的函数体,以及对线程的一系列初始化。
注意:必须声明为static
静态函数。
/**
* @brief 线程回调函数(固定格式)
* @return void*
*/
static void *run(void *);
void *Thread::run(void * arg)
{
//接收传入的this指针 因为是个static方法 因此依靠传入this指针很重要
Thread* thread = (Thread*)arg;
//初始化线程局部变量
t_thread = thread;
t_thread_name = thread->m_name;
//获取内核线程ID
thread->m_id = GetThreadId(); //util.cpp定义的方法
//给内核线程命名 但是只能接收小于等于16个字符
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
//防止m_cb中智能指针的引用不被释放 交换一次会减少引用次数
std::function<void()> cb;
cb.swap(thread->m_cb);
//确保线程已经完全初始化好了 再进行唤醒
thread->m_sem.notify();
//调取执行函数
cb();
return 0;
}
· 小技巧:在线程回调函数里面顺利使用类成员
原理:在调用pthread_create()
线程API的时候,传入参数this
指针,就能通过这个指针去访问到类中的成员,否则类成员静态方法中无法使用非静态成员变量。或者,另外写一个普通成员函数,在线程回调函数中去调用那个函数也能解决该问题。
**Thread::Thread()**
中:
**void *Thread::run(void * arg)**
线程回调函数中:
1.3.3 join()
功能:以阻塞态的形式回收处理线程资源
/**
* @brief 线程回收处理函数
*/
void join();
void Thread::join()
{
if(m_thread)
{
int ret = pthread_join(m_thread, nullptr);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "Thread:: pthread_join fail, ret= " << ret << " thread name= " << m_name;
throw std::logic_error("pthread_join fail!!");
}
m_thread = 0;
}
}
1.3.4 其他常用接口
/**
* @brief 获取线程ID
* @return pid_t
*/
pid_t getId() const {return m_id;}
/**
* @brief 获取线程名称
* @return const std::string&
*/
const std::string& getName() const {return m_name;}
/**
* @brief 获取当前正在运行的线程指针
* @return Thread*
*/
static Thread* GetThis();
Thread* Thread::GetThis()
{
return t_thread;
}
/**
* @brief 获取当前运行线程的名称
* @return const std::string&
*/
static const std::string& GetName();
const std::string& Thread::GetName()
{
return t_thread_name;
}
/**
* @brief 设置当前运行线程的名称
* @param[in] name 线程名称
*/
static void SetName(const std::string& name);
void Thread::SetName(const std::string& name)
{
if(t_thread)
{
t_thread->m_name = name;
}
t_thread_name = name;
}
C++知识补充复习1:线程局部变量(TLS thread-local-storage)
出处:《深入理解C++11新特性解析与应用》P215
概念:有线程生命周期以及线程可见性的变量。
特点:线程创建开始时,创建变量;线程结束时自动释放。
解决的问题:全局变量/静态变量在多线程环境下的共享问题。并非所有情况下,这种类型的变量都需要进行存储空间的共享。例如POSIX标准下的错误码全局变量errno,我们希望在某个线程下错误码是全局生效的,但是又不希望这个错误码会被其他线程所影响,因此引出线程局部变量,使得每个线程在”全局”上可以定义一个变量,但是空间的存储不是和所有线程共享。
本质上:由于线程是共享同一进程下的资源,因此线程局部变量的存储空间也在所依附的进程下。其他线程是能看到这一块空间的,但是会有线程局部特性的函数限制其他线程对该空间的访问。
C++知识补充复习2:区域锁(Scoped Locking)
概念:区域锁(Scoped Locking)并非一种锁的种类,而是使用锁的一种模式。这种概念是C++中RAII(Resource Acquisition Is Initialization)模式具体体现之一,”资源需要即初始化”。基本思想:C++中的资源应该交由对象管理,资源在对象的构造函数中完成初始化,在对象的析构函数中被释放。
拿现在锁的这个例子,一个管理锁的对象在构造函数中完成对锁的初始化,在析构函数中完成对锁的资源释放。
2. 封装互斥锁、信号量
来自mutex.h
、mutex.cpp
文件
思想:运用C++中RAII的模式思想,完成由对象管理锁、信号量资源。
2.1 信号量代码如下:
/**
* @brief 信号量类
*/
class Semaphore : Noncopyable
{
public:
/**
* @brief 信号量类构造函数
* @param[in] count 计数基准值, 一般以0位基准
*/
Semaphore(uint32_t count = 0);
/**
* @brief 信号量类析构函数
*/
~Semaphore();
//数-1
void wait();
//数+1
void notify();
private:
//禁止拷贝赋值
Semaphore(const Semaphore &) = delete;
Semaphore(const Semaphore &&) = delete;
Semaphore& operator=(const Semaphore &) = delete;
private:
/// 信号量结构体
sem_t m_semaphore;
};
Semaphore::Semaphore(uint32_t count)
{
//初始化信号量
if(sem_init(&m_semaphore, 0, count) < 0)
{
KIT_LOG_ERROR(g_logger) << "Semaphore::Semaphore sem_init fail";
throw std::logic_error("sem_init fail!!");
}
}
Semaphore::~Semaphore()
{
//销毁信号量
sem_destroy(&m_semaphore);
}
//数-1
void Semaphore::wait()
{
while(1)
{
//递减1 函数成功时返回0; 出错返回-1
if(!sem_wait(&m_semaphore))
break;
//如果中断就继续
if(errno == EINTR)
continue;
KIT_LOG_ERROR(g_logger) << "Semaphore::wait sem_wait error";
throw std::logic_error("sem_wait error!!");
}
}
//数+1
void Semaphore::notify()
{
//递增1 函数成功时返回0; 出错返回-1
if(sem_post(&m_semaphore) < 0)
{
KIT_LOG_ERROR(g_logger) << "Semaphore::notify sem_post error";
throw std::logic_error("sem_post error!!");
}
}
2.2 互斥锁代码如下:
/**
* @brief 局部区域互斥锁模板类
* @tparam T
*/
template<class T>
class ScopedLockImpl
{
public:
/**
* @brief 构造时加锁
* @param[in] mutex 互斥锁变量
*/
ScopedLockImpl(T &mutex)
:m_mutex(mutex)
{
m_mutex.lock();
m_locked = true;
}
/**
* @brief 析构时解锁
*/
~ScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
m_locked = true;
m_mutex.lock();
}
}
void unlock()
{
if(m_locked)
{
m_mutex.unlock();
m_locked = false;
}
}
private:
/// 锁资源
T& m_mutex;
/// 上锁状态
bool m_locked;
};
/**
* @brief 互斥锁类
*/
class Mutex : Noncopyable
{
public:
typedef ScopedLockImpl<Mutex> Lock;
/**
* @brief 互斥锁类构造函数
*/
Mutex() {pthread_mutex_init(&m_mutex, nullptr);}
/**
* @brief 互斥锁类析构函数
*/
~Mutex() {pthread_mutex_destroy(&m_mutex);}
/**
* @brief 加锁
*/
void lock() {pthread_mutex_lock(&m_mutex);}
/**
* @brief 解锁
*/
void unlock() {pthread_mutex_unlock(&m_mutex);}
private:
/// 互斥锁
pthread_mutex_t m_mutex;
};
2.3 读写锁代码如下:
/**
* @brief 局部区域读锁模板类
* @tparam T 锁类型
*/
template<class T>
class ReadScopedLockImpl
{
public:
ReadScopedLockImpl(T &mutex)
:m_mutex(mutex)
{
m_mutex.rdlock();
m_locked = true;
}
~ReadScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
m_locked = true;
m_mutex.rdlock();
}
}
void unlock()
{
if(m_locked)
{
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex;
bool m_locked;
};
/**
* @brief 局部区域写锁模板类
* @tparam T 锁类型
*/
template<class T>
class WriteScopedLockImpl
{
public:
WriteScopedLockImpl(T &mutex)
:m_mutex(mutex)
{
m_mutex.wrlock();
m_locked = true;
}
~WriteScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
m_locked = true;
m_mutex.wrlock();
}
}
void unlock()
{
if(m_locked)
{
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex;
bool m_locked;
};
/**
* @brief 读写锁类
*/
class RWMutex: Noncopyable
{
public:
typedef ReadScopedLockImpl<RWMutex> ReadLock;
typedef WriteScopedLockImpl<RWMutex> WriteLock;
/**
* @brief 读写锁类构造函数 初始化读写锁
*/
RWMutex() {pthread_rwlock_init(&m_rwlock, nullptr);}
/**
* @brief 读写锁类构造函数 初始化读写锁
*/
~RWMutex() {pthread_rwlock_destroy(&m_rwlock);}
/**
* @brief 读加锁
*/
void rdlock() {pthread_rwlock_rdlock(&m_rwlock);}
/**
* @brief 写加锁
*/
void wrlock() {pthread_rwlock_wrlock(&m_rwlock);}
/**
* @brief 读写全部解锁
*/
void unlock() {pthread_rwlock_unlock(&m_rwlock);}
private:
/// 读写锁
pthread_rwlock_t m_rwlock;
};
2.4 自旋锁代码如下:
**
* @brief 自旋锁类
*/
class SpinMutex: Noncopyable
{
public:
typedef ScopedLockImpl<SpinMutex> Lock;
/**
* @brief 自旋锁类构造函数
*/
SpinMutex() {pthread_spin_init(&m_mutex, 0);}
/**
* @brief 自旋锁类析构函数
*/
~SpinMutex() {pthread_spin_destroy(&m_mutex);}
/**
* @brief 加锁
*/
void lock() {pthread_spin_lock(&m_mutex);}
/**
* @brief 解锁
*/
void unlock() {pthread_spin_unlock(&m_mutex);}
private:
/// 自旋锁
pthread_spinlock_t m_mutex;
};
2.5 原子操作代码如下:
**
* @brief 原子锁类
*/
class CASMutex: Noncopyable
{
public:
typedef ScopedLockImpl<CASMutex> Lock;
/**
* @brief 原子锁类构造函数
*/
CASMutex(){ m_mutex.clear();}
/**
* @brief 原子锁类析构函数
*/
~CASMutex(){}
/**
* @brief 加锁
*/
void lock()
{
//执行本次原子操作之前 所有读原子操作必须全部完成
while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
}
/**
* @brief 解锁
*/
void unlock()
{
//执行本次原子操作之前 所有写原子操作必须全部完成
std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
}
private:
/// 原子锁
volatile std::atomic_flag m_mutex;
};
C++知识补充复习3:信号量semaphore
概念:信号量是一种特殊的变量,对其操作访问都是源原子操作。且只允许等待wait
和发送post
操作
作用:通常用于保护共享资源,保证资源在任何时刻只有一个线程(进程)在访问。
主要使用的是POSIX 信号量,定义在头文件<semaphore.h>
, 底层是一个非负整数,通过原子操作对其加减,控制线程(进程)对共享资源的访问;SYSTEM V 信号量定义在头文件<sys/sem.h>
,底层是一个信号量集合,同样通过原子操作管理集合,但是操作比较复杂。
POSIX 信号量API:
int sem_init(sem_t *sem, int pshared, unsigned int value);
初始化——-构造int sem_wait(sem_t *sem);
减1操作———-waitint sem_post(sem_t *sem);
加1操作———-notifyint sem_destroy(sem_t *sem);
释放———-析构
SYSTEM V 信号量API: ```cpp union semun //信号量集合 系统定义 { int val; struct semid_ds buf; unsigned short arry; };
struct sembuf //信号量属性集合 系统定义好的 { unsigned short sem_num; //非负计数 short sem_op; //操作形式 加/减 short sem_flg; //模式 };
int semget(key_t key, int nsems, int semflg); //创建或引用一个信号量集合 int semctl(int semid, int semnum, int cmd, …); //对信号量集合指定操作 int semop(int semid, struct sembuf *sops, size_t nsops); //修改并自动执行信号量的操作指令 ```
- 信号量和互斥锁之间的区别:
① 概念上,互斥锁只保证线程间对一个资源的互斥访问;信号量不仅保证互斥,还能控制线程同步
可以实现多个同类资源的多线程有序访问。
②用法上,互斥锁的加锁和解锁必须由同一个线程完成;信号量可以一个线程释放,另一个线程得到