数据结构的选择是解决方案的重要组成部分,当然并行程序也不例外。如果一种数据结构可以被多个线程所访问,要不就是绝对不变(其值不会发生变化,并且不需同步),要不程序就要对数据结构进行正确的设计,以确保其能在多线程环境下能够(正确的)同步。

  • 一种选择是使用独立的互斥量,其可以锁住需要保护的数据。这种方式是线程轮流访问被保护的数据。这是对数据进行串行的访问,而非并发。
  • 另一种选择是设计一种能够并发访问的数据结构。减少保护区域,减少序列化操作,就能提升并发访问的能力。

设计并发数据结构意味着,多个线程可以并发的访问这个数据结构,线程可对这个数据结构做相同或不同的操作,并且每一个线程都能在自己域中看到该数据结构。多线程环境下,无数据丢失和损毁,所有的数据需要维持原样,且无条件竞争。这样的数据结构,称之为“线程安全”的数据结构

1 设计建议

如何保证数据结构是线程安全的:

  • 确保无线程能够看到修改数据结构的“不变量”时的状态。
  • 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤。
  • 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态。
  • 将死锁的概率降到最低。使用数据结构时,需要限制锁的范围,避免嵌套锁的存在。

作为一个数据结构的设计者,设计数据结构时考虑以下问题:

  • 在锁范围中进行操作,是否允许在锁外执行?
  • 数据结构中不同的区域能是否被不同的互斥量所保护?
  • 所有操作都需要同级互斥量保护吗?
  • 能否对数据结构进行简单的修改,以增加并发访问的概率,且不影响操作语义?

2 基于锁的并发数据结构

基于锁的并发数据结构,需要确保访问线程持有锁的时间最短,对于只有一个互斥量的数据结构来说,这十分困难。需要保证数据不被锁之外的操作所访问,并且还要保证不会在结构上产生条件竞争。使用多个互斥量来保护数据结构中不同的区域时,问题会暴露的更加明显,当操作需要获取多个互斥锁时,就有可能产生死锁。所以在设计时,使用多个互斥量时需要格外小心。

2.1 线程安全栈——互斥锁

  1. /*
  2. 基于锁的线程安全栈
  3. */
  4. #include <exception>
  5. //自定义异常
  6. struct empty_stack : std::exception
  7. {
  8. const char *what() const throw();
  9. };
  10. template <typename T>
  11. class threadsafe_stack
  12. {
  13. private:
  14. std::stack<T> data; //封装的标准库stack
  15. //互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据
  16. mutable std::mutex m; //mutable表示可变的,const的反义词
  17. public:
  18. threadsafe_stack() {}
  19. //拷贝构造函数
  20. threadsafe_stack(const threadsafe_stack &other)
  21. {
  22. std::lock_guard<std::mutex> lock(other.m);
  23. data = other.data;
  24. }
  25. //不允许自动生成赋值构造函数,标记为delete
  26. threadsafe_stack &operator=(const threadsafe_stack &) = delete;
  27. void push(T new_value)
  28. {
  29. std::lock_guard<std::mutex> lock(m);
  30. data.push(std::move(new_value)); // 1
  31. }
  32. std::shared_ptr<T> pop()
  33. {
  34. std::lock_guard<std::mutex> lock(m);
  35. if (data.empty())
  36. throw empty_stack(); // 2
  37. std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top()))); // 3
  38. data.pop(); // 4
  39. return res;
  40. }
  41. void pop(T &value)
  42. {
  43. std::lock_guard<std::mutex> lock(m);
  44. if (data.empty())
  45. throw empty_stack();
  46. value = std::move(data.top()); // 5
  47. data.pop(); // 6
  48. }
  49. bool empty() const
  50. {
  51. std::lock_guard<std::mutex> lock(m);
  52. return data.empty();
  53. }
  54. };

所有成员函数都使用std::lock_guard<>保护数据,所以栈成员函数才是“线程安全”的。当然,构造与析构函数不是“线程安全”的,不过也没关系,因为构造与析构只有一次。即使在多线程下,并发调用的成员函数也是安全的(因为使用锁)。
序列化线程会隐性的限制程序性能,这就是栈争议声最大的地方:当一个线程在等待锁时,就会无所事事。这样的实现会限制栈的实现方式,线程等待时会浪费宝贵的资源去检查数据,或要求用户编写外部等待和提示的代码。

2.2 线程安全队列

锁和条件变量

  1. /*
  2. 基于锁和条件变量的线程安全队列
  3. */
  4. template <typename T>
  5. class threadsafe_queue
  6. {
  7. private:
  8. //互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据
  9. mutable std::mutex mut;
  10. std::queue<std::shared_ptr<T>> data_queue; //使用std::shared_ptr实例,而不是直接使用数据的值
  11. std::condition_variable data_cond; //条件变量
  12. public:
  13. threadsafe_queue() {}
  14. void push(T new_value)
  15. {
  16. std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value))); // 5
  17. std::lock_guard<std::mutex> lk(mut);
  18. data_queue.push(data);
  19. data_cond.notify_one(); //通知一个等待条件变量的线程
  20. }
  21. void wait_and_pop(T &value)
  22. {
  23. std::unique_lock<std::mutex> lk(mut);
  24. //使用条件变量等待条件达成,比循环调用empty()要好很多
  25. data_cond.wait(lk, [this] { return !data_queue.empty(); });
  26. value = std::move(*data_queue.front()); // 1
  27. data_queue.pop();
  28. }
  29. bool try_pop(T &value)
  30. {
  31. std::lock_guard<std::mutex> lk(mut);
  32. if (data_queue.empty())
  33. return false;
  34. value = std::move(*data_queue.front()); // 2
  35. data_queue.pop();
  36. return true;
  37. }
  38. std::shared_ptr<T> wait_and_pop()
  39. {
  40. std::unique_lock<std::mutex> lk(mut);
  41. data_cond.wait(lk, [this] { return !data_queue.empty(); });
  42. std::shared_ptr<T> res = data_queue.front(); // 3
  43. data_queue.pop();
  44. return res;
  45. }
  46. std::shared_ptr<T> try_pop()
  47. {
  48. std::lock_guard<std::mutex> lk(mut);
  49. if (data_queue.empty())
  50. return std::shared_ptr<T>();
  51. std::shared_ptr<T> res = data_queue.front(); // 4
  52. data_queue.pop();
  53. return res;
  54. }
  55. bool empty() const
  56. {
  57. std::lock_guard<std::mutex> lk(mut);
  58. return data_queue.empty();
  59. }
  60. };

std::shared_ptr<>持有数据的好处:新实例分配结束时,不会被锁在push()⑤当中。因为内存分配需要在性能上付出很高的代价(性能较低),所以使用std::shared_ptr<>对队列的性能有很大的提升,其减少了互斥量持有的时间,允许其他线程在分配内存的同时,对队列进行其他的操作。
如同栈的例子,使用互斥量保护整个数据结构,不过会限制队列对并发的支持;虽然,多线程可能被队列中的各种成员函数所阻塞,但仍有一个线程能在任意时间内进行工作。不过,这种限制是因为在实现中使用了std::queue<>;因为使用标准容器的原因,数据处于保护中。要对数据结构实现进行具体的控制,需要提供更多细粒度锁,来完成更高级的并发。

细粒度锁和条件变量

为了使用细粒度锁,需要看一下队列内部的组成结构,并且将一个互斥量与每个数据相关联

  1. //基于细粒度锁和条件变量的自定义线程安全队列
  2. #ifndef THREADSAFE_QUEUE2
  3. #define THREADSAFE_QUEUE2
  4. #include <memory>
  5. #include <mutex>
  6. #include <condition_variable>
  7. //类定义
  8. template <typename T>
  9. class threadsafe_queue
  10. {
  11. private:
  12. struct node
  13. {
  14. std::shared_ptr<T> data;
  15. std::unique_ptr<node> next;
  16. };
  17. std::mutex head_mutex;
  18. std::unique_ptr<node> head;
  19. std::mutex tail_mutex;
  20. node *tail;
  21. std::condition_variable data_cond;
  22. private:
  23. node *get_tail()
  24. {
  25. std::lock_guard<std::mutex> tail_lock(tail_mutex);
  26. return tail;
  27. }
  28. std::unique_ptr<node> pop_head() // 1
  29. {
  30. std::unique_ptr<node> old_head = std::move(head);
  31. head = std::move(old_head->next);
  32. return old_head;
  33. }
  34. std::unique_lock<std::mutex> wait_for_data() // 2
  35. {
  36. std::unique_lock<std::mutex> head_lock(head_mutex);
  37. data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
  38. return std::move(head_lock); // 3
  39. }
  40. std::unique_ptr<node> wait_pop_head()
  41. {
  42. std::unique_lock<std::mutex> head_lock(wait_for_data()); // 4
  43. return pop_head();
  44. }
  45. std::unique_ptr<node> wait_pop_head(T &value)
  46. {
  47. std::unique_lock<std::mutex> head_lock(wait_for_data()); // 5
  48. value = std::move(*head->data);
  49. return pop_head();
  50. }
  51. std::unique_ptr<node> try_pop_head()
  52. {
  53. std::lock_guard<std::mutex> head_lock(head_mutex);
  54. if (head.get() == get_tail())
  55. {
  56. return std::unique_ptr<node>();
  57. }
  58. return pop_head();
  59. }
  60. std::unique_ptr<node> try_pop_head(T &value)
  61. {
  62. std::lock_guard<std::mutex> head_lock(head_mutex);
  63. if (head.get() == get_tail())
  64. {
  65. return std::unique_ptr<node>();
  66. }
  67. value = std::move(*head->data);
  68. return pop_head();
  69. }
  70. public:
  71. threadsafe_queue() : head(new node), tail(head.get()) {}
  72. threadsafe_queue(const threadsafe_queue &other) = delete;
  73. threadsafe_queue &operator=(const threadsafe_queue &other) = delete;
  74. void push(T new_value)
  75. {
  76. std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
  77. std::unique_ptr<node> p(new node);
  78. {
  79. std::lock_guard<std::mutex> tail_lock(tail_mutex);
  80. tail->data = new_data;
  81. node *const new_tail = p.get();
  82. tail->next = std::move(p);
  83. tail = new_tail;
  84. }
  85. data_cond.notify_one();
  86. }
  87. std::shared_ptr<T> wait_and_pop()
  88. {
  89. std::unique_ptr<node> const old_head = wait_pop_head();
  90. return old_head->data;
  91. }
  92. void wait_and_pop(T &value)
  93. {
  94. std::unique_ptr<node> const old_head = wait_pop_head(value);
  95. }
  96. std::shared_ptr<T> try_pop()
  97. {
  98. std::unique_ptr<node> old_head = try_pop_head();
  99. return old_head ? old_head->data : std::shared_ptr<T>();
  100. }
  101. bool try_pop(T &value)
  102. {
  103. std::unique_ptr<node> const old_head = try_pop_head(value);
  104. return old_head;
  105. }
  106. bool empty()
  107. {
  108. std::lock_guard<std::mutex> head_lock(head_mutex);
  109. return (head.get() == get_tail());
  110. }
  111. };
  112. #endif

3 更复杂的数据结构

3.1 线程安全的链表

用细粒度锁最初的想法,是为了让链表每个节点都拥有一个互斥量。当链表很长时,就会使用有很多的互斥量!
这样的好处是对于链表中每一个独立的部分,都能实现真实的并发:其真正感兴趣的是对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放。

  1. /*线程安全链表——支持迭代器
  2. */
  3. #ifndef THREADSAFE_LIST
  4. #define THREADSAFE_LIST
  5. #include <memory>
  6. #include <mutex>
  7. template <typename T>
  8. class threadsafe_list
  9. {
  10. struct node // 1
  11. {
  12. std::mutex m;
  13. std::shared_ptr<T> data;
  14. std::unique_ptr<node> next;
  15. node() : // 2
  16. next()
  17. {
  18. }
  19. node(T const &value) : // 3
  20. data(std::make_shared<T>(value))
  21. {
  22. }
  23. };
  24. node head;
  25. public:
  26. threadsafe_list()
  27. {
  28. }
  29. ~threadsafe_list()
  30. {
  31. remove_if([](node const &) { return true; });
  32. }
  33. threadsafe_list(threadsafe_list const &other) = delete;
  34. threadsafe_list &operator=(threadsafe_list const &other) = delete;
  35. void push_front(T const &value)
  36. {
  37. std::unique_ptr<node> new_node(new node(value)); // 4
  38. std::lock_guard<std::mutex> lk(head.m);
  39. new_node->next = std::move(head.next); // 5
  40. head.next = std::move(new_node); // 6
  41. }
  42. template <typename Function>
  43. void for_each(Function f) // 7
  44. {
  45. node *current = &head;
  46. std::unique_lock<std::mutex> lk(head.m); // 8
  47. while (node *const next = current->next.get()) // 9
  48. {
  49. std::unique_lock<std::mutex> next_lk(next->m); // 10
  50. lk.unlock(); // 11
  51. f(*next->data); // 12
  52. current = next;
  53. lk = std::move(next_lk); // 13
  54. }
  55. }
  56. template <typename Predicate>
  57. std::shared_ptr<T> find_first_if(Predicate p) // 14
  58. {
  59. node *current = &head;
  60. std::unique_lock<std::mutex> lk(head.m);
  61. while (node *const next = current->next.get())
  62. {
  63. std::unique_lock<std::mutex> next_lk(next->m);
  64. lk.unlock();
  65. if (p(*next->data)) // 15
  66. {
  67. return next->data; // 16
  68. }
  69. current = next;
  70. lk = std::move(next_lk);
  71. }
  72. return std::shared_ptr<T>();
  73. }
  74. template <typename Predicate>
  75. void remove_if(Predicate p) // 17
  76. {
  77. node *current = &head;
  78. std::unique_lock<std::mutex> lk(head.m);
  79. while (node *const next = current->next.get())
  80. {
  81. std::unique_lock<std::mutex> next_lk(next->m);
  82. if (p(*next->data)) // 18
  83. {
  84. std::unique_ptr<node> old_next = std::move(current->next);
  85. current->next = std::move(next->next);
  86. next_lk.unlock();
  87. } // 20
  88. else
  89. {
  90. lk.unlock(); // 21
  91. current = next;
  92. lk = std::move(next_lk);
  93. }
  94. }
  95. }
  96. };
  97. #endif

3.2 线程安全map

假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。
对于键值的操作,需要有合适的函数。C++标准库提供std::hash<>模板。

  1. /*
  2. 线程安全hashmap
  3. 在构造函数中指定构造桶的数量。默认为19个(哈希表在有质数个桶时,工作效率最高)。
  4. 每一个桶都会被一个std::shared_mutex①实例锁保护,来允许并发读取,或对每一个桶,只有一个线程对其进行修改。
  5. */
  6. #ifndef THREADSAFE_MAP
  7. #define THREADSAFE_MAP
  8. #include <shared_mutex>
  9. #include <list>
  10. #include <vector>
  11. #include <map>
  12. #include <algorithm>
  13. template <typename Key, typename Value, typename Hash = std::hash<Key>>
  14. class threadsafe_lookup_table
  15. {
  16. private:
  17. class bucket_type
  18. {
  19. private:
  20. typedef std::pair<Key, Value> bucket_value;
  21. typedef std::list<bucket_value> bucket_data;
  22. typedef typename bucket_data::iterator bucket_iterator;
  23. bucket_data data;
  24. mutable std::shared_mutex mutex; // 1
  25. bucket_iterator find_entry_for(Key const &key) const // 2
  26. {
  27. return std::find_if(data.begin(), data.end(),
  28. [&](bucket_value const &item) { return item.first == key; });
  29. }
  30. public:
  31. Value value_for(Key const &key, Value const &default_value) const
  32. {
  33. std::shared_lock<std::shared_mutex> lock(mutex); // 3
  34. bucket_iterator const found_entry = find_entry_for(key);
  35. return (found_entry == data.end()) ? default_value : found_entry->second;
  36. }
  37. void add_or_update_mapping(Key const &key, Value const &value)
  38. {
  39. std::unique_lock<std::shared_mutex> lock(mutex); // 4
  40. bucket_iterator const found_entry = find_entry_for(key);
  41. if (found_entry == data.end())
  42. {
  43. data.push_back(bucket_value(key, value));
  44. }
  45. else
  46. {
  47. found_entry->second = value;
  48. }
  49. }
  50. void remove_mapping(Key const &key)
  51. {
  52. std::unique_lock<std::shared_mutex> lock(mutex); // 5
  53. bucket_iterator const found_entry = find_entry_for(key);
  54. if (found_entry != data.end())
  55. {
  56. data.erase(found_entry);
  57. }
  58. }
  59. };
  60. std::vector<std::unique_ptr<bucket_type>> buckets; // 6
  61. Hash hasher;
  62. bucket_type &get_bucket(Key const &key) const // 7
  63. {
  64. std::size_t const bucket_index = hasher(key) % buckets.size();
  65. return *buckets[bucket_index];
  66. }
  67. public:
  68. typedef Key key_type;
  69. typedef Value mapped_type;
  70. typedef Hash hash_type;
  71. threadsafe_lookup_table(unsigned num_buckets = 19, Hash const &hasher_ = Hash())
  72. : buckets(num_buckets), hasher(hasher_)
  73. {
  74. for (unsigned i = 0; i < num_buckets; ++i)
  75. {
  76. buckets[i].reset(new bucket_type);
  77. }
  78. }
  79. threadsafe_lookup_table(threadsafe_lookup_table const &other) = delete;
  80. threadsafe_lookup_table &operator=(
  81. threadsafe_lookup_table const &other) = delete;
  82. Value value_for(Key const &key,
  83. Value const &default_value = Value()) const
  84. {
  85. return get_bucket(key).value_for(key, default_value); // 8
  86. }
  87. void add_or_update_mapping(Key const &key, Value const &value)
  88. {
  89. get_bucket(key).add_or_update_mapping(key, value); // 9
  90. }
  91. void remove_mapping(Key const &key)
  92. {
  93. get_bucket(key).remove_mapping(key); // 10
  94. }
  95. std::map<Key, Value> get_map() const
  96. {
  97. std::vector<std::unique_lock<std::shared_mutex>> locks;
  98. for (unsigned i = 0; i < buckets.size(); ++i)
  99. {
  100. locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i].mutex));
  101. }
  102. std::map<Key, Value> res;
  103. for (unsigned i = 0; i < buckets.size(); ++i)
  104. {
  105. for (auto it = buckets[i].data.begin();
  106. it != buckets[i].data.end();
  107. ++it)
  108. {
  109. res.insert(*it);
  110. }
  111. }
  112. return res;
  113. }
  114. };
  115. #endif