数据结构的选择是解决方案的重要组成部分,当然并行程序也不例外。如果一种数据结构可以被多个线程所访问,要不就是绝对不变(其值不会发生变化,并且不需同步),要不程序就要对数据结构进行正确的设计,以确保其能在多线程环境下能够(正确的)同步。
- 一种选择是使用独立的互斥量,其可以锁住需要保护的数据。这种方式是线程轮流访问被保护的数据。这是对数据进行串行的访问,而非并发。
- 另一种选择是设计一种能够并发访问的数据结构。减少保护区域,减少序列化操作,就能提升并发访问的能力。
设计并发数据结构意味着,多个线程可以并发的访问这个数据结构,线程可对这个数据结构做相同或不同的操作,并且每一个线程都能在自己域中看到该数据结构。多线程环境下,无数据丢失和损毁,所有的数据需要维持原样,且无条件竞争。这样的数据结构,称之为“线程安全”的数据结构。
1 设计建议
如何保证数据结构是线程安全的:
- 确保无线程能够看到修改数据结构的“不变量”时的状态。
- 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤。
- 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态。
- 将死锁的概率降到最低。使用数据结构时,需要限制锁的范围,避免嵌套锁的存在。
作为一个数据结构的设计者,设计数据结构时考虑以下问题:
- 在锁范围中进行操作,是否允许在锁外执行?
- 数据结构中不同的区域能是否被不同的互斥量所保护?
- 所有操作都需要同级互斥量保护吗?
- 能否对数据结构进行简单的修改,以增加并发访问的概率,且不影响操作语义?
2 基于锁的并发数据结构
基于锁的并发数据结构,需要确保访问线程持有锁的时间最短,对于只有一个互斥量的数据结构来说,这十分困难。需要保证数据不被锁之外的操作所访问,并且还要保证不会在结构上产生条件竞争。使用多个互斥量来保护数据结构中不同的区域时,问题会暴露的更加明显,当操作需要获取多个互斥锁时,就有可能产生死锁。所以在设计时,使用多个互斥量时需要格外小心。
2.1 线程安全栈——互斥锁
/*基于锁的线程安全栈*/#include <exception>//自定义异常struct empty_stack : std::exception{const char *what() const throw();};template <typename T>class threadsafe_stack{private:std::stack<T> data; //封装的标准库stack//互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据mutable std::mutex m; //mutable表示可变的,const的反义词public:threadsafe_stack() {}//拷贝构造函数threadsafe_stack(const threadsafe_stack &other){std::lock_guard<std::mutex> lock(other.m);data = other.data;}//不允许自动生成赋值构造函数,标记为deletethreadsafe_stack &operator=(const threadsafe_stack &) = delete;void push(T new_value){std::lock_guard<std::mutex> lock(m);data.push(std::move(new_value)); // 1}std::shared_ptr<T> pop(){std::lock_guard<std::mutex> lock(m);if (data.empty())throw empty_stack(); // 2std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top()))); // 3data.pop(); // 4return res;}void pop(T &value){std::lock_guard<std::mutex> lock(m);if (data.empty())throw empty_stack();value = std::move(data.top()); // 5data.pop(); // 6}bool empty() const{std::lock_guard<std::mutex> lock(m);return data.empty();}};
所有成员函数都使用std::lock_guard<>保护数据,所以栈成员函数才是“线程安全”的。当然,构造与析构函数不是“线程安全”的,不过也没关系,因为构造与析构只有一次。即使在多线程下,并发调用的成员函数也是安全的(因为使用锁)。
序列化线程会隐性的限制程序性能,这就是栈争议声最大的地方:当一个线程在等待锁时,就会无所事事。这样的实现会限制栈的实现方式,线程等待时会浪费宝贵的资源去检查数据,或要求用户编写外部等待和提示的代码。
2.2 线程安全队列
锁和条件变量
/*基于锁和条件变量的线程安全队列*/template <typename T>class threadsafe_queue{private://互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据mutable std::mutex mut;std::queue<std::shared_ptr<T>> data_queue; //使用std::shared_ptr实例,而不是直接使用数据的值std::condition_variable data_cond; //条件变量public:threadsafe_queue() {}void push(T new_value){std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value))); // 5std::lock_guard<std::mutex> lk(mut);data_queue.push(data);data_cond.notify_one(); //通知一个等待条件变量的线程}void wait_and_pop(T &value){std::unique_lock<std::mutex> lk(mut);//使用条件变量等待条件达成,比循环调用empty()要好很多data_cond.wait(lk, [this] { return !data_queue.empty(); });value = std::move(*data_queue.front()); // 1data_queue.pop();}bool try_pop(T &value){std::lock_guard<std::mutex> lk(mut);if (data_queue.empty())return false;value = std::move(*data_queue.front()); // 2data_queue.pop();return true;}std::shared_ptr<T> wait_and_pop(){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk, [this] { return !data_queue.empty(); });std::shared_ptr<T> res = data_queue.front(); // 3data_queue.pop();return res;}std::shared_ptr<T> try_pop(){std::lock_guard<std::mutex> lk(mut);if (data_queue.empty())return std::shared_ptr<T>();std::shared_ptr<T> res = data_queue.front(); // 4data_queue.pop();return res;}bool empty() const{std::lock_guard<std::mutex> lk(mut);return data_queue.empty();}};
std::shared_ptr<>持有数据的好处:新实例分配结束时,不会被锁在push()⑤当中。因为内存分配需要在性能上付出很高的代价(性能较低),所以使用std::shared_ptr<>对队列的性能有很大的提升,其减少了互斥量持有的时间,允许其他线程在分配内存的同时,对队列进行其他的操作。
如同栈的例子,使用互斥量保护整个数据结构,不过会限制队列对并发的支持;虽然,多线程可能被队列中的各种成员函数所阻塞,但仍有一个线程能在任意时间内进行工作。不过,这种限制是因为在实现中使用了std::queue<>;因为使用标准容器的原因,数据处于保护中。要对数据结构实现进行具体的控制,需要提供更多细粒度锁,来完成更高级的并发。
细粒度锁和条件变量
为了使用细粒度锁,需要看一下队列内部的组成结构,并且将一个互斥量与每个数据相关联。
//基于细粒度锁和条件变量的自定义线程安全队列#ifndef THREADSAFE_QUEUE2#define THREADSAFE_QUEUE2#include <memory>#include <mutex>#include <condition_variable>//类定义template <typename T>class threadsafe_queue{private:struct node{std::shared_ptr<T> data;std::unique_ptr<node> next;};std::mutex head_mutex;std::unique_ptr<node> head;std::mutex tail_mutex;node *tail;std::condition_variable data_cond;private:node *get_tail(){std::lock_guard<std::mutex> tail_lock(tail_mutex);return tail;}std::unique_ptr<node> pop_head() // 1{std::unique_ptr<node> old_head = std::move(head);head = std::move(old_head->next);return old_head;}std::unique_lock<std::mutex> wait_for_data() // 2{std::unique_lock<std::mutex> head_lock(head_mutex);data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });return std::move(head_lock); // 3}std::unique_ptr<node> wait_pop_head(){std::unique_lock<std::mutex> head_lock(wait_for_data()); // 4return pop_head();}std::unique_ptr<node> wait_pop_head(T &value){std::unique_lock<std::mutex> head_lock(wait_for_data()); // 5value = std::move(*head->data);return pop_head();}std::unique_ptr<node> try_pop_head(){std::lock_guard<std::mutex> head_lock(head_mutex);if (head.get() == get_tail()){return std::unique_ptr<node>();}return pop_head();}std::unique_ptr<node> try_pop_head(T &value){std::lock_guard<std::mutex> head_lock(head_mutex);if (head.get() == get_tail()){return std::unique_ptr<node>();}value = std::move(*head->data);return pop_head();}public:threadsafe_queue() : head(new node), tail(head.get()) {}threadsafe_queue(const threadsafe_queue &other) = delete;threadsafe_queue &operator=(const threadsafe_queue &other) = delete;void push(T new_value){std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));std::unique_ptr<node> p(new node);{std::lock_guard<std::mutex> tail_lock(tail_mutex);tail->data = new_data;node *const new_tail = p.get();tail->next = std::move(p);tail = new_tail;}data_cond.notify_one();}std::shared_ptr<T> wait_and_pop(){std::unique_ptr<node> const old_head = wait_pop_head();return old_head->data;}void wait_and_pop(T &value){std::unique_ptr<node> const old_head = wait_pop_head(value);}std::shared_ptr<T> try_pop(){std::unique_ptr<node> old_head = try_pop_head();return old_head ? old_head->data : std::shared_ptr<T>();}bool try_pop(T &value){std::unique_ptr<node> const old_head = try_pop_head(value);return old_head;}bool empty(){std::lock_guard<std::mutex> head_lock(head_mutex);return (head.get() == get_tail());}};#endif
3 更复杂的数据结构
3.1 线程安全的链表
用细粒度锁最初的想法,是为了让链表每个节点都拥有一个互斥量。当链表很长时,就会使用有很多的互斥量!
这样的好处是对于链表中每一个独立的部分,都能实现真实的并发:其真正感兴趣的是对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放。
/*线程安全链表——支持迭代器*/#ifndef THREADSAFE_LIST#define THREADSAFE_LIST#include <memory>#include <mutex>template <typename T>class threadsafe_list{struct node // 1{std::mutex m;std::shared_ptr<T> data;std::unique_ptr<node> next;node() : // 2next(){}node(T const &value) : // 3data(std::make_shared<T>(value)){}};node head;public:threadsafe_list(){}~threadsafe_list(){remove_if([](node const &) { return true; });}threadsafe_list(threadsafe_list const &other) = delete;threadsafe_list &operator=(threadsafe_list const &other) = delete;void push_front(T const &value){std::unique_ptr<node> new_node(new node(value)); // 4std::lock_guard<std::mutex> lk(head.m);new_node->next = std::move(head.next); // 5head.next = std::move(new_node); // 6}template <typename Function>void for_each(Function f) // 7{node *current = &head;std::unique_lock<std::mutex> lk(head.m); // 8while (node *const next = current->next.get()) // 9{std::unique_lock<std::mutex> next_lk(next->m); // 10lk.unlock(); // 11f(*next->data); // 12current = next;lk = std::move(next_lk); // 13}}template <typename Predicate>std::shared_ptr<T> find_first_if(Predicate p) // 14{node *current = &head;std::unique_lock<std::mutex> lk(head.m);while (node *const next = current->next.get()){std::unique_lock<std::mutex> next_lk(next->m);lk.unlock();if (p(*next->data)) // 15{return next->data; // 16}current = next;lk = std::move(next_lk);}return std::shared_ptr<T>();}template <typename Predicate>void remove_if(Predicate p) // 17{node *current = &head;std::unique_lock<std::mutex> lk(head.m);while (node *const next = current->next.get()){std::unique_lock<std::mutex> next_lk(next->m);if (p(*next->data)) // 18{std::unique_ptr<node> old_next = std::move(current->next);current->next = std::move(next->next);next_lk.unlock();} // 20else{lk.unlock(); // 21current = next;lk = std::move(next_lk);}}}};#endif
3.2 线程安全map
假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。
对于键值的操作,需要有合适的函数。C++标准库提供std::hash<>模板。
/*线程安全hashmap在构造函数中指定构造桶的数量。默认为19个(哈希表在有质数个桶时,工作效率最高)。每一个桶都会被一个std::shared_mutex①实例锁保护,来允许并发读取,或对每一个桶,只有一个线程对其进行修改。*/#ifndef THREADSAFE_MAP#define THREADSAFE_MAP#include <shared_mutex>#include <list>#include <vector>#include <map>#include <algorithm>template <typename Key, typename Value, typename Hash = std::hash<Key>>class threadsafe_lookup_table{private:class bucket_type{private:typedef std::pair<Key, Value> bucket_value;typedef std::list<bucket_value> bucket_data;typedef typename bucket_data::iterator bucket_iterator;bucket_data data;mutable std::shared_mutex mutex; // 1bucket_iterator find_entry_for(Key const &key) const // 2{return std::find_if(data.begin(), data.end(),[&](bucket_value const &item) { return item.first == key; });}public:Value value_for(Key const &key, Value const &default_value) const{std::shared_lock<std::shared_mutex> lock(mutex); // 3bucket_iterator const found_entry = find_entry_for(key);return (found_entry == data.end()) ? default_value : found_entry->second;}void add_or_update_mapping(Key const &key, Value const &value){std::unique_lock<std::shared_mutex> lock(mutex); // 4bucket_iterator const found_entry = find_entry_for(key);if (found_entry == data.end()){data.push_back(bucket_value(key, value));}else{found_entry->second = value;}}void remove_mapping(Key const &key){std::unique_lock<std::shared_mutex> lock(mutex); // 5bucket_iterator const found_entry = find_entry_for(key);if (found_entry != data.end()){data.erase(found_entry);}}};std::vector<std::unique_ptr<bucket_type>> buckets; // 6Hash hasher;bucket_type &get_bucket(Key const &key) const // 7{std::size_t const bucket_index = hasher(key) % buckets.size();return *buckets[bucket_index];}public:typedef Key key_type;typedef Value mapped_type;typedef Hash hash_type;threadsafe_lookup_table(unsigned num_buckets = 19, Hash const &hasher_ = Hash()): buckets(num_buckets), hasher(hasher_){for (unsigned i = 0; i < num_buckets; ++i){buckets[i].reset(new bucket_type);}}threadsafe_lookup_table(threadsafe_lookup_table const &other) = delete;threadsafe_lookup_table &operator=(threadsafe_lookup_table const &other) = delete;Value value_for(Key const &key,Value const &default_value = Value()) const{return get_bucket(key).value_for(key, default_value); // 8}void add_or_update_mapping(Key const &key, Value const &value){get_bucket(key).add_or_update_mapping(key, value); // 9}void remove_mapping(Key const &key){get_bucket(key).remove_mapping(key); // 10}std::map<Key, Value> get_map() const{std::vector<std::unique_lock<std::shared_mutex>> locks;for (unsigned i = 0; i < buckets.size(); ++i){locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i].mutex));}std::map<Key, Value> res;for (unsigned i = 0; i < buckets.size(); ++i){for (auto it = buckets[i].data.begin();it != buckets[i].data.end();++it){res.insert(*it);}}return res;}};#endif
