这周学习了多线程并发的相关知识,写一个读书笔记以作记录。包括进程、线程、并发、mutex等等。

一、进程、线程、并发

  • 进程:简而言之就是一个运行的程序,如点开一个exe文件就打开了一个进程。
  • 线程:进程中的不同执行路径,即在一个进程中运行了多个功能。每一个进程都至少包含有一个线程,即主线程,主线程与进程的关系是相互依存的。
  • 并发:分为多线程和多进程,多进程同时运行和多线程同时运行的情况。例如,同时打开两个QQ客户端是多进程,VStudio中多个窗口线程是多线程并发。

二、thread

以前,由于系统环境的不同,如Windows和Linux,需要选择不同的线程库进行代码编写,可移植性不高。C11之后有了标准的线程库:<u>std::thread</u>。除此之外,C还提供了另外三个库来支持多线程编程,分别是 <mutex>, <condition_variable><future>

thread

thread类是thread库多线程实现的基础。以下是其构造函数及示例代码:

构造函数

  • thread():默认构造函数,创建一个空的 std::thread 执行对象(在线程池的实现中需要提前创建一定数量的线程对象)。
  • thread(Fn&& fn, Args&&...):初始化构造函数,创建一个 std::thread 对象,该对象可被 joinable,新产生的线程会调用 fn 函数(即可调用对象),该函数的参数由 args 给出。
  • thread(const thread&) = delete:拷贝构造函数(被禁用),意味着 std::thread 对象不可拷贝构造,线程在同一时刻仅能由一个线程对象运行。
  • thread(thread&& x):转移/移动构造函数,调用成功之后 x 不再代表任何 std::thread 执行对象,相当于将“线程”的所有权转给了另外的线程对象。

例子

以下示例展示了如何创建线程:

  1. #include <iostream> // std::cout
  2. #include <thread> // std::thread
  3. void func1() {
  4. for (int i = 0; i != 10; ++i) {
  5. std::cout << "thread 1 print " << i << std::endl;
  6. }
  7. }
  8. void func2(int n) {
  9. std::cout << "thread 2 print " << n << std::endl;
  10. }
  11. int main() {
  12. std::thread t1(func1);
  13. std::thread t2(func2, 123);
  14. std::cout << "main, foo and bar now execute concurrently...\n";
  15. t1.join(); // 阻塞直到第一个线程完成
  16. t2.join(); // 阻塞直到第二个线程完成
  17. std::cout << "thread 1 and thread 2 completed.\n";
  18. system("pause");
  19. return 0;
  20. }

结果

  1. thread 1 print 0
  2. thread 1 print 1
  3. ...
  4. thread 2 print 123
  5. ...
  6. main, foo and bar now execute concurrently...
  7. thread 1 and thread 2 completed.

重要函数

  • join():阻塞主线程直到子线程执行完毕。
  • detach():将线程与主线程分离,线程被运行时库接管。若在Linux环境下运行,即使Ctrl+C退出主线程,子线程依旧运行。
  • get_id():获取线程 ID,返回一个类型为 std::thread::id 的对象。
  • joinable():检查线程是否可被 join。
  • swap():交换两个线程对象所代表的底层句柄资源。

例子

以下示例展示了 swap 函数的使用:

  1. #include <iostream>
  2. #include <thread>
  3. #include <chrono>
  4. void foo() {
  5. std::this_thread::sleep_for(std::chrono::seconds(1));
  6. }
  7. void bar() {
  8. std::this_thread::sleep_for(std::chrono::seconds(1));
  9. }
  10. int main() {
  11. std::thread t1(foo);
  12. std::thread t2(bar);
  13. std::cout << "thread 1 id: " << t1.get_id() << std::endl;
  14. std::cout << "thread 2 id: " << t2.get_id() << std::endl;
  15. std::swap(t1, t2);
  16. std::cout << "after std::swap(t1, t2):" << std::endl;
  17. std::cout << "thread 1 id: " << t1.get_id() << std::endl;
  18. std::cout << "thread 2 id: " << t2.get_id() << std::endl;
  19. t1.swap(t2);
  20. std::cout << "after t1.swap(t2):" << std::endl;
  21. std::cout << "thread 1 id: " << t1.get_id() << std::endl;
  22. std::cout << "thread 2 id: " << t2.get_id() << std::endl;
  23. t1.join();
  24. t2.join();
  25. }

结果

  1. thread 1 id: ...
  2. thread 2 id: ...
  3. after std::swap(t1, t2):
  4. thread 1 id: ...
  5. thread 2 id: ...
  6. after t1.swap(t2):
  7. thread 1 id: ...
  8. thread 2 id: ...

其他函数

  • yield():当前线程放弃执行,操作系统调度另一线程继续执行。
  • sleep_until:线程休眠至某个指定的时刻,被重新唤醒。
  • sleep_for:线程休眠某个指定的时间片,详见例子。
  1. #include <iostream>
  2. #include <chrono>
  3. #include <thread>
  4. int main() {
  5. std::cout << "Hello waiter" << std::endl;
  6. std::chrono::milliseconds dura(2000);
  7. std::this_thread::sleep_for(dura);
  8. std::cout << "Waited 2000 ms\n";
  9. }

三、mutex

互斥

互斥是指散布在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务不能运行其余程序片段,需等待该任务运行完毕。

mutex

std::mutex 是最基本的互斥量类,不支持递归地对 std::mutex 对象上锁,而 std::recursive_mutex 则可以。

常用函数

  • lock():锁住互斥量,可能发生死锁。
  • unlock():解锁,释放对互斥量的所有权。
  • try_lock():尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程不会被阻塞。可能发生死锁。
  • std::recursive_mutex:允许同一个线程多次上锁解锁。

例子

  1. #include <iostream> // std::cout
  2. #include <thread> // std::thread
  3. #include <mutex> // std::mutex, std::lock_guard
  4. #include <stdexcept> // std::logic_error
  5. std::mutex mtx;
  6. void print_even(int x) {
  7. if (x % 2 == 0) std::cout << x << " is even\n";
  8. else throw (std::logic_error("not even"));
  9. }
  10. void print_thread_id(int id) {
  11. try {
  12. std::lock_guard<std::mutex> lck(mtx);
  13. print_even(id);
  14. } catch (std::logic_error&) {
  15. std::cout << "[exception caught]\n";
  16. }
  17. }
  18. int main() {
  19. std::thread threads[10];
  20. for (int i = 0; i < 10; ++i)
  21. threads[i] = std::thread(print_thread_id, i + 1);
  22. for (auto& th : threads) th.join();
  23. return 0;
  24. }

输出

  1. [exception caught]
  2. 2 is even
  3. ...
  4. 10 is even

unique_lock

unique_lock 对象以独占所有权的方式管理 mutex 对象的上锁和解锁操作。

四、条件变量 condition_variable

条件变量 (Condition Variable) 在多线程程序中用于实现“等待->唤醒”逻辑。

例子

  1. #include <thread> // std::thread
  2. #include <mutex> // std::mutex, std::unique_lock
  3. #include <condition_variable> // std::condition_variable
  4. std::mutex mtx; // 全局互斥锁
  5. std::condition_variable cv; // 全局条件变量
  6. bool ready = false; // 全局标志位
  7. void do_print_id(int id) {
  8. std::unique_lockstd::mutex lck(mtx);
  9. while (!ready) // 如果标志位不为 true,则等待
  10. cv.wait(lck); // 当前线程被阻塞,直到全局标志位变为 true 后被唤醒
  11. std::cout << "thread " << id << '\n'; // 打印线程编号
  12. }
  13. void go() {
  14. std::unique_lockstd::mutex lck(mtx);
  15. ready = true; // 设置全局标志位为 true
  16. cv.notify_all(); // 唤醒所有线程
  17. }
  18. int main() {
  19. std::thread threads[10];
  20. // 创建10个线程
  21. for (int i = 0; i < 10; ++i)
  22. threads[i] = std::thread(do_print_id, i);
  23. std::cout << "10 threads ready to race...\n";
  24. go(); // 唤醒所有等待线程
  25. for (auto& th : threads)
  26. th.join(); // 主线程等待所有子线程完成
  27. return 0;
  28. }

.

输出:

  1. 10 threads ready to race...
  2. thread 0
  3. thread 1
  4. thread 2
  5. thread 3
  6. thread 4
  7. thread 5
  8. thread 6
  9. thread 7
  10. thread 8
  11. thread 9

解释

在上述代码中,当调用 cv.wait() 时,如果 ready 变量不为 true,当前线程将被阻塞。当 go() 函数调用 cv.notify_all() 时,所有被阻塞的线程将被唤醒并继续执行。

std::condition_variable 提供的其他函数

  • wait_for:等待一段指定的时间。
  • wait_until:等待直到某个指定的时刻。

输出

  1. thread 0
  2. thread 1
  3. thread 2
  4. ...
  5. thread 9

五、异步调用 future

std::async

std::async 提供了一种启动异步任务的机制,可以通过 std::future 对象获取异步操作的结果。

例子

  1. #include <iostream>
  2. #include <future>
  3. #include <chrono>
  4. bool is_prime(int x) {
  5. for (int i = 2; i < x; ++i) {
  6. if (x % i == 0) return false;
  7. }
  8. return true;
  9. }
  10. int main() {
  11. // 异步启动任务
  12. std::future<bool> fut = std::async(is_prime, 700020007);
  13. std::cout << "please wait";
  14. std::chrono::milliseconds span(100);
  15. while (fut.wait_for(span) != std::future_status::ready)
  16. std::cout << ".";
  17. bool ret = fut.get();
  18. std::cout << "\nfinal result: " << std::boolalpha << ret << std::endl;
  19. return 0;
  20. }

输出

  1. please wait........
  2. final result: true

std::promise

std::promise 是一个用于在一个线程中设置值,并允许另一个线程通过 std::future 获取该值的类。

例子

  1. #include <iostream>
  2. #include <thread>
  3. #include <future>
  4. void set_value(std::promise<int>& prom) {
  5. prom.set_value(10);
  6. }
  7. int main() {
  8. std::promise<int> prom;
  9. std::future<int> fut = prom.get_future();
  10. std::thread t(set_value, std::ref(prom));
  11. std::cout << "value: " << fut.get() << std::endl;
  12. t.join();
  13. return 0;
  14. }

输出

  1. value: 10

六、原子操作 atomic

std::atomic 是 C++11 提供的用于实现原子操作的类型,避免数据竞争。

std::atomic_flag

std::atomic_flag 是一个简单的原子布尔类型,可以用于实现互斥锁。

  1. #include <iostream>
  2. #include <atomic>
  3. #include <thread>
  4. #include <vector>
  5. #include <sstream>
  6. std::atomic_flag lock = ATOMIC_FLAG_INIT;
  7. std::stringstream stream;
  8. void append_number(int x) {
  9. while (lock.test_and_set()) {} // 自旋锁
  10. stream << "thread#" << x << "\n";
  11. lock.clear();
  12. }
  13. int main() {
  14. std::vector<std::thread> threads;
  15. for (int i = 0; i < 10; ++i)
  16. threads.push_back(std::thread(append_number, i));
  17. for (auto& th : threads)
  18. th.join();
  19. std::cout << stream.str();
  20. return 0;
  21. }

输出

  1. thread#0
  2. thread#1
  3. ...
  4. thread#9

例子:std::atomic

  1. #include <iostream>
  2. #include <atomic>
  3. #include <thread>
  4. #include <vector>
  5. std::atomic<bool> ready(false);
  6. std::atomic_flag winner = ATOMIC_FLAG_INIT;
  7. void count1m(int id) {
  8. while (!ready) {}
  9. for (int i = 0; i < 1000000; ++i) {}
  10. if (!winner.test_and_set()) {
  11. std::cout << "winner: " << id << std::endl;
  12. }
  13. }
  14. int main() {
  15. std::vector<std::thread> threads;
  16. for (int i = 0; i < 10; ++i)
  17. threads.push_back(std::thread(count1m, i));
  18. ready = true;
  19. for (auto& th : threads)
  20. th.join();
  21. return 0;
  22. }

输出

  1. winner: 0

七、锁的详解

std::lock_guard

std::lock_guard 是 C++11 中引入的一个 RAII 风格的锁管理器,用于自动管理锁的生命周期。

特点

  • 简单易用std::lock_guard 在构造时获取锁,在析构时释放锁。
  • 异常安全:即使在异常情况下,std::lock_guard 也能保证锁的释放。

例子

  1. #include <iostream>
  2. #include <thread>
  3. #include <mutex>
  4. std::mutex mtx; // 全局互斥锁
  5. void print_even(int x) {
  6. if (x % 2 == 0) std::cout << x << " is even\n";
  7. else throw std::logic_error("not even");
  8. }
  9. void print_thread_id(int id) {
  10. try {
  11. std::lock_guard<std::mutex> lck(mtx); // 自动获取并释放锁
  12. print_even(id);
  13. } catch (std::logic_error&) {
  14. std::cout << "[exception caught]\n";
  15. }
  16. }
  17. int main() {
  18. std::thread threads[10];
  19. for (int i = 0; i < 10; ++i)
  20. threads[i] = std::thread(print_thread_id, i + 1);
  21. for (auto& th : threads) th.join();
  22. return 0;
  23. }

输出

  1. [exception caught]
  2. 2 is even
  3. [exception caught]
  4. 4 is even
  5. ...
  6. 10 is even

std::unique_lock

std::unique_lock 提供了比 std::lock_guard 更加灵活的锁管理。

特点

  • 灵活性std::unique_lock 可以延迟锁定、提前解锁及再次锁定。
  • 条件变量:在使用条件变量时,必须使用 std::unique_lock

例子

  1. #include <iostream>
  2. #include <thread>
  3. #include <mutex>
  4. #include <condition_variable>
  5. std::mutex mtx;
  6. std::condition_variable cv;
  7. bool ready = false;
  8. void print_id(int id) {
  9. std::unique_lock<std::mutex> lck(mtx);
  10. cv.wait(lck, [] { return ready; }); // 等待 ready 为 true
  11. std::cout << "thread " << id << '\n';
  12. }
  13. void go() {
  14. std::unique_lock<std::mutex> lck(mtx);
  15. ready = true;
  16. cv.notify_all(); // 唤醒所有等待的线程
  17. }
  18. int main() {
  19. std::thread threads[10];
  20. for (int i = 0; i < 10; ++i)
  21. threads[i] = std::thread(print_id, i);
  22. std::this_thread::sleep_for(std::chrono::seconds(1));
  23. go();
  24. for (auto& th : threads) th.join();
  25. return 0;
  26. }

输出

  1. thread 0
  2. thread 1
  3. thread 2
  4. ...
  5. thread 9

std::shared_lock

std::shared_lock 是 C++14 引入的用于共享所有权的锁管理器,适用于读多写少的场景。

特点

  • 共享模式:允许多个线程共享读取权限,但写操作必须独占。
  • 提升并发性:适用于读多写少的场景,提升并发性能。

例子

  1. #include <iostream>
  2. #include <shared_mutex>
  3. #include <thread>
  4. #include <vector>
  5. std::shared_mutex shared_mtx;
  6. void reader(int id) {
  7. std::shared_lock<std::shared_mutex> lock(shared_mtx);
  8. std::cout << "Reader " << id << " is reading.\n";
  9. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  10. }
  11. void writer(int id) {
  12. std::unique_lock<std::shared_mutex> lock(shared_mtx);
  13. std::cout << "Writer " << id << " is writing.\n";
  14. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  15. }
  16. int main() {
  17. std::vector<std::thread> threads;
  18. for (int i = 0; i < 5; ++i) {
  19. threads.emplace_back(reader, i);
  20. }
  21. for (int i = 0; i < 2; ++i) {
  22. threads.emplace_back(writer, i);
  23. }
  24. for (auto& th : threads) {
  25. th.join();
  26. }
  27. return 0;
  28. }

输出

  1. Reader 0 is reading.
  2. Reader 1 is reading.
  3. Reader 2 is reading.
  4. Writer 0 is writing.
  5. Reader 3 is reading.
  6. Reader 4 is reading.
  7. Writer 1 is writing.

八、线程池

介绍

线程池是一种常用的多线程设计模式,它通过复用一组线程来执行任务,从而避免频繁创建和销毁线程的开销。线程池在需要频繁创建和销毁线程的场景中尤其有用,例如服务器处理请求、并行计算等。

实现

以下是一个简单的线程池实现:

  1. #include <iostream>
  2. #include <vector>
  3. #include <thread>
  4. #include <queue>
  5. #include <mutex>
  6. #include <condition_variable>
  7. #include <functional>
  8. #include <future>
  9. class ThreadPool {
  10. public:
  11. ThreadPool(size_t);
  12. template<class F, class... Args>
  13. auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
  14. ~ThreadPool();
  15. private:
  16. std::vector<std::thread> workers;
  17. std::queue<std::function<void()>> tasks;
  18. std::mutex queue_mutex;
  19. std::condition_variable condition;
  20. bool stop;
  21. };
  22. inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
  23. for (size_t i = 0; i < threads; ++i)
  24. workers.emplace_back([this] {
  25. for (;;) {
  26. std::function<void()> task;
  27. {
  28. std::unique_lock<std::mutex> lock(this->queue_mutex);
  29. this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
  30. if (this->stop && this->tasks.empty())
  31. return;
  32. task = std::move(this->tasks.front());
  33. this->tasks.pop();
  34. }
  35. task();
  36. }
  37. });
  38. }
  39. template<class F, class... Args>
  40. auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
  41. using return_type = typename std::result_of<F(Args...)>::type;
  42. auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  43. std::future<return_type> res = task->get_future();
  44. {
  45. std::unique_lock<std::mutex> lock(queue_mutex);
  46. if (stop)
  47. throw std::runtime_error("enqueue on stopped ThreadPool");
  48. tasks.emplace([task]() { (*task)(); });
  49. }
  50. condition.notify_one();
  51. return res;
  52. }
  53. inline ThreadPool::~ThreadPool() {
  54. {
  55. std::unique_lock<std::mutex> lock(queue_mutex);
  56. stop = true;
  57. }
  58. condition.notify_all();
  59. for (std::thread& worker : workers)
  60. worker.join();
  61. }

使用线程池

  1. int main() {
  2. ThreadPool pool(4);
  3. auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 1, 2);
  4. auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 3, 4);
  5. std::cout << "1 + 2 = " << result1.get() << std::endl;
  6. std::cout << "3 * 4 = " << result2.get() << std::endl;
  7. return 0;
  8. }

输出

  1. 1 + 2 = 3
  2. 3 * 4 = 12

优点

  • 复用线程:避免频繁创建销毁线程的开销。
  • 任务并行:提高任务处理的并行度。
  • 管理方便:统一管理线程和任务。

参考资料