Disruptor 是一款高性能的有界内存队列,目前应用非常广泛,Log4j2、Spring Messaging、HBase、Storm 都用到了 Disruptor,那 Disruptor 的性能为什么这么高呢?Disruptor 项目团队曾经写过一篇论文,详细解释了其原因,可以总结为如下:
- 内存分配更加合理,使用 RingBuffer(环形队列)数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;并且队列对象可以循环利用,避免频繁 GC。
- 能够避免伪共享,提升缓存利用率。
- 采用无锁算法,避免频繁加锁、解锁的性能消耗。
- 支持批量消费,消费者可以无锁方式消费多个消息。
使用示例
在详细介绍这些知识前,我们先来看看 Disruptor 是如何使用的。相较而言,Disruptor 的使用比 Java SDK 提供 BlockingQueue 要复杂一些,但总体思路是一致的,其大致情况如下:
1)创建 EventFactory
在 Disruptor 中,生产者生产的对象(也就是消费者消费的对象)称为 Event,使用 Disruptor 必须要自定义 Event,而 EventFactory 用于在构建 Disruptor 时初始化 Event 实例,这些实例用于填充 RingBuffer,因为 RingBuffer 的设计为避免频繁的垃圾回收,在 RingBuffer 中存储的 Event 会预先创建,后续这些 Event 在发送的时候可以进行复用。
@Gatter
@Setter
public class DataEvent {
private long value;
}
public class DataEventFactory implements EventFactory<DataEvent> {
@Override
public DataEvent newInstance() {
return new DataEvent();
}
}
2)生产者
public class Producer {
private final RingBuffer<DataEvent> ringBuffer;
public Producer(RingBuffer<DataEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishDataEvent() {
// 消费方获取可消费的序号
long sequence = ringBuffer.next();
try {
// 复用DataEvent
DataEvent dataEvent = ringBuffer.get(sequence);
dataEvent.setValue(new Random().nextInt(100));
} finally {
ringBuffer.publish(sequence);
}
}
}
3)消费者
public class Consumer implements WorkHandler<DataEvent> {
@Override
public void onEvent(DataEvent event) throws Exception {
System.out.println(Thread.currentThread().getName() + ": DataEvent value is " + event.getValue());
}
}
4)启动 Disruptor
public static void main(String[] args) {
// 指定RingBuffer大小,必须是2的N次方
int bufferSize = 1024;
// 构建Disruptor
Disruptor<DataEvent> disruptor = new Disruptor<> (new DataEventFactory(),
bufferSize,
DaemonThreadFactory.INSTANCE,
// 多生产者
ProducerType.MULTI,
// 阻塞等待策略
new BlockingWaitStrategy());
// 注册事件处理器,有四个消费者,会创建四个线程
disruptor.handleEventsWithWorkerPool(new Consumer(), new Consumer(), new Consumer(), new Consumer());
// 启动Disruptor
disruptor.start();
// 获取RingBuffer
RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();
// 生产Event
Producer producer = new Producer(ringBuffer);
for (long i = 0; i < 10; i++) {
// 生产者生产消息
producer.publishDataEvent();
}
}
当有新数据在 Disruptor 框架的环形缓冲区中产生时,消费者如何知道这些新产生的数据呢?或者说,消费者如何监控缓冲区中的信息呢?为此,Disruptor 框架提供了几种策略,这些策略由 WaitStrategy 接口进行封装,主要有以下几种实现:
BlockingWaitStrategy:这是默认的策略,类似于 BlockingQueue,它们都使用锁和条件(Condition)进行数据的监控和线程的唤醒。因为涉及线程的切换,BlockingWaitStrategy 策略最节省 CPU,但是在高并发下它是性能表现最糟糕的一种等待策略。
SleepingWaitStrategy:这个策略对 CPU 的消耗与 BlockingWaitStrategy 类似。它会在循环中不断等待数据。它会先进行自旋等待,如果不成功,则使用 Thread.yield 方法方法让出 CPU,并最终使用 LockSupport.parkNanos(1) 进行线程休眠,以确保不占用太多的 CPU 数据。因此,该策略对于数据处理可能会产生比较高的平均延时。适合对延时要求不是特别高的场合,典型场景是异步日志。
YieldingWaitStrategy:这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区的变化,在循环内部,它会使用 Thread.yield 方法让出 CPU 给别的线程执行时间。如果你需要一个高性能的系统,并且对延时有较为严格的要求,则可以考虑这种策略。但最好有多于消费者线程数量的逻辑 CPU 数量。
BusySpinWaitStrategy:这个是最疯狂的等待策略了。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化。因此,它会吃掉所有的 CPU 资源。只有对延迟非常苛刻的场合可以考虑使用。
TimeoutBlockingWaitStrategy:带超时时间的阻塞等待。
PhasedBackoffWaitStrategy:组合策略,可以指定上述策略,然后退化为 yield 自旋。
RingBuffer 如何提升性能
Java SDK 中 ArrayBlockingQueue 使用数组作为底层的数据存储,而 Disruptor 是使用 RingBuffer 作为数据存储。RingBuffer 是一个环形队列,本质上也是数组,对于一般的队列,势必要提供队列同步 head 和尾部 tail 两个指针用于出队和入队,这样无疑增加了线程协作的复杂度。但环形队列只需要提供一个当前位置 cursor,利用这个指针既可以进行入队操作,也可以进行出队操作。
由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队时序列就加 1),Disruptor 框架要求我们必须将数组的大小设置为 2 的整数次方。这样通过 sequence &(queuesize-1) 就能立即定位到实际的元素位置 index,这比取余(%)操作要快得多。因为如果 queuesize 是 2 的整数次幂,则这个数字的二进制表示必然是 10、100、1000、10000 等形式。因此,queuesize-1 的二进制则是一个全 1 的数字。因此它可以将 sequence 限定在 queueSize-1 的范围内,并且不会有任何一位是浪费的。
另外 Disruptor 在 RingBuffer 的基础上还做了很多优化,其中一项优化就是和内存分配有关的。
在介绍这项优化之前,你需要先了解一下程序的局部性原理。简单来讲,程序的局部性原理指的是在一段时间内程序的执行会限定在一个局部范围内。这里的“局部性”可以从两个方面来理解,一个是时间局部性,另一个是空间局部性。时间局部性指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;如果某条数据被访问,不久之后这条数据很可能再次被访问。而空间局部性是指某块内存一旦被访问,不久之后这块内存附近的内存也很可能被访问。
CPU 的缓存就利用了程序的局部性原理:CPU 从内存中加载数据 X 时,会将数据 X 缓存在高速缓存 Cache 中,实际上 CPU 缓存 X 的同时,还缓存了 X 周围的数据,因为根据程序具备局部性原理,X 周围的数据也很有可能被访问。从另外一个角度来看,如果程序能够很好地体现出局部性原理,也就能更好地利用 CPU 的缓存,从而提升程序的性能。Disruptor 在设计 RingBuffer 时就充分考虑了这个问题。
下面我们就对比着 ArrayBlockingQueue 来分析一下。生产者线程向 ArrayBlockingQueue 增加一个元素,每次增加元素 E 之前,都需要创建一个对象 E,如下图所示,ArrayBlockingQueue 内部有 6 个元素,这 6 个元素都是由生产者线程创建的,由于创建这些元素的时间基本上是离散的,所以这些元素的内存地址大概率也不是连续的(虽然数组是连续内存地址,但数组只保存对象的引用地址,这些对象本身不是连续的)。
下面再看 Disruptor 是如何处理的。Disruptor 内部的 RingBuffer 也是用数组实现的,但是这个数组中的所有元素在初始化时是一次性全部创建的,所以这些元素的内存地址大概率是连续的,相关代码如下:
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
// entries[]就是RingBuffer内部的数组,eventFactory就是前面示例代码中传入的LongEvent::new
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
Disruptor 内部 RingBuffer 的结构可以简化成下图,那问题来了,数组中所有元素内存地址连续能提升性能吗?能!为什么呢?因为消费者线程在消费的时候,是遵循空间局部性原理的,消费完第 1 个元素,很快就会消费第 2 个元素;当消费第 1 个元素 E1 的时候,CPU 会把内存中 E1 后面的数据也加载进 Cache,如果 E1 和 E2 在内存中的地址是连续的,那么 E2 也就会被加载进 Cache 中,然后当消费第 2 个元素的时候,由于 E2 已经在 Cache 中了,所以就不需要从内存中加载了,这样就能大大提升性能。
此外,在 Disruptor 中,生产者线程通过 publishEvent() 发布 Event 的时候,并不是创建一个新的 Event,而是通过 event.set() 方法修改 Event,也就是说 RingBuffer 创建的 Event 是可以循环利用的,这样还能避免频繁创建、删除 Event 导致的频繁 GC 问题。
如何避免“伪共享”
高效利用 Cache 能够大大提升性能,所以要努力构建能够高效利用 Cache 的内存结构。而从另一个角度看,努力避免不能高效利用 Cache 的内存结构也同样重要。有一种叫做“伪共享(False sharing)”的内存布局就会使 Cache 失效,那什么是“伪共享”呢?
伪共享和 CPU 内部的 Cache 有关,Cache 内部是按照缓存行(Cache Line)管理的,缓存行的大小通常是 64 个字节;CPU 从内存中加载数据 X,会同时加载 X 后面(64-size(X))个字节的数据。下面的示例代码出自 Java SDK 的 ArrayBlockingQueue,其内部维护了 4 个成员变量,分别是队列数组 items、出队索引 takeIndex、入队索引 putIndex 以及队列中的元素总数 count。
/** 队列数组 */
final Object[] items;
/** 出队索引 */
int takeIndex;
/** 入队索引 */
int putIndex;
/** 队列中元素总数 */
int count;
当 CPU 从内存中加载 takeIndex 时,会同时将 putIndex 以及 count 都加载进 Cache。下图是某个时刻 CPU 中 Cache 的状况,为了简化,缓存行中我们仅列出了 takeIndex 和 putIndex。
假设线程 A 运行在 CPU-1 上,执行入队操作,入队操作会修改 putIndex,而修改 putIndex 会导致其所在的所有核上的缓存行均失效;此时假设运行在 CPU-2 上的线程执行出队操作,出队操作需要读取 takeIndex,由于 takeIndex 所在的缓存行已经失效,所以 CPU-2 必须从内存中重新读取。入队操作本不会修改 takeIndex,但由于 takeIndex 和 putIndex 共享的是一个缓存行,就导致出队操作不能很好地利用 Cache,这就是伪共享。简单来讲,伪共享指的是由于共享缓存行导致缓存无效的场景。
ArrayBlockingQueue 的入队和出队操作是用锁来保证互斥的,所以入队和出队不会同时发生。如果允许入队和出队同时发生,那就会导致线程 A 和线程 B 争用同一个缓存行,这样也会导致性能问题。所以为了更好地利用缓存,我们必须避免伪共享,那如何避免呢?
方案很简单,每个变量独占一个缓存行、不共享缓存行就可以了,具体技术是缓存行填充。比如想让 takeIndex 独占一个缓存行,可以在 takeIndex 的前后各填充 56 个字节,这样就一定能保证 takeIndex 独占一个缓存行。在 Disruptor 中很多对象,如 RingBuffer、RingBuffer 内部的数组都用到了缓存行填充来避免伪共享。
// 前:填充56字节
class LhsPadding{
long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding{
volatile long value;
}
// 后:填充56字节
class RhsPadding extends Value {
long p9, p10, p11, p12, p13, p14, p15;
}
前后各 56 个字节保证了目标字段总是独占一个 cache line,不受周围变量缓存失效的影响。
Disruptor 中的无锁算法
ArrayBlockingQueue 是利用管程实现的,中规中矩,生产、消费操作都需要加锁,实现起来简单,但性能并不十分理想。Disruptor 采用的是无锁算法,很复杂,但是核心无非是生产和消费两个操作。Disruptor 中最复杂的是入队操作,所以我们重点来看看入队操作是如何实现的。
对于入队操作,最关键的要求是不能覆盖没有消费的元素;对于出队操作,最关键的要求是不能读取没有写入的元素,所以 Disruptor 中也一定会维护类似出队索引和入队索引这样两个关键变量。Disruptor 中的 RingBuffer 维护了入队索引,但并没有维护出队索引,这是因为在 Disruptor 中多个消费者可以同时消费,每个消费者都会有一个出队索引,所以 RingBuffer 的出队索引是所有消费者里面最小的那一个。
下面是 Disruptor 生产者入队操作的核心代码,看上去很复杂,其实逻辑很简单:如果没有足够的空余位置,就出让 CPU 使用权,然后重新计算;反之则用 CAS 设置入队索引。
// 生产者获取n个写入位置
do {
// cursor类似于入队索引,指的是上次生产到这里
current = cursor.get();
// 目标是再生产n个
next = current + n;
// 减掉一个循环
long wrapPoint = next - bufferSize;
// 获取上一次的最小消费位置
long cachedGatingSequence = gatingSequenceCache.get();
// 没有足够的空余位置
if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current) {
// 重新计算所有消费者里面的最小值位置
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1);
continue;
}
// 从新设置上一次的最小消费位置
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)) {
// 获取写入位置成功,跳出循环
break;
}
} while (true);
总结
Disruptor 在优化并发性能方面可谓是做到了极致,优化的思路大体是两个方面,一个是利用无锁算法避免锁的争用,另外一个则是将硬件(CPU)的性能发挥到极致。
由于伪共享问题如此重要,所以在 Java 8 中,提供了避免伪共享的注解 @sun.misc.Contended,通过这个注解就能轻松避免伪共享(默认仅开放给 JDK 内部的类,如果要在应用程序中使用则需要设置 JVM 参数 -XX:-RestrictContended)。不过避免伪共享是以牺牲内存为代价的,JDK 8 在 @Contended 注释的字段前后各自填充大小为缓存行宽度的 2 倍的填充空间,所以我们应该只在确认伪共享问题的情况下才考虑使用填充。