这周学习了多线程并发的相关知识,写一个读书笔记以作记录。包括进程、线程、并发、mutex等等。
一、进程、线程、并发
- 进程:简而言之就是一个运行的程序,如点开一个exe文件就打开了一个进程。
 - 线程:进程中的不同执行路径,即在一个进程中运行了多个功能。每一个进程都至少包含有一个线程,即主线程,主线程与进程的关系是相互依存的。
 - 并发:分为多线程和多进程,多进程同时运行和多线程同时运行的情况。例如,同时打开两个QQ客户端是多进程,VStudio中多个窗口线程是多线程并发。
 
二、thread 库
以前,由于系统环境的不同,如Windows和Linux,需要选择不同的线程库进行代码编写,可移植性不高。C11之后有了标准的线程库:<u>std::thread</u>。除此之外,C还提供了另外三个库来支持多线程编程,分别是 <mutex>, <condition_variable> 和 <future>。
thread类
thread类是thread库多线程实现的基础。以下是其构造函数及示例代码:
构造函数
thread():默认构造函数,创建一个空的std::thread执行对象(在线程池的实现中需要提前创建一定数量的线程对象)。thread(Fn&& fn, Args&&...):初始化构造函数,创建一个std::thread对象,该对象可被 joinable,新产生的线程会调用fn函数(即可调用对象),该函数的参数由args给出。thread(const thread&) = delete:拷贝构造函数(被禁用),意味着std::thread对象不可拷贝构造,线程在同一时刻仅能由一个线程对象运行。thread(thread&& x):转移/移动构造函数,调用成功之后x不再代表任何std::thread执行对象,相当于将“线程”的所有权转给了另外的线程对象。
例子
以下示例展示了如何创建线程:
#include <iostream> // std::cout#include <thread> // std::threadvoid func1() {for (int i = 0; i != 10; ++i) {std::cout << "thread 1 print " << i << std::endl;}}void func2(int n) {std::cout << "thread 2 print " << n << std::endl;}int main() {std::thread t1(func1);std::thread t2(func2, 123);std::cout << "main, foo and bar now execute concurrently...\n";t1.join(); // 阻塞直到第一个线程完成t2.join(); // 阻塞直到第二个线程完成std::cout << "thread 1 and thread 2 completed.\n";system("pause");return 0;}
结果:
thread 1 print 0thread 1 print 1...thread 2 print 123...main, foo and bar now execute concurrently...thread 1 and thread 2 completed.
重要函数
join():阻塞主线程直到子线程执行完毕。detach():将线程与主线程分离,线程被运行时库接管。若在Linux环境下运行,即使Ctrl+C退出主线程,子线程依旧运行。get_id():获取线程 ID,返回一个类型为std::thread::id的对象。joinable():检查线程是否可被 join。swap():交换两个线程对象所代表的底层句柄资源。
例子
以下示例展示了 swap 函数的使用:
#include <iostream>#include <thread>#include <chrono>void foo() {std::this_thread::sleep_for(std::chrono::seconds(1));}void bar() {std::this_thread::sleep_for(std::chrono::seconds(1));}int main() {std::thread t1(foo);std::thread t2(bar);std::cout << "thread 1 id: " << t1.get_id() << std::endl;std::cout << "thread 2 id: " << t2.get_id() << std::endl;std::swap(t1, t2);std::cout << "after std::swap(t1, t2):" << std::endl;std::cout << "thread 1 id: " << t1.get_id() << std::endl;std::cout << "thread 2 id: " << t2.get_id() << std::endl;t1.swap(t2);std::cout << "after t1.swap(t2):" << std::endl;std::cout << "thread 1 id: " << t1.get_id() << std::endl;std::cout << "thread 2 id: " << t2.get_id() << std::endl;t1.join();t2.join();}
结果:
thread 1 id: ...thread 2 id: ...after std::swap(t1, t2):thread 1 id: ...thread 2 id: ...after t1.swap(t2):thread 1 id: ...thread 2 id: ...
其他函数
yield():当前线程放弃执行,操作系统调度另一线程继续执行。sleep_until:线程休眠至某个指定的时刻,被重新唤醒。sleep_for:线程休眠某个指定的时间片,详见例子。
#include <iostream>#include <chrono>#include <thread>int main() {std::cout << "Hello waiter" << std::endl;std::chrono::milliseconds dura(2000);std::this_thread::sleep_for(dura);std::cout << "Waited 2000 ms\n";}
三、mutex 库
互斥
互斥是指散布在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务不能运行其余程序片段,需等待该任务运行完毕。
mutex 类
std::mutex 是最基本的互斥量类,不支持递归地对 std::mutex 对象上锁,而 std::recursive_mutex 则可以。
常用函数
lock():锁住互斥量,可能发生死锁。unlock():解锁,释放对互斥量的所有权。try_lock():尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程不会被阻塞。可能发生死锁。std::recursive_mutex:允许同一个线程多次上锁解锁。
例子
#include <iostream> // std::cout#include <thread> // std::thread#include <mutex> // std::mutex, std::lock_guard#include <stdexcept> // std::logic_errorstd::mutex mtx;void print_even(int x) {if (x % 2 == 0) std::cout << x << " is even\n";else throw (std::logic_error("not even"));}void print_thread_id(int id) {try {std::lock_guard<std::mutex> lck(mtx);print_even(id);} catch (std::logic_error&) {std::cout << "[exception caught]\n";}}int main() {std::thread threads[10];for (int i = 0; i < 10; ++i)threads[i] = std::thread(print_thread_id, i + 1);for (auto& th : threads) th.join();return 0;}
输出:
[exception caught]2 is even...10 is even
unique_lock
unique_lock 对象以独占所有权的方式管理 mutex 对象的上锁和解锁操作。
四、条件变量 condition_variable
条件变量 (Condition Variable) 在多线程程序中用于实现“等待->唤醒”逻辑。
例子
#include <thread> // std::thread#include <mutex> // std::mutex, std::unique_lock#include <condition_variable> // std::condition_variablestd::mutex mtx; // 全局互斥锁std::condition_variable cv; // 全局条件变量bool ready = false; // 全局标志位void do_print_id(int id) {std::unique_lockstd::mutex lck(mtx);while (!ready) // 如果标志位不为 true,则等待cv.wait(lck); // 当前线程被阻塞,直到全局标志位变为 true 后被唤醒std::cout << "thread " << id << '\n'; // 打印线程编号}void go() {std::unique_lockstd::mutex lck(mtx);ready = true; // 设置全局标志位为 truecv.notify_all(); // 唤醒所有线程}int main() {std::thread threads[10];// 创建10个线程for (int i = 0; i < 10; ++i)threads[i] = std::thread(do_print_id, i);std::cout << "10 threads ready to race...\n";go(); // 唤醒所有等待线程for (auto& th : threads)th.join(); // 主线程等待所有子线程完成return 0;}
.
输出:
10 threads ready to race...thread 0thread 1thread 2thread 3thread 4thread 5thread 6thread 7thread 8thread 9
解释
在上述代码中,当调用 cv.wait() 时,如果 ready 变量不为 true,当前线程将被阻塞。当 go() 函数调用 cv.notify_all() 时,所有被阻塞的线程将被唤醒并继续执行。
std::condition_variable 提供的其他函数
wait_for:等待一段指定的时间。wait_until:等待直到某个指定的时刻。
输出:
thread 0thread 1thread 2...thread 9
五、异步调用 future
std::async
std::async 提供了一种启动异步任务的机制,可以通过 std::future 对象获取异步操作的结果。
例子
#include <iostream>#include <future>#include <chrono>bool is_prime(int x) {for (int i = 2; i < x; ++i) {if (x % i == 0) return false;}return true;}int main() {// 异步启动任务std::future<bool> fut = std::async(is_prime, 700020007);std::cout << "please wait";std::chrono::milliseconds span(100);while (fut.wait_for(span) != std::future_status::ready)std::cout << ".";bool ret = fut.get();std::cout << "\nfinal result: " << std::boolalpha << ret << std::endl;return 0;}
输出:
please wait........final result: true
std::promise
std::promise 是一个用于在一个线程中设置值,并允许另一个线程通过 std::future 获取该值的类。
例子
#include <iostream>#include <thread>#include <future>void set_value(std::promise<int>& prom) {prom.set_value(10);}int main() {std::promise<int> prom;std::future<int> fut = prom.get_future();std::thread t(set_value, std::ref(prom));std::cout << "value: " << fut.get() << std::endl;t.join();return 0;}
输出:
value: 10
六、原子操作 atomic
std::atomic 是 C++11 提供的用于实现原子操作的类型,避免数据竞争。
std::atomic_flag
std::atomic_flag 是一个简单的原子布尔类型,可以用于实现互斥锁。
#include <iostream>#include <atomic>#include <thread>#include <vector>#include <sstream>std::atomic_flag lock = ATOMIC_FLAG_INIT;std::stringstream stream;void append_number(int x) {while (lock.test_and_set()) {} // 自旋锁stream << "thread#" << x << "\n";lock.clear();}int main() {std::vector<std::thread> threads;for (int i = 0; i < 10; ++i)threads.push_back(std::thread(append_number, i));for (auto& th : threads)th.join();std::cout << stream.str();return 0;}
输出:
thread#0thread#1...thread#9
例子:std::atomic
#include <iostream>#include <atomic>#include <thread>#include <vector>std::atomic<bool> ready(false);std::atomic_flag winner = ATOMIC_FLAG_INIT;void count1m(int id) {while (!ready) {}for (int i = 0; i < 1000000; ++i) {}if (!winner.test_and_set()) {std::cout << "winner: " << id << std::endl;}}int main() {std::vector<std::thread> threads;for (int i = 0; i < 10; ++i)threads.push_back(std::thread(count1m, i));ready = true;for (auto& th : threads)th.join();return 0;}
输出:
winner: 0
七、锁的详解
std::lock_guard
std::lock_guard 是 C++11 中引入的一个 RAII 风格的锁管理器,用于自动管理锁的生命周期。
特点
- 简单易用:
std::lock_guard在构造时获取锁,在析构时释放锁。 - 异常安全:即使在异常情况下,
std::lock_guard也能保证锁的释放。 
例子
#include <iostream>#include <thread>#include <mutex>std::mutex mtx; // 全局互斥锁void print_even(int x) {if (x % 2 == 0) std::cout << x << " is even\n";else throw std::logic_error("not even");}void print_thread_id(int id) {try {std::lock_guard<std::mutex> lck(mtx); // 自动获取并释放锁print_even(id);} catch (std::logic_error&) {std::cout << "[exception caught]\n";}}int main() {std::thread threads[10];for (int i = 0; i < 10; ++i)threads[i] = std::thread(print_thread_id, i + 1);for (auto& th : threads) th.join();return 0;}
输出:
[exception caught]2 is even[exception caught]4 is even...10 is even
std::unique_lock
std::unique_lock 提供了比 std::lock_guard 更加灵活的锁管理。
特点
- 灵活性:
std::unique_lock可以延迟锁定、提前解锁及再次锁定。 - 条件变量:在使用条件变量时,必须使用 
std::unique_lock。 
例子
#include <iostream>#include <thread>#include <mutex>#include <condition_variable>std::mutex mtx;std::condition_variable cv;bool ready = false;void print_id(int id) {std::unique_lock<std::mutex> lck(mtx);cv.wait(lck, [] { return ready; }); // 等待 ready 为 truestd::cout << "thread " << id << '\n';}void go() {std::unique_lock<std::mutex> lck(mtx);ready = true;cv.notify_all(); // 唤醒所有等待的线程}int main() {std::thread threads[10];for (int i = 0; i < 10; ++i)threads[i] = std::thread(print_id, i);std::this_thread::sleep_for(std::chrono::seconds(1));go();for (auto& th : threads) th.join();return 0;}
输出:
thread 0thread 1thread 2...thread 9
std::shared_lock
std::shared_lock 是 C++14 引入的用于共享所有权的锁管理器,适用于读多写少的场景。
特点
- 共享模式:允许多个线程共享读取权限,但写操作必须独占。
 - 提升并发性:适用于读多写少的场景,提升并发性能。
 
例子
#include <iostream>#include <shared_mutex>#include <thread>#include <vector>std::shared_mutex shared_mtx;void reader(int id) {std::shared_lock<std::shared_mutex> lock(shared_mtx);std::cout << "Reader " << id << " is reading.\n";std::this_thread::sleep_for(std::chrono::milliseconds(50));}void writer(int id) {std::unique_lock<std::shared_mutex> lock(shared_mtx);std::cout << "Writer " << id << " is writing.\n";std::this_thread::sleep_for(std::chrono::milliseconds(50));}int main() {std::vector<std::thread> threads;for (int i = 0; i < 5; ++i) {threads.emplace_back(reader, i);}for (int i = 0; i < 2; ++i) {threads.emplace_back(writer, i);}for (auto& th : threads) {th.join();}return 0;}
输出:
Reader 0 is reading.Reader 1 is reading.Reader 2 is reading.Writer 0 is writing.Reader 3 is reading.Reader 4 is reading.Writer 1 is writing.
八、线程池
介绍
线程池是一种常用的多线程设计模式,它通过复用一组线程来执行任务,从而避免频繁创建和销毁线程的开销。线程池在需要频繁创建和销毁线程的场景中尤其有用,例如服务器处理请求、并行计算等。
实现
以下是一个简单的线程池实现:
#include <iostream>#include <vector>#include <thread>#include <queue>#include <mutex>#include <condition_variable>#include <functional>#include <future>class ThreadPool {public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;};inline ThreadPool::ThreadPool(size_t threads) : stop(false) {for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});}template<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;}inline ThreadPool::~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();}
使用线程池
int main() {ThreadPool pool(4);auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 1, 2);auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 3, 4);std::cout << "1 + 2 = " << result1.get() << std::endl;std::cout << "3 * 4 = " << result2.get() << std::endl;return 0;}
输出:
1 + 2 = 33 * 4 = 12
优点
- 复用线程:避免频繁创建销毁线程的开销。
 - 任务并行:提高任务处理的并行度。
 - 管理方便:统一管理线程和任务。
 
