LongAdder 简单介绍

前面讲过,AtomicLong 通过 CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经很好了,但是 JDK 开发组并不满足于此。使用 AtomicLong 时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的 CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试 CAS 的操作,而这会白白浪费 CPU 资源。

因此 JDK 8 新增了一个原子性递增或者递减类 LongAdder 用来克服在高并发下使用 AtomicLong 的缺点。既然 AtomicLong 的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,是不是就解决了性能问题?是的,LongAdder 就是这个思路。下面通过图来理解两者设计的不同之处,如图 4-1 所示。

JDK 8 新增的原子操作类 LongAdder - 图1

如图 4-1 所示,使用 AtomicLong 时,是多个线程同时竞争同一个原子变量。

JDK 8 新增的原子操作类 LongAdder - 图2

如图 4-2 所示,使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。最后,在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。

LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下 Cell 数组是 null)和一个基值变量 base。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。

当一开始判断 Cell 数组是 null 并且并发线程较少时,所有的累加操作都是对 base 变量进行的。保持 Cell 数组的大小为 2 的N次方,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组里面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用,也就是解决伪共享问题。

对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用@sun.misc.Contended 注解对 Cell 类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。

LongAdder 代码分析

为了解决高并发下多线程对一个变量 CAS 争夺失败后进行自旋而造成的降低并发性能问题,LongAdder 在内部维护多个 Cell 元素(一个动态的 Cell 数组)来分担对单个变量进行争夺的开销。下面围绕以下话题从源码角度来分析 LongAdder 的实现:(1)LongAdder 的结构是怎样的?(2)当前线程应该访问 Cell 数组里面的哪一个 Cell 元素?(3)如何初始化 Cell 数组?(4)Cell 数组如何扩容?(5)线程访问分配的 Cell 元素有冲突后如何处理?(6)如何保证线程操作被分配的 Cell 元素的原子性?

首先看下 LongAdder 的类图结构,如图 4-3 所示。

JDK 8 新增的原子操作类 LongAdder - 图3

由该图可知,LongAdder 类继承自 Striped64 类,在 Striped64 内部维护着三个变量。LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell 元素中的 value 值的累加,base 是个基础值,默认为 0。cellsBusy 用来实现自旋锁,状态值只有 0 和 1,当创建 Cell 元素,扩容 Cell 数组或者初始化 Cell 数组时,使用 CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作。

下面看 Cell 的构造。

  1. @sun.misc.Contended static final class Cell {
  2. volatile long value;
  3. Cell(long x) { value = x; }
  4. final boolean cas(long cmp, long val) {
  5. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  6. }
  7. // Unsafe mechanics
  8. private static final sun.misc.Unsafe UNSAFE;
  9. private static final long valueOffset;
  10. static {
  11. try {
  12. UNSAFE = sun.misc.Unsafe.getUnsafe();
  13. Class<? > ak = Cell.class;
  14. valueOffset = UNSAFE.objectFieldOffset
  15. (ak.getDeclaredField("value"));
  16. } catch (Exception e) {
  17. throw new Error(e);
  18. }
  19. }
  20. }

可以看到,Cell 的构造很简单,其内部维护一个被声明为 volatile 的变量,这里声明为 volatile 是因为线程操作 value 变量时没有使用锁,为了保证变量的内存可见性这里将其声明为 volatile 的。另外 cas 函数通过 CAS 操作,保证了当前线程更新时被分配的 Cell 元素中 value 值的原子性。另外,Cell 类使用@sun.misc.Contended 修饰是为了避免伪共享。到这里我们回答了问题 1 和问题 6。

● long sum()返回当前的值,内部操作是累加所有 Cell 内部的 value 值后再累加 base。例如下面的代码,由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中可能有其他线程对 Cell 中的值进行了修改,也有可能对数组进行了扩容,所以 sum 返回的值并不是非常精确的,其返回值并不是一个调用 sum 方法时的原子快照值。

  1. public long sum() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. if (as ! = null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) ! = null)
  7. sum += a.value;
  8. }
  9. }
  10. return sum;
  11. }

● void reset()为重置操作,如下代码把 base 置为 0,如果 Cell 数组有元素,则元素值被重置为 0。

  1. public void reset() {
  2. Cell[] as = cells; Cell a;
  3. base = 0L;
  4. if (as ! = null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) ! = null)
  7. a.value = 0L;
  8. }
  9. }
  10. }

● long sumThenReset()是 sum 的改造版本,如下代码在使用 sum 累加对应的 Cell 值后,把当前 Cell 的值重置为 0,base 重置为 0。这样,当多线程调用该方法时会有问题,比如考虑第一个调用线程清空 Cell 的值,则后一个线程调用时累加的都是 0 值。

  1. public long sumThenReset() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. base = 0L;
  5. if (as ! = null) {
  6. for (int i = 0; i < as.length; ++i) {
  7. if ((a = as[i]) ! = null) {
  8. sum += a.value;
  9. a.value = 0L;
  10. }
  11. }
  12. }
  13. return sum;
  14. }

● long longValue()等价于 sum()。

下面主要看下 add 方法的实现,从这个方法里面就可以找到其他问题的答案。

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. if ((as = cells) ! = null || ! casBase(b = base, b + x)) {//(1)
  4. boolean uncontended = true;
  5. if (as == null || (m = as.length -1) < 0 ||//(2)
  6. (a = as[getProbe() & m]) == null ||//(3)
  7. !(uncontended = a.cas(v = a.value, v + x)))//(4)
  8. longAccumulate(x, null, uncontended); //(5)
  9. }
  10. }
  11. final boolean casBase(long cmp, long val) {
  12. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
  13. }

代码(1)首先看 cells 是否为 null,如果为 null 则当前在基础变量 base 上进行累加,这时候就类似 AtomicLong 的操作。

如果 cells 不为 null 或者线程执行代码(1)的 CAS 操作失败了,则会去执行代码(2)。代码(2)(3)决定当前线程应该访问 cells 数组里面的哪一个 Cell 元素,如果当前线程映射的元素存在则执行代码(4),使用 CAS 操作去更新分配的 Cell 元素的 value 值,如果当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码(5)。其实将代码(2)(3)(4)合起来看就是获取当前线程应该访问的 cells 数组的 Cell 元素,然后进行 CAS 更新操作,只是在获取期间如果有些条件不满足则会跳转到代码(5)执行。另外当前线程应该访问 cells 数组的哪一个 Cell 元素是通过 getProbe()& m 进行计算的,其中 m 是当前 cells 数组元素个数-1,getProbe()则用于获取当前线程中变量 threadLocalRandomProbe 的值,这个值一开始为 0,在代码(5)里面会对其进行初始化。并且当前线程通过分配的 Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性,到这里我们回答了问题 2 和问题 6。

下面重点研究 longAccumulate 的代码逻辑,这是 cells 数组被初始化和扩容的地方。

  1. final void longAccumulatelong x, LongBinaryOperator fn,
  2. boolean wasUncontended {
  3. //(6) 初始化当前线程的变量 threadLocalRandomProbe 的值
  4. int h
  5. if ((h = getProbe()) == 0 {
  6. ThreadLocalRandom.current(); //
  7. h = getProbe();
  8. wasUncontended = true
  9. }
  10. boolean collide = false
  11. for (; ; {
  12. Cell[] as Cell a int n long v
  13. if ((as = cells) ! = null && (n = as.length) > 0 {//(7)
  14. if ((a = as[(n -1) & h]) == null {//(8)
  15. if cellsBusy == 0 { // Try to attach new Cell
  16. Cell r = new Cellx); // Optimistically create
  17. if cellsBusy == 0 && casCellsBusy()) {
  18. boolean created = false
  19. try { // Recheck under lock
  20. Cell[] rs int m j
  21. if ((rs = cells) ! = null &&
  22. (m = rs.length) > 0 &&
  23. rs[j = (m -1) & h] == null {
  24. rs[j] = r
  25. created = true
  26. }
  27. } finally {
  28. cellsBusy = 0
  29. }
  30. if created
  31. break
  32. continue // Slot is now non-empty
  33. }
  34. }
  35. collide = false
  36. }
  37. else if (! wasUncontended // CAS already known to fail
  38. wasUncontended = true
  39. //当前 Cell 存在,则执行 CAS 设置(9)
  40. else if a.cas(v = a.value, ((fn == null) ? v + x :
  41. fn.applyAsLong(v, x))))
  42. break
  43. //当前 Cell 数组元素个数大于 CPU 个数(10)
  44. else if n >= NCPU || cells = as
  45. collide = false // At max size or stale
  46. //是否有冲突(11)
  47. else if (! collide
  48. collide = true
  49. //如果当前元素个数没有达到 CPU 个数并且有冲突则扩容(12)
  50. else if (cellsBusy == 0 && casCellsBusy()) {
  51. try {
  52. if cells == as { // Expand table unless stale
  53. //12.1
  54. Cell[] rs = new Cell[n << 1];
  55. for int i = 0 i < n ++i
  56. rs[i] = as[i];
  57. cells = rs
  58. }
  59. } finally {
  60. //12.2
  61. cellsBusy = 0
  62. }
  63. //12.3
  64. collide = false
  65. continue // Retry with expanded table
  66. }
  67. //(13)为了能够找到一个空闲的 Cell,重新计算 hash 值,xorshift 算法生成随机数
  68. h = advanceProbeh);
  69. }
  70. //初始化 Cell 数组(14)
  71. else if cellsBusy == 0 && cells == as && casCellsBusy()) {
  72. boolean init = false
  73. try {
  74. if cells == as {
  75. //14.1
  76. Cell[] rs = new Cell[2];
  77. //14.2
  78. rs[h & 1] = new Cellx);
  79. cells = rs
  80. init = true
  81. }
  82. } finally {
  83. //14.3
  84. cellsBusy = 0
  85. }
  86. if init
  87. break
  88. }
  89. else if casBase(v = base, ((fn == null) ? v + x :
  90. fn.applyAsLong(v, x))))
  91. break // Fall back on using base
  92. }
  93. }

上面代码比较复杂,这里我们主要关注问题 3、问题 4 和问题 5。

当每个线程第一次执行到代码(6)时,会初始化当前线程变量 threadLocalRandomProbe 的值,上面也说了,这个变量在计算当前线程应该被分配到 cells 数组的哪一个 Cell 元素时会用到。

cells 数组的初始化是在代码(14)中进行的,其中 cellsBusy 是一个标示,为 0 说明当前 cells 数组没有在被初始化或者扩容,也没有在新建 Cell 元素,为 1 则说明 cells 数组在被初始化或者扩容,或者当前在创建新的 Cell 元素、通过 CAS 操作来进行 0 或 1 状态的切换,这里使用 casCellsBusy 函数。假设当前线程通过 CAS 设置 cellsBusy 为 1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码(14.1)初始化 cells 数组元素个数为 2,然后使用 h&1 计算当前线程应该访问 celll 数组的哪个位置,也就是使用当前线程的 threadLocalRandomProbe 变量值&(cells 数组元素个数-1),然后标示 cells 数组已经被初始化,最后代码(14.3)重置了 cellsBusy 标记。显然这里没有使用 CAS 操作,却是线程安全的,原因是 cellsBusy 是 volatile 类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改 cellsBusy 的值。在这里初始化的 cells 数组里面的两个元素的值目前还是 null。这里回答了问题 3,知道了 cells 数组如何被初始化。

cells 数组的扩容是在代码(12)中进行的,对 cells 扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前 cells 的元素个数小于当前机器 CPU 个数并且当前多个线程访问了 cells 中同一个元素,从而导致冲突使其中一个线程 CAS 失败时才会进行扩容操作。这里为何要涉及 CPU 个数呢?其实在基础篇中已经讲过,只有当每个 CPU 都运行一个线程时才会使多线程的效果最佳,也就是当 cells 数组元素个数与 CPU 个数一致时,每个 Cell 都使用一个 CPU 进行处理,这时性能才是最佳的。代码(12)中的扩容操作也是先通过 CAS 设置 cellsBusy 为 1,然后才能进行扩容。假设 CAS 成功则执行代码(12.1)将容量扩充为之前的 2 倍,并复制 Cell 元素到扩容后数组。另外,扩容后 cells 数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是 null。这里回答了问题 4。

在代码(7)(8)中,当前线程调用 add 方法并根据当前线程的随机数 threadLocalRandomProbe 和 cells 元素个数计算要访问的 Cell 元素下标,然后如果发现对应下标元素的值为 null,则新增一个 Cell 元素到 cells 数组,并且在将其添加到 cells 数组之前要竞争设置 cellsBusy 为 1。

代码(13)对 CAS 失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,以减少下次访问 cells 元素时的冲突机会。这里回答了问题 5。

小结

本节介绍了 JDK 8 中新增的 LongAdder 原子性操作类,该类通过内部 cells 数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对 cells 数组里面的元素进行并行的更新操作。另外,数组元素 Cell 使用@sun.misc.Contended 注解进行修饰,这避免了 cells 数组内多个原子变量被放入同一个缓存行,也就是避免了伪共享,这对性能也是一个提升。