3.1 什么是RAII,手动代码实现
RAII(Resource Acquisitong Is Initialization) C++之父Bjarne Stroustrup提出; 使用局部对象来管理资源的技术称为资源获取即初始化;它的生命周期是由操作系统来管理的,无需人工介入;资源的销毁容易忘记,造成死锁或内存泄漏。
手动实现RAII管理mutex资源-锁自动释放
#include <iostream>#include <mutex>#include <shared_mutex>#include <thread>using namespace std;//RAIIclass XMutex {public:XMutex(mutex& mux): mux_(mux){cout << "Lock" << endl;mux_.lock();}~XMutex(){cout << "Unlock" << endl;mux_.unlock();}private:mutex& mux_;};static mutex mux;void TestMutex(int status){XMutex lock(mux);if (status == 1) {cout << "=1" << endl;} else {cout << "!=1" << endl;}}int main(){TestMutex(1);TestMutex(2);}
3.2 C++支持的RAII管理互斥资源 lock_guard
- C++11实现严格基于作用域的互斥体所有权包装器
- adopt_lock C++ 类型为adopt_lock_t,假设调用方已拥有互斥的所有权
通过
{}控制锁的临界区 ```cpp // CLASS TEMPLATE lock_guard templateclass lock_guard { // class with destructor that unlocks a mutex public: using mutex_type = _Mutex; explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
_MyMutex.lock();
}
lock_guard(_Mutex& _Mtx, adopt_lock_t) : _MyMutex(_Mtx) {} // construct but don’t lock
~lock_guard() noexcept {
_MyMutex.unlock();}
lock_guard(const lock_guard&) = delete; lock_guard& operator=(const lock_guard&) = delete;
private: _Mutex& _MyMutex; };
```cpp
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;
static mutex gmutex;
void TestLockGuard(int i)
{
gmutex.lock();
{
//已经拥有锁,不lock
lock_guard<mutex> lock(gmutex, adopt_lock);
//结束释放锁
}
{
lock_guard<mutex> lock(gmutex);
cout << "begin thread " << i << endl;
}
for (;;) {
{
lock_guard<mutex> lock(gmutex);
cout << i << "[in]" << endl;
}
this_thread::sleep_for(500ms);
}
}
int main()
{
for (int i = 0; i < 3; i++) {
thread th(TestLockGuard, i + 1);
th.detach();
}
getchar();
}
3.3 unique_lock C++11
unique_lockC++11实现可移动的互斥体所有权包装器- 支持临时释放锁
unlock - 支持
adopt_lock(已经拥有锁,不加锁,出栈区会释放) - 支持
defer_lock(延后拥有,不加锁,出栈区不释放) - 支持
try_to_lock尝试获得互斥的所有权而不阻塞,获取失败退出栈区不会释放,通过owns_lock()函数判断 ```cppinclude
include
include
include
include
using namespace std;
int main() { { static mutex mux;
{
unique_lock<mutex> lock(mux);
lock.unlock(); // 可以提前释放锁,然后再加锁
// 业务代码
lock.lock();
}
{
mux.lock();
// 已经拥有锁,不锁定,退出解锁
unique_lock<mutex> lock(mux, adopt_lock);
}
{
// 延后枷锁,不拥有,退出不解锁
unique_lock<mutex> lock(mux, defer_lock);
// 加锁,退出栈区解锁
lock.lock();
}
{
// 尝试加锁,不阻塞,失败不拥有锁
unique_lock<mutex> lock(mux, try_to_lock);
if (lock.owns_lock()) {
cout << "own_lock" << endl;
} else {
cout << "not own_lock" << endl;
}
}
}
}
<a name="nOvwI"></a>
# 3.4 shared_lock C++14
shared_lock C++14 实现可移动的共享互斥体所有权封装器
```cpp
explicit shared_lock(mutex_type& _Mtx)
: _Pmtx(_STD addressof(_Mtx)), _Owns(true) { // construct with mutex and lock shared
_Mtx.lock_shared();
}
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>
using namespace std;
int main()
{
{
//共享锁
static shared_timed_mutex tmux;
//读取锁 共享锁
{
//调用共享锁
shared_lock<shared_timed_mutex> lock(tmux);
cout << "read data" << endl;
//退出栈区 释放共享锁
}
//写入锁 互斥锁
{
unique_lock<shared_timed_mutex> lock(tmux);
cout << "write data" << endl;
}
}
}
3.5 scoped_lock C++17
scoped_lock C++17 用于多个互斥体的免死锁RAII封装器,类似lock
演示死锁
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>
using namespace std;
static mutex mux1;
static mutex mux2;
void TestScope1()
{
//模拟死锁 停100ms等另一个线程锁mux2
this_thread::sleep_for(100ms);
cout << this_thread::get_id() << " begin mux1 lock" << endl;
mux1.lock();
cout << this_thread::get_id() << " begin mux2 lock" << endl;
mux2.lock();
cout << "TestScope1" << endl;
this_thread::sleep_for(1000ms);
mux1.unlock();
mux2.unlock();
}
void TestScope2()
{
cout << this_thread::get_id() << " begin mux2 lock" << endl;
mux2.lock();
this_thread::sleep_for(500ms);
cout << this_thread::get_id() << " begin mux1 lock" << endl;
mux1.lock(); //死锁
cout << "TestScope2" << endl;
this_thread::sleep_for(1500ms);
mux1.unlock();
mux2.unlock();
}
int main()
{
{
//演示死锁情况
{
thread th(TestScope1);
th.detach();
}
{
thread th(TestScope2);
th.detach();
}
}
getchar();
}

解决:
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
// #include <scoped_lock>
using namespace std;
static mutex mux1;
static mutex mux2;
void TestScope1()
{
//模拟死锁 停100ms等另一个线程锁mux2
this_thread::sleep_for(100ms);
cout << this_thread::get_id() << " begin lock" << endl;
// c++11
lock(mux1, mux2); // lock multiple locks, without deadlock
// c++17
// scoped_lock lock(mux1, mux2); // 解决死锁 class with destructor that unlocks mutexes
cout << "TestScope1" << endl;
this_thread::sleep_for(1000ms);
}
void TestScope2()
{
cout << this_thread::get_id() << " begin lock" << endl;
cout << this_thread::get_id() << " begin mux2 lock" << endl;
mux2.lock();
this_thread::sleep_for(500ms);
cout << this_thread::get_id() << " begin mux1 lock" << endl;
mux1.lock(); //死锁
cout << "TestScope2" << endl;
this_thread::sleep_for(1500ms);
mux1.unlock();
mux2.unlock();
}
int main()
{
{
//演示死锁情况
{
thread th(TestScope1);
th.detach();
}
{
thread th(TestScope2);
th.detach();
}
}
getchar();
}

3.6 使用互斥锁+list模拟线程通信
- 封装线程基类XThread控制线程启动和停止
- 模拟消息服务器线程 接收字符串消息,并模拟处理
- 通过unique_lock和mutex互斥访问list
消息队列 - 主线程定时发送消息给子线程
main函数
#include <iostream>
#include "XMsgServer.h"
#include <sstream>
using namespace std;
int main()
{
XMsgServer server;
server.Start();
for (int i = 0; i < 10; i++) {
stringstream ss;
ss << "msg: " << i + 1;
server.SendMsg(ss.str());
this_thread::sleep_for(500ms);
}
server.Stop();
getchar();
return 0;
}
XThread
// XThread.h
#pragma once
#include <thread>
class XThread
{
public:
//启动线程
virtual void Start();
//设置线程退出标志 并等待
virtual void Stop();
//等待线程退出(阻塞)
virtual void Wait();
//线程是否退出
bool is_exit();
private:
//线程入口
virtual void Main() = 0;
bool is_exit_ = false;
std::thread th_;
};
// XThread.cpp
#include "XThread.h"
using namespace std;
void XThread::Start()
{
is_exit_ = false;
th_ = thread(&XThread::Main, this);
}
void XThread::Stop()
{
is_exit_ = true;
Wait();
}
void XThread::Wait()
{
if (th_.joinable()) {
th_.join();
}
}
bool XThread::is_exit()
{
return is_exit_;
}
XMsgServer
// XMsgServer.h
#pragma once
#include "XThread.h"
#include <string >
#include <list>
#include <mutex>
class XMsgServer: public XThread
{
public:
// 给当前线程发消息
void SendMsg(std::string msg);
private:
// 处理消息的线程入口函数
void Main() override;
// 消息队列缓冲
std::list<std::string> msgs_;
// 互斥访问消息队列
std::mutex mutex_;
};
// XMsgServer.cpp
#include "XMsgServer.h"
#include <iostream>
using namespace std;
void XMsgServer::SendMsg(std::string msg)
{
unique_lock<mutex> lock(mutex_);
msgs_.push_back(msg);
}
//处理消息的线程入口函数
void XMsgServer::Main()
{
while (!is_exit()) {
this_thread::sleep_for(10ms);
unique_lock<mutex> lock(mutex_);
if (msgs_.empty()) {
continue;
}
while (!msgs_.empty()) {
//消息处理业务逻辑
cout << "recv: " << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
3.7 条件变量
3.7.1 生产者-消费者模型
- 生产者和消费者共享资源变量 list队列
- 生产者生产一个产品,通知消费者消费
- 消费者阻塞等待信息—-获取信号后消费产品(取出list队列中数据)
一、改变共享变量的线程步骤
准备好信号量
std::condition_variable cv;- 获得
std::mutex常通过std::unique_lockunique_lock lock(mux);
- 获得
- 在获取锁时进行修改
msgs_.push_back(data);
- 在获取锁时进行修改
3.释放锁并通知读取线程
lock.unlock(); cv.notify_one(); // 通知一个等待信号线程 cv.notify_all(); // 通知所有等待信号线程二、等待信号读取共享变量的线程步骤
1.获得与改变变量线程共用的mutex
unique_lock lock(mux);- wait()等待信号通知
- 2.1 无lambda表达式 ```cpp // 解锁lock,并阻塞等待 notify_one notify_all通知 cv.wait(lock);
// 接收到通知会再次获取锁标注,也就是说如果此时mux资源被占用,wait函数会阻塞 msgs.front(); // 处理数据 msgs.pop_front();
- 2.2 lambda表达式 cv.wait(lock, []{ return !msgs.empty(); });<br />只在std::unique_lock<std::mutex>上工作的 std::condition_varible
```cpp
#include <iostream>
#include <list>
#include <mutex>
#include <sstream>
#include <thread>
using namespace std;
list<string> msgs_;
condition_variable cv;
mutex mux;
void ThreadWrite()
{
for (int i = 0;; i++) {
stringstream ss;
ss << "Write msg " << i;
unique_lock<mutex> lock(mux);
msgs_.push_back(ss.str());
lock.unlock();
cv.notify_one(); // 发送信号
this_thread::sleep_for(1s);
}
}
void ThreadRead(int i)
{
for (;;) {
cout << "read msg" << endl;
unique_lock<mutex> lock(mux);
// cv.wait(lock); //解锁、阻塞等待信号
cv.wait(lock, [i]() {
cout << i << " wait" << endl;
return !msgs_.empty();
});
//获取信号后锁定
while (!msgs_.empty()) {
cout << i << " read " << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
int main(int argc, char* argv[])
{
thread th(ThreadWrite);
th.detach();
for (int i = 0; i < 3; i++) {
thread th(ThreadRead, i + 1);
th.detach();
}
getchar();
return 0;
}

main.cpp
#include "xmsgserver.h"
#include <iostream>
#include <sstream>
using namespace std;
int main(int argc, char* argv[])
{
XMsgServer server;
server.Start();
for (int i = 0; i < 10; i++) {
stringstream ss;
ss << "msg: " << i;
server.SendMsg(ss.str());
this_thread::sleep_for(500ms);
}
server.Stop();
cout << "Server stoped" << endl;
getchar();
}
// xthread.h
#pragma once
#include <thread>
class XThread {
public:
// 启动线程
virtual void Start();
virtual void Stop();
virtual void Wait();
bool is_exit();
protected:
bool is_exit_;
private:
virtual void Main() = 0;
std::thread th_;
};
// xthread.cpp
#include "xthread.h"
using namespace std;
void XThread::Start()
{
is_exit_ = false;
th_ = thread(&XThread::Main, this);
}
void XThread::Stop()
{
is_exit_ = true;
Wait();
}
void XThread::Wait()
{
if (th_.joinable()) {
th_.join();
}
}
bool XThread::is_exit()
{
return is_exit_;
}
// xmsgserver.h
#pragma once
#include "xthread.h"
#include <list>
#include <mutex>
#include <string>
class XMsgServer : public XThread {
public:
// 给当前线程发消息
void SendMsg(std::string msg);
void Stop() override;
private:
// 处理消息的线程入口函数
void Main() override;
// 消息队列缓冲
std::list<std::string> msgs_;
// 互斥访问消息队列
std::mutex mutex_;
std::condition_variable cv_;
};
// xmsgserver.cpp
#include "xmsgserver.h"
#include <iostream>
using namespace std;
void XMsgServer::SendMsg(std::string msg)
{
unique_lock<mutex> lock(mutex_);
msgs_.push_back(msg);
lock.unlock();
cv_.notify_one();
}
void XMsgServer::Stop()
{
is_exit_ = true;
cv_.notify_all();
Wait();
}
void XMsgServer::Main()
{
while (!is_exit()) {
unique_lock<mutex> lock(mutex_);
cv_.wait(lock, [this]() {
cout << "is_exit: " << is_exit() << endl;
if (is_exit()) {
return true;
}
return !msgs_.empty();
});
while (!msgs_.empty()) {
//消息处理业务逻辑
cout << "recv: " << msgs_.front() << endl;
msgs_.pop_front();
}
}
}

