假设你乘飞机去国外度假。当你到达机场并办理完各种登机手续后,还需要等待机场广播通知登机时(可能要等很多个小时),你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯价格不菲的机场咖啡。不过,从根本上来说你就在等待一件事情:机场广播能够登机的时间。之前飞机的班次在之后没有可参考性,因为当你在再次度假的时候,可能会选择等待另一班飞机。

C++标准库模型将这种一次性事件称为期望值(future)。当线程需要等待特定的一次性事件时,某种程度上来说就需要知道这个事件在未来的期望结果。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望值的状态会变为就绪(ready)。一个期望值可能是数据相关的(比如,你的登机口编号),也可能不是。当事件发生时(并且期望状态为就绪),并且这个期望值就不能被重置。

C++标准库中,有两种期望值,使用两种类型模板实现,声明在<future>头文件中:唯一期望值(unique futures)(std::future<>)和共享期望值(shared futures)(std::shared_future<>)。仿照了std::unique_ptrstd::shared_ptrstd::future的实例只能与一个指定事件相关联,而std::shared_future的实例就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如std::unique_ptrstd::shared_ptr的模板参数就是相关联的数据类型。与数据无关处,可以使用std::future<void>std::shared_future<void>的特化模板。虽然,我希望用于线程间的通讯,但是期望值对象本身并不提供同步访问。当多个线程需要访问一个独立期望值对象时,必须使用互斥量或类似同步机制对访问进行保护,如在第3章所述。不过,在将要阅读到的4.2.5节中,多个线程会对一个std::shared_future<>实例的副本进行访问,即使他们是同一个异步结果,也不需要同步期望值。

并行技术规范将这两个模板类在std::experimental命名空间中进行了扩展:std::experimental::future<>std::experimental::shared_future<>。这个命名空间就是为了将其与std命名空间中的模板类进行区分,实验性的命名空间中为这两个模板类添加了更多的功能。尤为重要的是std::experimental中的内容与代码质量无关(我希望这里也会有较高质量的实现),需要强调是这个命名空间提供的类和函数都不是标准的,这个命名空间中类和函数的语法和语义,很可能与纳入C++标准(也就是std命名空间)后有所不同。如果想要使用这两个试验性的模板类,需要包含<experimental/future>头文件。

最简单的一次性事件,就是一个后台运行出的计算结果。第2章中已经清楚了std::thread执行的任务不能有返回值,并且我能保证,这个问题将在使用期望值解决——现在就来看看是怎么解决的。

4.2.1 后台任务的返回值

假设,你有一个需要长时间的运算,你需要其能计算出一个有效的值,但是你现在并不迫切需要这个值。可能你已经找到了生命、宇宙,以及万物的答案,就像道格拉斯·亚当斯[1]一样。你可以启动一个新线程来执行这个计算,这就意味着你需要计算的结果,因为std::thread并不提供直接接收返回值的机制。这里就需要std::async函数模板(也是在头文件<future>中声明的)。

当不着急要任务结果时,可以使用std::async启动一个异步任务。与std::thread对象等待的方式不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数;并且会阻塞线程直到期望值状态为就绪为止;之后,返回计算结果。下面清单中代码就是一个简单的例子。

清单4.6 使用std::future从异步任务中获取返回值

  1. #include <future>
  2. #include <iostream>
  3. int find_the_answer_to_ltuae();
  4. void do_other_stuff();
  5. int main()
  6. {
  7. std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
  8. do_other_stuff();
  9. std::cout << "The answer is " << the_answer.get() << std::endl;
  10. }

std::thread 做的方式一样,std::async允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在std::ref中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。就如std::thread,当参数为右值时,拷贝操作将使用移动的方式转移原始数据。这就允许使用“只移动”类型作为函数对象和参数。来看一下下面的程序清单:

清单4.7 使用std::async向函数传递参数

  1. #include <future>
  2. #include <string>
  3. struct X {
  4. void foo(int, std::string const&);
  5. std::string bar(std::string const&);
  6. };
  7. X x;
  8. auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针
  9. auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
  10. struct Y {
  11. double operator()(double);
  12. };
  13. Y y;
  14. auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
  15. auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
  16. X baz(X&);
  17. auto f6 = std::async(baz, std::ref(x)); // 调用baz(x)
  18. class move_only {
  19. public:
  20. move_only();
  21. move_only(move_only&&);
  22. move_only(move_only const&) = delete;
  23. move_only& operator=(move_only&&);
  24. move_only& operator=(move_only const&) = delete;
  25. void operator()();
  26. };
  27. auto f5 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到

默认情况下,期望值是否等待取决于std::async是否启动一个线程,或是否有任务正在进行同步。大多数情况下(估计这就是你想要的结果),也可以在函数调用之前向std::async传递一个额外参数,这个参数的类型是std::launch,还可以是std::launch::defered,表明函数调用被延迟到wait()或get()函数调用时才执行,std::launch::async表明函数必须在其所在的独立线程上执行,std::launch::deferred | std::launch::async表明实现可以选择这两种方式的一种。最后一个选项是默认的,当函数调用被延迟,它可能不会在运行了。如下所示:

  1. auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
  2. auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在wait()或get()调用时执行
  3. auto f8 = std::async(
  4. std::launch::deferred | std::launch::async,
  5. baz, std::ref(x)); // 实现选择执行方式
  6. auto f9 = std::async(baz, std::ref(x));
  7. f7.wait(); // 调用延迟函数

本章的后面和第8章中,将会再次看到这段程序,使用std::async会更容易让算法分割到各个任务中,这样程序就能并发的执行了。不过,这不是让std::future与任务实例相关联的唯一方式;你也可以将任务包装入std::packaged_task<>实例中,或通过编写代码的方式,使用std::promise<>类型模板显示设置值。与std::promise<>对比,std::packaged_task<>具有更高层的抽象,所以我们从“高抽象”的模板说起。

4.2.3 使用(std::)promises

当一个应用需要处理很多网络连接时,它会使用不同线程尝试连接每个接口,因为这能使网络尽早联通,尽早执行程序。当连接较少的时候,工作没有问题(也就是线程数量比较少)。不幸的是,随着连接数量的增长,这种方式变的越来越不合适;因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子:系统资源被创建的线程消耗殆尽,系统连接网络的能力会变的极差。因此通过少数线程(可能只有一个)处理网络连接,每个线程同时处理多个连接事件,对需要处理大量的网络连接的应用而言是普遍的做法。

考虑一个线程处理多个连接事件,来自不同的端口连接的数据包基本上是以乱序方式进行处理的;同样的,数据包也将以乱序的方式进入队列。很多情况下,另一些应用不是等待数据成功的发送,就是等待一批(新的)来自指定网络接口的数据接收成功。

std::promise<T>提供设定值的方式(类型为T),这个类型会和后面看到的std::future<T>对象相关联。一对std::promise/std::future会为这种方式提供一个可行的机制;期望值可以阻塞等待线程,同时,提供数据的线程可以使用组合中的承诺值来对相关值进行设置,并将期望值的状态置为“就绪”。

可以通过一个给定的std::promise的get_future()成员函数来获取与之相关的std::future对象,跟std::packaged_task的用法类似。当承诺值已经设置完毕(使用set_value()成员函数),对应期望值的状态变为“就绪”,并且可用于检索已存储的值。当在设置值之前销毁std::promise,将会存储一个异常。在4.2.4节中,会详细描述异常是如何传送到线程的。

清单4.10中,单线程处理多接口的实现,如同之前所说,在这个例子中,可以使用一对std::promise<bool>/ std::future<bool>找出一块传出成功的数据块;与期望值相关的只是一个简单的“成功/失败”标识。对于传入包,与期望值相关的数据就是数据包的有效负载。

清单4.10 使用承诺值解决单线程多连接问题

  1. #include <future>
  2. void process_connections(connection_set& connections)
  3. {
  4. while (!done(connections)) // 1
  5. {
  6. for (connection_iterator // 2
  7. connection
  8. = connections.begin(),
  9. end = connections.end();
  10. connection != end;
  11. ++connection) {
  12. if (connection->has_incoming_data()) // 3
  13. {
  14. data_packet data = connection->incoming();
  15. std::promise<payload_type>& p = connection->get_promise(data.id); // 4
  16. p.set_value(data.payload);
  17. }
  18. if (connection->has_outgoing_data()) // 5
  19. {
  20. outgoing_packet data = connection->top_of_outgoing_queue();
  21. connection->send(data.payload);
  22. data.promise.set_value(true); // 6
  23. }
  24. }
  25. }
  26. }