在介绍 AtomicInteger 时,已经说明在高并发下大量线程去竞争更新同一个原子变量时,因为只有一个线程能够更新成功,其他的线程在竞争失败后,只能一直循环,不断的进行 CAS 尝试,从而浪费了 CPU 资源。而在 JDK 8 中新增了 LongAdder 用来解决高并发下变量的原子操作。下面同样通过阅读源码来了解 LongAdder 。

介绍

一个或多个变量共同维持初值为 0 总和。 当跨线程竞争更新时,变量集可以动态增长以减少竞争。 方法 sum 返回当前变量集的总和。
当多个线程更新时,这个类是通常优选 AtomicLong ,比如用于收集统计信息,不用于细粒度同步控制的共同总和。 在低更新竞争,这两个类具有相似的特征。 但在高更新竞争时,使用 LongAdder 性能要高于 AtomicLong,同样要消耗更高的空间为代价。
LongAddr - 图1
LongAdder 继承了 Striped64,内部维护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 里面都有一个初始值为 0 的 long 型变量。

源码分析

Cell 类

Cell 类 是 Striped64 的静态内部类。

  1. @sun.misc.Contended static final class Cell {
  2. volatile long value;
  3. Cell(long x) { value = x; }
  4. final boolean cas(long cmp, long val) {
  5. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  6. }
  7. // Unsafe mechanics
  8. private static final sun.misc.Unsafe UNSAFE;
  9. private static final long valueOffset;
  10. static {
  11. try {
  12. UNSAFE = sun.misc.Unsafe.getUnsafe();
  13. Class<?> ak = Cell.class;
  14. valueOffset = UNSAFE.objectFieldOffset
  15. (ak.getDeclaredField("value"));
  16. } catch (Exception e) {
  17. throw new Error(e);
  18. }
  19. }
  20. }
  1. Cell 使用 _@_sun.misc.Contended 注解。
  2. 内部维护一个被 volatile 修饰的 long 型 value 。
  3. 提供 cas 方法,更新value。

其中 _@_sun.misc.Contended 注解作用是为了减少缓存争用。什么是缓存争用,这里只做下简要介绍。

伪共享 CPU 存在多级缓存,其中最小存储单元是 Cache Line,每个 Cache Line 能存储 64 个字节的数据。 在多线程场景下,A B 两个线程数据如果被存储到同一个 Cache Line 上,此时 A B 更新各自的数据,就会发生缓存争用,导致多个线程之间相互牵制,变成了串行程序,降低了并发。 _@_sun.misc.Contended 注解,则可以保证该变量独占一个 Cache Line。 详细可参考:http://openjdk.java.net/jeps/142

Striped64 核心属性

  1. abstract class Striped64 extends Number {
  2. /** CPU 的数量,以限制表大小 */
  3. static final int NCPU = Runtime.getRuntime().availableProcessors();
  4. /**
  5. * cell 数组,当非空时,大小是 2 的幂。
  6. */
  7. transient volatile Cell[] cells;
  8. /**
  9. * Base 值,在无争用时使用,表初始化竞赛期间的后备。使用 CAS 更新
  10. */
  11. transient volatile long base;
  12. /**
  13. * 调整大小和创建Cells时自旋锁(通过CAS锁定)使用。
  14. */
  15. transient volatile int cellsBusy;
  16. }

Striped64 类主要提供以下几个属性:

  1. NCPU:CPU 的数量,以限制表大小。
  2. cells:Cell[] cell 数组,当非空时,大小是 2 的幂。
  3. base:long 型,Base 值,在无争用时使用,表初始化竞赛期间的后备。使用 CAS 更新。
  4. cellsBusy:调整大小和创建Cells时自旋锁(通过CAS锁定)使用。

下面看是进入核心逻辑:
LongAddr - 图2

LongAdder#add

  1. public class LongAdder extends Striped64 implements Serializable {
  2. public void add(long x) {
  3. Cell[] as; long b, v; int m; Cell a;
  4. // cells 是 数组,base 是基础值
  5. if ((as = cells) != null || !casBase(b = base, b + x)) {
  6. boolean uncontended = true;
  7. if (as == null || (m = as.length - 1) < 0 ||
  8. (a = as[getProbe() & m]) == null ||
  9. !(uncontended = a.cas(v = a.value, v + x)))
  10. longAccumulate(x, null, uncontended);
  11. }
  12. }
  13. }
  1. abstract class Striped64 extends Number {
  2. // 使用 CAS 更新 BASE 的值
  3. final boolean casBase(long cmp, long val) {
  4. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
  5. }
  6. // 返回当前线程的探测值。 由于包装限制,从ThreadLocalRandom复制
  7. static final int getProbe() {
  8. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  9. }
  10. }

LongAddr - 图3

  1. 首先会对 Base 值进行 CAS 更新,当 Base 发生竞争时, 会更新数组内的 Cell 。
  2. 数组未初始化,Cell 未初始化, Cell 更新失败,即 Cell 也发生竞争时,会调用 Striped64 的 longAccumulate 方法。

    Striped64#longAccumulate

    1. abstract class Striped64 extends Number {
    2. /**
    3. * x 要增加的值
    4. * wasUncontended 有没有发生竞争
    5. */
    6. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    7. int h;
    8. // 当前线程有无初始化线程探测值, 给当前线程生成一个 非 0 探测值
    9. if ((h = getProbe()) == 0) {
    10. ThreadLocalRandom.current(); // force initialization
    11. h = getProbe();
    12. wasUncontended = true;
    13. }
    14. boolean collide = false; // True if last slot nonempty
    15. // 循环
    16. for (;;) {
    17. Cell[] as; Cell a; int n; long v;
    18. // 数组不为空切数组长度大于 0
    19. if ((as = cells) != null && (n = as.length) > 0) {
    20. // (n - 1) & h 获取到索引,索引处 cell 是否为 null, cell未初始化
    21. if ((a = as[(n - 1) & h]) == null) {
    22. // 判断 cellsBusy 是否为 0
    23. if (cellsBusy == 0) { // Try to attach new Cell
    24. Cell r = new Cell(x); // Optimistically create
    25. // cellsBusy == 0 且 使用 casCellsBusy 方法将其更新为 1,失败会继续循环
    26. if (cellsBusy == 0 && casCellsBusy()) {
    27. boolean created = false;
    28. try { // Recheck under lock
    29. Cell[] rs; int m, j;
    30. // 重新检查状态 并创建
    31. if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
    32. rs[j] = r;
    33. created = true;
    34. }
    35. } finally {
    36. // 创建完成之后, 改回 cellsBusy 值
    37. cellsBusy = 0;
    38. }
    39. if (created)
    40. break;
    41. // 未创建继续循环
    42. continue; // Slot is now non-empty
    43. }
    44. }
    45. collide = false;
    46. }
    47. // 传入的 wasUncontended 为 false 即发生碰撞了, 修改为未碰撞, 此处会继续循环,走到下一步,相当于会一直循环这个 cell
    48. else if (!wasUncontended) // CAS already known to fail
    49. wasUncontended = true; // Continue after rehash
    50. // cas 更新 cell 的 value, 成功则返回
    51. else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    52. break;
    53. // 数组到最大长度 即大于等于 CPU 数量, 或者 cells 数组被改变,
    54. else if (n >= NCPU || cells != as)
    55. collide = false; // At max size or stale
    56. else if (!collide)
    57. collide = true;
    58. // 乐观锁 进行扩容
    59. else if (cellsBusy == 0 && casCellsBusy()) {
    60. try {
    61. if (cells == as) { // Expand table unless stale
    62. Cell[] rs = new Cell[n << 1];
    63. for (int i = 0; i < n; ++i)
    64. rs[i] = as[i];
    65. cells = rs;
    66. }
    67. } finally {
    68. cellsBusy = 0;
    69. }
    70. collide = false;
    71. continue; // Retry with expanded table
    72. }
    73. // 当前探针值不能操作成功,则重新设置一个进行尝试
    74. h = advanceProbe(h);
    75. }
    76. // 没有加 cellsBusy 乐观锁 且 没有初始化,且获得锁成功(此时 cellsBusy == 1)
    77. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    78. boolean init = false;
    79. try { // Initialize table
    80. if (cells == as) {
    81. Cell[] rs = new Cell[2];
    82. rs[h & 1] = new Cell(x);
    83. cells = rs;
    84. init = true;
    85. }
    86. } finally {
    87. cellsBusy = 0;
    88. }
    89. if (init)
    90. break;
    91. }
    92. // 尝试在base上累加
    93. else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    94. break; // Fall back on using base
    95. }
    96. }
    97. }

    longAccumulate 方法一共有三种情况

  3. (as = cells) != null && (n = as.length) > 0 数组不为空且长度大于 0 。

    1. 获取索引处的 cell , cell 为空则进行初始化。
    2. cell 不为空,使用 cas 更新, 成功 break; 跳出循环, 失败则还在循环内,会一直尝试。
    3. collide 指是否发生冲突,冲突后会进行重试。
    4. 冲突后会尝试获得锁并进行扩容,扩容长度为原来的 2 倍,然后继续重试。
    5. 获得锁失败(说明其他线程在扩容)会重新进行计算探针值。
  4. cellsBusy == 0 && cells == as && casCellsBusy() 数组为空,获得乐观锁成功。
    1. 直接初始化数组。
    2. 初始数组长度为 2 。
  5. casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))) 获得乐观锁失败。

    1. 说明有其他线程在初始化数组,直接 CAS 更新 base 。

      LongAdder#sum

      1. public class LongAdder extends Striped64 implements Serializable {
      2. public long sum() {
      3. Cell[] as = cells; Cell a;
      4. long sum = base;
      5. if (as != null) {
      6. for (int i = 0; i < as.length; ++i) {
      7. if ((a = as[i]) != null)
      8. sum += a.value;
      9. }
      10. }
      11. return sum;
      12. }
      13. }
  6. 数组为空,说明没有发生竞争,直接返回 base 。

  7. 数组不为空,说明发生竞争,累加 cell 的 value 和 base 的和进行返回。

    总结

    基本流程

  8. LongAdder 继承了 Striped64,内部维护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 里面都有一个初始值为 0 的 long 型变量。

  9. 未发生竞争时(Cells 数组未初始化),是对 base 变量进行原子操作。
  10. 发生竞争时,每个线程对自己的 Cell 变量的 value 进行原子操作。

    如何确定哪个线程操作哪个 cell?

    通过 getProbe() 方法获取该线程的探测值,然后和数组长度 n - 1& 操作 (n - 1) & h 。
    1. static final int getProbe() {
    2. return UNSAFE.getInt(Thread.currentThread(), PROBE);
    3. }

    Cells 数组初始化及扩容?

    初始化扩容时会判断 cellsBusy, cellsBusy 使用 volatile 修饰,保证线程见可见性,同时使用 CAS 进行更新。 0 表示空闲,1 表示正在初始化或扩容。
    初始化时会创建长度为 2 的 Cell 数组。扩容是创建一个长度是原数组长度 2 倍的新数组,并循环赋值。
    如果线程访问分配的 Cell 元素有冲突后,会使用 advanceProbe() 方法重新获取探测值,再次进行尝试。

    使用场景

    在高并发情况下,需要相对高的性能,同时数据准确性要求不高,可以考虑使用 LongAdder。
    当要保证线程安全,并允许一定的性能损耗时,并对数据准确性要求较高,优先使用 AtomicLong。