1、AtomicLong的缺点

AtomicLong底层使用CAS实现,在高并发场景下,CAS自旋时间长,性能较差

image.png

  1. @HotSpotIntrinsicCandidate
  2. public final long getAndSetLong(Object o, long offset, long newValue) {
  3. long v;
  4. do {
  5. v = getLongVolatile(o, offset);
  6. } while (!weakCompareAndSetLong(o, offset, v, newValue));
  7. return v;
  8. }

2、LongAddr基本原理

LongAdder的基本思路就是分散热点,将value值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个value值进行CAS操作,减少冲突的概率

image.png

LongAdder有一个全局变量volatile long base值,当并发不高的情况下都是通过CAS来直接操作base值,如果CAS失败,则针对LongAdder中的Cell[]数组中的Cell进行CAS操作,最后使用base值加cell中的和即为最终值

image.png

[

](https://blog.csdn.net/u012881584/article/details/106133349)

3、LongAddr源码分析

3.1、伪共享问题

LongAddr继承了Striped64,其中定义了base和cell数组

  1. abstract class Striped64 extends Number {
  2. @jdk.internal.vm.annotation.Contended static final class Cell {
  3. volatile long value;
  4. Cell(long x) { value = x; }
  5. final boolean cas(long cmp, long val) {
  6. return VALUE.compareAndSet(this, cmp, val);
  7. }
  8. final void reset() {
  9. VALUE.setVolatile(this, 0L);
  10. }
  11. final void reset(long identity) {
  12. VALUE.setVolatile(this, identity);
  13. }
  14. final long getAndSet(long val) {
  15. return (long)VALUE.getAndSet(this, val);
  16. }
  17. private static final VarHandle VALUE;
  18. static {
  19. try {
  20. MethodHandles.Lookup l = MethodHandles.lookup();
  21. VALUE = l.findVarHandle(Cell.class, "value", long.class);
  22. } catch (ReflectiveOperationException e) {
  23. throw new ExceptionInInitializerError(e);
  24. }
  25. }
  26. }
  27. static final int NCPU = Runtime.getRuntime().availableProcessors();
  28. transient volatile Cell[] cells;
  29. transient volatile long base;
  30. transient volatile int cellsBusy;
  31. }

Striped64中Cell使用@Contended注解进行修饰,而 @Contended 注解可以进行缓存行填充,从而解决伪共享问题


伪共享问题 :
cpu和主存之间引入了多级的cache缓存,CPU缓存中的数据是以缓存行为单位处理的,缓存行通常是 64 字节(常用处理器的缓存行是 64 字节的,比较旧的处理器缓存行是 32 字节),并且它有效地引用主内存中的一块地址。在程序运行的过程中,缓存每次更新都从主内存中加载连续的 64 个字节(如果访问一个 long 类型的数组时,当数组中的一个值被加载到缓存中时,另外 7 个元素也会被加载到缓存中),所有cpu对内存连续的数据结构更为友好。但是当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能 ,如cpu核心1的线程a和cpu核心2的线程b操作了同一个缓存行的两个独立变量,当a更新目标变量后cpu核心2的对应缓存行失效,需要从主从重新读取。

伪共享解决方案:

进行缓存行填充(Padding),让不同线程操作的对象处于不同的缓存行
如果一条缓存行有 64 字节,而 Java 程序的对象头固定占 8 字节(32位系统)或 12 字节( 64 位系统默认开启压缩, 不开压缩为 16 字节),所以我们只需要填 6 个无用的长整型补上6*8=48字节,让不同的 VolatileLong 对象处于不同的缓存行,就避免了伪共享


3.2、add方法

供外部调用的api

  1. public void add(long x) {
  2. /**
  3. * cs: cells引用
  4. * b: base值
  5. * v: 期望值
  6. * m: cell数组的长度
  7. * c: 当前线程命中的cell单元格
  8. *
  9. */
  10. Cell[] cs; long b, v; int m; Cell c;
  11. /**
  12. * 条件1: (cs = cells) != null
  13. * true: cells已经初始化了 当前线程应该将新数据写到cell中
  14. * false: cells还未被初始化 当前线程应该将数据写到base中
  15. * 条件2: !casBase(b = base, b + x)
  16. * 只有条件1为false 即cells还未被初始化时会执行
  17. * true: 当前线程cas修改base值失败
  18. * false: 当前线程cas修改base值成功
  19. */
  20. if ((cs = cells) != null || !casBase(b = base, b + x)) {
  21. /**
  22. * 进入条件:
  23. * 1、cells已经初始化了 当前线程应该将新数据写到cell中
  24. * 2、cells还未被初始化,cas发生竞争,当前线程cas操作失败
  25. */
  26. boolean uncontended = true;
  27. /**
  28. *条件1: cs == null || (m = cs.length - 1) < 0
  29. * true: cells还未被初始化,是cas发生竞争操作失败进入代码块的
  30. * false: cells已经初始化了 当前线程应该将新数据写到cell中
  31. *条件2:c = cs[getProbe() & m]) == null
  32. * getProbe(): 相当于获取当前线程的hash值
  33. * m: cell的数量 -1 ,m为2的n次方,getProbe() & m 相当于 使用hash值对数组长度取余,但是位运算操作更快
  34. *
  35. * 前提条件:cells已经初始化了
  36. * true: 说明当前线程对应下标的cell为空,需要创建longAccumulate支持
  37. * false:说明当前线程对应下标的cell不为空,下一步需要将x值添加到cell中
  38. *
  39. *条件3: !(uncontended = c.cas(v = c.value, v + x))
  40. * 前提条件:cells已经初始化了,且当前线程对应下标的cell不为空
  41. * 对目标cell进行一次cas尝试操作
  42. * true: cas操作失败 当前线程对应的cell有竞争
  43. * false: cas成功
  44. */
  45. if (cs == null || (m = cs.length - 1) < 0 ||
  46. (c = cs[getProbe() & m]) == null ||
  47. !(uncontended = c.cas(v = c.value, v + x))){
  48. /**
  49. * 进入条件:
  50. * 1、cells还未被初始化,是cas发生竞争操作失败进入代码块的
  51. * 2、cells已未被初始化,但是当前线程对应下标的cell为空
  52. * 3、cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败 当前线程对应的cell有竞争
  53. */
  54. longAccumulate(x, null, uncontended);
  55. }
  56. }
  57. }


3.3、longAccumulate方法

LongAddr统计实现核心方法

image.png

  1. /**
  2. * 进入条件:
  3. * 1、cells还未被初始化,是cas发生竞争操作失败进入代码块的
  4. * 2、cells已未被初始化,但是当前线程对应下标的cell为空
  5. * 3、cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败 当前线程对应的cell有竞争
  6. *
  7. *
  8. * @param x 要累加的值
  9. * @param fn 操作算法接口 默认为null
  10. * @param wasUncontended 当前cell是否发生过竞争 true 未发生竞争 false 发送竞争
  11. * 当为第3种条件进入时 wasUncontended为false 第1,2种条件为true
  12. */
  13. final void longAccumulate(long x, LongBinaryOperator fn,
  14. boolean wasUncontended) {
  15. //当前线程的hash值
  16. int h;
  17. if ((h = getProbe()) == 0) {
  18. //当前线程还未分片hash值
  19. ThreadLocalRandom.current();
  20. h = getProbe();
  21. /**
  22. * 强制将竞争标识设置为true
  23. *
  24. * 默认情况下:
  25. * 新线程的对应hash值为0,首先会写入cells[0]的位置 不将其算作一次竞争
  26. */
  27. wasUncontended = true;
  28. }
  29. //扩容意向 false 一定不会扩容 true 可能会扩容
  30. boolean collide = false;
  31. done: for (;;) {
  32. //cs : cells引用
  33. //c :当前线程命中cell
  34. //n: cells数组长度
  35. //v: 期望值
  36. Cell[] cs; Cell c; int n; long v;
  37. /**
  38. *CASE1: (cs = cells) != null && (n = cs.length) > 0 表示cells已经初始化了,当前线程应该将数据写到对应的cell中
  39. */
  40. if ((cs = cells) != null && (n = cs.length) > 0) {
  41. /**
  42. * CASE1.1 :(c = cs[(n - 1) & h]) == null
  43. * true: 当前线程对应的cell为空,需要创建 (n - 1) & h相当于 h % n
  44. * false: 当前线程对应的cell不为空
  45. */
  46. if ((c = cs[(n - 1) & h]) == null) {
  47. //cellsBusy == 0:cells没有在扩容或者创建
  48. if (cellsBusy == 0) {
  49. //以累加值x 新建一个cell
  50. Cell r = new Cell(x);
  51. /**
  52. * 如果cells没有在扩容或者创建,尝试使用cas获取锁
  53. * 条件1: cellsBusy == 0
  54. * true: 当前处于无锁状态
  55. * false: 锁被占用
  56. * 条件2:casCellsBusy()
  57. * true: 当前线程获取锁成功 可以进行cell创建
  58. * false: 当前线程获取锁失败
  59. */
  60. if (cellsBusy == 0 && casCellsBusy()) {
  61. try {
  62. /**
  63. * cas获取cells操作锁成功 将cells数组对应下标的元素设置为刚刚新建的cell
  64. * rs cells引用
  65. *
  66. */
  67. Cell[] rs; int m, j;
  68. /**
  69. * 条件1:(rs = cells) != null 恒成立
  70. * 条件2:m = rs.length) > 0 恒成立
  71. * 条件3:rs[j = (m - 1) & h] == null
  72. * 类似单例双重锁检查 防止其它线程初始化位置,当前线程再次初始化 导致数据丢失
  73. * true: 没有其它线程对目标下标创建cell
  74. * false: 其它线程对目标下标创建cell
  75. */
  76. if ((rs = cells) != null &&
  77. (m = rs.length) > 0 &&
  78. rs[j = (m - 1) & h] == null) {
  79. rs[j] = r;
  80. break done;
  81. }
  82. } finally {
  83. //释放锁
  84. cellsBusy = 0;
  85. }
  86. //此时当前线程对应的cell已经初始化了 重试操作
  87. continue;
  88. }
  89. }
  90. //扩容意向设置为false
  91. collide = false;
  92. }
  93. //CASE1.2: cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败
  94. else if (!wasUncontended){
  95. //把发送竞争标识设置为true 表示当前cell未发生竞争 后面将线程hash值重置
  96. wasUncontended = true;
  97. }
  98. //CASE1.3: 对当前cell进行cas操作
  99. // 前置条件:当前线程重置过hash值,新命中的cell不为空
  100. else if (c.cas(v = c.value,
  101. (fn == null) ? v + x : fn.applyAsLong(v, x))){
  102. //true: 当前cell进行cas操作成功 退出
  103. // false: 重置hash值后,新命中的cell也有竞争,CAS失败
  104. break;
  105. }
  106. /**
  107. * CASE1.4:
  108. * 条件1:n >= NCPU
  109. * true: 当前cells数量大于cpu个数, 不再扩容
  110. * false: 可以扩容
  111. * 条件2:cells != cs
  112. * true: 其它线程已经扩容过了,当前线程重置hash值后重试即可
  113. * false: 其它线程未扩容cells
  114. *
  115. */
  116. else if (n >= NCPU || cells != cs){
  117. collide = false;
  118. }
  119. /**
  120. * CASE1.5: 设置扩容意向为true 但是不一定真的发生扩容,此时再去重试一次,如果还发生竞争就走到CASE1.6
  121. * 前置条件:当前线程重置过hash值,新命中的cell不为空,cas发生竞争,操作失败,并且cells还可以扩容
  122. *
  123. */
  124. else if (!collide){
  125. collide = true;
  126. }
  127. /**
  128. * CASE1.6: 真正扩容逻辑
  129. * 条件1:cellsBusy == 0
  130. * true: 当前为无锁状态 当前线程可以去竞争锁
  131. * false: 当前为有锁状态 cells在进行扩容
  132. * 条件2:casCellsBusy()
  133. * true 当前线程获取锁成功 可以执行扩容逻辑
  134. * false: 当前时刻有其它线程在进行扩容操作
  135. *
  136. */
  137. else if (cellsBusy == 0 && casCellsBusy()) {
  138. try {
  139. if (cells == cs){
  140. //和上面的一样 类似单例双重锁检查 防止其它线程扩容,当前线程再次扩容
  141. cells = Arrays.copyOf(cs, n << 1);
  142. }
  143. } finally {
  144. cellsBusy = 0;
  145. }
  146. collide = false;
  147. continue; // Retry with expanded table
  148. }
  149. //重置线程hash值
  150. h = advanceProbe(h);
  151. }
  152. /**
  153. * CASE2:
  154. * 前置条件:cells还未被初始化
  155. * 条件1:cellsBusy == 0 当前未加锁,cell没有在扩容或者新建
  156. * 条件2:cells == cs,cells为空 因为有可能其它线程修改了cells
  157. * 条件3:casCellsBusy() 加锁成功 获取cells操作锁 cellsBusy改为1
  158. */
  159. else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
  160. try {
  161. /**
  162. * 类似单例双重锁检查 防止 a,b线程同时执行到cellsBusy == 0 && cells == cs 位置,然后a拿到锁,创建cells,再释放锁,此时b再次拿到锁,
  163. * 如果不进行判断cells是否为null 则会覆盖掉cells
  164. */
  165. if (cells == cs) {
  166. Cell[] rs = new Cell[2];
  167. rs[h & 1] = new Cell(x);
  168. cells = rs;
  169. break done;
  170. }
  171. } finally {
  172. cellsBusy = 0;
  173. }
  174. }
  175. /**
  176. * CASE3:
  177. * 前置条件:
  178. * 1、cells还未被初始化,cellbusy为加锁状态 需要当前线程将数据累加到base
  179. * 2、cells被其他线程初始化了 当前线程需要将数据累加到base
  180. */
  181. else if (casBase(v = base,
  182. (fn == null) ? v + x : fn.applyAsLong(v, x)))
  183. break done;
  184. }
  185. }

3.4、sum方法

LongAdder 的自增是没有返回值的,要获取当前值的时候,只能调用 sum 方法,将所有cell中的数据和base求和返回。
先自增,再获取值,不是原子操作,当多线程并发调用的时候,sum 方法返回的值必定不是一个准确的值

  1. public long sum() {
  2. Cell[] cs = cells;
  3. long sum = base;
  4. if (cs != null) {
  5. for (Cell c : cs)
  6. if (c != null)
  7. sum += c.value;
  8. }
  9. return sum;
  10. }