一,为什么要用LongAdder

ali-longadder.jpg

【参考】volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
说明:如果是 count++ 操作,使用如下类实现:AtomicInteger count = new AtomicInteger(); count.addAndGet(1); 如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观 锁的重试次数)。

以上内容来自阿里《Java开发手册》。

里面提到了多线程读写环境下,LongAdder相对于Atomic原子类拥有更好的效率。

  1. public class LongAdderAndAtomicTest {
  2. private static AtomicInteger a = new AtomicInteger(0);
  3. private static LongAdder b = new LongAdder();
  4. public static void main(String[] args) throws Exception {
  5. test(1, 10000000);
  6. test(10, 10000000);
  7. test(20, 10000000);
  8. test(50, 10000000);
  9. test(100, 10000000);
  10. }
  11. /**
  12. * 测试LongAdder和Atomic的效率
  13. *
  14. * @param threadNum 线程数
  15. * @param times 执行时间
  16. */
  17. public static void test(Integer threadNum, Integer times) throws Exception {
  18. System.out.println("线程数为:" + threadNum);
  19. testAtomic(threadNum, times);
  20. testLongAdder(threadNum, times);
  21. }
  22. /**
  23. * 测试Atomic的效率
  24. *
  25. * @param threadNum
  26. * @param times
  27. */
  28. public static void testAtomic(Integer threadNum, Integer times) throws InterruptedException {
  29. //开始时间
  30. long start = System.currentTimeMillis();
  31. CountDownLatch countDownLatch = new CountDownLatch(threadNum);
  32. for (int i = 0; i < threadNum; i++) {
  33. new Thread(() -> {
  34. for (int j = 0; j < times; j++) {
  35. a.incrementAndGet();
  36. }
  37. countDownLatch.countDown();
  38. }).start();
  39. }
  40. countDownLatch.await();
  41. //结束时间
  42. long end = System.currentTimeMillis();
  43. System.out.println("Atomic 消耗时间:" + (end - start));
  44. }
  45. /**
  46. * 测试LongAdder的效率
  47. *
  48. * @param threadNum
  49. * @param times
  50. */
  51. public static void testLongAdder(Integer threadNum, Integer times) throws InterruptedException {
  52. //开始时间
  53. long start = System.currentTimeMillis();
  54. CountDownLatch countDownLatch = new CountDownLatch(threadNum);
  55. for (int i = 0; i < threadNum; i++) {
  56. new Thread(() -> {
  57. for (int j = 0; j < times; j++) {
  58. b.increment();
  59. }
  60. countDownLatch.countDown();
  61. }).start();
  62. }
  63. countDownLatch.await();
  64. //结束时间
  65. long end = System.currentTimeMillis();
  66. System.out.println("LongAdder 消耗时间:" + (end - start));
  67. }
  68. }

LongAdderAndAtomicTest.png
由图可以看到,线程数越多,LongAdder的效率型对于Atomic越高,由此可以看出,LongAdder更适合于高并发情况下。

二,LongAdder源码阅读

看LongAdder类的继承关系:

  1. public class LongAdder extends Striped64 implements Serializable

LongAdder这个类继承自Striped64Striped64里面声明了一个内部类Cell

  1. @sun.misc.Contended static final class Cell {
  2. //拥有内存可见性的value
  3. volatile long value;
  4. //带参数的构造器
  5. Cell(long x) { value = x; }
  6. //调用unsafe类的cas对cmp和bal进行比较交换,返回是否成功
  7. final boolean cas(long cmp, long val) {
  8. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  9. }
  10. //声明unsafe类
  11. private static final sun.misc.Unsafe UNSAFE;
  12. //声明cell成员属性的内存偏移量
  13. private static final long valueOffset;
  14. //初始化
  15. static {
  16. try {
  17. UNSAFE = sun.misc.Unsafe.getUnsafe();
  18. Class<?> ak = Cell.class;
  19. valueOffset = UNSAFE.objectFieldOffset
  20. (ak.getDeclaredField("value"));
  21. } catch (Exception e) {
  22. throw new Error(e);
  23. }
  24. }
  25. }
  1. //获取当前系统的cpu数 控制cells数组长度的一个关键条件
  2. static final int NCPU = Runtime.getRuntime().availableProcessors();
  3. //数组的长度,只要数组不为空,一定是2的倍数,这样-1转化为二进制的时候一定是一大堆1
  4. transient volatile Cell[] cells;
  5. //没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中
  6. transient volatile long base;
  7. //初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了
  8. transient volatile int cellsBusy;

LongAdder里面使用的这两个属性实际上是继承自他的父类的。

  1. public void add(long x) {
  2. /**
  3. * as: 表示cells数组的引用
  4. * b: 表示获取的base值
  5. * v: 表示期望值
  6. * m: 表示cells数组的长度
  7. * a: 表示当前线程命中的cell单元格
  8. */
  9. Cell[] as; long b, v; int m; Cell a;
  10. /**
  11. * 条件一:true->表示cells已经初始化过,当前线程应该将数据写入到对应的cell中
  12. * false->表示cells未初始化,当前所有线程应该将数据写入到base中
  13. * 条件二:false->表示当前线程cas替换数据成功
  14. * true->表示发生竞争了,可能需要重试或者扩容
  15. * 进入if的条件:数组已经初始化 或者 cas交换数据失败,表示有竞争
  16. *
  17. */
  18. if ((as = cells) != null || !casBase(b = base, b + x)) {
  19. /**
  20. * uncontended: true -> 未竞争 false->发生竞争
  21. * 条件一:true->数组没有初始化
  22. * false->数组已经初始化
  23. * 条件二:true->数组没有初始化
  24. * false->数组已经初始化
  25. *
  26. * getProbe():获取当前线程的hash值
  27. * 条件三:true-> 当前线程对应的cell并没有初始化
  28. * false->当前线程对应的cell已经初始化
  29. * 条件四:true->cas交换失败,表示有竞争
  30. * false->cas交换成功
  31. * 进入if的条件:
  32. * cells未初始化, 或者 当前线程对应的cell未初始化, 或者 cas交换失败
  33. */
  34. boolean uncontended = true;
  35. if (as == null || (m = as.length - 1) < 0 ||
  36. (a = as[getProbe() & m]) == null ||
  37. !(uncontended = a.cas(v = a.value, v + x)))
  38. longAccumulate(x, null, uncontended);
  39. }
  40. }

接下来看longAccumulate()

  1. /**
  2. * 首先:哪些情况会进入当前方法?
  3. * cells未初始化, 或者 当前线程对应的cell未初始化, 或者 cas交换失败
  4. * @param x 新值
  5. * @param fn 没用上。一个扩展接口
  6. * @param wasUncontended 只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
  7. */
  8. final void longAccumulate(long x, LongBinaryOperator fn,
  9. boolean wasUncontended) {
  10. //h:代表当前线程hash值
  11. int h;
  12. /**
  13. * 如果当前线程hash值等于0,条件成立
  14. * 给当前线程分配hash值
  15. * 将当前线程的hash值重新赋值给h
  16. * 设置为未竞争或者竞争修改成功状态。
  17. * 为什么?
  18. * 因为默认所有线程进来操做的都是cells[0]的位置,所以不把它当作一次真正的竞争。
  19. */
  20. if ((h = getProbe()) == 0) {
  21. //给当前线程分配hash值
  22. ThreadLocalRandom.current(); // force initialization
  23. //将当前线程的hash值重新赋值给h
  24. h = getProbe();
  25. wasUncontended = true;
  26. }
  27. //表示扩容意向 false 一定不会扩容,true 可能会扩容。
  28. boolean collide = false;
  29. //自旋
  30. for (;;) {
  31. /**
  32. * as 代表cells的引用
  33. * a 当前线程对应的cell
  34. * n cells的长度
  35. * v 期望值
  36. */
  37. Cell[] as; Cell a; int n; long v;
  38. /**
  39. * case1:
  40. * cells已经初始化
  41. *
  42. * case1.1:
  43. * if(当前线程对应的cell还没有初始化 && 当前处于无锁状态){
  44. * 创建一个新的cell对象 r
  45. *
  46. * if(当前锁状态未0并且获取到了锁){
  47. * created : 标记是否创建成功
  48. * if(cells已经被初始化 && 当前线程对应的cell为空){
  49. * 将当前线程对应位置的cell初始化为新创建的cell r
  50. * create=true 表示创建成功,最终在释放锁。
  51. * }
  52. * }
  53. * 将扩容意向改成false
  54. * }
  55. *
  56. * case1.2:
  57. * if(如果当前线程竞争修改失败){
  58. * 状态改为true;
  59. * //默认所有线程一开始都在cell[0]的位置,所以一定会发生竞争,
  60. * //这次竞争就不当作一次真正的竞争。
  61. * }
  62. *
  63. * case1.3:
  64. * if(当前线程rehash过hash值 && 新命中的cell不为空){
  65. * 尝试cas一次
  66. * }
  67. *
  68. * case1.4:
  69. * if(如果cells的长度>cpu数 || cells和as不一致){
  70. * //cells和as不一致 说明其他线程已经扩容过了,当前线程只需要rehash重试即可
  71. * 扩容意向强制改为false。
  72. * }
  73. *
  74. * case1.5:
  75. * //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
  76. *
  77. * case1.6:
  78. * if(锁状态为0 && 获取到了锁){
  79. * //第二层判断为了防止当前线程在对第一层id的条件判断一半的时候,又进来一个线程,将所有业务已经执行一遍了。
  80. * //只有当cells==as才能说明,当前线程在第一层if执行条件的过程中,没有其他线程进来破坏。
  81. * if(cells==as){
  82. * 扩容为原来的二倍
  83. * 重置当前线程Hash值
  84. * }
  85. * }
  86. */
  87. if ((as = cells) != null && (n = as.length) > 0) {
  88. if ((a = as[(n - 1) & h]) == null) {
  89. if (cellsBusy == 0) { // Try to attach new Cell
  90. Cell r = new Cell(x); // Optimistically create
  91. if (cellsBusy == 0 && casCellsBusy()) {
  92. //标记是否创建成功
  93. boolean created = false;
  94. try {
  95. /**
  96. * rs:cells的引用
  97. * m:cells的长度
  98. * j:当前线程对应的cells下标
  99. */
  100. Cell[] rs; int m, j;
  101. if ((rs = cells) != null &&
  102. (m = rs.length) > 0 &&
  103. rs[j = (m - 1) & h] == null) {
  104. rs[j] = r;
  105. created = true;
  106. }
  107. } finally {
  108. cellsBusy = 0;
  109. }
  110. if (created)
  111. break;
  112. continue; // Slot is now non-empty
  113. }
  114. }
  115. collide = false;
  116. }
  117. else if (!wasUncontended) // CAS already known to fail
  118. wasUncontended = true; // Continue after rehash
  119. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  120. fn.applyAsLong(v, x))))
  121. break;
  122. else if (n >= NCPU || cells != as)
  123. collide = false; // At max size or stale
  124. else if (!collide)
  125. collide = true;
  126. else if (cellsBusy == 0 && casCellsBusy()) {
  127. try {
  128. if (cells == as) { // Expand table unless stale
  129. Cell[] rs = new Cell[n << 1];
  130. for (int i = 0; i < n; ++i)
  131. rs[i] = as[i];
  132. cells = rs;
  133. }
  134. } finally {
  135. cellsBusy = 0;
  136. }
  137. collide = false;
  138. continue; // Retry with expanded table
  139. }
  140. //重置当前线程Hash值
  141. h = advanceProbe(h);
  142. }
  143. /**
  144. * case2:
  145. * cells并未初始化
  146. * 锁状态为0
  147. * cells==as ? 因为其它线程可能会在你给as赋值之后修改了 cells
  148. * 获取锁成功
  149. * 里面再次判断cells==as是因为防止其他线程在判断第一层if的中间被其他线程先进来修改了一次
  150. * 初始化cells
  151. */
  152. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  153. boolean init = false;
  154. try { // Initialize table
  155. if (cells == as) {
  156. Cell[] rs = new Cell[2];
  157. rs[h & 1] = new Cell(x);
  158. cells = rs;
  159. init = true;
  160. }
  161. } finally {
  162. cellsBusy = 0;
  163. }
  164. if (init)
  165. break;
  166. }
  167. /**
  168. * case3:
  169. * cellsBusy处于加锁状态,表示其他线程正在初始化cells,
  170. * 那么当前线程就应该将数据累加到base
  171. */
  172. else if (casBase(v = base, ((fn == null) ? v + x :
  173. fn.applyAsLong(v, x))))
  174. break; // Fall back on using base
  175. }
  176. }

getProbe()获取当前线程hash值。

  1. static final int getProbe() {
  2. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  3. }

casCellsBusy()cas的方式获取锁。

  1. final boolean casCellsBusy() {
  2. return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
  3. }

advanceProbe(h)重置当前线程hash值。

  1. static final int advanceProbe(int probe) {
  2. probe ^= probe << 13; // xorshift
  3. probe ^= probe >>> 17;
  4. probe ^= probe << 5;
  5. UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
  6. return probe;
  7. }

casBase()cas的方式改变base值。

  1. final boolean casBase(long cmp, long val) {
  2. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
  3. }

三,LongAdder执行流程

LongAdder的执行流程实际上就是:

  1. 当没有线程竞争的时候,线程会直接操做base里面的值。
  2. 当有线程竞争的时候,会将base的值拷贝成一个cells数组,每个线程都来操作一个cell数组中的桶位,最终将cells各个桶位和base求和,就可以得到LongAdder的最终值。
    image.png
  1. public void add(long x) {
  2. if(数组未初始化||cas的方式把值写入base失败){
  3. if(数组未初始化||当前线程对应的单元格未初始化||或cas的方式在当前线程对应的单元格交换数据失败,说明有竞争){
  4. longAccumulate(x, null, uncontended);
  5. }
  6. }
  7. }
  8. final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {
  9. if(当前线程hash值==0){
  10. 重置当前线程hash
  11. 是否发生竞争改为true
  12. }
  13. 声明扩容意向=false
  14. for(;;){
  15. if(数组已经初始化过){
  16. if(当前线程命中的单元格未初始化){
  17. if(如果当前数组不是正在扩容){
  18. 创建一个新的Cell(x)
  19. if(当前并未有其他线程对数组进行扩容&&且当前线程竞争扩容数组的权利成功){
  20. 声明 创建完成 =false
  21. if(数组不为空&&当前没有线程正在操作数组&&且当前线程命中的单元格为空){
  22. 将当前线程对应的单元格初始化为刚才创建的cell
  23. 创建完成 =true
  24. 释放 数组扩容的权利
  25. }
  26. if(数组已经创建完成){
  27. break
  28. }
  29. continue;
  30. }
  31. }
  32. 扩容意向 =false
  33. }
  34. else if(如果当前线程竞争修改失败){
  35. 状态设置为true
  36. //默认所有线程1开始都在cell[0]的位置,所以一定会发生竞争,这次竞争就不当做一次真正的竞争
  37. }
  38. else if(cas的方式修改当前线程命中的单元格成功){
  39. break
  40. }
  41. else if(数组长度大于cpu数||cells长度 和当前数组长度不一致){
  42. 扩容意向 =false
  43. //cells长度 和当前数组长度不一致:说明已经有其他线程扩容了,当前线程只要rehash重试即可。
  44. }
  45. else if(扩容意向==false){
  46. 扩容意向 = true //但是不一定真的发生扩容
  47. }
  48. else if(当前没有线程正在在扩容数组 && 并且当前线程竞争到了扩容数组的资格){
  49. if(在这个过程中没有线程把数组扩容了,也就是啥也没发生){
  50. 数组扩容一倍,将原数组的值进行一个拷贝,将新数组的地址指向原数组
  51. }
  52. 释放数组扩容的权利
  53. 扩容意向改成false
  54. }
  55. 把当前线程的hashrehash
  56. }
  57. //当前数组没有被初始化
  58. else if(当前没有线程正在初始化数组,也没有线程已经初始化完了,并且当前线程竞争到了初始化的权利){
  59. 声明 初始化 == false
  60. if(当前数组还未初始化,说明没有线程已经初始化完了){
  61. 创建一个长度为2 的数组
  62. 并把当前线程命中的单元格初始化
  63. 初始化 == true
  64. }
  65. 释放数组扩容权利
  66. if(初始化==true){
  67. break
  68. }
  69. }
  70. //如果有线程正在初始化数组,那么当前线程就应该将数据累加到base
  71. else if(cas的方式修改base值成功){
  72. break
  73. }
  74. }
  75. }

1.为什么比cas的效率高?

  1. cas是多线程竞争,只有拿到锁的线程才能去操做资源,其他线程不断的自旋重试,相当于线程排队。
  2. LongAdder是多线程竞争的时候,他会将共享资源拷贝多份,采用分支合并的思想,提升效率。