前言:

伪共享:

缓存行:

相信大家都知道,线程其实并不是直接从主内存里读取数据,而是经过缓存,并且现在的CPU一般都会有多级缓存,这里假如不考虑多级缓存,那java线程与主内存之间的交互应该可以用下图进行表示:
image.png
从上图我们可以看出,类似于学习mysql时提到的页的概念,缓存行(Cache Line)可以简单的理解为CPU Cache中的最小缓存单位。现今主流的CPU都不再是按字节访问内存,而是以64字节为单位的块(chunk)拿取,称为一个缓存行(cache line)。当你读一个特定的内存地址时,整个缓存行将从主内存读入到缓存。

伪共享的概念

首先明白了缓存行的概念的话,就应该可以想到:
一个缓存行可以存储多个变量(存满当前缓存行的字节数);而CPU对缓存的读取、修改又是以缓存行为最小单位的。
其实这样就产生了一个问题:
image.png
一个缓存行可以存储多个变量(存满当前缓存行的字节数);而CPU对缓存的修改又是以缓存行为最小单位的,在多线程情况下,如果需要修改“共享同一个缓存行的变量”,就会无意中影响彼此的性能,这就是伪共享(False Sharing)。

如何避免伪共享—-数据填充

可以使用数据填充的方式来避免伪共享,即单个数据填充满一个CacheLine。这本质是一种空间换时间的做法。

不使用数据填充

我们可以按照下图进行设计程序:

  • 预先生命数组A;
  • 按理说如果不进行缓存行的填充,数组中A的数据很有可能在一个缓存行,如下图;
  • 那多个线程对数组A中的值进行修改就会发生伪共享问题

image.png

手动进行数据填充

image.png

1. LongAdder与原子包装类的区别

AtomicInteger、AtomicLong使用非阻塞的CAS算法原子性地更新某一个变量,比synchronized这些阻塞算法拥有更好的性能,但是在高并发情况下,大量线程同时去更新一个变量,由于同一时间只有一个线程能够成功,绝大部分的线程在尝试更新失败后,会通过自旋的方式再次进行尝试,严重占用了CPU的时间片。

AtomicLong的实现原理图:
image.png
LongAdder是JDK8新增的原子操作类,它提供了一种新的思路,既然AtomicLong的性能瓶颈是由于大量线程同时更新一个变量造成的,那么能不能把这个变量拆分出来,变成多个变量,然后让线程去竞争这些变量,最后合并即可?LongAdder的设计精髓就在这里,通过将变量拆分成多个元素,降低该变量的并发度,最后进行合并元素,变相的减少了CAS的失败次数。

LongAdder的实现原理图:
image.png

2. 常用方法

  1. public class LongAdder extends Striped64 implements Serializable {
  2. //构造方法
  3. public LongAdder() {
  4. }
  5. //加1操作
  6. public void increment();
  7. //减1操作
  8. public void decrement();
  9. //获取原子变量的值
  10. public long longValue();
  11. }

3. 原理解析

image.png
LongAdder内部维护了一个Cell类型的数组,其中Cell是Striped64中的一个静态内部类。

3.1 Cell类

  1. abstract class Striped64 extends Number {
  2. @sun.misc.Contended static final class Cell {
  3. volatile long value;
  4. Cell(long x) { value = x; }
  5. final boolean cas(long cmp, long val) {
  6. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  7. }
  8. }
  9. }

Cell用来封装被拆分出来的元素,内部用一个value字段保存当前元素的值,等到需要合并时,则累加所有Cell数组中的value。Cell内部使用CAS操作来更新value值。
可以注意到,Cell类被 @sun.misc.Contended注解修饰了,这个注解是为了解决伪共享问题的。

而LongAdder采用的是Cell数组,而数组元素是连续的,因此多个Cell对象共享一个缓存行的情况非常普遍,因此这里@sun.misc.Contended注解对单个Cell元素进行字节填充,确保一个Cell对象占据一个缓存行,即填充至64字节。

3.2 longValue()

longValue()返回累加后的值

  1. public long longValue() {
  2. return sum();
  3. }
  4. public long sum() {
  5. Cell[] as = cells; Cell a;
  6. long sum = base;
  7. //当Cell数组不为null时,进行累加后返回,否则直接返回基准数base
  8. if (as != null) {
  9. for (int i = 0; i < as.length; ++i) {
  10. if ((a = as[i]) != null)
  11. sum += a.value;
  12. }
  13. }
  14. return sum;
  15. }

3.3 increment()

  1. public void increment() {
  2. add(1L);
  3. }
  4. public void add(long x) {
  5. Cell[] as; long b, v; int m; Cell a;
  6. /**
  7. * 如果一下两种条件则继续执行if内的语句
  8. * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,
  9. * 才会初始化cells数组)
  10. * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败
  11. * (casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
  12. * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
  13. * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是
  14. * 类似于AtomticInteger处理方式,使用cas做累加。
  15. */
  16. if ((as = cells) != null || !casBase(b = base, b + x)) {
  17. //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,
  18. //如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
  19. boolean uncontended = true;
  20. /**
  21. *1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
  22. *2. (m = as.length - 1) < 0: cells数组的长度为0
  23. *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
  24. *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,
  25. * 则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,
  26. * 然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;
  27. * 如果cells[threadLocalRandomProbe%cells.length]的位置为null,
  28. * 这说明这个位置从来没有线程做过累加,
  29. * 需要进入if继续执行,在这个位置创建一个新的Cell对象;
  30. *4. !(uncontended = a.cas(v = a.value, v + x)):
  31. * 尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,
  32. * 并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
  33. 如果进入if语句执行longAccumulate方法,有三种情况
  34. 1. 前两个条件代表cells没有初始化,
  35. 2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
  36. 3. 第四个条件代表产生了冲突,uncontended=false
  37. **/
  38. if (as == null || (m = as.length - 1) < 0 ||
  39. (a = as[getProbe() & m]) == null ||
  40. !(uncontended = a.cas(v = a.value, v + x)))
  41. longAccumulate(x, null, uncontended);
  42. }
  43. }

其中longAccmulate的解析如下:

  1. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
  2. //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,
  3. // 说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员
  4. // 静态私有变量probeGenerator的值,后面会详细讲hash值的生成;
  5. //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
  6. //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组
  7. //后争用的),
  8. // 是第一次执行base的cas累加操作失败;
  9. //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,
  10. // 那么这里会将其重新置为true;第一次执行操作失败;
  11. //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
  12. int h;
  13. if ((h = getProbe()) == 0) {
  14. //初始化ThreadLocalRandom;
  15. ThreadLocalRandom.current(); // force initialization
  16. //将h设置为0x9e3779b9
  17. h = getProbe();
  18. //设置未竞争标记为true
  19. wasUncontended = true;
  20. }
  21. //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;
  22. // collide=true代表有冲突,collide=false代表无冲突
  23. boolean collide = false;
  24. for (;;) {
  25. Cell[] as; Cell a; int n; long v;
  26. //这个主干if有三个分支
  27. //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
  28. //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
  29. //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,
  30. // 及cellbusy=1;
  31. // 则尝试将累加值通过cas累加到base上
  32. //先看主分支一
  33. if ((as = cells) != null && (n = as.length) > 0) {
  34. /**
  35. *内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,
  36. * 说明没有线程在这个位置设置过值,
  37. * 没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,
  38. * 对cells数组使用cellsBusy加锁,
  39. * 然后将这个Cell对象放到cells[m%cells.length]位置上
  40. */
  41. if ((a = as[(n - 1) & h]) == null) {
  42. //cellsBusy == 0 代表当前没有线程cells数组做修改
  43. if (cellsBusy == 0) {
  44. //将要累加的x值作为初始值创建一个新的Cell对象,
  45. Cell r = new Cell(x);
  46. //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
  47. if (cellsBusy == 0 && casCellsBusy()) {
  48. //标记Cell是否创建成功并放入到cells数组被hash的位置上
  49. boolean created = false;
  50. try {
  51. Cell[] rs; int m, j;
  52. //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
  53. if ((rs = cells) != null &&
  54. (m = rs.length) > 0 &&
  55. rs[j = (m - 1) & h] == null) {
  56. //将新的cell设置到该位置
  57. rs[j] = r;
  58. created = true;
  59. }
  60. } finally {
  61. //去掉锁
  62. cellsBusy = 0;
  63. }
  64. //生成成功,跳出循环
  65. if (created)
  66. break;
  67. //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]
  68. // 已经有其它线程设置了cell了,
  69. // 继续执行循环。
  70. continue;
  71. }
  72. }
  73. //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
  74. collide = false;
  75. /**
  76. *内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的
  77. * value值设置为v+x失败,
  78. * 说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,
  79. * 最后重新计算一个新的probe,然后重新执行循环;
  80. */
  81. } else if (!wasUncontended)
  82. //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。
  83. wasUncontended = true;
  84. /**
  85. *内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,
  86. * 也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]
  87. * 的value ,如果成功直接退出
  88. */
  89. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  90. fn.applyAsLong(v, x))))
  91. break;
  92. /**
  93. *内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值
  94. * (大于等于cup数量),
  95. * 或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值*/
  96. else if (n >= NCPU || cells != as)
  97. collide = false;
  98. /**
  99. *内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,
  100. * 进入下一次for循环
  101. */
  102. else if (!collide)
  103. //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。
  104. // 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
  105. collide = true;
  106. /**
  107. *内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
  108. */
  109. else if (cellsBusy == 0 && casCellsBusy()) {
  110. try {
  111. //检查cells是否已经被扩容
  112. if (cells == as) { // Expand table unless stale
  113. Cell[] rs = new Cell[n << 1];
  114. for (int i = 0; i < n; ++i)
  115. rs[i] = as[i];
  116. cells = rs;
  117. }
  118. } finally {
  119. cellsBusy = 0;
  120. }
  121. collide = false;
  122. continue; // Retry with expanded table
  123. }
  124. //为当前线程重新计算hash值
  125. h = advanceProbe(h);
  126. //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,
  127. // 先尝试获取cellsBusy锁。
  128. }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  129. boolean init = false;
  130. try { // Initialize table
  131. //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
  132. if (cells == as) {
  133. Cell[] rs = new Cell[2];
  134. rs[h & 1] = new Cell(x);
  135. cells = rs;
  136. init = true;
  137. }
  138. } finally {
  139. //解锁
  140. cellsBusy = 0;
  141. }
  142. //如果init为true说明初始化成功,跳出循环
  143. if (init)
  144. break;
  145. }
  146. /**
  147. *如果以上操作都失败了,则尝试将值累加到base上;
  148. */
  149. else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
  150. // Fall back on using base
  151. break;
  152. }
  153. }

4. 总结

LongAdder在没有线程竞争的时候,只使用base值,此时的情况就类似与AtomicLong。但LongAdder的高明之处在于,发生线程竞争时,便会使用到Cell数组,所以该数组是惰性加载的。

Cell数组初始值为2,每次扩容(当线程竞争异常激烈时,发生扩容)为上次长度的2倍,因此数组长度一直是2的次幂,但是当数组长度≥CPU的核心数时,就不再进行扩容。为什么?我的理解是在一台电脑中,最多能有CPU核心数个线程能够并行,因此同时也就这么多个线程操作Cell数组,每个线程更新一个位置上的元素,且又因为数组中每个元素由于字节填充机制,十分的占据内存。考虑到这两个因素,Cell数组在长度≥CPU核心数时,停止扩容。

确实,LongAdder花了很多心思提高了高并发下程序运行的效率,每一步都是在没有更好的办法下才会去选择开销更大的操作。在低并发下,LongAdder和AtomicLong效率上差不多,但LongAdder更加耗费内存。不过在高并发下,LongAdder将更加高效。