1 线程间task划分

1.1 对数据进行分组划分

最简单的分配方式:第一组N个元素分配一个线程,下一组N个元素再分配一个线程,以此类推,如下图所示。不管数据怎么分,每个线程都会对分配给它的元素进行操作,不过并不会和其他线程进行沟通,直到处理完成。执行线程独立的执行这些任务,结果在会有主线程中合并

8 如何进行并发化设计 - 图1

1.2 递归划分

递归划分数据如下图:
8 如何进行并发化设计 - 图2
重要的是:对一个很大的数据集进行排序时,当每层递归都产生一个新线程,最后就会产生大量的线程。会看到大量线程对性能的影响,如果有太多的线程存在,那么应用将会运行的很慢。如果数据集过于庞大,会将线程耗尽。两个选择:

  • 在递归的基础上进行任务的划分,只需要将一定数量的数据打包后,交给线程即可。std::async()可以处理这种简单的情况。
  • 另一种选择是使用std::thread::hardware_concurrency()函数来确定线程的数量。然后,可以将已排序的数据推到线程安全的栈上。线程无所事事时,不是已经完成对自己数据块的梳理,就是在等待一组排序数据的产生;线程可以从栈上获取这组数据,并且对其排序。

代码示例:

  1. /*
  2. 使用std::thread::hardware_concurrency()函数来确定线程的数量。然后,可以将已排序的数据推到线程安全的栈上.
  3. 仅演示想法,而不是作为工业代码。如果使用C++ 17编译器,最好使用标准库提供的并行算法
  4. */
  5. #include <list>
  6. #include <vector>
  7. #include <future>
  8. #include <algorithm>
  9. #include "19_threadsafe_stack.hpp"
  10. template <typename T>
  11. struct sorter // 1
  12. {
  13. struct chunk_to_sort
  14. {
  15. std::list<T> data;
  16. std::promise<std::list<T>> promise;
  17. };
  18. threadsafe_stack<chunk_to_sort> chunks; // 2 使用线程安全栈保存data数据块
  19. std::vector<std::thread> threads; // 3 保存线程
  20. unsigned const max_thread_count; //最大支持的线程数,根据硬件获得
  21. std::atomic<bool> end_of_data;
  22. sorter() : max_thread_count(std::thread::hardware_concurrency() - 1),
  23. end_of_data(false)
  24. {
  25. }
  26. ~sorter() // 4
  27. {
  28. end_of_data = true; // 5
  29. for (unsigned i = 0; i < threads.size(); ++i)
  30. {
  31. threads[i].join(); // 6
  32. }
  33. }
  34. //从线程安全栈中获取一个数据块,进行排序
  35. void try_sort_chunk()
  36. {
  37. std::shared_ptr<chunk_to_sort> chunk = chunks.pop(); // 7
  38. if (chunk)
  39. {
  40. sort_chunk(chunk); // 8
  41. }
  42. }
  43. //排序的主函数
  44. std::list<T> do_sort(std::list<T> &chunk_data) // 9
  45. {
  46. if (chunk_data.empty())
  47. {
  48. return chunk_data;
  49. }
  50. std::list<T> result;
  51. result.splice(result.begin(), chunk_data, chunk_data.begin()); //转移数据到result列表中
  52. T const &partition_val = *result.begin();
  53. //10 对data进行第一次快速排序,根据data第一个元素排序,表达式true放左边,false放右边,返回值为中间分组点
  54. typename std::list<T>::iterator divide_point = std::partition(chunk_data.begin(),
  55. chunk_data.end(),
  56. [&](T const &val) { return val < partition_val; });
  57. chunk_to_sort new_lower_chunk; //定义新的data类型保存上面排序结果的左部分数据块
  58. new_lower_chunk.data.splice(new_lower_chunk.data.end(),
  59. chunk_data, chunk_data.begin(),
  60. divide_point);
  61. std::future<std::list<T>> new_lower = new_lower_chunk.promise.get_future();
  62. chunks.push(std::move(new_lower_chunk)); // 11 将上面的左部分数据块保存到线程安全栈中
  63. if (threads.size() < max_thread_count) // 12
  64. { //启动新线程处理左部分数据块的排序
  65. threads.push_back(std::thread(&sorter<T>::sort_thread, this));
  66. }
  67. std::list<T> new_higher(do_sort(chunk_data)); //在本线程继续处理右部分的排序
  68. result.splice(result.end(), new_higher);
  69. //等下期望值变为ready,即表示新线程的排序任务已经结束
  70. while (new_lower.wait_for(std::chrono::seconds(0)) != std::future_status::ready) // 13
  71. {
  72. try_sort_chunk(); // 14 如果没有结束,本线程等待,并尝试处理安全栈中数据
  73. }
  74. result.splice(result.begin(), new_lower.get());
  75. return result;
  76. }
  77. //对数据进行排序,并把结果保存在承诺值中
  78. void sort_chunk(std::shared_ptr<chunk_to_sort> const &chunk)
  79. {
  80. chunk->promise.set_value(do_sort(chunk->data)); // 15
  81. }
  82. void sort_thread()
  83. {
  84. while (!end_of_data) // 16
  85. {
  86. try_sort_chunk(); // 17
  87. std::this_thread::yield(); // 18
  88. }
  89. }
  90. };
  91. template <typename T>
  92. std::list<T> parallel_quick_sort(std::list<T> input) // 19 并行快速排序函数
  93. {
  94. if (input.empty())
  95. {
  96. return input;
  97. }
  98. sorter<T> s;
  99. return s.do_sort(input); // 20
  100. }

1.3 基于任务类型划分

对分工的排序,也就是从并发分离关注结果;每个线程都有不同的任务,这意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理;不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是良好的设计,每一段代码只对自己的部分负责

多线程下有两个危险需要分离关注。第一个是对错误担忧的分离,主要表现为线程间共享着很多的数据,或者不同的线程要相互等待,这两种情况都是因为线程间很密切的交互。这种情况发生时,就需要看一下为什么需要这么多交互。当所有交互都有同样的问题,就应该使用单线程来解决,并将引用同一源的线程提取出来。或者当有两个线程需要频繁的交流,在没有其他线程时,就可以将这两个线程合为一个线程。
当通过任务类型对线程间的任务进行划分时,不应该让线程处于完全隔离的状态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段,来让每个线程执行。

2 并发性能的影响因素

2.1 乒乓缓存

当两个线程并发的在不同处理器上执行,并且对同一数据进行读取,通常不会出现问题;因为数据将会拷贝到每个线程的缓存中,并且让两个处理器同时进行处理。
乒乓缓存:当有线程对数据进行修改的时候,这个修改需要更新到其他核芯的缓存中去,就要耗费一定的时间。根据线程的操作性质,以及使用到的内存序,这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。当一个处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情,所以对于整个应用来说这是一个坏消息。
减少两个线程对同一个内存位置的竞争,能减轻乒乓缓存,但还是可能因为伪共享导致乒乓缓存。

2.2 伪共享

假设你有一个int类型的数组,并且有一组线程可以访问数组中的元素,且对数组的访问很频繁(包括更新)。通常int类型的大小要小于一个缓存行,同一个缓存行中可以存储多个数据项(某个线程所要访问的数据过于接近另一线程的数据)。因此,即使每个线程都能对数据中的成员进行访问,硬件缓存还是会产生乒乓缓存。每当线程访问0号数据项,并对其值进行更新时,缓存行的所有权就需要转移给执行该线程的处理器,这仅是为了更新1号数据项的线程获取1号线程的所有权。缓存行是共享的,因此使用伪共享来称呼这种方式。

解决办法就是对数据进行构造,让同一线程访问的数据项存在临近的内存中(就像是放在同一缓存行中),这样那些能被独立线程访问的数据将分布在相距很远的地方,并且可能是存储在不同的缓存行中。
C++17标准在头文件<new>中定义了std::hardware_destructive_interference_size它指定了当前编译目标可能共享的连续字节的最大数目。如果确保数据间隔大于等于这个字节数,就不会有错误的共享存在了

3 三个标准函数的并行实现

3.1 并行实现std::for_each

std::for_each的原理很简单:其对某个范围中的元素,依次调用用户提供的函数。
为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。事先知道元素数量,所以可以处理前对数据进行分组划分

  1. /*
  2. 使用std::async实现std::for_each
  3. */
  4. #include <iterator>
  5. #include <algorithm>
  6. #include <future>
  7. template <typename Iterator, typename Func>
  8. void parallel_for_each(Iterator first, Iterator last, Func f)
  9. {
  10. unsigned long const length = std::distance(first, last);
  11. if (!length)
  12. return;
  13. unsigned long const min_per_thread = 25;
  14. if (length < (2 * min_per_thread))
  15. {
  16. // 如果数据小于两个线程的能力,直接使用单线程for_each
  17. std::for_each(first, last, f);
  18. }
  19. else
  20. {
  21. Iterator const mid_point = first + length / 2;
  22. // 对数据分为两部分,异步执行一部分
  23. std::future<void> first_half = std::async(&parallel_for_each<Iterator, Func>, first, mid_point, f);
  24. parallel_for_each(mid_point, last, f); // 迭代执行另一部分
  25. first_half.get(); // 通过future传播异常,后面可以跟异常处理
  26. }
  27. }

3.2 并行实现std::find

当得到想要的答案就中断其他任务的执行,所以不能等待线程处理对剩下的元素进行处理。如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。

中断其他线程的一个办法就是使用原子变量作为一个标识,处理过每一个元素后就对这个标识进行检查。如果标识被设置,就有线程找到了匹配元素,所以算法就可以停止并返回了。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。

  1. //使用std::async实现的并行find算法
  2. #include <atomic>
  3. #include <future>
  4. template <typename Iterator, typename MatchType> // done为已经找到的标识
  5. Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,
  6. std::atomic<bool> &done)
  7. {
  8. try
  9. {
  10. unsigned long const length = std::distance(first, last);
  11. unsigned long const min_per_thread = 25;
  12. if (length < (2 * min_per_thread)) // 如果数据不足以分成两部分,直接单线程查找
  13. {
  14. for (; (first != last) && !done.load(); ++first) // 4
  15. {
  16. if (*first == match)
  17. {
  18. done = true; // 如果找到,将done置为true,这样其他线程在进入for循环时判断done为true会直接返回
  19. return first;
  20. }
  21. }
  22. return last; // 6
  23. }
  24. else
  25. {
  26. Iterator const mid_point = first + (length / 2); // 7数据分成两部分
  27. // 异步查找第一部分,同时继续传递原子bool done
  28. std::future<Iterator> async_result = std::async(&parallel_find_impl<Iterator, MatchType>,
  29. mid_point, last, match, std::ref(done));
  30. // 迭代执行另一部分
  31. Iterator const direct_result = parallel_find_impl(first, mid_point, match, done);
  32. // 返回结果,direct_result == mid_point,意味着结果在第一部分中,返回future期望值结果
  33. return (direct_result == mid_point) ? async_result.get() : direct_result;
  34. }
  35. }
  36. catch (...)
  37. {
  38. done = true; // 11
  39. throw;
  40. }
  41. }
  42. template <typename Iterator, typename MatchType>
  43. Iterator parallel_find(Iterator first, Iterator last, MatchType match)
  44. {
  45. std::atomic<bool> done(false); // 原子bool类型,用于标识已经找到
  46. return parallel_find_impl(first, last, match, done); // 12
  47. }