3.1 什么是RAII,手动代码实现

RAII(Resource Acquisitong Is Initialization) C++之父Bjarne Stroustrup提出; 使用局部对象来管理资源的技术称为资源获取即初始化;它的生命周期是由操作系统来管理的,无需人工介入;资源的销毁容易忘记,造成死锁或内存泄漏。

手动实现RAII管理mutex资源-锁自动释放

  1. #include <iostream>
  2. #include <mutex>
  3. #include <shared_mutex>
  4. #include <thread>
  5. using namespace std;
  6. //RAII
  7. class XMutex {
  8. public:
  9. XMutex(mutex& mux)
  10. : mux_(mux)
  11. {
  12. cout << "Lock" << endl;
  13. mux_.lock();
  14. }
  15. ~XMutex()
  16. {
  17. cout << "Unlock" << endl;
  18. mux_.unlock();
  19. }
  20. private:
  21. mutex& mux_;
  22. };
  23. static mutex mux;
  24. void TestMutex(int status)
  25. {
  26. XMutex lock(mux);
  27. if (status == 1) {
  28. cout << "=1" << endl;
  29. } else {
  30. cout << "!=1" << endl;
  31. }
  32. }
  33. int main()
  34. {
  35. TestMutex(1);
  36. TestMutex(2);
  37. }

3.2 C++支持的RAII管理互斥资源 lock_guard

  • C++11实现严格基于作用域的互斥体所有权包装器
  • adopt_lock C++ 类型为adopt_lock_t,假设调用方已拥有互斥的所有权
  • 通过{}控制锁的临界区 ```cpp // CLASS TEMPLATE lock_guard template class lock_guard { // class with destructor that unlocks a mutex public: using mutex_type = _Mutex;

    explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock

    1. _MyMutex.lock();

    }

    lock_guard(_Mutex& _Mtx, adopt_lock_t) : _MyMutex(_Mtx) {} // construct but don’t lock

    ~lock_guard() noexcept {

      _MyMutex.unlock();
    

    }

    lock_guard(const lock_guard&) = delete; lock_guard& operator=(const lock_guard&) = delete;

private: _Mutex& _MyMutex; };

```cpp
#include <iostream>
#include <mutex>
#include <thread>

using namespace std;

static mutex gmutex;
void TestLockGuard(int i)
{
    gmutex.lock();
    {
        //已经拥有锁,不lock
        lock_guard<mutex> lock(gmutex, adopt_lock);
        //结束释放锁
    }

    {
        lock_guard<mutex> lock(gmutex);
        cout << "begin thread " << i << endl;
    }

    for (;;) {
        {
            lock_guard<mutex> lock(gmutex);
            cout << i << "[in]" << endl;
        }
        this_thread::sleep_for(500ms);
    }
}

int main()
{
    for (int i = 0; i < 3; i++) {
        thread th(TestLockGuard, i + 1);
        th.detach();
    }
    getchar();
}

3.3 unique_lock C++11

  • unique_lock C++11实现可移动的互斥体所有权包装器
  • 支持临时释放锁unlock
  • 支持adopt_lock(已经拥有锁,不加锁,出栈区会释放)
  • 支持 defer_lock(延后拥有,不加锁,出栈区不释放)
  • 支持try_to_lock 尝试获得互斥的所有权而不阻塞,获取失败退出栈区不会释放,通过owns_lock()函数判断 ```cpp

    include

    include

    include

    include

    include

using namespace std;

int main() { { static mutex mux;

    {
        unique_lock<mutex> lock(mux);
        lock.unlock(); // 可以提前释放锁,然后再加锁
        // 业务代码
        lock.lock();
    }
    {
        mux.lock();
        // 已经拥有锁,不锁定,退出解锁
        unique_lock<mutex> lock(mux, adopt_lock);
    }
    {
        // 延后枷锁,不拥有,退出不解锁
        unique_lock<mutex> lock(mux, defer_lock);
        // 加锁,退出栈区解锁
        lock.lock();
    }
    {
        // 尝试加锁,不阻塞,失败不拥有锁
        unique_lock<mutex> lock(mux, try_to_lock);
        if (lock.owns_lock()) {
            cout << "own_lock" << endl;
        } else {
            cout << "not own_lock" << endl;
        }
    }
}

}

<a name="nOvwI"></a>
# 3.4 shared_lock C++14
shared_lock C++14 实现可移动的共享互斥体所有权封装器
```cpp
explicit shared_lock(mutex_type& _Mtx)
    : _Pmtx(_STD addressof(_Mtx)), _Owns(true) { // construct with mutex and lock shared
        _Mtx.lock_shared();
    }
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>

using namespace std;

int main()
{
    {
        //共享锁
        static shared_timed_mutex tmux;
        //读取锁 共享锁
        {
            //调用共享锁
            shared_lock<shared_timed_mutex> lock(tmux);
            cout << "read data" << endl;
            //退出栈区 释放共享锁
        }
        //写入锁 互斥锁
        {
            unique_lock<shared_timed_mutex> lock(tmux);
            cout << "write data" << endl;
        }
    }
}

3.5 scoped_lock C++17

scoped_lock C++17 用于多个互斥体的免死锁RAII封装器,类似lock
演示死锁

#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>
using namespace std;

static mutex mux1;
static mutex mux2;

void TestScope1()
{
    //模拟死锁 停100ms等另一个线程锁mux2
    this_thread::sleep_for(100ms);
    cout << this_thread::get_id() << " begin mux1 lock" << endl;
    mux1.lock();
    cout << this_thread::get_id() << " begin mux2 lock" << endl;
    mux2.lock();
    cout << "TestScope1" << endl;
    this_thread::sleep_for(1000ms);

    mux1.unlock();
    mux2.unlock();
}
void TestScope2()
{
    cout << this_thread::get_id() << " begin mux2 lock" << endl;
    mux2.lock();
    this_thread::sleep_for(500ms);
    cout << this_thread::get_id() << " begin mux1 lock" << endl;
    mux1.lock(); //死锁
    cout << "TestScope2" << endl;
    this_thread::sleep_for(1500ms);
    mux1.unlock();
    mux2.unlock();
}

int main()
{
    {
        //演示死锁情况
        {
            thread th(TestScope1);
            th.detach();
        }
        {
            thread th(TestScope2);
            th.detach();
        }
    }
    getchar();
}

image.png
解决:

#include <iostream>
#include <mutex>
#include <string>
#include <thread>
// #include <scoped_lock>

using namespace std;

static mutex mux1;
static mutex mux2;

void TestScope1()
{
    //模拟死锁 停100ms等另一个线程锁mux2
    this_thread::sleep_for(100ms);
    cout << this_thread::get_id() << " begin lock" << endl;

    // c++11
    lock(mux1, mux2); // lock multiple locks, without deadlock

    // c++17
    // scoped_lock lock(mux1, mux2); // 解决死锁 class with destructor that unlocks mutexes

    cout << "TestScope1" << endl;
    this_thread::sleep_for(1000ms);
}

void TestScope2()
{
    cout << this_thread::get_id() << " begin lock" << endl;
    cout << this_thread::get_id() << " begin mux2 lock" << endl;
    mux2.lock();
    this_thread::sleep_for(500ms);
    cout << this_thread::get_id() << " begin mux1 lock" << endl;
    mux1.lock(); //死锁
    cout << "TestScope2" << endl;
    this_thread::sleep_for(1500ms);
    mux1.unlock();
    mux2.unlock();
}

int main()
{
    {
        //演示死锁情况
        {
            thread th(TestScope1);
            th.detach();
        }
        {
            thread th(TestScope2);
            th.detach();
        }
    }
    getchar();
}

image.png

3.6 使用互斥锁+list模拟线程通信

  • 封装线程基类XThread控制线程启动和停止
  • 模拟消息服务器线程 接收字符串消息,并模拟处理
  • 通过unique_lock和mutex互斥访问list 消息队列
  • 主线程定时发送消息给子线程

main函数

#include <iostream>
#include "XMsgServer.h"
#include <sstream>
using namespace std;
int main()
{
    XMsgServer server;
    server.Start();
    for (int i = 0; i < 10; i++) {
        stringstream ss;
        ss << "msg: " << i + 1;
        server.SendMsg(ss.str());
        this_thread::sleep_for(500ms);
    }
    server.Stop();
    getchar();
    return 0;
}

XThread

// XThread.h
#pragma once
#include <thread>

class XThread
{
public:
    //启动线程
    virtual void Start();

    //设置线程退出标志 并等待
    virtual void Stop();

    //等待线程退出(阻塞)
    virtual void Wait();

    //线程是否退出
    bool is_exit();
private:
    //线程入口
    virtual void Main() = 0;
    bool is_exit_ = false;
    std::thread th_;
}; 


// XThread.cpp
#include "XThread.h"
using namespace std;
void XThread::Start()
{
    is_exit_ = false;
    th_ = thread(&XThread::Main, this);
}

void XThread::Stop()
{
    is_exit_ = true;
    Wait();
}

void XThread::Wait()
{
    if (th_.joinable()) {
        th_.join();
    }
}

bool XThread::is_exit()
{
    return is_exit_;
}

XMsgServer

// XMsgServer.h
#pragma once
#include "XThread.h"
#include <string >
#include <list>
#include <mutex>
class XMsgServer: public XThread
{
public:
    // 给当前线程发消息
    void SendMsg(std::string msg);

private:
    // 处理消息的线程入口函数
    void Main() override;

    // 消息队列缓冲
    std::list<std::string> msgs_;

    // 互斥访问消息队列
    std::mutex mutex_;
};


// XMsgServer.cpp
#include "XMsgServer.h"
#include <iostream>

using namespace std;

void XMsgServer::SendMsg(std::string msg)
{
    unique_lock<mutex> lock(mutex_);
    msgs_.push_back(msg);
}

//处理消息的线程入口函数
void XMsgServer::Main()
{
    while (!is_exit()) {
        this_thread::sleep_for(10ms);
        unique_lock<mutex> lock(mutex_);
        if (msgs_.empty()) {
            continue;
        }
        while (!msgs_.empty()) {
            //消息处理业务逻辑
            cout << "recv: " << msgs_.front() << endl;
            msgs_.pop_front();
        }

    }
}

3.7 条件变量

3.7.1 生产者-消费者模型

  • 生产者和消费者共享资源变量 list队列
  • 生产者生产一个产品,通知消费者消费
  • 消费者阻塞等待信息—-获取信号后消费产品(取出list队列中数据)

一、改变共享变量的线程步骤

  • 准备好信号量

    std::condition_variable cv;
    
    1. 获得std::mutex 常通过 std::unique_lock
      unique_lock lock(mux);
      
    1. 在获取锁时进行修改
      msgs_.push_back(data);
      
  • 3.释放锁并通知读取线程

    lock.unlock();
    cv.notify_one(); // 通知一个等待信号线程
    cv.notify_all(); // 通知所有等待信号线程
    

    二、等待信号读取共享变量的线程步骤

  • 1.获得与改变变量线程共用的mutex

    unique_lock lock(mux);
    
    1. wait()等待信号通知
  • 2.1 无lambda表达式 ```cpp // 解锁lock,并阻塞等待 notify_one notify_all通知 cv.wait(lock);

// 接收到通知会再次获取锁标注,也就是说如果此时mux资源被占用,wait函数会阻塞 msgs.front(); // 处理数据 msgs.pop_front();


- 2.2 lambda表达式 cv.wait(lock, []{ return !msgs.empty(); });<br />只在std::unique_lock<std::mutex>上工作的 std::condition_varible

```cpp
#include <iostream>
#include <list>
#include <mutex>
#include <sstream>
#include <thread>

using namespace std;

list<string> msgs_;
condition_variable cv;
mutex mux;
void ThreadWrite()
{
    for (int i = 0;; i++) {
        stringstream ss;
        ss << "Write msg " << i;
        unique_lock<mutex> lock(mux);
        msgs_.push_back(ss.str());
        lock.unlock();
        cv.notify_one(); // 发送信号
        this_thread::sleep_for(1s);
    }
}
void ThreadRead(int i)
{
    for (;;) {
        cout << "read msg" << endl;
        unique_lock<mutex> lock(mux);
        // cv.wait(lock); //解锁、阻塞等待信号
        cv.wait(lock, [i]() {
            cout << i << " wait" << endl;
            return !msgs_.empty();
        });

        //获取信号后锁定
        while (!msgs_.empty()) {
            cout << i << " read " << msgs_.front() << endl;
            msgs_.pop_front();
        }
    }
}

int main(int argc, char* argv[])
{
    thread th(ThreadWrite);
    th.detach();
    for (int i = 0; i < 3; i++) {
        thread th(ThreadRead, i + 1);
        th.detach();
    }
    getchar();
    return 0;
}

image.png

main.cpp

#include "xmsgserver.h"
#include <iostream>
#include <sstream>

using namespace std;

int main(int argc, char* argv[])
{
    XMsgServer server;
    server.Start();
    for (int i = 0; i < 10; i++) {
        stringstream ss;
        ss << "msg: " << i;
        server.SendMsg(ss.str());
        this_thread::sleep_for(500ms);
    }
    server.Stop();
    cout << "Server stoped" << endl;
    getchar();
}
// xthread.h

#pragma once
#include <thread>

class XThread {
public:
    // 启动线程
    virtual void Start();
    virtual void Stop();
    virtual void Wait();
    bool is_exit();

protected:
    bool is_exit_;

private:
    virtual void Main() = 0;
    std::thread th_;
};


// xthread.cpp
#include "xthread.h"

using namespace std;

void XThread::Start()
{
    is_exit_ = false;
    th_ = thread(&XThread::Main, this);
}

void XThread::Stop()
{
    is_exit_ = true;
    Wait();
}

void XThread::Wait()
{
    if (th_.joinable()) {
        th_.join();
    }
}

bool XThread::is_exit()
{
    return is_exit_;
}
// xmsgserver.h
#pragma once
#include "xthread.h"
#include <list>
#include <mutex>
#include <string>

class XMsgServer : public XThread {
public:
    // 给当前线程发消息
    void SendMsg(std::string msg);
    void Stop() override;

private:
    // 处理消息的线程入口函数
    void Main() override;

    // 消息队列缓冲
    std::list<std::string> msgs_;

    // 互斥访问消息队列
    std::mutex mutex_;

    std::condition_variable cv_;
};

// xmsgserver.cpp
#include "xmsgserver.h"
#include <iostream>
using namespace std;

void XMsgServer::SendMsg(std::string msg)
{
    unique_lock<mutex> lock(mutex_);
    msgs_.push_back(msg);
    lock.unlock();
    cv_.notify_one();
}

void XMsgServer::Stop()
{
    is_exit_ = true;
    cv_.notify_all();
    Wait();
}

void XMsgServer::Main()
{
    while (!is_exit()) {
        unique_lock<mutex> lock(mutex_);
        cv_.wait(lock, [this]() {
            cout << "is_exit: " << is_exit() << endl;
            if (is_exit()) {
                return true;
            }
            return !msgs_.empty();
        });
        while (!msgs_.empty()) {
            //消息处理业务逻辑
            cout << "recv: " << msgs_.front() << endl;
            msgs_.pop_front();
        }
    }
}

image.png