LongAdder 类是 JDK1.8 新增的一个原子性操作类,其出现是为了解决高并发场景下AtomicLong的自旋瓶颈问题。AtomicLong 通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下, N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。

LongAdder设计思路

AtomicLong 中有个内部变量 value 保存着实际的 long 值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回即可。
微信截图_20211117000552.png

LongAdder 继承 Striped64 类,内部有一个 base 变量和一个 Cell[] 数组:

  • base 变量:非竞态条件下,直接累加到该变量上
  • Cell[] 数组:竞态条件下,累加个各个线程自己的槽Cell[i]中 ```java // CPU逻辑核数,用来决定槽数组的大小 static final int NCPU = Runtime.getRuntime().availableProcessors();

// 数组槽,大小为2的次幂 transient volatile Cell[] cells;

// 基数,在两种情况下会使用: //1. 没有遇到并发竞争时,直接使用base累加数值 //2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),其他竞争失败的线程会讲数值累加到base上 transient volatile long base;

transient volatile int cellsBusy;

  1. 我们在来看一下 Cell 这个内部类:
  2. ```java
  3. @sun.misc.Contended static final class Cell {
  4. //保存用于累加的值
  5. volatile long value;
  6. Cell(long x) { value = x; }
  7. //使用UNSAFE类的cas来更新value值
  8. final boolean cas(long cmp, long val) {
  9. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  10. }
  11. // Unsafe mechanics
  12. private static final sun.misc.Unsafe UNSAFE;
  13. private static final long valueOffset;
  14. static {
  15. try {
  16. UNSAFE = sun.misc.Unsafe.getUnsafe();
  17. Class<?> ak = Cell.class;
  18. valueOffset = UNSAFE.objectFieldOffset
  19. (ak.getDeclaredField("value"));
  20. } catch (Exception e) {
  21. throw new Error(e);
  22. }
  23. }
  24. }

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的 volatile 变量。

LongAdder实现原理

我们先来看一下 LongAdder 的 add() 方法:

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. //如果当前cells数组不为空,或修改base值失败(并发修改导致),会进入下面逻辑
  4. if ((as = cells) != null || !casBase(b = base, b + x)) {
  5. boolean uncontended = true;
  6. //再次判断是否创建了cells数组/数组长度小于0
  7. if (as == null || (m = as.length - 1) < 0 ||
  8. //或者已经创建了数组,但数组内的cell元素还未创建
  9. (a = as[getProbe() & m]) == null ||
  10. //或者对指定的cell执行累加操作失败
  11. !(uncontended = a.cas(v = a.value, v + x)))
  12. //满足以上条件中的一种,执行longAccumulate()方法
  13. longAccumulate(x, null, uncontended);
  14. }
  15. }

首先判断 cells 数组是否初始化,如果还未初始化则可以对 base 进行 cas 修改操作,若修改失败了则继续走下面的逻辑。接着再一次进行 if 判断,cells 数组是否为空/数组长度小于0,如果不为空,则判断该数组下标对应的 cell 是否已经创建,如果已经创建了,则可以对该 cell 进行 cas 修改操作。若修改失败,则直接进入 longAccumulate() 方法。
微信截图_20211117230042.png
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对 Cell[] 数组中的单元 Cell。如果 Cell[] 数组未初始化,会调用父类的longAccumelate 去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的 longAccumelate,此时可能就需要对 Cell[] 扩容了。这也是 LongAdder 设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟

longAccumulate()

下面我们来详细说明一下 longAccumulate() 方法到底干了什么,在 longAccumulate 中有几个标记位,我们也先理解一下

  • cellsBusy:cells 的操作标记位,如果正在修改、新建、操作 cells 数组中的元素,会将其 cas 为 1,否则为0。
  • wasUncontended:表示 cas 是否失败,如果失败则考虑操作升级。
  • collide:是否冲突,如果冲突,则考虑扩容 cells 的长度。

    1. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    2. int h;
    3. if ((h = getProbe()) == 0) {
    4. ThreadLocalRandom.current(); // force initialization
    5. h = getProbe();
    6. wasUncontended = true;
    7. }
    8. boolean collide = false;
    9. //开始自旋
    10. for (;;) {
    11. Cell[] as; Cell a; int n; long v;
    12. //cells数组不为空/数组长度大于0
    13. if ((as = cells) != null && (n = as.length) > 0) {
    14. //cells数组中被定位下标所在元素还未创建
    15. if ((a = as[(n - 1) & h]) == null) {
    16. if (cellsBusy == 0) {
    17. //创建Cell,在创建过程中初始化其value为我们要加的值
    18. Cell r = new Cell(x);
    19. //判断cellBusy锁标志是否0,如果为0则调用casCellsBusy进行cas修改为1,获取锁成功
    20. if (cellsBusy == 0 && casCellsBusy()) {
    21. boolean created = false;
    22. try { // Recheck under lock
    23. Cell[] rs; int m, j;
    24. if ((rs = cells) != null &&
    25. (m = rs.length) > 0 &&
    26. rs[j = (m - 1) & h] == null) {
    27. //往数组中添加Cell
    28. rs[j] = r;
    29. created = true;
    30. }
    31. } finally {
    32. cellsBusy = 0;
    33. }
    34. if (created)
    35. break;
    36. continue; // Slot is now non-empty
    37. }
    38. }
    39. collide = false;
    40. }
    41. //wasUncontended为我们传进来的值,默认为true
    42. else if (!wasUncontended) // CAS already known to fail
    43. wasUncontended = true; // Continue after rehash
    44. //若Cell元素已经创建,则对该cell进行更新
    45. else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    46. break;
    47. //如果cell更新失败,且当前cell数量达到最大值(默认为cpu核数)
    48. else if (n >= NCPU || cells != as)
    49. //collide标记为false
    50. collide = false;
    51. else if (!collide)
    52. collide = true;
    53. //获取cellsBusy锁进行扩容
    54. else if (cellsBusy == 0 && casCellsBusy()) {
    55. try {
    56. if (cells == as) {
    57. //扩容为原来的两倍
    58. Cell[] rs = new Cell[n << 1];
    59. for (int i = 0; i < n; ++i)
    60. rs[i] = as[i];
    61. cells = rs;
    62. }
    63. } finally {
    64. cellsBusy = 0;
    65. }
    66. collide = false;
    67. continue; // Retry with expanded table
    68. }
    69. h = advanceProbe(h);
    70. }
    71. //当cells数组没创建完成
    72. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    73. boolean init = false;
    74. try { // Initialize table
    75. if (cells == as) {
    76. //初始化cells数组,长度为2
    77. Cell[] rs = new Cell[2];
    78. //在相应位置创建对应cell
    79. rs[h & 1] = new Cell(x);
    80. cells = rs;
    81. init = true;
    82. }
    83. } finally {
    84. cellsBusy = 0;
    85. }
    86. if (init)
    87. break;
    88. }
    89. //创建数组失败(有竞争),则对base进行操作
    90. else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    91. break; // Fall back on using base
    92. }
    93. }

    整个代码的逻辑入如下:

  1. cells 不为空
    1. 如果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
    2. 如果 cas 失败,继续循环
    3. 如果 cell 不为空,且 cell cas 成功,退出
    4. 如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试
    5. 设置 collide 为 true。
    6. 获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
  2. cells 为空且获取到 cellsBusy ,init cells 数组,然后赋值退出。
  3. cellsBusy 获取失败,则进行 baseCas ,操作成功退出,不成功则重试。

微信截图_20211117235537.png

sum()

相比于上面的计数操作,longAdder 的求和操作则简单多了。

  1. public long sum() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. if (as != null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) != null)
  7. sum += a.value;
  8. }
  9. }
  10. return sum;
  11. }

由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中可能有其他线程对 Cell 中的值进行了修改而当前循环中还未遍历到,也有可能对数组进行了扩容,所以 sum 返回的值并不是非常精确的,其返回值并不是一个调用 sum 方法时的原子快照值。