这周学习了多线程并发的相关知识,写一个读书笔记以作记录。包括进程、线程、并发、mutex等等。
一、进程、线程、并发
- 进程:简而言之就是一个运行的程序,如点开一个exe文件就打开了一个进程。
- 线程:进程中的不同执行路径,即在一个进程中运行了多个功能。每一个进程都至少包含有一个线程,即主线程,主线程与进程的关系是相互依存的。
- 并发:分为多线程和多进程,多进程同时运行和多线程同时运行的情况。例如,同时打开两个QQ客户端是多进程,VStudio中多个窗口线程是多线程并发。
二、thread
库
以前,由于系统环境的不同,如Windows和Linux,需要选择不同的线程库进行代码编写,可移植性不高。C11之后有了标准的线程库:<u>std::thread</u>
。除此之外,C还提供了另外三个库来支持多线程编程,分别是 <mutex>
, <condition_variable>
和 <future>
。
thread
类
thread
类是thread
库多线程实现的基础。以下是其构造函数及示例代码:
构造函数
thread()
:默认构造函数,创建一个空的std::thread
执行对象(在线程池的实现中需要提前创建一定数量的线程对象)。thread(Fn&& fn, Args&&...)
:初始化构造函数,创建一个std::thread
对象,该对象可被 joinable,新产生的线程会调用fn
函数(即可调用对象),该函数的参数由args
给出。thread(const thread&) = delete
:拷贝构造函数(被禁用),意味着std::thread
对象不可拷贝构造,线程在同一时刻仅能由一个线程对象运行。thread(thread&& x)
:转移/移动构造函数,调用成功之后x
不再代表任何std::thread
执行对象,相当于将“线程”的所有权转给了另外的线程对象。
例子
以下示例展示了如何创建线程:
#include <iostream> // std::cout
#include <thread> // std::thread
void func1() {
for (int i = 0; i != 10; ++i) {
std::cout << "thread 1 print " << i << std::endl;
}
}
void func2(int n) {
std::cout << "thread 2 print " << n << std::endl;
}
int main() {
std::thread t1(func1);
std::thread t2(func2, 123);
std::cout << "main, foo and bar now execute concurrently...\n";
t1.join(); // 阻塞直到第一个线程完成
t2.join(); // 阻塞直到第二个线程完成
std::cout << "thread 1 and thread 2 completed.\n";
system("pause");
return 0;
}
结果:
thread 1 print 0
thread 1 print 1
...
thread 2 print 123
...
main, foo and bar now execute concurrently...
thread 1 and thread 2 completed.
重要函数
join()
:阻塞主线程直到子线程执行完毕。detach()
:将线程与主线程分离,线程被运行时库接管。若在Linux环境下运行,即使Ctrl+C退出主线程,子线程依旧运行。get_id()
:获取线程 ID,返回一个类型为std::thread::id
的对象。joinable()
:检查线程是否可被 join。swap()
:交换两个线程对象所代表的底层句柄资源。
例子
以下示例展示了 swap
函数的使用:
#include <iostream>
#include <thread>
#include <chrono>
void foo() {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void bar() {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main() {
std::thread t1(foo);
std::thread t2(bar);
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
std::swap(t1, t2);
std::cout << "after std::swap(t1, t2):" << std::endl;
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
t1.swap(t2);
std::cout << "after t1.swap(t2):" << std::endl;
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
t1.join();
t2.join();
}
结果:
thread 1 id: ...
thread 2 id: ...
after std::swap(t1, t2):
thread 1 id: ...
thread 2 id: ...
after t1.swap(t2):
thread 1 id: ...
thread 2 id: ...
其他函数
yield()
:当前线程放弃执行,操作系统调度另一线程继续执行。sleep_until
:线程休眠至某个指定的时刻,被重新唤醒。sleep_for
:线程休眠某个指定的时间片,详见例子。
#include <iostream>
#include <chrono>
#include <thread>
int main() {
std::cout << "Hello waiter" << std::endl;
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);
std::cout << "Waited 2000 ms\n";
}
三、mutex
库
互斥
互斥是指散布在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务不能运行其余程序片段,需等待该任务运行完毕。
mutex
类
std::mutex
是最基本的互斥量类,不支持递归地对 std::mutex
对象上锁,而 std::recursive_mutex
则可以。
常用函数
lock()
:锁住互斥量,可能发生死锁。unlock()
:解锁,释放对互斥量的所有权。try_lock()
:尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程不会被阻塞。可能发生死锁。std::recursive_mutex
:允许同一个线程多次上锁解锁。
例子
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error
std::mutex mtx;
void print_even(int x) {
if (x % 2 == 0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id(int id) {
try {
std::lock_guard<std::mutex> lck(mtx);
print_even(id);
} catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto& th : threads) th.join();
return 0;
}
输出:
[exception caught]
2 is even
...
10 is even
unique_lock
unique_lock
对象以独占所有权的方式管理 mutex
对象的上锁和解锁操作。
四、条件变量 condition_variable
条件变量 (Condition Variable) 在多线程程序中用于实现“等待->唤醒”逻辑。
例子
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥锁
std::condition_variable cv; // 全局条件变量
bool ready = false; // 全局标志位
void do_print_id(int id) {
std::unique_lockstd::mutex lck(mtx);
while (!ready) // 如果标志位不为 true,则等待
cv.wait(lck); // 当前线程被阻塞,直到全局标志位变为 true 后被唤醒
std::cout << "thread " << id << '\n'; // 打印线程编号
}
void go() {
std::unique_lockstd::mutex lck(mtx);
ready = true; // 设置全局标志位为 true
cv.notify_all(); // 唤醒所有线程
}
int main() {
std::thread threads[10];
// 创建10个线程
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // 唤醒所有等待线程
for (auto& th : threads)
th.join(); // 主线程等待所有子线程完成
return 0;
}
.
输出:
10 threads ready to race...
thread 0
thread 1
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9
解释
在上述代码中,当调用 cv.wait()
时,如果 ready
变量不为 true
,当前线程将被阻塞。当 go()
函数调用 cv.notify_all()
时,所有被阻塞的线程将被唤醒并继续执行。
std::condition_variable
提供的其他函数
wait_for
:等待一段指定的时间。wait_until
:等待直到某个指定的时刻。
输出:
thread 0
thread 1
thread 2
...
thread 9
五、异步调用 future
std::async
std::async
提供了一种启动异步任务的机制,可以通过 std::future
对象获取异步操作的结果。
例子
#include <iostream>
#include <future>
#include <chrono>
bool is_prime(int x) {
for (int i = 2; i < x; ++i) {
if (x % i == 0) return false;
}
return true;
}
int main() {
// 异步启动任务
std::future<bool> fut = std::async(is_prime, 700020007);
std::cout << "please wait";
std::chrono::milliseconds span(100);
while (fut.wait_for(span) != std::future_status::ready)
std::cout << ".";
bool ret = fut.get();
std::cout << "\nfinal result: " << std::boolalpha << ret << std::endl;
return 0;
}
输出:
please wait........
final result: true
std::promise
std::promise
是一个用于在一个线程中设置值,并允许另一个线程通过 std::future
获取该值的类。
例子
#include <iostream>
#include <thread>
#include <future>
void set_value(std::promise<int>& prom) {
prom.set_value(10);
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(set_value, std::ref(prom));
std::cout << "value: " << fut.get() << std::endl;
t.join();
return 0;
}
输出:
value: 10
六、原子操作 atomic
std::atomic
是 C++11 提供的用于实现原子操作的类型,避免数据竞争。
std::atomic_flag
std::atomic_flag
是一个简单的原子布尔类型,可以用于实现互斥锁。
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <sstream>
std::atomic_flag lock = ATOMIC_FLAG_INIT;
std::stringstream stream;
void append_number(int x) {
while (lock.test_and_set()) {} // 自旋锁
stream << "thread#" << x << "\n";
lock.clear();
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i)
threads.push_back(std::thread(append_number, i));
for (auto& th : threads)
th.join();
std::cout << stream.str();
return 0;
}
输出:
thread#0
thread#1
...
thread#9
例子:std::atomic
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
std::atomic<bool> ready(false);
std::atomic_flag winner = ATOMIC_FLAG_INIT;
void count1m(int id) {
while (!ready) {}
for (int i = 0; i < 1000000; ++i) {}
if (!winner.test_and_set()) {
std::cout << "winner: " << id << std::endl;
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i)
threads.push_back(std::thread(count1m, i));
ready = true;
for (auto& th : threads)
th.join();
return 0;
}
输出:
winner: 0
七、锁的详解
std::lock_guard
std::lock_guard
是 C++11 中引入的一个 RAII 风格的锁管理器,用于自动管理锁的生命周期。
特点
- 简单易用:
std::lock_guard
在构造时获取锁,在析构时释放锁。 - 异常安全:即使在异常情况下,
std::lock_guard
也能保证锁的释放。
例子
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx; // 全局互斥锁
void print_even(int x) {
if (x % 2 == 0) std::cout << x << " is even\n";
else throw std::logic_error("not even");
}
void print_thread_id(int id) {
try {
std::lock_guard<std::mutex> lck(mtx); // 自动获取并释放锁
print_even(id);
} catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto& th : threads) th.join();
return 0;
}
输出:
[exception caught]
2 is even
[exception caught]
4 is even
...
10 is even
std::unique_lock
std::unique_lock
提供了比 std::lock_guard
更加灵活的锁管理。
特点
- 灵活性:
std::unique_lock
可以延迟锁定、提前解锁及再次锁定。 - 条件变量:在使用条件变量时,必须使用
std::unique_lock
。
例子
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id(int id) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [] { return ready; }); // 等待 ready 为 true
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all(); // 唤醒所有等待的线程
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_id, i);
std::this_thread::sleep_for(std::chrono::seconds(1));
go();
for (auto& th : threads) th.join();
return 0;
}
输出:
thread 0
thread 1
thread 2
...
thread 9
std::shared_lock
std::shared_lock
是 C++14 引入的用于共享所有权的锁管理器,适用于读多写少的场景。
特点
- 共享模式:允许多个线程共享读取权限,但写操作必须独占。
- 提升并发性:适用于读多写少的场景,提升并发性能。
例子
#include <iostream>
#include <shared_mutex>
#include <thread>
#include <vector>
std::shared_mutex shared_mtx;
void reader(int id) {
std::shared_lock<std::shared_mutex> lock(shared_mtx);
std::cout << "Reader " << id << " is reading.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
void writer(int id) {
std::unique_lock<std::shared_mutex> lock(shared_mtx);
std::cout << "Writer " << id << " is writing.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(reader, i);
}
for (int i = 0; i < 2; ++i) {
threads.emplace_back(writer, i);
}
for (auto& th : threads) {
th.join();
}
return 0;
}
输出:
Reader 0 is reading.
Reader 1 is reading.
Reader 2 is reading.
Writer 0 is writing.
Reader 3 is reading.
Reader 4 is reading.
Writer 1 is writing.
八、线程池
介绍
线程池是一种常用的多线程设计模式,它通过复用一组线程来执行任务,从而避免频繁创建和销毁线程的开销。线程池在需要频繁创建和销毁线程的场景中尤其有用,例如服务器处理请求、并行计算等。
实现
以下是一个简单的线程池实现:
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
}
使用线程池
int main() {
ThreadPool pool(4);
auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 1, 2);
auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 3, 4);
std::cout << "1 + 2 = " << result1.get() << std::endl;
std::cout << "3 * 4 = " << result2.get() << std::endl;
return 0;
}
输出:
1 + 2 = 3
3 * 4 = 12
优点
- 复用线程:避免频繁创建销毁线程的开销。
- 任务并行:提高任务处理的并行度。
- 管理方便:统一管理线程和任务。