10.3 C++标准库中的并行算法

标准库中的大多数被执行策略重载的算法都在<algorithm><numeric>头文件中。包括有:all_of,any_of,none_of,for_each,for_each_n,find,find_if,find_end,find_first_of,adjacent_find,count,count_if,mismatch,equal,search,search_n,copy,copy_n,copy_if,move,swap_ranges,transform,replace,replace_if,replace_copy,replace_copy_if,fill,fill_n,generate,generate_n,remove,remove_if,remove_copy,remove_copy_if,unique,unique_copy,reverse,reverse_copy,rotate,rotate_copy,is_partitioned,partition,stable_partition,partition_copy,sort,stable_sort,partial_sort,partial_sort_copy,is_sorted,is_sorted_until,nth_element,merge,inplace_merge,includes,set_union,set_intersection,set_difference,set_symmetric_difference,is_heap,is_heap_until,min_element,max_element,minmax_element,lexicographical_compare,reduce,transform_reduce,exclusive_scan,inclusive_scan,transform_exclusive_scan,transform_inclusive_scan和adjacent_difference 。

所有C++并发化的算法都在这个列表中了。像std::accumlate这样的算法,严格来说是一个连续的累积,但广义算法std::reduce也出现了在这个列表中——标准中适当的警告,即如果约简运算不同时具有关联性和交换性时,由于未指定的运算顺序,结果可能不确定。

对于列表中的每一个算法,每个”普通”算法的重载都有一个新的参数(第一个参数),这个参数将传入执行策略——“普通”重载的相应参数在此执行策略参数之后。例如,std::sort有两个没有执行策略的“普通”重载:

  1. template<class RandomAccessIterator>
  2. void sort(RandomAccessIterator first, RandomAccessIterator last);
  3. template<class RandomAccessIterator, class Compare>
  4. void sort(
  5. RandomAccessIterator first, RandomAccessIterator last, Compare comp);

因此,还具有两个有执行策略的重载:

  1. template<class ExecutionPolicy, class RandomAccessIterator>
  2. void sort(
  3. ExecutionPolicy&& exec,
  4. RandomAccessIterator first, RandomAccessIterator last);
  5. template<class ExecutionPolicy, class RandomAccessIterator, class Compare>
  6. void sort(
  7. ExecutionPolicy&& exec,
  8. RandomAccessIterator first, RandomAccessIterator last, Compare comp);

有执行策略和没有执行策略的函数列表间有一个重要的区别,会影响到一些算法:如果“普通”算法允许输入迭代器或输出迭代器,那执行策略的重载则需要前向迭代器。因为输入迭代器是单向迭代的:只能访问当前元素,并且不能将迭代器存储到以前的元素。输出迭代器只允许写入当前元素:不能在写入后面的元素后,后退再写入前面的元素。

C++标准库中的迭代器类别

C++标准库定义了五类迭代器:输入迭代器、输出迭代器、正向迭代器、双向迭代器和随机访问迭代器。

输入迭代器是用于检索值的单向迭代器。通常用于控制台或网络的输入,或生成序列。该迭代器的任何副本都是无效的。

输出迭代器是用于向单向迭代器写入值。通常输出到文件或向容器添加值。该迭代器会使该迭代器的任何副本失效。

前向迭代器是通过数据不变进行单向迭代的多路径迭代器。虽然迭代器不能返回到前一个元素,但是可以存储前面元素的副本,并使用它们引用。前向迭代器返回对元素的实际引用,因此可以用于读写(如果目标是不是常量)。

双向迭代器是像前向迭代器一样的多路径迭代器,但是它也可以后向访问之前的元素。

随机访问迭代器是可以像双向迭代器一样前进和后退的多路径迭代器,是它们比单个元素大的跨距前进和后退,并且可以使用数组索引运算符,在偏移位置直接访问元素。

展示一下std::copy的“普通”函数签名:

  1. template<class InputIterator, class OutputIterator>
  2. OutputIterator copy(
  3. InputIterator first, InputIterator last, OutputIterator result);

带有执行策略的版本:

  1. template<class ExecutionPolicy,
  2. class ForwardIterator1, class ForwardIterator2>
  3. ForwardIterator2 copy(
  4. ExecutionPolicy&& policy,
  5. ForwardIterator1 first, ForwardIterator1 last,
  6. ForwardIterator2 result);

虽然,模板参数的命名没有从编译器的角度带来任何影响,但从C++标准的角度来看:标准库算法模板参数的名称表示语义约束的类型,并且算法的操作将依赖于这些约束,且具有特定的语义。对于输入迭代器与前向迭代器,前者对迭代器的引用返回允许取消代理类型,代理类型可转换为迭代器的值类型,而后者对迭代器的引用返回要求取消对值的实际引用,并且所有相同的迭代器都返回对相一值的引用。

这对于并行性很重要:这意味着迭代器可以自由地复制,并等价地使用。此外,增加正向迭代器不会使其他失效副本也很重要,因为这意味着单线程可以在迭代器的副本上操作,需要时增加副本,而不必担心使其他线程的迭代器失效。如果带有执行策略的重载允许使用输入迭代器,将强制线程序列化,对于从源序列读取唯一迭代器的访问,会限制其并行的可能性。

10.3.1 并行算法示例

最简单的例子就是并行循环:对容器的每个元素进行处理。这是令人尴尬的并行场景:每个项目都是独立的,所以有最大的并行性。使用支持OpenMP的编译器,代码就可以写成:

  1. #pragma omp parallel for
  2. for(unsigned i=0;i<v.size();++i){
  3. do_stuff(v[i]);
  4. }

使用C++标准库算法,可以改写成:

  1. std::for_each(std::execution::par,v.begin(),v.end(),do_stuff);

标准库将创建内部线程,并对数据进行划分,且对每个元素x调用do_stuff(x)。其中在线程间划分元素是一个实现细节。

执行策略的选择

std::execution::par是最常使用的策略,除非实现提供了更适合的非标准策略。如果代码适合并行化,那应该与std::execution::par一起工作。某些情况下,可以使用std::execution::par_unseq代替。这可能根本没什么用(没有任何标准的执行策略可以保证能达到并行性的级别),但它可以给库额外的空间,通过重新排序和交错任务执行来提高代码的性能,以换取对代码更严格的要求。更严格的要求中值得注意的是,访问元素或对元素执行操作时不使用同步。这意味着不能使用互斥量或原子变量,或前面章节中描述的任何其他同步机制,以确保多线程的访问是安全的。相反,必须依赖于算法本身,而不是使用多个线程访问同一个元素,在调用并行算法外使用外部同步,从而避免其他线程访问数据。

代码10.1中的示例,可以使用std::execution::par,但不能使用std::execution::par_unseq。使用内部互斥量同步意味着使用std::execution::par_unseq,这将会导致未定义行为的发生。

代码10.1 具有内部同步并行算法的类

  1. class X{
  2. mutable std::mutex m;
  3. int data;
  4. public:
  5. X():data(0){}
  6. int get_value() const{
  7. std::lock_guard guard(m);
  8. return data;
  9. }
  10. void increment(){
  11. std::lock_guard guard(m);
  12. ++data;
  13. }
  14. };
  15. void increment_all(std::vector<X>& v){
  16. std::for_each(std::execution::par,v.begin(),v.end(),
  17. [](X& x){
  18. x.increment();
  19. });
  20. }

下个代码展示了可使用std::execution::par_unseq的代码段。这种情况下,内部元素互斥量替换为整个容器的互斥量。

代码10.2 无内部同步并行算法的类

  1. class Y{
  2. int data;
  3. public:
  4. Y():data(0){}
  5. int get_value() const{
  6. return data;
  7. }
  8. void increment(){
  9. ++data;
  10. }
  11. };
  12. class ProtectedY{
  13. std::mutex m;
  14. std::vector<Y> v;
  15. public:
  16. void lock(){
  17. m.lock();
  18. }
  19. void unlock(){
  20. m.unlock();
  21. }
  22. std::vector<Y>& get_vec(){
  23. return v;
  24. }
  25. };
  26. void increment_all(ProtectedY& data){
  27. std::lock_guard guard(data);
  28. auto& v=data.get_vec();
  29. std::for_each(std::execution::par_unseq,v.begin(),v.end(),
  30. [](Y& y){
  31. y.increment();
  32. });
  33. }

代码10.2中的元素访问目前没有同步,使用std::execution::par_unseq是安全的。缺点是:并行算法调用之外,其他线程的并发访问现在也必须等待整个操作完成。互斥锁的粒度与代码10.1中不同。

现在让我们来看一个现实的例子,详述如何使用并行算法:记录访问网站的次数。

10.3.2 计数访问

假设有一个运作繁忙的网站,日志有数百万条条目,你希望对这些日志进行处理以便查看相关数据:每页访问多少次、访问来自何处、使用的是哪个浏览器,等等。分析日志有两个部分:处理每一行以提取相关信息,将结果聚合在一起。对于使用并行算法来说,这是一个理想的场景,因为处理每一条单独的行完全独立于其他所有行,并且如果最终的合计是正确的,可以逐个汇总结果。

这种类型的任务适合transform_reduce,下面的代码展示了如何将其用于该任务。

代码10.3 使用transform_reduce来记录网站的页面被访问的次数

  1. #include <vector>
  2. #include <string>
  3. #include <unordered_map>
  4. #include <numeric>
  5. struct log_info {
  6. std::string page;
  7. time_t visit_time;
  8. std::string browser;
  9. // any other fields
  10. };
  11. extern log_info parse_log_line(std::string const &line); // 1
  12. using visit_map_type= std::unordered_map<std::string, unsigned long long>;
  13. visit_map_type
  14. count_visits_per_page(std::vector<std::string> const &log_lines) {
  15. struct combine_visits {
  16. visit_map_type
  17. operator()(visit_map_type lhs, visit_map_type rhs) const { // 3
  18. if(lhs.size() < rhs.size())
  19. std::swap(lhs, rhs);
  20. for(auto const &entry : rhs) {
  21. lhs[entry.first]+= entry.second;
  22. }
  23. return lhs;
  24. }
  25. visit_map_type operator()(log_info log,visit_map_type map) const{ // 4
  26. ++map[log.page];
  27. return map;
  28. }
  29. visit_map_type operator()(visit_map_type map,log_info log) const{ // 5
  30. ++map[log.page];
  31. return map;
  32. }
  33. visit_map_type operator()(log_info log1,log_info log2) const{ // 6
  34. visit_map_type map;
  35. ++map[log1.page];
  36. ++map[log2.page];
  37. return map;
  38. }
  39. };
  40. return std::transform_reduce( // 2
  41. std::execution::par, log_lines.begin(), log_lines.end(),
  42. visit_map_type(), combine_visits(), parse_log_line);
  43. }

假设函数parse_log_line的功能是从日志条目中提取相关信息①,count_visits_per_page 函数是一个简单的包装器,将对std::transform_reduce的调用进行包装②。复杂度来源于规约操作:需要组合两个log_info结构体来生成一个映射,一个log_info结构体和一个映射(无论是哪种方式),或两个映射。这意味着combine_visits函数对象需要重载4个函数运算符,③④⑤和⑥。虽然这些重载很简单,但这里没有用Lambda表达式来实现。

因此,std::transform_reduce将使用硬件并行执行此计算(因为传了std::execution::par)。人工编写这个算法是非常重要的,正如在上一章中看到的,因此这允许将实现并行性的艰苦工作委托给标准库实现者,这样开发者就可以专注于期望的结果了。