6.3设计更加复杂的数据结构

栈和队列都很简单:接口相对固定,并且应用于比较特殊的情况。并不是所有数据结构都这样简单,大多数数据结构支持更加多样化的操作。这将增大并行的可能性,但也让数据保护变得更加困难。为了并发访问对数据结构进行设计时,一些原有的操作就变得越发需要重点关注。

先来看看,在设计查询表时遇到的一些问题。

6.3.1 使用锁的线程安全查询表

查询表或字典是一种类型的值(键值)和另一种类型的值进行关联(映射)的数据结构。一般情况下,这样的结构允许代码通过键值对相关的数据值进行查询。C++标准库中相关工具:std::map<>, std::multimap<>, std::unordered_map<>std::unordered_multimap<>

查询表的使用方式与栈和队列不同。栈和队列上几乎每个操作都会对数据结构进行修改,不是添加一个元素,就是删除一个,而对于查询表来说,几乎不需要什么修改。代码3.13中有个例子,一个简单的域名系统(DNS)缓存,相较于std::map<>削减了很多的接口。和队列和栈一样,标准容器的接口不适合多线程进行并发访问,因为这些接口都存在固有的条件竞争,所以有些接口需要砍掉或重新修订。

并发访问时,std::map<>最大的问题在于——迭代器。要想正确的处理迭代器,可能会碰到下面的问题:迭代器引用的元素被其他线程删除时,迭代器就会出问题。线程安全的查询表的第一次接口削减,需要绕过迭代器。std::map<>(以及标准库中其他相关容器)给定的接口对于迭代器的依赖很严重,其中有些接口需要先放在一边,先对一些简单接口进行设计。

查询表的基本操作有:

  • 添加一对“键值-数据”

  • 修改指定键值所对应的数据

  • 删除一组值

  • 通过给定键值,获取对应数据

容器的一些操作也非常有用,比如:查询容器是否为空,键值列表的完整快照和“键值-数据”的完整快照。

如果坚持线程安全指导意见,例如:不要返回一个引用,并且用一个简单的互斥锁对每一个成员函数进行上锁,以确保每一个函数线程安全。最有可能的条件竞争在于,当一对“键值-数据”加入时。当两个线程都添加一个数据,那么肯定一先一后。一种方式合并了“添加”和“修改”操作,为一个成员函数。

从接口角度看,有一个很有趣的问题,就是任意(if any)部分获取相关数据。一种选择是在键值没有对应值的时候进行返回时,允许用户提供一个默认值:

  1. mapped_type get_value(key_type const& key, mapped_type default_value);

这种情况下,当default_value没有明确的给出时,默认构造出的mapped_type实例将使用,可以扩展成返回一个std::pair<mapped_type, bool>来代替mapped_type实例,其中bool代表返回值是否是当前键对应的值。另一个选择是返回指向数据的智能指针,当指针的值是NULL时,这个键值就没有对应的数据。

当接口确定时,(假设没有接口间的条件竞争)需要保证线程安全了,可以通过对每一个成员函数使用互斥量和锁来保护底层数据。不过,当独立函数对数据结构进行读取和修改时,就会降低并发的可能性。一个选择是使用一个互斥量去面对多个读者线程或一个作者线程。虽然会提高并发访问,但是同时只有一个线程能对数据结构进行修改。理想很美好,现实很骨感?我们其实可以做的更好!

为细粒度锁设计映射结构

对队列的讨论中(6.2.3节),为了允许细粒度锁能正常工作,需要对数据结构的细节进行仔细考究,而非直接使用已知容器。列出三个常见关联容器的方式:

  • 二叉树,比如:红黑树

  • 有序数组

  • 哈希表

二叉树的方式,不会提高并发访问的能力。每一个查找或者修改操作都需要访问根节点,所以根节点需要上锁。虽然访问线程在向下移动时,锁可以进行释放,但相比横跨整个数据结构的单锁,并没有什么优势。

有序数组是最坏的选择,因为无法提前言明数组中哪段是有序的,所以需要用一个锁将整个数组锁起来。

最后就剩哈希表了。假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。当然,缺点也是有的:对于键值的操作,需要有合适的函数。C++标准库提供std::hash<>模板,可以直接使用,用户还可以简单的对键值类型进行特化。如果去效仿标准无序容器,并且获取函数对象的类型作为哈希表的模板参数,用户可以选择特化std::hash<>的键值类型,或者直接提供哈希函数。

怎样才能完成一个线程安全的查询表?下面提供一种方式。

代码6.11 线程安全的查询表

  1. template<typename Key,typename Value,typename Hash=std::hash<Key> >
  2. class threadsafe_lookup_table
  3. {
  4. private:
  5. class bucket_type
  6. {
  7. private:
  8. typedef std::pair<Key,Value> bucket_value;
  9. typedef std::list<bucket_value> bucket_data;
  10. typedef typename bucket_data::iterator bucket_iterator;
  11. bucket_data data;
  12. mutable std::shared_mutex mutex; // 1
  13. bucket_iterator find_entry_for(Key const& key) const // 2
  14. {
  15. return std::find_if(data.begin(),data.end(),
  16. [&](bucket_value const& item)
  17. {return item.first==key;});
  18. }
  19. public:
  20. Value value_for(Key const& key,Value const& default_value) const
  21. {
  22. std::shared_lock<std::shared_mutex> lock(mutex); // 3
  23. bucket_iterator const found_entry=find_entry_for(key);
  24. return (found_entry==data.end())?
  25. default_value:found_entry->second;
  26. }
  27. void add_or_update_mapping(Key const& key,Value const& value)
  28. {
  29. std::unique_lock<std::shared_mutex> lock(mutex); // 4
  30. bucket_iterator const found_entry=find_entry_for(key);
  31. if(found_entry==data.end())
  32. {
  33. data.push_back(bucket_value(key,value));
  34. }
  35. else
  36. {
  37. found_entry->second=value;
  38. }
  39. }
  40. void remove_mapping(Key const& key)
  41. {
  42. std::unique_lock<std::shared_mutex> lock(mutex); // 5
  43. bucket_iterator const found_entry=find_entry_for(key);
  44. if(found_entry!=data.end())
  45. {
  46. data.erase(found_entry);
  47. }
  48. }
  49. };
  50. std::vector<std::unique_ptr<bucket_type> > buckets; // 6
  51. Hash hasher;
  52. bucket_type& get_bucket(Key const& key) const // 7
  53. {
  54. std::size_t const bucket_index=hasher(key)%buckets.size();
  55. return *buckets[bucket_index];
  56. }
  57. public:
  58. typedef Key key_type;
  59. typedef Value mapped_type;
  60. typedef Hash hash_type;
  61. threadsafe_lookup_table(
  62. unsigned num_buckets=19,Hash const& hasher_=Hash()):
  63. buckets(num_buckets),hasher(hasher_)
  64. {
  65. for(unsigned i=0;i<num_buckets;++i)
  66. {
  67. buckets[i].reset(new bucket_type);
  68. }
  69. }
  70. threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
  71. threadsafe_lookup_table& operator=(
  72. threadsafe_lookup_table const& other)=delete;
  73. Value value_for(Key const& key,
  74. Value const& default_value=Value()) const
  75. {
  76. return get_bucket(key).value_for(key,default_value); // 8
  77. }
  78. void add_or_update_mapping(Key const& key,Value const& value)
  79. {
  80. get_bucket(key).add_or_update_mapping(key,value); // 9
  81. }
  82. void remove_mapping(Key const& key)
  83. {
  84. get_bucket(key).remove_mapping(key); // 10
  85. }
  86. };

实现中使用了std::vector<std::unique_ptr<bucket_type>>⑥来保存桶,其允许在构造函数中指定构造桶的数量。默认为19个,这个值可以是一个任意的质数。哈希表在有质数个桶时,工作效率最高。每一个桶都会被一个std::shared_mutex①实例锁保护,对于每一个桶只有一个线程能对其进行修改。

因为桶的数量固定,所以get_bucket()⑦可以无锁调用,⑧⑨⑩也都一样。并且对桶的互斥量上锁,要不就是共享(只读)所有权时③,要不就是在获取唯一(读/写)权时④⑤。这里的互斥量,适用于每个成员函数。

这三个函数都使用到了find_entry_for()成员函数②,用来确定数据是否在桶中。每一个桶都包含一个“键值-数据”的std::list<>列表,所以添加和删除数据就会很简单。

从并发的角度考虑,互斥锁保护所有成员,这样的实现是“异常安全”的吗?value_for是不能修改任何值的,所以其不会有问题。如果value_for抛出异常,也不会对影响任何数据结构。remove_mapping修改链表时,会调用erase,不过这能保证没有异常抛出。那么就剩add_or_update_mapping了,可能会在其两个if分支上抛出异常。push_back是异常安全的,如果有异常抛出,也会将链表恢复成原始状态。唯一的问题就在赋值阶段(将替换已有的数据),当赋值阶段抛出异常,用于依赖的原始状态没有改变,所以不会影响数据结构的整体,以及用户提供类型的属性,这样就可以放心的将问题交给用户处理。

本节开始时,提到查询表的一个可有可无(nice-to-have)的特性,会将选择当前状态的快照,例如:一个std::map<>。这要求锁住整个容器,保证拷贝副本的状态是可以索引的,这将锁住所有的桶。因为对于查询表的“普通”的操作,需要在同一时间获取桶上的锁,而这个操作将要求查询表将所有桶都锁住。因此,只要每次以相同的顺序进行上锁(例如,递增桶的索引值),就不会产生死锁。实现如下所示:

代码6.12 获取整个threadsafe_lookup_table作为一个std::map<>

  1. std::map<Key,Value> threadsafe_lookup_table::get_map() const
  2. {
  3. std::vector<std::unique_lock<std::shared_mutex> > locks;
  4. for(unsigned i=0;i<buckets.size();++i)
  5. {
  6. locks.push_back(
  7. std::unique_lock<std::shared_mutex>(buckets[i].mutex));
  8. }
  9. std::map<Key,Value> res;
  10. for(unsigned i=0;i<buckets.size();++i)
  11. {
  12. for(bucket_iterator it=buckets[i].data.begin();
  13. it!=buckets[i].data.end();
  14. ++it)
  15. {
  16. res.insert(*it);
  17. }
  18. }
  19. return res;
  20. }

代码6.11中的查询表实现,就增大的并发访问的能力,这个查询表作为一个整体,通过单独的操作,对每一个桶进行锁定,并且通过使用std::shared_mutex允许读者线程对每一个桶并发访问。如果细粒度锁和哈希表结合起来,会增加并发的可能性吗?

下一节中,将使用到线程安全列表(支持迭代器)。

6.3.2 编写使用锁的线程安全链表

链表类型是数据结构中的基本类型,所以好修改成线程安全的,对么?不好说,这取决于要添加什么样的功能,并且需要提供迭代器的支持。为了简化基本数据类型的代码,我去掉了一些功能。迭代器的问题在于,STL类的迭代器需要持有容器内部引用。当容器可被其他线程修改时,这个引用还是有效的。实际上就需要迭代器持有锁,对指定的结构中的部分进行上锁。在给定STL类迭代器的生命周期中,让其完全脱离容器的控制是很糟糕的做法。

替代方案就是提供迭代函数,例如:将for_each作为容器本身的一部分。这就能让容器对迭代的部分进行负责和锁定,不过这将违反第3章的指导意见。为了让for_each在任何情况下都有效,持有内部锁的时,必须调用用户提供的代码。不仅如此,需要传递一个对容器中元素的引用到用户代码中,就是让用户代码对容器中的元素进行操作。为了避免传递引用,需要传出一个拷贝到用户代码中。不过当数据很大时,拷贝要付出的代价也很大。

所以,可以将避免死锁的工作(因为用户提供的操作需要获取内部锁),还有避免对引用(不被锁保护)进行存储时的条件竞争交给用户去做。因为清楚这里的实现不会有任何问题,查询表就可以“安全的”使用链表了。

剩下的问题就是哪些操作需要列表所提供。如果愿意再花点时间看一下代码6.11和6.12,要注意下下面的操作:

  • 向列表添加一个元素

  • 当某个条件满足时,从链表中删除某个元素

  • 当某个条件满足时,从链表中查找某个元素

  • 当某个条件满足时,更新链表中的某个元素

  • 将容器中链表中的每个元素,复制到另一个容器中

提供了这些操作,链表才能为通用容器,这将帮助我们添加更多功能,比如:指定位置上插入元素,不过这对于查询表来说就没有必要了,所以算是给读者们留的一个作业吧。

使用细粒度锁最初的想法,是为了让链表每个节点都拥有一个互斥量。当链表很长时,会使用有很多的互斥量!这样的好处是对于链表中每一个独立的部分,都能实现真实的并发:真正感兴趣的是对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放。

代码6.13 线程安全链表——支持迭代器

  1. template<typename T>
  2. class threadsafe_list
  3. {
  4. struct node // 1
  5. {
  6. std::mutex m;
  7. std::shared_ptr<T> data;
  8. std::unique_ptr<node> next;
  9. node(): // 2
  10. next()
  11. {}
  12. node(T const& value): // 3
  13. data(std::make_shared<T>(value))
  14. {}
  15. };
  16. node head;
  17. public:
  18. threadsafe_list()
  19. {}
  20. ~threadsafe_list()
  21. {
  22. remove_if([](node const&){return true;});
  23. }
  24. threadsafe_list(threadsafe_list const& other)=delete;
  25. threadsafe_list& operator=(threadsafe_list const& other)=delete;
  26. void push_front(T const& value)
  27. {
  28. std::unique_ptr<node> new_node(new node(value)); // 4
  29. std::lock_guard<std::mutex> lk(head.m);
  30. new_node->next=std::move(head.next); // 5
  31. head.next=std::move(new_node); // 6
  32. }
  33. template<typename Function>
  34. void for_each(Function f) // 7
  35. {
  36. node* current=&head;
  37. std::unique_lock<std::mutex> lk(head.m); // 8
  38. while(node* const next=current->next.get()) // 9
  39. {
  40. std::unique_lock<std::mutex> next_lk(next->m); // 10
  41. lk.unlock(); // 11
  42. f(*next->data); // 12
  43. current=next;
  44. lk=std::move(next_lk); // 13
  45. }
  46. }
  47. template<typename Predicate>
  48. std::shared_ptr<T> find_first_if(Predicate p) // 14
  49. {
  50. node* current=&head;
  51. std::unique_lock<std::mutex> lk(head.m);
  52. while(node* const next=current->next.get())
  53. {
  54. std::unique_lock<std::mutex> next_lk(next->m);
  55. lk.unlock();
  56. if(p(*next->data)) // 15
  57. {
  58. return next->data; // 16
  59. }
  60. current=next;
  61. lk=std::move(next_lk);
  62. }
  63. return std::shared_ptr<T>();
  64. }
  65. template<typename Predicate>
  66. void remove_if(Predicate p) // 17
  67. {
  68. node* current=&head;
  69. std::unique_lock<std::mutex> lk(head.m);
  70. while(node* const next=current->next.get())
  71. {
  72. std::unique_lock<std::mutex> next_lk(next->m);
  73. if(p(*next->data)) // 18
  74. {
  75. std::unique_ptr<node> old_next=std::move(current->next);
  76. current->next=std::move(next->next);
  77. next_lk.unlock();
  78. } // 20
  79. else
  80. {
  81. lk.unlock(); // 21
  82. current=next;
  83. lk=std::move(next_lk);
  84. }
  85. }
  86. }
  87. };

代码6.13中的threadsafe_list<>是一个单链表,可从node的结构①中看出。一个默认构造的node作为链表的head,其next指针②指向的是NULL。新节点都通过push_front()函数添加,构造第一个新节点④,其将会在堆上分配内存③来对数据进行存储,同时将next指针置为NULL。然后,为了设置next的值⑤,需要获取head节点的互斥锁,也就是插入节点到列表的头部,让头节点的head.next指向这个新节点⑥。目前,只需要锁住一个互斥量,就能将新的数据添加进入链表,所以不存在死锁的问题。同样,(缓慢的)内存分配操作在锁的范围外,所以锁能保护需要更新的一对指针。那么,再来看一下迭代功能。

首先,for_each()⑦这个操作对队列中的每个元素执行Function(函数指针)。大多数标准算法库中,都会通过传值方式来执行这个函数,要不就传入一个通用的函数,要不就传入一个有函数操作的类型对象。这种情况下,函数必须接受类型为T的值作为参数。链表中会有“手递手”的上锁过程,这个过程开始时,需要锁住head及节点⑧的互斥量。然后,安全的获取指向下一个节点的指针(使用get()获取,因为对这个指针没有所有权)。当指针不为NULL⑨,为了继续对数据进行处理,就需要对指向的节点进行上锁⑩。当锁住了那个节点,就可以对上一个节点进行释放了⑪,并调用指定函数⑫。当函数执行完成时,就可以更新当前指针所指向的节点(刚刚处理过的节点),并将所有权从next_lk移动移动到lk⑬。因为for_each传递的每个数据都是能被Function接受的,所以当需要的时,或需要拷贝到另一个容器的时,或其他情况时,都可以考虑使用这种方式更新每个元素。如果函数的行为没什么问题,这种方式是安全的,因为在获取节点互斥锁时,函数正在处理已经获取锁的节点。

find_first_if()⑭和for_each()很相似,最大的区别在于find_first_if支持函数(谓词)在匹配的时候返回true,不匹配的时候返回false⑮。条件匹配时,只需要返回找到的数据⑯,而非继续查找。可以使用for_each()来做这件事,不过在找到后,继续做查找就没意义了。

remove_if()⑰就有些不同了,因为函数会改变链表。所以,就不能使用for_each()实现这个功能。当函数(谓词)返回true⑱,对应元素将会移除,并且更新current->next⑲。当这些都做完,就可以释放next指向节点的锁。当std::unique_ptr<node>的移动超出链表范围⑳,节点将被删除。这种情况下,就不需要更新当前节点了,因为只需要修改next所指向的下一个节点就可以。当函数(谓词)返回false,移动的操作就和之前一样了(21)。

那么,所有的互斥量中会有死锁或条件竞争吗?答案无疑是“否”,要看提供的函数(谓词)是否有良好的行为。迭代通常都使用一种方式,从head节点开始,并且在释放当前节点锁之前,将下一个节点的互斥量锁住,所以就不可能会有不同线程有不同的上锁顺序。唯一可能出现条件竞争的地方就在remove_if()⑳中删除已有节点的时候。操作在解锁互斥量后进行(其导致的未定义行为,可对已上锁的互斥量进行破坏),所以可以确定这是安全的。因为现在还持有前一个节点(当前节点)的互斥锁,所以不会有新的线程尝试去获取正在删除节点的互斥锁。

并发概率有多大呢?细粒度锁要比单锁的并发概率大很多,那我们已经获得了吗?是的,已经获得了:同一时间内,不同线程在不同节点上工作,无论是使用for_each()对每一个节点进行处理,还是使用find_first_if()对数据进行查找,或是使用remove_if()删除一些元素。不过,因为互斥量必须按顺序上锁,线程就不能交叉进行工作。当线程耗费大量的时间对一个特殊节点进行处理,其他线程就必须等待这个处理完成。完成后,其他线程才能到达这个节点。