LongAdder 类是 JDK1.8 新增的一个原子性操作类,其出现是为了解决高并发场景下AtomicLong的自旋瓶颈问题。AtomicLong 通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下, N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。
LongAdder设计思路
AtomicLong 中有个内部变量 value 保存着实际的 long 值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回即可。 
LongAdder 继承 Striped64 类,内部有一个 base 变量和一个 Cell[] 数组:
- base 变量:非竞态条件下,直接累加到该变量上
- Cell[] 数组:竞态条件下,累加个各个线程自己的槽Cell[i]中 ```java // CPU逻辑核数,用来决定槽数组的大小 static final int NCPU = Runtime.getRuntime().availableProcessors();
// 数组槽,大小为2的次幂 transient volatile Cell[] cells;
// 基数,在两种情况下会使用: //1. 没有遇到并发竞争时,直接使用base累加数值 //2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),其他竞争失败的线程会讲数值累加到base上 transient volatile long base;
transient volatile int cellsBusy;
我们在来看一下 Cell 这个内部类:```java@sun.misc.Contended static final class Cell {//保存用于累加的值volatile long value;Cell(long x) { value = x; }//使用UNSAFE类的cas来更新value值final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}
Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的 volatile 变量。
LongAdder实现原理
我们先来看一下 LongAdder 的 add() 方法:
public void add(long x) {Cell[] as; long b, v; int m; Cell a;//如果当前cells数组不为空,或修改base值失败(并发修改导致),会进入下面逻辑if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;//再次判断是否创建了cells数组/数组长度小于0if (as == null || (m = as.length - 1) < 0 ||//或者已经创建了数组,但数组内的cell元素还未创建(a = as[getProbe() & m]) == null ||//或者对指定的cell执行累加操作失败!(uncontended = a.cas(v = a.value, v + x)))//满足以上条件中的一种,执行longAccumulate()方法longAccumulate(x, null, uncontended);}}
首先判断 cells 数组是否初始化,如果还未初始化则可以对 base 进行 cas 修改操作,若修改失败了则继续走下面的逻辑。接着再一次进行 if 判断,cells 数组是否为空/数组长度小于0,如果不为空,则判断该数组下标对应的 cell 是否已经创建,如果已经创建了,则可以对该 cell 进行 cas 修改操作。若修改失败,则直接进入 longAccumulate() 方法。
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对 Cell[] 数组中的单元 Cell。如果 Cell[] 数组未初始化,会调用父类的longAccumelate 去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的 longAccumelate,此时可能就需要对 Cell[] 扩容了。这也是 LongAdder 设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
longAccumulate()
下面我们来详细说明一下 longAccumulate() 方法到底干了什么,在 longAccumulate 中有几个标记位,我们也先理解一下
- cellsBusy:cells 的操作标记位,如果正在修改、新建、操作 cells 数组中的元素,会将其 cas 为 1,否则为0。
- wasUncontended:表示 cas 是否失败,如果失败则考虑操作升级。
collide:是否冲突,如果冲突,则考虑扩容 cells 的长度。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false;//开始自旋for (;;) {Cell[] as; Cell a; int n; long v;//cells数组不为空/数组长度大于0if ((as = cells) != null && (n = as.length) > 0) {//cells数组中被定位下标所在元素还未创建if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {//创建Cell,在创建过程中初始化其value为我们要加的值Cell r = new Cell(x);//判断cellBusy锁标志是否0,如果为0则调用casCellsBusy进行cas修改为1,获取锁成功if (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {//往数组中添加Cellrs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}//wasUncontended为我们传进来的值,默认为trueelse if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//若Cell元素已经创建,则对该cell进行更新else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;//如果cell更新失败,且当前cell数量达到最大值(默认为cpu核数)else if (n >= NCPU || cells != as)//collide标记为falsecollide = false;else if (!collide)collide = true;//获取cellsBusy锁进行扩容else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) {//扩容为原来的两倍Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}//当cells数组没创建完成else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {//初始化cells数组,长度为2Cell[] rs = new Cell[2];//在相应位置创建对应cellrs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}//创建数组失败(有竞争),则对base进行操作else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break; // Fall back on using base}}
整个代码的逻辑入如下:
- cells 不为空
- 如果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
- 如果 cas 失败,继续循环
- 如果 cell 不为空,且 cell cas 成功,退出
- 如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试
- 设置 collide 为 true。
- 获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
- cells 为空且获取到 cellsBusy ,init cells 数组,然后赋值退出。
- cellsBusy 获取失败,则进行 baseCas ,操作成功退出,不成功则重试。

sum()
相比于上面的计数操作,longAdder 的求和操作则简单多了。
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中可能有其他线程对 Cell 中的值进行了修改而当前循环中还未遍历到,也有可能对数组进行了扩容,所以 sum 返回的值并不是非常精确的,其返回值并不是一个调用 sum 方法时的原子快照值。
