阿里开发手册建议

image.png

LongAdder由来


LongAdder类是JDK1.8新增的一个原子性操作类。AtomicLong通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,但是JDK开发组并不满足于此,因为非常搞并发的请求下AtomicLong的性能是不能让人接受的。
如下AtomicLong 的incrementAndGet的代码,虽然AtomicLong使用CAS算法,但是CAS失败后还是通过无限循环的自旋锁不多的尝试,浪费CPU资源,这就是高并发下CAS性能低下的原因所在。源码如下:
image.png
并发下N多线程同时去操作一个变量会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性。既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源。

LongAdder实现原理图
image.pngimage.png


LongAdder则是内部维护一个Cells数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。

LongAdder维护了要给延迟初始化的原子性更新数组和一个基值变量base数组的大小保持是2的N次方大小,数组表的下标使用每个线程的hashcode值的掩码表示,数组里面的变量实体是Cell类型。

Cell 类型是Atomic的一个改进,用来减少缓存的争用,对于大多数原子操作字节填充是浪费的,因为原子操作都是无规律的分散在内存中进行的,多个原子性操作彼此之间是没有接触的,但是原子性数组元素彼此相邻存放将能经常共享缓存行,也就是伪共享。所以这在性能上是一个提升。

另外由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候再创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

Longadder类关系图
image.png

Striped64类

  • Striped64是一个高并发累加的工具类。
  • Striped64的设计核心思路就是通过内部的分散计算来避免竞争。
  • Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表。
  • 没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。所以整个Striped64的值为sum=base+∑[0~n]cells。


Striped64核心属性

  • transient volatile Cell[] cells; // 存放cell的hash表,大小为2乘幂
  • transient volatile long base; // 基础值(1.无竞争时更新,2.cells数组初始化过程不可用时,也会通过cas累加到base)
  • transient volatile int cellsBusy; // 自旋锁,通过CAS操作加锁(1.初始化cells数组,2.创建cell单元,3.cells扩容)

成员变量cells

  • cells数组是LongAdder高性能实现的必杀器:
  • AtomicInteger只有一个value,所有线程累加都要通过cas竞争value这一个变量,高并发下线程争用非常严重;
    而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,这时使用base做累加,有了竞争后cells数组就上场了,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器cpu的数量为止就不在扩容(CPU能够并行的CAS操作的最大数量是它的核心数),每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上。

image.png
成员变量cellsBusy

cellsBusy作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁,加锁的状况有三种

  1. cells数组初始化的时候
  2. cells数组扩容的时候
  3. 如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候

成员变量base

它有两个作用:

  1. 在开始没有竞争的情况下,将累加值累加到base
  2. 在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上

Longadder

  1. // 累加方法,参数x为累加的值
  2. public void add(long x) {
  3. Cell[] as; long b, v; int m; Cell a;
  4. /**
  5. * 如果一下两种条件则继续执行if内的语句
  6. * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
  7. * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
  8. * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
  9. * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
  10. */
  11. if ((as = cells) != null || !casBase(b = base, b + x)) {
  12. //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
  13. boolean uncontended = true;
  14. /**
  15. *1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
  16. *2. (m = as.length - 1) < 0: cells数组的长度为0
  17. *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
  18. *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
  19. *4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
  20. 如果进入if语句执行longAccumulate方法,有三种情况
  21. 1. 前两个条件代表cells没有初始化,
  22. 2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
  23. 3. 第四个条件代表产生了冲突,uncontended=false
  24. **/
  25. if (as == null || (m = as.length - 1) < 0 ||
  26. (a = as[getProbe() & m]) == null ||
  27. !(uncontended = a.cas(v = a.value, v + x)))
  28. longAccumulate(x, null, uncontended);
  29. }
  30. }
  31. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
  32. //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;
  33. //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
  34. //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;
  35. //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;
  36. //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
  37. int h;
  38. if ((h = getProbe()) == 0) {
  39. //初始化ThreadLocalRandom;
  40. ThreadLocalRandom.current(); // force initialization
  41. //将h设置为0x9e3779b9
  42. h = getProbe();
  43. //设置未竞争标记为true
  44. wasUncontended = true;
  45. }
  46. //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突
  47. boolean collide = false;
  48. for (;;) {
  49. Cell[] as; Cell a; int n; long v;
  50. //这个主干if有三个分支
  51. //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
  52. //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
  53. //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上
  54. //先看主分支一
  55. if ((as = cells) != null && (n = as.length) > 0) {
  56. /**
  57. *内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上
  58. */
  59. if ((a = as[(n - 1) & h]) == null) {
  60. //cellsBusy == 0 代表当前没有线程cells数组做修改
  61. if (cellsBusy == 0) {
  62. //将要累加的x值作为初始值创建一个新的Cell对象,
  63. Cell r = new Cell(x);
  64. //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
  65. if (cellsBusy == 0 && casCellsBusy()) {
  66. //标记Cell是否创建成功并放入到cells数组被hash的位置上
  67. boolean created = false;
  68. try {
  69. Cell[] rs; int m, j;
  70. //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
  71. if ((rs = cells) != null &&
  72. (m = rs.length) > 0 &&
  73. rs[j = (m - 1) & h] == null) {
  74. //将新的cell设置到该位置
  75. rs[j] = r;
  76. created = true;
  77. }
  78. } finally {
  79. //去掉锁
  80. cellsBusy = 0;
  81. }
  82. //生成成功,跳出循环
  83. if (created)
  84. break;
  85. //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。
  86. continue;
  87. }
  88. }
  89. //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
  90. collide = false;
  91. /**
  92. *内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;
  93. */
  94. } else if (!wasUncontended)
  95. //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。
  96. wasUncontended = true;
  97. /**
  98. *内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出
  99. */
  100. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  101. fn.applyAsLong(v, x))))
  102. break;
  103. /**
  104. *内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值*/
  105. else if (n >= NCPU || cells != as)
  106. collide = false;
  107. /**
  108. *内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
  109. */
  110. else if (!collide)
  111. //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
  112. collide = true;
  113. /**
  114. *内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
  115. */
  116. else if (cellsBusy == 0 && casCellsBusy()) {
  117. try {
  118. //检查cells是否已经被扩容
  119. if (cells == as) { // Expand table unless stale
  120. Cell[] rs = new Cell[n << 1];
  121. for (int i = 0; i < n; ++i)
  122. rs[i] = as[i];
  123. cells = rs;
  124. }
  125. } finally {
  126. cellsBusy = 0;
  127. }
  128. collide = false;
  129. continue; // Retry with expanded table
  130. }
  131. //为当前线程重新计算hash值
  132. h = advanceProbe(h);
  133. //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。
  134. }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  135. boolean init = false;
  136. try { // Initialize table
  137. //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
  138. if (cells == as) {
  139. Cell[] rs = new Cell[2];
  140. rs[h & 1] = new Cell(x);
  141. cells = rs;
  142. init = true;
  143. }
  144. } finally {
  145. //解锁
  146. cellsBusy = 0;
  147. }
  148. //如果init为true说明初始化成功,跳出循环
  149. if (init)
  150. break;
  151. }
  152. /**
  153. *如果以上操作都失败了,则尝试将值累加到base上;
  154. */
  155. else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // Fall back on using base
  156. break;
  157. }
  158. }

hash生成策略

hash决定了当前线程将累加值定位到哪个cell中,hash算法尤其重要。
hash就是java的Thread类里面有一个成员变量,初始值为0。

  1. /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
  2. @sun.misc.Contended("tlr")
  3. int threadLocalRandomProbe;
  4. // LongAdder的父类Striped64里通过getProbe方法获取当前线程threadLocalRandomProbe
  5. static final int getProbe() {
  6. // PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量
  7. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  8. }
  9. // threadLocalRandomProbe初始化
  10. // 线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9
  11. int h;
  12. if ((h = getProbe()) == 0) {
  13. ThreadLocalRandom.current();
  14. h = getProbe();
  15. //设置未竞争标记为true
  16. wasUncontended = true;
  17. }
  18. static final void localInit() {
  19. // private static final AtomicInteger probeGenerator = new AtomicInteger();
  20. // private static final int PROBE_INCREMENT = 0x9e3779b9;
  21. int p = probeGenerator.addAndGet(PROBE_INCREMENT);
  22. int probe = (p == 0) ? 1 : p; // skip 0
  23. long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
  24. Thread t = Thread.currentThread();
  25. UNSAFE.putLong(t, SEED, seed);
  26. UNSAFE.putInt(t, PROBE, probe);
  27. }
  28. threadLocalRandomProbe重新生成
  29. static final int advanceProbe(int probe) {
  30. probe ^= probe << 13; // xorshift
  31. probe ^= probe >>> 17;
  32. probe ^= probe << 5;
  33. UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
  34. return probe;
  35. }