数据结构的选择是解决方案的重要组成部分,当然并行程序也不例外。如果一种数据结构可以被多个线程所访问,要不就是绝对不变(其值不会发生变化,并且不需同步),要不程序就要对数据结构进行正确的设计,以确保其能在多线程环境下能够(正确的)同步。
- 一种选择是使用独立的互斥量,其可以锁住需要保护的数据。这种方式是线程轮流访问被保护的数据。这是对数据进行串行的访问,而非并发。
- 另一种选择是设计一种能够并发访问的数据结构。减少保护区域,减少序列化操作,就能提升并发访问的能力。
设计并发数据结构意味着,多个线程可以并发的访问这个数据结构,线程可对这个数据结构做相同或不同的操作,并且每一个线程都能在自己域中看到该数据结构。多线程环境下,无数据丢失和损毁,所有的数据需要维持原样,且无条件竞争。这样的数据结构,称之为“线程安全”的数据结构。
1 设计建议
如何保证数据结构是线程安全的:
- 确保无线程能够看到修改数据结构的“不变量”时的状态。
- 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤。
- 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态。
- 将死锁的概率降到最低。使用数据结构时,需要限制锁的范围,避免嵌套锁的存在。
作为一个数据结构的设计者,设计数据结构时考虑以下问题:
- 在锁范围中进行操作,是否允许在锁外执行?
- 数据结构中不同的区域能是否被不同的互斥量所保护?
- 所有操作都需要同级互斥量保护吗?
- 能否对数据结构进行简单的修改,以增加并发访问的概率,且不影响操作语义?
2 基于锁的并发数据结构
基于锁的并发数据结构,需要确保访问线程持有锁的时间最短,对于只有一个互斥量的数据结构来说,这十分困难。需要保证数据不被锁之外的操作所访问,并且还要保证不会在结构上产生条件竞争。使用多个互斥量来保护数据结构中不同的区域时,问题会暴露的更加明显,当操作需要获取多个互斥锁时,就有可能产生死锁。所以在设计时,使用多个互斥量时需要格外小心。
2.1 线程安全栈——互斥锁
/*
基于锁的线程安全栈
*/
#include <exception>
//自定义异常
struct empty_stack : std::exception
{
const char *what() const throw();
};
template <typename T>
class threadsafe_stack
{
private:
std::stack<T> data; //封装的标准库stack
//互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据
mutable std::mutex m; //mutable表示可变的,const的反义词
public:
threadsafe_stack() {}
//拷贝构造函数
threadsafe_stack(const threadsafe_stack &other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
//不允许自动生成赋值构造函数,标记为delete
threadsafe_stack &operator=(const threadsafe_stack &) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // 1
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack(); // 2
std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top()))); // 3
data.pop(); // 4
return res;
}
void pop(T &value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack();
value = std::move(data.top()); // 5
data.pop(); // 6
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
所有成员函数都使用std::lock_guard<>
保护数据,所以栈成员函数才是“线程安全”的。当然,构造与析构函数不是“线程安全”的,不过也没关系,因为构造与析构只有一次。即使在多线程下,并发调用的成员函数也是安全的(因为使用锁)。
序列化线程会隐性的限制程序性能,这就是栈争议声最大的地方:当一个线程在等待锁时,就会无所事事。这样的实现会限制栈的实现方式,线程等待时会浪费宝贵的资源去检查数据,或要求用户编写外部等待和提示的代码。
2.2 线程安全队列
锁和条件变量
/*
基于锁和条件变量的线程安全队列
*/
template <typename T>
class threadsafe_queue
{
private:
//互斥量m用于保证线程安全,对每个成员函数进行加锁保护,保证在同一时间只有一个线程可以访问数据
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue; //使用std::shared_ptr实例,而不是直接使用数据的值
std::condition_variable data_cond; //条件变量
public:
threadsafe_queue() {}
void push(T new_value)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value))); // 5
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one(); //通知一个等待条件变量的线程
}
void wait_and_pop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
//使用条件变量等待条件达成,比循环调用empty()要好很多
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = std::move(*data_queue.front()); // 1
data_queue.pop();
}
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front()); // 2
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
std::shared_ptr<T> res = data_queue.front(); // 3
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = data_queue.front(); // 4
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
std::shared_ptr<>
持有数据的好处:新实例分配结束时,不会被锁在push()⑤当中。因为内存分配需要在性能上付出很高的代价(性能较低),所以使用std::shared_ptr<>
对队列的性能有很大的提升,其减少了互斥量持有的时间,允许其他线程在分配内存的同时,对队列进行其他的操作。
如同栈的例子,使用互斥量保护整个数据结构,不过会限制队列对并发的支持;虽然,多线程可能被队列中的各种成员函数所阻塞,但仍有一个线程能在任意时间内进行工作。不过,这种限制是因为在实现中使用了std::queue<>
;因为使用标准容器的原因,数据处于保护中。要对数据结构实现进行具体的控制,需要提供更多细粒度锁,来完成更高级的并发。
细粒度锁和条件变量
为了使用细粒度锁,需要看一下队列内部的组成结构,并且将一个互斥量与每个数据相关联。
//基于细粒度锁和条件变量的自定义线程安全队列
#ifndef THREADSAFE_QUEUE2
#define THREADSAFE_QUEUE2
#include <memory>
#include <mutex>
#include <condition_variable>
//类定义
template <typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node *tail;
std::condition_variable data_cond;
private:
node *get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head() // 1
{
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data() // 2
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
return std::move(head_lock); // 3
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data()); // 4
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T &value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data()); // 5
value = std::move(*head->data);
return pop_head();
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T &value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
value = std::move(*head->data);
return pop_head();
}
public:
threadsafe_queue() : head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue &other) = delete;
threadsafe_queue &operator=(const threadsafe_queue &other) = delete;
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
node *const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> const old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T &value)
{
std::unique_ptr<node> const old_head = wait_pop_head(value);
}
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
}
bool try_pop(T &value)
{
std::unique_ptr<node> const old_head = try_pop_head(value);
return old_head;
}
bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get() == get_tail());
}
};
#endif
3 更复杂的数据结构
3.1 线程安全的链表
用细粒度锁最初的想法,是为了让链表每个节点都拥有一个互斥量。当链表很长时,就会使用有很多的互斥量!
这样的好处是对于链表中每一个独立的部分,都能实现真实的并发:其真正感兴趣的是对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放。
/*线程安全链表——支持迭代器
*/
#ifndef THREADSAFE_LIST
#define THREADSAFE_LIST
#include <memory>
#include <mutex>
template <typename T>
class threadsafe_list
{
struct node // 1
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node() : // 2
next()
{
}
node(T const &value) : // 3
data(std::make_shared<T>(value))
{
}
};
node head;
public:
threadsafe_list()
{
}
~threadsafe_list()
{
remove_if([](node const &) { return true; });
}
threadsafe_list(threadsafe_list const &other) = delete;
threadsafe_list &operator=(threadsafe_list const &other) = delete;
void push_front(T const &value)
{
std::unique_ptr<node> new_node(new node(value)); // 4
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next); // 5
head.next = std::move(new_node); // 6
}
template <typename Function>
void for_each(Function f) // 7
{
node *current = &head;
std::unique_lock<std::mutex> lk(head.m); // 8
while (node *const next = current->next.get()) // 9
{
std::unique_lock<std::mutex> next_lk(next->m); // 10
lk.unlock(); // 11
f(*next->data); // 12
current = next;
lk = std::move(next_lk); // 13
}
}
template <typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p) // 14
{
node *current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node *const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data)) // 15
{
return next->data; // 16
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}
template <typename Predicate>
void remove_if(Predicate p) // 17
{
node *current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node *const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data)) // 18
{
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock();
} // 20
else
{
lk.unlock(); // 21
current = next;
lk = std::move(next_lk);
}
}
}
};
#endif
3.2 线程安全map
假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。
对于键值的操作,需要有合适的函数。C++标准库提供std::hash<>
模板。
/*
线程安全hashmap
在构造函数中指定构造桶的数量。默认为19个(哈希表在有质数个桶时,工作效率最高)。
每一个桶都会被一个std::shared_mutex①实例锁保护,来允许并发读取,或对每一个桶,只有一个线程对其进行修改。
*/
#ifndef THREADSAFE_MAP
#define THREADSAFE_MAP
#include <shared_mutex>
#include <list>
#include <vector>
#include <map>
#include <algorithm>
template <typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lookup_table
{
private:
class bucket_type
{
private:
typedef std::pair<Key, Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable std::shared_mutex mutex; // 1
bucket_iterator find_entry_for(Key const &key) const // 2
{
return std::find_if(data.begin(), data.end(),
[&](bucket_value const &item) { return item.first == key; });
}
public:
Value value_for(Key const &key, Value const &default_value) const
{
std::shared_lock<std::shared_mutex> lock(mutex); // 3
bucket_iterator const found_entry = find_entry_for(key);
return (found_entry == data.end()) ? default_value : found_entry->second;
}
void add_or_update_mapping(Key const &key, Value const &value)
{
std::unique_lock<std::shared_mutex> lock(mutex); // 4
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry == data.end())
{
data.push_back(bucket_value(key, value));
}
else
{
found_entry->second = value;
}
}
void remove_mapping(Key const &key)
{
std::unique_lock<std::shared_mutex> lock(mutex); // 5
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end())
{
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr<bucket_type>> buckets; // 6
Hash hasher;
bucket_type &get_bucket(Key const &key) const // 7
{
std::size_t const bucket_index = hasher(key) % buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(unsigned num_buckets = 19, Hash const &hasher_ = Hash())
: buckets(num_buckets), hasher(hasher_)
{
for (unsigned i = 0; i < num_buckets; ++i)
{
buckets[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(threadsafe_lookup_table const &other) = delete;
threadsafe_lookup_table &operator=(
threadsafe_lookup_table const &other) = delete;
Value value_for(Key const &key,
Value const &default_value = Value()) const
{
return get_bucket(key).value_for(key, default_value); // 8
}
void add_or_update_mapping(Key const &key, Value const &value)
{
get_bucket(key).add_or_update_mapping(key, value); // 9
}
void remove_mapping(Key const &key)
{
get_bucket(key).remove_mapping(key); // 10
}
std::map<Key, Value> get_map() const
{
std::vector<std::unique_lock<std::shared_mutex>> locks;
for (unsigned i = 0; i < buckets.size(); ++i)
{
locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i].mutex));
}
std::map<Key, Value> res;
for (unsigned i = 0; i < buckets.size(); ++i)
{
for (auto it = buckets[i].data.begin();
it != buckets[i].data.end();
++it)
{
res.insert(*it);
}
}
return res;
}
};
#endif