实现一个小型自动化并行库——std::future

大多数复杂的任务都能分解为很多子任务。对于所有子任务,我们可以通过画一张有向图无环图来描述哪些子任务键是有依赖的。我们来看一个例子,假设我们想要产出一个字符串"foo bar foo barthis that ",我们只能通过一个个字符进行产生,然后将这些词汇拼接在一起。为了完成这项工作,我们提供了三个函数createconcattwice

考虑到这一点,我们可以通过绘制DAG图来看一下词组间相互的依赖关系:

实现一个小型自动化并行库——std::future - 图1

实现过程中,当一个CPU核上串行的完成这些工作并没有什么问题。通过依赖关系在多个CPU核上执行任务,当所依赖的任务未完成时,只能处于等待状态。

即使使用std::async,这样写出的代码也太无趣了。因为子任务间的依赖关系需要提前建模。本节中,我们将实现两个简单的辅助库,帮助我们将createconcattwice函数转换成异步的。这样,我们就能找到一种更为优雅的方式,来设置依赖关系图。执行过程中,代码将会以一种智能的方式进行并行计算,并尽快将整个图完成。

How to do it…

本节中,我们将实现一些函数用来模拟计算敏感型任务,这些任务会互相依赖,我们的任务就是让这些任务尽可能的并行执行:

  1. 包含必要的头文件,并声明所使用的命名空间:

    1. #include <iostream>
    2. #include <iomanip>
    3. #include <thread>
    4. #include <string>
    5. #include <sstream>
    6. #include <future>
    7. using namespace std;
    8. using namespace chrono_literals;
  2. 需要对输出进行同步,所以可以使用之前章节中的同步辅助函数来帮助我们:

    1. struct pcout : public stringstream {
    2. static inline mutex cout_mutex;
    3. ~pcout() {
    4. lock_guard<mutex> l {cout_mutex};
    5. cout << rdbuf();
    6. cout.flush();
    7. }
    8. };
  3. 现在,我们对三个字符串转换函数进行实现。第一个函数会通过一个C风格的字符串来创建一个std::string对象。我们会让这个函数休眠3秒,以模拟计算复杂度:

    1. static string create(const char *s)
    2. {
    3. pcout{} << "3s CREATE " << quoted(s) << '\n';
    4. this_thread::sleep_for(3s);
    5. return {s};
    6. }
  4. 下一个函数需要两个字符串对象作为参数,并且返回拼接后的结果。我们让其休眠5秒:

    1. static string concat(const string &a, const string &b)
    2. {
    3. pcout{} << "5s CONCAT "
    4. << quoted(a) << " "
    5. << quoted(b) << '\n';
    6. this_thread::sleep_for(5s);
    7. return a + b;
    8. }
  5. 最后一个函数接收一个字符串作为参数,并返回自己和自己拼接后的结果。我们让其休眠3秒:

    1. static string twice(const string &s)
    2. {
    3. pcout{} << "3s TWICE " << quoted(s) << '\n';
    4. this_thread::sleep_for(3s);
    5. return s + s;
    6. }
  6. 对于串行任务来说,这就已经准备好了,但是我们想使用并行的方式来完成。所以,我们还需要实现一些辅助函数。这里需要注意了,下面三个函数看起来有些复杂。asynchronize能接收一个函数f,并返回一个其捕获到的可调用对象。我们可以传入任意数量的参数到这个可调用的对象中,然后其会将这些参数连同f捕获到另一个可调用对象中,并且将这个可调用对象返回给我们。最后一个可调用对象不需要任何参数。之后,其会将参数传入f中,并异步的执行函数f:

    1. template <typename F>
    2. static auto asynchronize(F f)
    3. {
    4. return [f](auto ... xs) {
    5. return [=] () {
    6. return async(launch::async, f, xs...);
    7. };
    8. };
    9. }
  7. 接下来这个函数,将会使用下一步(也就是第8步)中我们声明的函数。其能接受一个函数f,并且将该函数捕获到一个可调用的对象中并返回。该对象可以被多个future对象所调用。然后,对future对象使用.get(),来获取f中的结果:

    1. template <typename F>
    2. static auto fut_unwrap(F f)
    3. {
    4. return [f](auto ... xs) {
    5. return f(xs.get()...);
    6. };
    7. }
  8. 最后一个辅助函数能够接受一个函数f。其会返回一个持有f函数的可调用对象。这个可调用对象可以传入任意个参数,并且会将函数f与这些参数让另一个可调用对象获取。最后,返回给我们的可调用对象无需任何参数。然后,就可以调用xs...包中获取到所有可调用对象。这些对象会返回很多futrue,这些future对象需要使用fut_unwarp进行展开。future展开,并会通过std::async对实际函数f进行执行,在通过future返回函数f执行的结果:

    1. template <typename F>
    2. static auto async_adapter(F f)
    3. {
    4. return [f](auto ... xs) {
    5. return [=] () {
    6. return async(launch::async,
    7. fut_unwrap(f), xs()...);
    8. };
    9. };
    10. }
  9. OK,完成以上工作的感觉就是“疯狂”,这种表达式的嵌套让我想起了电影《盗梦空间》的场景(上一步的代码中,Lambda表达式会继续返回一个Lambda表达式)。这段带有魔法的代码,我们会在后面来详细的了解。现在,让我们异步的使用createconcattwice函数。async_adapter是一个非常简单的函数,其会等待future参数,并返回一个future的结果,其会将同步世界转换成异步世界。我们对concattwice使用这个函数。我们必须对create使用asynchronize,因为其会返回一个future,不过我们会使用future对象获取到的值,而非future对象本身。任务的依赖链,需要从create开始:

    1. int main()
    2. {
    3. auto pcreate (asynchronize(create));
    4. auto pconcat (async_adapter(concat));
    5. auto ptwice (async_adapter(twice));
  10. 现在我们有了可以自动并行化的函数,其与同步代码的函数名相同,不过添加了前缀p。现在,让我们来设置一些比较复杂依赖关系树。首先,我们创建两个字符串"foo""bar",然后进行拼接,返回"foo bar"。在twice中,字符串将会和自身进行拼接。然后,创建了字符串"this""that",拼接得到"this that"。最后,我们拼接的结果为"foo bar foo bar this that",结果将会保存在变量callable中。最后,调用callable().get()进行计算,并等待返回值,然后将返回值进行打印。我们没有调用callable()时,计算不会开始,在我们对其进行调用后,就是见证奇迹的时刻:

    1. auto result (
    2. pconcat(
    3. ptwice(
    4. pconcat(
    5. pcreate("foo "),
    6. pcreate("bar "))),
    7. pconcat(
    8. pcreate("this "),
    9. pcreate("that "))));
    10. cout << "Setup done. Nothing executed yet.\n";
    11. cout << result().get() << '\n';
    12. }
  11. 编译并运行程序,我们就会看到create每一次调用所产生的字符串,然后其他函数也开始执行。这个过程好像是通过智能调度来完成的,整个程序使用16秒完成。如果使用串行的方式,将会使用30s完成。需要注意的是,我们使用4核的机器来运行程序,也就是有4次create调用在同时进行。如果机器没有太多和CPU,那么运行时间会更长:

    1. $ ./chains
    2. Setup done. Nothing executed yet.
    3. 3s CREATE "foo "
    4. 3s CREATE "bar "
    5. 3s CREATE "this "
    6. 3s CREATE "that "
    7. 5s CONCAT "this " "that "
    8. 5s CONCAT "foo " "bar "
    9. 3s TWICE"foo bar "
    10. 5s CONCAT "foo bar foo bar " "this that "
    11. foo bar foo bar this that

How it works…

本节例子的串行版本,可能看起来如下:

  1. int main()
  2. {
  3. string result {
  4. concat(
  5. twice(
  6. concat(
  7. create("foo "),
  8. create("bar "))),
  9. concat(
  10. create("this "),
  11. create("that "))) };
  12. cout << result << '\n';
  13. }

本节中,我们完成了一些辅助函数,async_adapterasynchronize,其能帮助我们对createconcattwice函数进行包装。然后调用其异步版本pcreatepconcatptwice

先不看这两个函数复杂的实现,我们先来看一下我们获得了什么。

串行版本的代码可能类似如下写法:

  1. string result {concat( ... )};
  2. cout << result << '\n';

并行版本的写法:

  1. auto result (pconcat( ... ));
  2. cout << result().get() << '\n';

好了!现在就是最复杂的环节了。并行最后的结果并不是string,而是一个能够返回一个future<string>实例的可调用对象,我们可以对返回值调用get()得到函数运算后的值。这看起来可能很疯狂。

所以,我们为什么要返回future对象呢?问题在于我们的createconcattwice函数运行起得来都非常慢。不过,我们通过依赖关系树可以看到,数据流还是有可以独立的部分,也就是可并行的部分。让我们来看一下下面两个例子的流水:

实现一个小型自动化并行库——std::future - 图2

左侧边是单核的流水。所有函数一个接一个的在CPU上进行。这样时间累加起来就是30秒。

右侧边是多核的流水。函数会通过依赖关系并行的运行。在有4个核的机器上,我们将同时创建4个子字符串,然后对其进行拼接,等等的操作。并行版本需要16秒就能完成任务。如果我们没法让函数本身变的更快,则我们无法再进行加速。4个CPU的情况下,我们能有如此的加速,其实我们可以以更好的方式进行调度。

应该怎么做?

我们通常会写成如下的模式:

  1. auto a (async(launch::async, create, "foo "));
  2. auto b (async(launch::async, create, "bar "));
  3. auto c (async(launch::async, create, "this "));
  4. auto d (async(launch::async, create, "that "));
  5. auto e (async(launch::async, concat, a.get(), b.get()));
  6. auto f (async(launch::async, concat, c.get(), d.get()));
  7. auto g (async(launch::async, twice, e.get()));
  8. auto h (async(launch::async, concat, g.get(), f.get()));

a , b , cd都可以作为一个不错的开始,因为会创建对应的子字符串,并且会在后台同时进行创建。不幸的是,这段代码将会在初始化e的时候被阻塞。为了拼接ab,我们需要调用get()函数来获取这两个值,函数会对程序进行阻塞,直到获得相应的值为止。这明显不是一个好方法,因为并行代码会在第一个get()调用时阻塞。我们需要更好的策略来解决这个问题。

OK,现在让我们来看看我们在例子中完成的比较复杂的辅助函数。第一个就是asynchronize

  1. template <typename F>
  2. static auto asynchronize(F f)
  3. {
  4. return [f](auto ... xs) {
  5. return [=] () {
  6. return async(launch::async, f, xs...);
  7. };
  8. };
  9. }

当我们有一个函数int f(int, int)时,我们可以进行如下的操作:

  1. auto f2 ( asynchronize(f) );
  2. auto f3 ( f2(1, 2) );
  3. auto f4 ( f3() );
  4. int result { f4.get() };

f2就是异步版本的f。其调用方式与f完全相同。之后,其会返回可调用对象,并保存在f3中。现在f3得到了f和参数12,不过函数还没运行,只是捕获过程。

我们调用f3()时,最后就会得到一个future实例,因为f3中的返回值是async(launch::async, f, 1, 2);的返回值。某种意义上来说f3表示为集获取函数和函数参数,与抛出std::async返回值与一身的变量

内部Lambda表达式只通过捕获进行获取,但不接受任何输入参数。因此,可以让任务并行的方式分发,而不会遭遇任何方式的阻塞。我们对同样复杂的async_adapter函数采取同样的策略:

  1. template <typename F>
  2. static auto async_adapter(F f)
  3. {
  4. return [f](auto ... xs) {
  5. return [=] () {
  6. return async(launch::async, fut_unwrap(f), xs()...);
  7. };
  8. };
  9. }

函数能够返回一个函数f的模拟函数,因为其能接受相同的参数。然后,函数会返回一个可调用对象,并且也不接受任何参数,这里返回的可调用对象与其他辅助函数所返回的有所不同。

async(launch::async, fut_unwrap(f), xs()...);是什么意思呢?其中xs()...部分意味着,所有参数都保存在xs包中,供可调用对象使用,并且返回的可调用对象都不需要参数。那些可调用对象通过自身的方式生产future变量,通过对future变量调用get()获得实际返回值。这也就是fut_unwrap所要完成的事情:

  1. template <typename F>
  2. static auto fut_unwrap(F f)
  3. {
  4. return [f](auto ... xs) {
  5. return f(xs.get()...);
  6. };
  7. }

fut_unwrap会将函数f转换为一个可调用对象,其能接受一组参数。函数对象执行之后可以对所有的future对象调用.get(),从而获得f函数实际的执行结果。

我们花点时间来消化一下上面的内容。当主函数中调用这些函数,使用auto result (pconcat(...));的方式创建调用链,将所有子字符串最后拼接成一个长字符串。这时对async的调用还未完成。然后,当调用result()时,我们则获得async的返回值,并对其返回值调用.get(),这就能保证任何线程不会发生阻塞。实际上,在async调用前,不会有get()的调用。

最后,我们可以对result()的返回值调用.get(),从而获取最终的结果字符串。