背景

本文的目标很清晰,首先就是要实现一个hashmap,提供Insert/Delete/Search等基本功能。非open-addressing的实现,可能会有多个值被hash到同一个bucket,因此需要支持bucket扩容。

接着,这个数据结构的实现需要是True Lock-free的。其实不存在什么True Lock-free,只是现在很多号称Lock free的实现其实压根不是Lock-free。并不是说没有用到mutex库就是Lock-free,这里wiki[1]有详细说明。但是通俗来说,你可以通过这种方法来分析自己的系统是不是Lock-free:假设程序运行的操作系统的任务调度存在大bug,每一个执行线程都可能在任意一步被调度走,然后此后永远也得不到执行。此时,是否会影响其他线程继续完成自己的工作。如果不会影响,这才能算Lock-free。

最后,这是一个c++的实现,没有语言自带的gc功能。因此相比于其他编程语言,在无锁的环境下内存的安全释放也是一个需要仔细设计的问题,需要兼顾安全和性能。

Lock-free链表

我们这里使用有有序单向链表来保存hash到同一个bucket的数据,因此首先需要实现一个无锁的链表。无锁链表在插入和删除的时候需要通过cas来实现:
image.png
除此之外还需要处理一点比较棘手的情况,就是当搜索或者插入遇到被删除的节点,可能无法知道该节点已经被删除,导致在被删除的节点上继续搜索或者插入,从而出错。因此在删除某个节点之前,需要对被删除节点的next指针做上标记。当遍历到一个被打上删除标记的节点后,需要从头开始,进行搜索。做标记的方法很简单,用next指针的最低bit位表示即可。

这里比较核心的代码是寻找一个节点pair的过程。节点pair包含{prev, curr}这两个指针,其中curr指向第一个大于或等于要查找节点的节点指针,而prev->next == curr。因此在插入节点的时候,流程就简化为首先找到节点pair,接着将节点插入{prev, curr}之间。搜索流程则简化为同样首先找到节点pair,接着判断curr是否等于要查找的节点。而寻找节点pair的代码如下:

  1. bool FindPair(LinkedNode* const begin_node,
  2. Node& node,
  3. FindPair& pair) {
  4. assert(begin_node != nullptr);
  5. assert(!IS_DEL(begin_node->next_));
  6. pair.prev = begin_node;
  7. pair.curr = begin_node->next_;
  8. while (pair.curr != nullptr) {
  9. LinkedNode* p = pair.curr->next_;
  10. if (IS_DEL(p)) {
  11. // 判断寻找路径上的该节点是否已被删除,如果已删除
  12. // 则协助释放该节点。并且返回false,让外层从头开始执行
  13. if (ATOMIC_CAS(&pair.prev->next_,
  14. pair.curr,
  15. (LinkedNode*)MARK_NOT_DEL(p))) {
  16. RS.Retire(pair.curr);
  17. FETCH_AND_ADD(&size_, -1);
  18. }
  19. return false;
  20. }
  21. if (pair.curr->data_ < node) {
  22. // 未找到大于或等于node的节点pair,继续查找
  23. pair.prev = pair.curr;
  24. pair.curr = p;
  25. } else {
  26. // 查找成功,退出循环
  27. break;
  28. }
  29. }
  30. return true;
  31. }

因此,Search的实现如下:

  1. bool SearchFrom(LinkedNode* begin_node,
  2. Node& node, Node& ret) {
  3. RetireGuard guard;
  4. FindPair pair;
  5. // 查找节点pair
  6. while (!FindPair(begin_node, node, pair)) {}
  7. // 因为pair中curr是第一个大于或者等于要查找的node的节点,
  8. // 因此判断node是否存在
  9. if (pair.curr == nullptr || pair.curr->data_ != node) {
  10. return false;
  11. } else {
  12. ret = pair.curr->data_;
  13. return true;
  14. }
  15. }

插入的实现如下:

  1. void InsertFrom(LinkedNode* begin_node, Node& node) {
  2. RetireGuard guard;
  3. LinkedNode* newnode = new LinkedNode();
  4. while (true) {
  5. FindPair pair;
  6. // 查找节点pair
  7. while (!FindPair(begin_node, node, pair)) {}
  8. newnode->data_ = node;
  9. newnode->next_ = pair.curr;
  10. // 将新增节点插入到pair之间
  11. if (ATOMIC_CAS(&pair.prev->next_, pair.curr, newnode)) {
  12. FETCH_AND_ADD(&size_, 1);
  13. return;
  14. }
  15. // 如果插入失败,则重新开始查找节点pair
  16. }
  17. }

删除的实现如下:

  1. bool DeleteFrom(LinkedNode* begin_node, Node& node) {
  2. RetireGuard guard;
  3. while (true) {
  4. FindPair pair;
  5. while (!FindLarger(begin_node, node, pair)) {}
  6. if (pair.curr == nullptr || pair.curr->data_ != node) {
  7. // 要删除的节点不存在
  8. return false;
  9. }
  10. LinkedNode* n_node = ATOMIC_LOAD(&pair.curr->next_);
  11. // 这里采用lazy删除的方法,只对要删除的节点打上标记,不做实际删除
  12. if (!ATOMIC_CAS(&pair.curr->next_,
  13. (LinkedNode*)MARK_NOT_DEL(n_node),
  14. (LinkedNode*)MARK_DEL(n_node))) {
  15. continue;
  16. } else {
  17. // lazy delete
  18. return true;
  19. }
  20. }
  21. }

这里采用lazy删除的方式,有一个很重要的考虑就是为了实现lock-free。网络有很多类似实现链表的方法,都宣称自己是lock-free的,做法是在删除流程中先给节点打上删除标记,然后再将节点移除链表。但是这个做法本身就是非lock-free的。因为如果在给节点打上删除标记后,该线程再也得不到调度执行,则该节点将永远无法被移除,导致后续其他线程的动作都会被阻塞,从而违反了lock-free的定义。因此,本实现中会在遍历节点的时候协助删除被打上删除标记的节点。

内存管理

c++需要自动管理内存的回收,但是在无锁的环境,会出现一种情况,当线程在准备释放被删除的节点时,可能该节点正在被其他线程访问,因此内存释放的时机应该是确保所有的线程都不会访问到该节点。我们这里设计了一种类似Quiescent State Based Reclamation的回收机制,方法在于寻找一个quiescent state,此后就可以安全地回收节点。

全局维护一个global version,定期提升global version,每一个线程会存有一个线程局部的thread version,在开始执行之前,设置thread version = global version,在结束后,设置thread version = UINT64_MAX。负责垃圾回收的线程在做清理前会获取本次的quiescent version = global version,并且global version++,当所有线程的最小值min{thread version…} > quiescent version后,说明此时已进入quiescent state,可以安全地释放此前删除的节点内存。

如下图所示,采用双buffer的方式来保存待删除的节点,定期切换删除节点写入的buffer,并且推高global version++,等到quiescent state后,就删除对应的buffer。
image.png
内存回收线程核心代码如下:

  1. while (!stop_) {
  2. usleep(100000);
  3. // 切换buffer index
  4. const uint8_t cur_index = ATOMIC_LOAD(&cur_index_);
  5. const uint8_t next_index = cur_index ^ 1;
  6. ATOMIC_STORE(&cur_index_, next_index);
  7. // 推大global_version
  8. const uint64_t target = FETCH_AND_ADD(&global_version, 1);
  9. // 等待进入quiescent state
  10. while (true) {
  11. uint64_t min_version = UINT64_MAX;
  12. for (int i = 0; i < MAX_THREAD_CNT; i++) {
  13. min_version = std::min(min_version, thread_version[i]);
  14. }
  15. // 进入quiescent state
  16. if (min_version > target) {
  17. // 此时buffer中的节点将不会被访问到,因此可以安全回收
  18. RetireNode* rn = ptr_head_[cur_index].next;
  19. while (rn != nullptr) {
  20. free(rn);
  21. }
  22. break;
  23. }
  24. } // while (true)
  25. }

工作线程开始执行之前,只需获取RetireGuard即可:

  1. class RetireGuard {
  2. public:
  3. RetireGuard() {
  4. tid_ = FETCH_AND_ADD(&thread_id, 1);
  5. thread_version[tid_ & (MAX_THREAD_CNT - 1)] = ATOMIC_LOAD(&global_version);
  6. }
  7. ~RetireGuard() {
  8. thread_version[tid_ & (MAX_THREAD_CNT -1)] = UINT64_MAX;
  9. }
  10. private:
  11. uint32_t tid_;
  12. };
  13. // 工作线程
  14. {
  15. RetireGuard guard;
  16. // 开始执行
  17. // do something
  18. }

采用epoch based的内存回收管理方式,相对于很多编程语言所采用的引用计数的方式来说,能更好地减少cache-line冲突。

ABA问题

无锁实现有一个棘手的问题是ABA的问题,会导致cas的时候,已经被释放的节点以一种幽灵的方式回归,导致cas失效。但是由于Quiescent State Based Reclamation的设计,使得每一次操作都不会遇到一个已经被释放的指针被再次使用的情况,因此本实现中的特殊内存管理释放方式,可以很好地解决ABA问题[2]。

LockFree HashMap

之前已经实现了一个lockfree的单向有序链表,基于此可以实现lockfree的hashmap。在hashmap的实现中,常见的做法是用数组来保存bucket,每个bucket对应一个lockfree的单向有序链表。但是这个实现无法很好地解决lockfree地扩容bucket的问题,因此我们要换一个思路。在上一个思路中,我们采用的是链表跟着bucket走的思路,但是我们可以换一个思路,让bucket跟着链表走,把扩容bucket的动作转换为插入节点的动作:
image.png
需要对插入节点的hash做一下reverse bit反转变换:

  1. bucket_id = reverse(hash_code) % bucket_cnt

在插入节点时,判断当前hashmap的阈值threshold是否达到预设扩容的要求,如果达到要求,执行扩容,扩容的动作就是原子地将bucket_size*2。

  1. void ExpandMap() {
  2. uint64_t size = list_.Size();
  3. uint64_t bucket_size = bucket_size_;
  4. if ((double)size / (double)bucket_size > threshold_
  5. && 2 * bucket_size < MAX_BUCKET_SIZE) {
  6. // 扩容
  7. ATOMIC_CAS(&bucket_size_, bucket_size, 2 * bucket_size);
  8. }
  9. }

但是在扩容后,并不会直接生成对应的bucket,而是采用延迟生成的方法,等地GetBucket的时候再执行生成:

  1. LinkedNode* GetBucket(const uint32_t bucket_id, const uint32_t mark) {
  2. LinkedNode* bn = ATOMIC_LOAD(&bucket_[bucket_id]);
  3. while (nullptr == bn) {
  4. // bucket未生成,开始生成bucket
  5. LinkedNode* nn = new LinkedNode();
  6. nn->data_.hash_code = reverse(bucket_id);
  7. if (ATOMIC_CAS(&bucket_[bucket_id], nullptr, nn)) {
  8. // 生成bucket,并插入链表
  9. list_.InsertLinkedNode(nn);
  10. } else {
  11. delete nn;
  12. }
  13. bn = ATOMIC_LOAD(&bucket_[bucket_id]);
  14. }
  15. const uint32_t next_mark = ((mark + 1) >> 1) - 1;
  16. // 如果bucket对应的节点未插入链表成功,则往前查找前面的bucket
  17. return (bn->next_ == nullptr && bucket_id > 1)
  18. ? GetBucket(bucket_id & next_mark, next_mark) : bn;
  19. }

在这个设计中,之所以能够做到延迟生成bucket,当发现当前bucket未就绪后,可以向前追溯bucket,是因为首先数据都存在同一个链表上,接着是因为特定的hash规则,后续扩容的bucket可以均匀地分布在先前的bucket之间。
image.png
基于此,可以实现一个真正的Lockfree的HashMap。后续我会给出和目前现有的开源库的性能对比。

[1] https://en.wikipedia.org/wiki/Non-blocking_algorithm
[2] https://en.wikipedia.org/wiki/ABA_problem