任务

除了线程之外,C++还有可以异步处理任务,这种方式处理任务需要包含<future>头文件。任务由一个参数化工作包和两个组件组成:promise和future,两者构建一条数据通道。promise执行工作包并将结果放入数据通道,对应的future可以获取结果,两个通信端可以在不同的线程中运行。特别的是future可以在之后的某个时间点获取结果,所以通过promise计算结果与通过future查询结果的步骤是分开的。

将任务视为通信端间的数据通道

任务的行为类似于通信点之间的数据通道。数据通道的一端称为promise,另一端称为future。这些端点可以存在于相同的线程中,也可以存在于不同的线程中。promise将其结果放入数据通道,future会在晚些时候把结果取走。

任务 - 图1

任务 vs. 线程

任务与线程有很大的不同。

  1. // asyncVersusThread.cpp
  2. #include <future>
  3. #include <thread>
  4. #include <iostream>
  5. int main() {
  6. std::cout << std::endl;
  7. int res;
  8. std::thread t([&] {res = 2000 + 11; });
  9. t.join();
  10. std::cout << "res: " << res << std::endl;
  11. auto fut = std::async([] {return 2000 + 11; });
  12. std::cout << "fut.get(): " << fut.get() << std::endl;
  13. std::cout << std::endl;
  14. }

线程tstd::async异步调用函数同时计算2000和11的和。主线程通过共享变量res获取其线程t的计算结果,并在第14行中显示它。第16行中,使用std::async在发送方(promise)和接收方(future)之间创建数据通道。future 变量使用fut.get()(第17行),通过数据通道获得计算结果。fut.get为阻塞调用。

下面是程序输出的结果:

任务 - 图2

基于这个程序,我想强调线程和任务之间的区别。

任务 vs. 线程

标准 线程 任务
构成元素 创建线程和子线程 promise和future
通讯方式 共享变量 通信通道
创建线程 必定创建 可选
同步方式 通过join()(等待) 使用get阻塞式调用
线程中的异常 子线程和创建线程终止 返回promise的值
通信类型 变量值 变量值、通知和异常

线程需要包含<thread>头文件,任务需要包含<future>头文件。

创建线程和子线程之间的通信需要使用共享变量,任务通过其隐式的数据通道保护数据通信。因此,任务不需要互斥锁之类的保护机制。

虽然,可以使用共享变量(的可变)来在子线程及其创建线程之间进行通信,但任务的通信方式更为明确。future只能获取一次任务的结果(通过调用fut.get()),多次调用它会导致未定义的行为(而std::shared_future可以查询多次)。

创建线程需要等待子线程汇入。而使用fut.get()时,该调用将一直阻塞,直到获取结果为止。

如果子线程中抛出异常,创建的线程将终止,创建者和整个进程也将终止。相反,promise可以将异常发送给future,而future必须对异常进行处理。

一个promise可以对应于一个或多个future。它可以发送值、异常,或者只是通知,可以使用它们替换条件变量。

std::async是创建future最简单的方法。

std::async

std::async的行为类似于异步函数调用,可调用带有参数的函数。std::async是一个可变参数模板,因此可以接受任意数量的参数。对std::async的调用会返回一个future 的对象fut。可以通过fut.get()获得结果。

std::async应该首选

C++运行时决定std::async是否在独立的线程中执行,决策可能取决于可用的CPU内核的数量、系统的利用率或工作包的大小。通过使用std::async,只需要指定运行的任务,C++运行时会自动管理线程。

可以指定std::async的启动策略。

启动策略

使用启动策略,可以显式地指定异步调用应该在同一线程(std::launch::deferred)中执行,还是在不同线程(std::launch::async)中执行。

及早求值)与惰性求值)

及早求值与惰性求值是计算结果表达式的两种策略。在及早求值的情况下,立即计算表达式,而在惰性求值 的情况下,仅在需要时才计算表达式。及早求值通常称为贪婪求值,而惰性求值通常称为按需调用。使用惰性求值,可以节省时间和计算资源。

调用auto fut = std::async(std::launch::deferred,…)的特殊之处在于,promise可能不会立即执行,调用fut.get()时才执行对应的promise 。这意味着,promise只在future调用fut.get()时计算得到结果。

  1. // asyncLazy.cpp
  2. #include <chrono>
  3. #include <future>
  4. #include <iostream>
  5. int main() {
  6. std::cout << std::endl;
  7. auto begin = std::chrono::system_clock::now();
  8. auto asyncLazy = std::async(std::launch::deferred,
  9. [] {return std::chrono::system_clock::now(); });
  10. auto asyncEager = std::async(std::launch::async,
  11. [] {return std::chrono::system_clock::now(); });
  12. std::this_thread::sleep_for(std::chrono::seconds(1));
  13. auto lazyStart = asyncLazy.get() - begin;
  14. auto eagerStart = asyncEager.get() - begin;
  15. auto lazyDuration = std::chrono::duration<double>(lazyStart).count();
  16. auto eagerDuration = std::chrono::duration<double>(eagerStart).count();
  17. std::cout << "asyncLazy evaluated after : " << lazyDuration
  18. << " seconds." << std::endl;
  19. std::cout << "asyncEager evaluated after : " << eagerDuration
  20. << " seconds." << std::endl;
  21. std::cout << std::endl;
  22. }

两个std::async调用(第13行和第16行)都返回当前时间点。但是,第一个调用是lazy,第二个调用是eager。第21行中的asyncLazy.get()调用触发了第13行promise的执行——短睡一秒(第19行)。这对于asyncEager来说是不存在的,asyncEager.get()会立即获取执行结果。

下面就是该程序输出的结果:

任务 - 图3

不必把future绑定到变量上。

发后即忘)(Fire and Forget)

发后即忘是比较特殊的future。因为其future不受某个变量的约束,所以只是在原地执行。对于一个发后即忘的future,相应的promise运行在一个不同的线程中,所以可以立即开始(这是通过std::launch::async策略完成的)。

我们对普通的future和发后即忘的future进行比较。

  1. auto fut= std::async([]{ return 2011; });
  2. std::cout << fut.get() << std::endl;
  3. std::async(std::launch::async,
  4. []{ std::cout << "fire and forget" << std::endl; });

发后即忘的future看起来很有美好,但有一个很大的缺点。std::async创建的future会等待promise完成,才会进行析构。这种情况下,等待和阻塞就没有太大的区别了。future的析构函数会中阻塞程序的进程,当使用发后即忘的future时,这一点变得更加明显,看起来程序上是并发的,但实际上是串行运行的。

  1. // fireAndForgetFutures.cpp
  2. #include <chrono>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. int main() {
  7. std::cout << std::endl;
  8. std::async(std::launch::async, [] {
  9. std::this_thread::sleep_for(std::chrono::seconds(2));
  10. std::cout << "first thread" << std::endl;
  11. });
  12. std::async(std::launch::async, [] {
  13. std::this_thread::sleep_for(std::chrono::seconds(2));
  14. std::cout << "second thread" << std::endl; }
  15. );
  16. std::cout << "main thread" << std::endl;
  17. std::cout << std::endl;
  18. }

程序在线程中执行两个promise,这样就会产生发后即忘的future。future在析构函数中阻塞线程,直到相关的promise完成。promise是按照源代码顺序执行的,执行顺序与执行时间无关。

任务 - 图4

std::async是一种方便的机制,可用于在分解较大的计算任务。

并行计算

标量乘积的计算可分布在四个异步调用中。

  1. // dotProductAsync.cpp
  2. #include <iostream>
  3. #include <future>
  4. #include <random>
  5. #include <vector>
  6. #include <numeric>
  7. using namespace std;
  8. static const int NUM = 100000000;
  9. long long getDotProduct(vector<int>& v, vector<int>& w) {
  10. auto vSize = v.size();
  11. auto future1 = async([&] {
  12. return inner_product(&v[0], &v[vSize / 4], &w[0], 0LL);
  13. });
  14. auto future2 = async([&] {
  15. return inner_product(&v[vSize / 4], &v[vSize / 2], &w[vSize / 4], 0LL);
  16. });
  17. auto future3 = async([&] {
  18. return inner_product(&v[vSize / 2], &v[vSize * 3 / 4], &w[vSize / 2], 0LL);
  19. });
  20. auto future4 = async([&] {
  21. return inner_product(&v[vSize * 3 / 4], &v[vSize], &w[vSize * 3 / 4], 0LL);
  22. });
  23. return future1.get() + future2.get() + future3.get() + future4.get();
  24. }
  25. int main() {
  26. cout << endl;
  27. random_device seed;
  28. // generator
  29. mt19937 engine(seed());
  30. // distribution
  31. uniform_int_distribution<int> dist(0, 100);
  32. // fill the vector
  33. vector<int> v, w;
  34. v.reserve(NUM);
  35. w.reserve(NUM);
  36. for (int i = 0; i < NUM; ++i) {
  37. v.push_back(dist(engine));
  38. w.push_back(dist(engine));
  39. }
  40. cout << "getDotProduct(v, w): " << getDotProduct(v, w) << endl;
  41. cout << endl;
  42. }

该程序使用了随机库和时间库,创建两个向量vw并用随机数填充(第50-56行),每个向量添加(第53 - 56行)1亿个元素。第54和55行中的dist(engine)生成均匀分布在0到100之间的随机数。标量乘积的计算在getDotProduct中进行(第13 - 34行)。内部实现中,std::async使用标准库算法std::inner_product。最后,使用future获取结果进行相加,就得到了最终结果。

任务 - 图5

std::packaged_task通常也用于并发。

std::packaged_task

std::packaged_task是用于异步调用的包装器。通过pack.get_future()可以获得相关的future。可以使用可调用操作符pack(pack())执行std::packaged_task

处理std::packaged_task通常包括四个步骤:

I. 打包:

  1. std::packaged_task<int(int, int)> sumTask([](int a, int b){ return a + b; });

II. 创建future:

  1. std::future<int> sumResult= sumTask.get_future();

III. 执行计算:

  1. sumTask(2000, 11);

IV. 查询结果:

  1. sumResult.get();

下面的示例,展示了这四个步骤。

  1. // packagedTask.cpp
  2. #include <utility>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. #include <deque>
  7. class SumUp {
  8. public:
  9. int operator()(int beg, int end) {
  10. long long int sum{ 0 };
  11. for (int i = beg; i < end; ++i) sum += i;
  12. return static_cast<int>(sum);
  13. }
  14. };
  15. int main() {
  16. std::cout << std::endl;
  17. SumUp sumUp1;
  18. SumUp sumUp2;
  19. SumUp sumUp3;
  20. SumUp sumUp4;
  21. // wrap the task
  22. std::packaged_task<int(int, int)> sumTask1(sumUp1);
  23. std::packaged_task<int(int, int)> sumTask2(sumUp2);
  24. std::packaged_task<int(int, int)> sumTask3(sumUp3);
  25. std::packaged_task<int(int, int)> sumTask4(sumUp4);
  26. // create the futures
  27. std::future<int> sumResult1 = sumTask1.get_future();
  28. std::future<int> sumResult2 = sumTask2.get_future();
  29. std::future<int> sumResult3 = sumTask3.get_future();
  30. std::future<int> sumResult4 = sumTask4.get_future();
  31. // push the task on the container
  32. std::deque<std::packaged_task<int(int, int)>> allTasks;
  33. allTasks.push_back(std::move(sumTask1));
  34. allTasks.push_back(std::move(sumTask2));
  35. allTasks.push_back(std::move(sumTask3));
  36. allTasks.push_back(std::move(sumTask4));
  37. int begin{ 1 };
  38. int increment{ 2500 };
  39. int end = begin + increment;
  40. // preform each calculation in a separate thread
  41. while (!allTasks.empty()) {
  42. std::packaged_task<int(int, int)> myTask = std::move(allTasks.front());
  43. allTasks.pop_front();
  44. std::thread sumThread(std::move(myTask), begin, end);
  45. begin = end;
  46. end += increment;
  47. sumThread.detach();
  48. }
  49. // pick up the results
  50. auto sum = sumResult1.get() + sumResult2.get() +
  51. sumResult3.get() + sumResult4.get();
  52. std::cout << "sum of 0 .. 10000 = " << sum << std::endl;
  53. std::cout << std::endl;
  54. }

这段程序的是计算从0到10000的整数和。创建四个std::packaged_task的对象,并且每个std::packaged_task有自己的线程,并使用future来汇总结果。当然,也可以直接使用Gaußschen Summenformel(高斯求和公式)。真奇怪,我没有找到英文网页。(译者注:打开网页就是最熟悉的高斯求和公式,也就是等差数列求和公式)。翻了下维基百科,确实没有相关的英文页面。)

I. 打包任务:程序将工作包打包进std::packaged_task(第28 - 31行)的实例中,工作包就是SumUp的实例(第9 - 16行),使用函数操作符完成任务(第11 - 15行)。函数操作符将begend - 1的所有整数相加并返回结果。第28 - 31行中的std::packaged_task实例可以处理需要两个int参数的函数调用,并返回一个int: int(int, int)类型的任务包。

II.创建future:第34到37行中,使用std::packaged_task创建future对象,这时std::packaged_task对象属于通信通道中的promise。future的类型有明确定义:std::future<int> sumResult1 = sumTask1.get_future(),也可以让编译器来确认future的具体类型:auto sumResult1 sumTask1.get_future()

III. 进行计算:开始计算。将任务包移动到std::deque(第40 - 44行)中,while循环(第51 - 58行)会执行每个任务包。为此,将std::deque的队头任务包移动到一个std::packaged_task实例中(第52行),并将这个实例移动到一个新线程中(第54行),并让这个线程在后台运行(第57行)。因为packaged_task对象不可复制的,所以会在52和54行中使用move语义。这个限制不仅适用于所有的promise实例,但也适用于future和线程实例。但有一个例外:std::shared_future

IV. 查询结果:最后一步中,从每个future获取计算的结果,并把它们加起来(第61行)。

任务 - 图6

下表展示std::packaged_task pack的接口

成员函数 函数描述
pack.swap(pack2)std::swap(pack, pack2) 交换对象
pack.valid() 检查对象中的函数是否合法
pack.get_future() 返回future
pack.make_ready_at_thread_exit(ex) 执行的函数,如果线程还存在,那么结果还是可用的
pack.reset() 重置任务的状态,擦除之前执行的结果

std::asyncstd::promise相比,std::packaged_task可以复位并重复使用。下面的程序展示了std::packaged_task的“特殊”使用方式。

  1. // packagedTaskReuse.cpp
  2. #include <functional>
  3. #include <future>
  4. #include <iostream>
  5. #include <utility>
  6. #include <vector>
  7. void calcProducts(std::packaged_task<int(int, int)>& task,
  8. const std::vector<std::pair<int, int>>& pairs) {
  9. for (auto& pair : pairs) {
  10. auto fut = task.get_future();
  11. task(pair.first, pair.second);
  12. std::cout << pair.first << " * " << pair.second << " = " << fut.get()<<
  13. std::endl;
  14. task.reset();
  15. }
  16. }
  17. int main() {
  18. std::cout << std::endl;
  19. std::vector<std::pair<int, int>> allPairs;
  20. allPairs.push_back(std::make_pair(1, 2));
  21. allPairs.push_back(std::make_pair(2, 3));
  22. allPairs.push_back(std::make_pair(3, 4));
  23. allPairs.push_back(std::make_pair(4, 5));
  24. std::packaged_task<int(int, int)> task{ [](int fir, int sec) {
  25. return fir * sec; }
  26. };
  27. calcProducts(task, allPairs);
  28. std::cout << std::endl;
  29. std::thread t(calcProducts, std::ref(task), allPairs);
  30. t.join();
  31. std::cout << std::endl;
  32. }

函数calcProduct(第9行)有两个参数:taskpairs。使用任务包task来计算pairs中的每个整数对的乘积(第13行),并在第16行重置任务task。这样,calcProduct就能在主线程(第34行)和另外开启的线程(第38行)中运行。下面是程序的输出。

任务 - 图7

std::promise和std::future

std::promisestd::future可以完全控制任务。

promise和future是一对强有力的组合。promise可以将值、异常或通知放入数据通道。一个promise可以对应多个std::shared_future对象。

下面是std::promisestd::future用法的示例。两个通信端点都可以在不同的的线程中,因此通信可以在线程间发生。

  1. // promiseFuture.cpp
  2. #include <future>
  3. #include <iostream>
  4. #include <thread>
  5. #include <utility>
  6. void product(std::promise<int>&& intPromise, int a, int b) {
  7. intPromise.set_value(a * b);
  8. }
  9. struct Div {
  10. void operator()(std::promise<int>&& intPromise, int a, int b) const {
  11. intPromise.set_value(a / b);
  12. }
  13. };
  14. int main() {
  15. int a = 20;
  16. int b = 10;
  17. std::cout << std::endl;
  18. // define the promises
  19. std::promise<int> prodPromise;
  20. std::promise<int> divPromise;
  21. // get the futures
  22. std::future<int> prodResult = prodPromise.get_future();
  23. std::future<int> divResult = divPromise.get_future();
  24. // calculate the result in a separate thread
  25. std::thread prodThread(product, std::move(prodPromise), a, b);
  26. Div div;
  27. std::thread divThread(div, std::move(divPromise), a, b);
  28. // get the result
  29. std::cout << "20*10 = " << prodResult.get() << std::endl;
  30. std::cout << "20/10 = " << divResult.get() << std::endl;
  31. prodThread.join();
  32. divThread.join();
  33. std::cout << std::endl;
  34. }

将函数product(第8 -10行)、prodPromise(第32行)以及数字ab放入线程Thread prodThread(第36行)中。prodThread的第一个参数需要一个可调用的参数,上面程序中就是函数乘积函数。函数需要一个类型右值引用的promise(std::promise<int>&& intPromise)和两个数字。std::move(第36行)创建一个右值引用。剩下的就简单了,divThread(第38行)将ab分开传入。

future通过prodResult.get()divResult.get()获取结果

任务 - 图8

std::promise

std::promise允许设置一个值、一个通知或一个异常。此外,promise可以以延迟的方式提供结果。

std::promise prom的成员函数

成员函数 函数描述
prom.swap(prom2)std::swap(prom, prom2) 交换对象
prom.get_future() 返回future
prom.set_value(val) 设置值
prom.set_exception(ex) 设置异常
prom.set_value_at_thread_exit(val) promise退出前存储该值
prom.set_exception_at_thread_exit(ex) promise退出前存储该异常

如果多次对promise设置值或异常,则会抛出std::future_error

std::future

std::future可以完成的事情有:

  • 从promise中获取值。
  • 查询promise值是否可获取。
  • 等待promise通知,这种等待可以用一个时间段或一个绝对的时间点来完成。
  • 创建共享future(std::shared_future)。

future实例fut的成员函数

成员函数 函数描述
fut.share() 返回std::shared_future
fut.get() 返回可以是值或异常
fut.valid() 检查当前实例是否可用调用fut.get()。使用get()之后,返回false
fut.wait() 等待结果
fut.wait_for(relTime) relTime时间段内等待获取结果,并返回std:: future_status实例
fut.wait_until(absTime) absTime时间点前等待获取结果,并返回std:: future_status实例

wait不同,wait_forwait_until会返回future的状态。

std::future_status

future和共享future的wait_forwait_until成员函数将返回其状态。有三种可能:

  1. enum class future_status {
  2. ready,
  3. timeout,
  4. deferred
  5. };

下表描述了每种状态:

状态 描述
deferred 函数还未运行
ready 结果已经准备就绪
timeout 结果超时得到,视为过期

使用wait_forwait_until可以一直等到相关的promise完成。

  1. // waitFor.cpp
  2. #include <iostream>
  3. #include <future>
  4. #include <thread>
  5. #include <chrono>
  6. using namespace std::literals::chrono_literals;
  7. void getAnswer(std::promise<int> intPromise) {
  8. std::this_thread::sleep_for(3s);
  9. intPromise.set_value(42);
  10. }
  11. int main() {
  12. std::cout << std::endl;
  13. std::promise<int> answerPromise;
  14. auto fut = answerPromise.get_future();
  15. std::thread prodThread(getAnswer, std::move(answerPromise));
  16. std::future_status status{};
  17. do {
  18. status = fut.wait_for(0.2s);
  19. std::cout << "... doing something else" << std::endl;
  20. } while (status != std::future_status::ready);
  21. std::cout << std::endl;
  22. std::cout << "The Answer: " << fut.get() << '\n';
  23. prodThread.join();
  24. std::cout << std::endl;
  25. }

在futurefut在等待promise时,可以执行其他操作。

任务 - 图9

如果多次获取futurefut的结果,会抛出std::future_error异常。

promise和future是一对一的关系,而std::shared_future支持一个promise 对应多个future。

std::shared_future

创建std::shared_future的两种方式:

  1. 通过promise实例prom创建std::shared_future:std::shared_future<int> fut = prom.get_future()
  2. 使用futfut.share()进行创建。执行了fut.share()后,fut.valid()会返回false。

共享future是与相应的promise相关联的,可以获取promise的结果。共享future与std::future有相同的接口。

除了有std::future的功能外,std::shared_future还允许和其他future查询关联promise的值。

std::shared_future的操作很特殊,下面的代码中就直接创建了一个std::shared_future

  1. // sharedFuture.cpp
  2. #include <future>
  3. #include <iostream>
  4. #include <thread>
  5. #include <utility>
  6. std::mutex coutMutex;
  7. struct Div {
  8. void operator()(std::promise<int>&& intPromise, int a, int b) {
  9. intPromise.set_value(a / b);
  10. }
  11. };
  12. struct Requestor {
  13. void operator()(std::shared_future<int> shaFut) {
  14. // lock std::cout
  15. std::lock_guard<std::mutex> coutGuard(coutMutex);
  16. // get the thread id
  17. std::cout << "threadId(" << std::this_thread::get_id() << "): ";
  18. std::cout << "20/10= " << shaFut.get() << std::endl;
  19. }
  20. };
  21. int main() {
  22. std::cout << std::endl;
  23. // define the promises
  24. std::promise<int> divPromise;
  25. // get the futures
  26. std::shared_future<int> divResult = divPromise.get_future();
  27. // calculate the result in a separate thread
  28. Div div;
  29. std::thread divThread(div, std::move(divPromise), 20, 10);
  30. Requestor req;
  31. std::thread sharedThread1(req, divResult);
  32. std::thread sharedThread2(req, divResult);
  33. std::thread sharedThread3(req, divResult);
  34. std::thread sharedThread4(req, divResult);
  35. std::thread sharedThread5(req, divResult);
  36. divThread.join();
  37. sharedThread1.join();
  38. sharedThread2.join();
  39. sharedThread3.join();
  40. sharedThread4.join();
  41. sharedThread5.join();
  42. std::cout << std::endl;
  43. }

promise和future的工作包都是函数对象。第46行中将divPromise移动到线程divThread中执行,因此会将std::shared_future复制到5个线程中(第49 - 53行)。与只能移动的std::future对象不同,可以std::shared_future对象可以进行复制。

主线程在第57到61行等待子线程完成它们的任务。

任务 - 图10

前面提到过,可以通过使用std::future的成员函数创建std::shared_future。我们把上面的代码改一下。

  1. // sharedFutureFromFuture.cpp
  2. #include <future>
  3. #include <iostream>
  4. #include <thread>
  5. #include <utility>
  6. std::mutex coutMutex;
  7. struct Div {
  8. void operator()(std::promise<int>&& intPromise, int a, int b) {
  9. intPromise.set_value(a / b);
  10. }
  11. };
  12. struct Requestor {
  13. void operator()(std::shared_future<int> shaFut) {
  14. // lock std::cout
  15. std::lock_guard<std::mutex> coutGuard(coutMutex);
  16. // get the thread id
  17. std::cout << "threadId(" << std::this_thread::get_id() << "): ";
  18. std::cout << "20/10= " << shaFut.get() << std::endl;
  19. }
  20. };
  21. int main() {
  22. std::cout << std::boolalpha << std::endl;
  23. // define the promises
  24. std::promise<int> divPromise;
  25. // get the futures
  26. std::future<int> divResult = divPromise.get_future();
  27. std::cout << "divResult.valid(): " << divResult.valid() << std::endl;
  28. // calculate the result in a separate thread
  29. Div div;
  30. std::thread divThread(div, std::move(divPromise), 20, 10);
  31. std::cout << "divResult.valid(): " << divResult.valid() << std::endl;
  32. std::shared_future<int> sharedResult = divResult.share();
  33. std::cout << "divResult.valid(): " << divResult.valid() << "\n\n";
  34. Requestor req;
  35. std::thread sharedThread1(req, sharedResult);
  36. std::thread sharedThread2(req, sharedResult);
  37. std::thread sharedThread3(req, sharedResult);
  38. std::thread sharedThread4(req, sharedResult);
  39. std::thread sharedThread5(req, sharedResult);
  40. divThread.join();
  41. sharedThread1.join();
  42. sharedThread2.join();
  43. sharedThread3.join();
  44. sharedThread4.join();
  45. sharedThread5.join();
  46. std::cout << std::endl;
  47. }

std::future(第44行和第50行)前两次调用divResult.valid()都返回true。第52行执行divResult.share()之后,因为该操作使得状态转换为共享,所以在执行到第54行时,程序会返回false。

任务 - 图11

异常

如果std::asyncstd::packaged_task的工作包抛出错误,则异常会存储在共享状态中。当futurefut调用fut.get()时,异常将重新抛出。

std::promise prom提供了相同的功能,但是它有一个成员函数prom.set_value(std::current_exception())可以将异常设置为共享状态。

数字除以0是未定义的行为,函数executeDivision显示计算结果或异常。

  1. // promiseFutureException.cpp
  2. #include <exception>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. #include <utility>
  7. #ifdef WIN32
  8. #include <string>
  9. #endif
  10. struct Div {
  11. void operator()(std::promise<int>&& intPromise, int a, int b){
  12. try {
  13. if (b == 0) {
  14. std::string errMess = std::string("Illegal division by zero: ") +
  15. std::to_string(a) + "/" + std::to_string(b);
  16. throw std::runtime_error(errMess);
  17. }
  18. intPromise.set_value(a / b);
  19. }
  20. catch (...) {
  21. intPromise.set_exception(std::current_exception());
  22. }
  23. }
  24. };
  25. void executeDivision(int nom, int denom) {
  26. std::promise<int> divPromise;
  27. std::future<int> divResult = divPromise.get_future();
  28. Div div;
  29. std::thread divThread(div, std::move(divPromise), nom, denom);
  30. // get the result or the exception
  31. try {
  32. std::cout << nom << "/" << denom << " = " << divResult.get() << std::endl;
  33. }
  34. catch (std::runtime_error& e) {
  35. std::cout << e.what() << std::endl;
  36. }
  37. divThread.join();
  38. }
  39. int main() {
  40. std::cout << std::endl;
  41. executeDivision(20, 0);
  42. executeDivision(20, 10);
  43. std::cout << std::endl;
  44. }

这个程序中,promise会处理分母为0的情况。如果分母为0,则在第24行中将异常设置为返回值:intPromise.set_exception(std::current_exception())。future需要在try-catch中处理异常(第37 - 42行)。

下面是程序的输出。

任务 - 图12

std::current_exception和std::make_exception_ptr

std::current_exception()捕获当前异常对象,并创建一个 std:: exception_ptrstd::exception_ptr保存异常对象的副本或引用。如果在没有异常处理时调用该函数,则返回一个空的std::exception_ptr

为了不在try/catch中使用intPromise.set_exception(std::current_exception())检索抛出的异常,可以直接调用intPromise.set_exception(std::make_exception_ptr(std::runtime_error(errMess)))

如果在std::promise销毁之前没有调用设置类的成员函数,或是在std::packaged_task调用它,那么std::future_error异常和错误代码std::future_errc::broken_promise将存储在共享future中。

通知

任务是条件变量的一种替代方式。如果使用promise和future来同步线程,它们与条件变量有很多相同之处。大多数时候,promise和future是更好的选择。

在看例子之前,先了解下任务和条件变量的差异。

对比标准 条件变量 任务
多重同步 Yes No
临界区保护 Yes No
接收错误处理机制 No Yes
伪唤醒 Yes No
未唤醒 Yes No

与promise和future相比,条件变量的优点是可以多次同步线程,而promise只能发送一次通知,因此必须使用更多promise和future对,才能模拟出条件变量的功能。如果只同步一次,那条件变量正确的使用方式或许将更具大的挑战。promise和future对不需要共享变量,所以不需要锁,并且不大可能出现伪唤醒或未唤醒的情况。除了这些,任务还可以处理异常。所以,在同步线程上我会更偏重于选择任务,而不是条件变量。

还记得使用条件变量有多难吗?如果忘记了,这里展示了两个线程同步所需的关键部分。

  1. void waitingForWork(){
  2. std::cout << "Worker: Waiting for work." << std::endl;
  3. std::unique_lock<std::mutex> lck(mutex_);
  4. condVar.wait(lck, []{ return dataReady; });
  5. doTheWork();
  6. std::cout << "Work done." << std::endl;
  7. }
  8. void setDataReady(){
  9. std::lock_guard<std::mutex> lck(mutex_);
  10. dataReady=true;
  11. std::cout << "Sender: Data is ready." << std::endl;
  12. condVar.notify_one();
  13. }

函数setDataReady为同步通知,函数waitingForWork为同步等待。

使用任务完成相同的工作流程。

  1. // promiseFutureSynchronise.cpp
  2. #include <future>
  3. #include <iostream>
  4. #include <utility>
  5. void doTheWork() {
  6. std::cout << "Processing shared data." << std::endl;
  7. }
  8. void waitingForWork(std::future<void>&& fut) {
  9. std::cout << "Worker: Waiting for work." << std::endl;
  10. fut.wait();
  11. doTheWork();
  12. std::cout << "Work done." << std::endl;
  13. }
  14. void setDataReady(std::promise<void>&& prom) {
  15. std::cout << "Sender: Data is ready." << std::endl;
  16. prom.set_value();
  17. }
  18. int main() {
  19. std::cout << std::endl;
  20. std::promise<void> sendReady;
  21. auto fut = sendReady.get_future();
  22. std::thread t1(waitingForWork, std::move(fut));
  23. std::thread t2(setDataReady, std::move(sendReady));
  24. t1.join();
  25. t2.join();
  26. std::cout << std::endl;
  27. }

是不是非常简单?

通过sendReady(第32行)获得了一个futurefut(第33行),promise使用其返回值void (std::promise<void> sendReady)进行通信,并且只能够发送通知。两个通信端点分别移动到线程t1t2中(第35行和第36行),调用fut.wait()(第15行)等待promise的通知(prom.set_value()(第24行))。

程序结构和输出,与条件变量章节程序的输出一致。

任务 - 图13