1 无锁并发结构分类
无锁结构依赖于原子操作和内存序,以确保多线程以正确的顺序访问数据结构。
1.1无锁数据结构
无锁结构就意味着线程可以并发的访问数据结构,线程不能做相同的操作;
- 一个无锁队列可能允许一个线程进行压入数据,另一个线程弹出数据,当有两个线程同时尝试添加元素时,这个数据结构将被破坏。
- 当其中一个访问线程被调度器中途挂起时,其他线程必须能够继续完成自己的工作,而无需等待挂起线程。
具有“比较/交换”操作的数据结构,通常在“比较/交换”操作实现中都有一个循环。使用“比较/交换”操作的原因:当有其他线程同时对指定的数据进行修改时,代码将尝试恢复数据。当其他线程被挂起时,“比较/交换”操作执行成功,这样的代码就是无锁的。当执行失败时,就需要一个自旋锁,且这个结构就是“无阻塞-有锁”的结构。
1.2 无等待数据结构
无锁算法中的循环会让一些线程处于“饥饿”状态。如有线程在“错误”时间执行,那么第一个线程将会不停的尝试所要完成的操作(其他程序继续执行)。由于可能会和其他线程的行为冲突,从而算法会进行了若干次尝试,因此无法做到无等待。“无锁-无等待”数据结构的出现,就为了避免这种问题。
无等待数据结构:对compare_exchange_weak或compare_exchange_strong操作进行循环,并且循环次数没有上限。操作系统对线程进行进行管理,有些线程的循环次数非常多,有些线程的循环次数就非常少。因此,这些操作是无等待的。
正确实现一个无等待结构十分困难的,要保证每个线程都能在有限的步骤内完成操作,你必须确保每次执行的操作都是一次性的,并且当前线程中的操作不会影响其他线程的操作。这就会让算法中所使用到的操作变的相当复杂。
1.3 优缺点
优点:
- 将并发最大化,减少单个线程的等待时间
- 线程在无锁数据结构上执行操作,在执行到一半终止时,数据结构上的数据没有丢失(除了线程本身的数据),其他线程依旧可以正常执行
- 没有任何锁(有可能存在活锁),死锁问题不会困扰无锁数据结构。(活锁的产生是两个线程同时尝试修改数据结构,但每个线程所做的修改操作都会让另一个线程重启,所以两个线程就会陷入循环,多次的尝试完成自己的操作)
缺点:
“无锁-无等待”代码的缺点:可能会将整体性能拉低。
- 原子操作的无锁代码要慢于无原子操作的代码,原子操作就相当于无锁数据结构中的锁。
- 硬件必须通过同一个原子变量对线程间的数据进行同步,可能会形成一个明显的性能瓶颈。
提交代码之前,无论是基于锁的数据结构,还是无锁的数据结构,对性能的检查很重要(最坏的等待时间,平均等待时间,整体执行时间或者其他指标)。
2 设计示例
2.1 内存泄漏的无锁线程安全栈
首先利用原子类型和“比较-交换”构造一个线程安全栈,当然下面代码有内存泄漏,pop执行后,node并没有释放,我们会在后面处理内存问题,当前只关注push和pop的实现:
//无锁的线程安全栈
#include <atomic>
#include <memory>
template <typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data; //使用共享指针保存数据
node *next;
node(const T &data) : data(std::make_shared(data)), next(nullptr) {}
};
std::atomic<node *> head; //原子操作类
public:
/*
push操作:
1. 创建一个新的节点node
2. 让node->next指向head->next
3. head->next指向node
*/
void push(T const &data)
{
node *const new_node = new node(data); // 1
// 2 node->next先指向head,这里会有多线程等待,如果别的线程在操作head的话
new_node->next = head.load(); //使用默认内存序
/*
3 使用“比较-交换”更新head:
判断当前head是不是new_node->next(用来判断是否其他线程修改了head):
返回false,则当前head不是new_node->next,则更新new_node->next为新的head,然后继续循环直到返回true
返回true,则表示没有其他线程修改head,则替换head和new_node位置(new_node变成head之后的第一个了),跳出循环
*/
while (!head.compare_exchange_weak(new_node->next, new_node))
;
}
/*
pop操作:
1. 获取head。
2. 读取head->next指向的结点node。
3. 设置head->next指向node->next。
4. 通过node返回携带的数据data。
5. 删除node节点。
*/
std::shared_ptr<T> pop()
{
node *old_head = head.load(); //先获得head,多线程等待,防止别的线程正在获取head
/*
使用“比较-交换”更新head:
判断当前head是否等于old_head,即head有无发生变化
返回false,即head发生变化,old_head更新为新的head,继续循环
返回true,即head没有变化,则替换head和head->next的位置,跳出循环
*/
while (old_head && !head.compare_exchange_weak(old_head, old_head->next)) //old_head不能为空指针
;
return old_head ? old_head->data : std::shared_ptr<T>();//返回数据或空指针
//注意,old_head并没有从内存中删除,有内存泄漏
}
};
2.2 pop时的内存泄漏问题
当要释放一个节点时,需要确认其他线程没有持有这个节点。另一方面,栈同时处理多线程对pop()的调用时,就知道节点什么时候被删除。实际上就需要写一个专用的垃圾收集器。这听起来相当棘手,不过也没多糟糕:需要检查节点,并且检查哪些节点可被pop()访问。
这里改写一下pop函数:
threads_in_pop原子变量用来记录有多少线程试图弹出栈中的元素。调用pop()函数时,计数器加一;调用try_reclaim()时,计数器减一;这个函数被节点调用时,说明节点已被删除。因为暂时不需要将节点删除,可以通过swap()函数来删除节点上的数据(而非只是拷贝指针),当不再需要这些数据的时候,这些数据会自动删除,而不是持续存在着(因为还有对未删除节点的引用)
std::shared_ptr<T> pop()
{
++threads_in_pop; //新线程调用pop,增加计数1
node *old_head = head.load(); //先获得head,多线程等待,防止别的线程正在获取head
/*
使用“比较-交换”更新head:
判断当前head是否等于old_head,即head有无发生变化
返回false,即head发生变化,old_head更新为新的head,继续循环
返回true,即head没有变化,则替换head和head->next的位置,跳出循环
*/
while (old_head && !head.compare_exchange_weak(old_head, old_head->next)) //old_head不能为空指针
;
//此时的old_head为第一个有数据的节点,真正的head是它的next
std::shared_ptr<T> retVal;
if (old_head)
{
retVal.swap(old_head->data); //交换内容,将old_head的data传递给retVal
}
try_reclaim(old_head); //回收old_head节点,计数器减一
return retVal;
}
下面是等待删除列表的删除操作函数:
private:
std::atomic<node *> to_be_deleted; //将要删除的节点列表,暂存,因为不能立刻删除,其他线程可能还在用
//释放node
static void delete_nodes(node *nodes)
{
while (nodes)
{
node *next = nodes->next;
delete nodes;
nodes = next;
}
}
void chain_pending_nodes(node *nodes)
{
node *last = nodes;
while (node *const next = last->next) // 9 让next指针指向链表的末尾
{
last = next;
}
chain_pending_nodes(nodes, last);
}
void chain_pending_nodes(node *first, node *last)
{
last->next = to_be_deleted;
while (!to_be_deleted.compare_exchange_weak(last->next, first)) // 用循环来保证last->next的正确性
;
}
void chain_pending_node(node *n)
{
chain_pending_nodes(n, n);
}
void try_reclaim(node *old_head)
{
if (threads_in_pop == 1) // 当计数为1,表示只有当前线程正在调用pop函数,可以删除节点
{
// 返回to_be_deleted的第一个值值给左边,同时to_be_deleted赋值nullptr,因为to_be_deleted中可能有多个等待中的节点
node *nodes_to_delete = to_be_deleted.exchange(nullptr);
if (!--threads_in_pop) // 计数器减一,再次确认是否只有一个线程调用pop()
{
delete_nodes(nodes_to_delete); // 循环删除等待列表中的节点
}
else if (nodes_to_delete)
{
chain_pending_nodes(nodes_to_delete); // 6
}
delete old_head; // 删除新传入的节点
}
else
{
chain_pending_node(old_head); // 计数不是1,将节点添加到等待删除列表
--threads_in_pop; //计数减一
}
}
低负荷状态下,上面的代码可以正常运行。高负荷的情况,因为其他线程在初始化之后都能进入pop(),to_ne_deleted链表将会无界的增加,并且会再次泄露。
2.3 基于引用计数和松散内存序的栈
//基于引用计数和松散原子操作的无锁栈
//通过使用更多的松散内存序操作,不影响并发性的同时提高性能
#include <atomic>
#include <memory>
template <typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node *ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const &data_) : data(std::make_shared<T>(data_)),
internal_count(0)
{
}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr &old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
} while (!head.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
public:
~lock_free_stack()
{
while (pop())
;
}
void push(T const &data)
{
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
;
}
std::shared_ptr<T> pop()
{
counted_node_ptr old_head =
head.load(std::memory_order_relaxed);
for (;;)
{
increase_head_count(old_head);
node *const ptr = old_head.ptr;
if (!ptr)
{
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count - 2;
if (ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) == -count_increase)
{
delete ptr;
}
return res;
}
else if (ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed) == 1)
{
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};
3 设计无锁数据结构的指导建议
3.1 使用std::memory_order_seq_cst的原型
std::memory_order_seq_cst
比起其他内存序要简单的多,因为所有操作都将其作为总序。默认原子操作都是从std::memory_order_seq_cst
开始,只有当基本操作正常工作的时候,才放宽内存序的选择。这种情况下,使用其他内存序就是优化(早期可以不用这样做)。
通常,当了解整套代码对数据结构的操作后,才能决定是否要放宽内存序的选择。所以,尝试放宽选择,可能会让你轻松一些。测试通过后,工作的代码可能会很复杂(不过,不能完全保证内存序正确)。除非你有一个算法检查器,可以系统的测试,线程能看到的所有可能性组合,这样就能保证指定内存序的正确性(这样的测试的确存在),仅是执行实现代码是远远不够的。
3.2 对无锁内存的回收策略
与有锁代码最大的区别就是内存管理。当其他线程对节点进行访问的时候,节点无法被任一线程删除;为避免过多的内存使用,还是希望这个节点能在删除的时候尽快删除。下面三种技术来保证内存可以被安全的回收:
- 等待无线程对数据结构进行访问时,删除所有等待删除的对象(例子2.2)。
- 使用风险指针来标识正在被线程访问的对象(IBM专利)。
- 对对象进行引用计数,当没有线程对对象进行引用时将其删除(例子2.3)。
所有例子的想法都是使用一种方式去跟踪指定对象上的线程访问数量,当没有现成对对象进行引用时将对象删除。当然,无锁数据结构中,还有很多方式可以用来回收内存,例如:理想情况下使用一个垃圾收集器,比起算法来说更容易实现一些。只需要让回收器知道,当节点没被引用的时候回收节点,就可以了。
3.3 小心ABA问题
其他替代方案就是循环使用节点,只在数据结构被销毁时才将节点完全删除。因为节点能被复用,这样就不会有非法的内存,所以就能避免未定义行为的发生。这种方式的缺点,会产生“ABA问题”。
基于“比较/交换”的算法中要格外小心“ABA问题”。其流程是:
- 线程1读取原子变量x,并且发现其值是A。
- 线程1对这个值进行一些操作,比如,解引用(当其是一个指针的时候),或做查询,或其他操作。
- 操作系统将线程1挂起。
- 其他线程对x执行一些操作,并且将其值改为B。
- 另一个线程对A相关的数据进行修改(线程1持有),让其不再合法。可能会在释放指针指向的内存时,代码产生剧烈的反应(大问题);或者只是修改了相关值而已(小问题)。
- 再来一个线程将x的值改回为A。如果A是一个指针,那么其可能指向一个新的对象,只是与旧对象共享同一个地址而已。
- 线程1继续运行,并且对x执行“比较/交换”操作,将A进行对比。这里,“比较/交换”成功(因为其值还是A),不过这是一个错误的A(the wrong A value)。从第2步中读取的数据不再合法,但是线程1无法言明这个问题,并且之后的操作将会损坏数据结构。
“ABA问题”在使用释放链表和循环使用节点的算法中很是普遍,而将节点返回给分配器,则不会引起这个问题。