【参考】volatile能解决多线程内存不可见问题。对于一些多读吗,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。 说明:如果是count++操作,使用如下类实现:AtomicInteger count = new AtomicInteger();count.addAndGet(1)。如果是jdk8,推荐使用LongAdder对象,比AtomicInteger性能更好(减少乐观锁的重试次数)。

以上内容来自阿里《java开发手册》。
从上面说,LongAdder在jdk8中比AtomicInteger性能要好。

一、为什么LongAdder比Atomic原子类性能好?

这里做出一个测试

  1. /**
  2. * @Created by 勺子
  3. * @Description LongAdder Atomic测试
  4. * @Date 2022/3/1 12:59
  5. */
  6. public class LongAdderAndAtomicTest {
  7. private static AtomicInteger a = new AtomicInteger(0);
  8. private static LongAdder b = new LongAdder();
  9. public static void main(String[] args) throws InterruptedException {
  10. testAtomicAndLongAdder(1,10000000);
  11. testAtomicAndLongAdder(10,10000000);
  12. testAtomicAndLongAdder(20,10000000);
  13. testAtomicAndLongAdder(50,10000000);
  14. testAtomicAndLongAdder(100,10000000);
  15. }
  16. static void testAtomicAndLongAdder(int threadSize, int times) throws InterruptedException {
  17. System.out.println(String.format("线程数为: %s",threadSize));
  18. testAtomic(threadSize,times);
  19. testLongAdder(threadSize,times);
  20. }
  21. /**
  22. * 测试atomic效率
  23. * @param threadSize
  24. * @param times
  25. * @throws InterruptedException
  26. */
  27. private static void testAtomic(int threadSize, int times) throws InterruptedException {
  28. long startTime = System.currentTimeMillis();
  29. CountDownLatch latch = new CountDownLatch(threadSize);
  30. for (int i = 0; i < threadSize; i++) {
  31. new Thread(){
  32. @Override
  33. public void run() {
  34. for (int j = 0; j < times; j++) {
  35. a.incrementAndGet();
  36. }
  37. latch.countDown();
  38. }
  39. }.start();
  40. }
  41. latch.await();
  42. long endTime = System.currentTimeMillis();
  43. System.out.println(String.format("atomic耗时时间 :%s",endTime-startTime));
  44. }
  45. /**
  46. * 测试longAdder效率
  47. * @param threadSize
  48. * @param times
  49. * @throws InterruptedException
  50. */
  51. private static void testLongAdder(int threadSize, int times) throws InterruptedException {
  52. long startTime = System.currentTimeMillis();
  53. CountDownLatch latch = new CountDownLatch(threadSize);
  54. for (int i = 0; i < threadSize; i++) {
  55. new Thread(){
  56. @Override
  57. public void run() {
  58. for (int j = 0; j < times; j++) {
  59. b.increment();
  60. }
  61. latch.countDown();
  62. }
  63. }.start();
  64. }
  65. latch.await();
  66. long endTime = System.currentTimeMillis();
  67. System.out.println(String.format("longAdder耗时时间 :%s",endTime-startTime));
  68. }
  69. }

测试结果
1646182787(1).png
由此可见,在线程越多的时候,longAdder的性能是比Atomic原子类更高一些。

二、LongAdder源码阅读

首先先看下LongAdder的继承体系
image.png
LongAdder继承了Striped64 类, Striped64 类有一个静态内部类 Cell类

  1. @sun.misc.Contended static final class Cell {
  2. //保证可见性的value
  3. volatile long value;
  4. //带参数的构造函数,初始化时候传递value值
  5. Cell(long x) { value = x; }
  6. //cell类的cas操作交给unsafe类实现,返回是否成功,true表示cas写入成功,false->表示cas写入失败
  7. final boolean cas(long cmp, long val) {
  8. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  9. }
  10. // Unsafe mechanics
  11. private static final sun.misc.Unsafe UNSAFE;
  12. //指定属性的内存偏移量
  13. private static final long valueOffset;
  14. //静态代码块,类加载时候执行
  15. static {
  16. try {
  17. //先获取unsafe类
  18. UNSAFE = sun.misc.Unsafe.getUnsafe();
  19. Class<?> ak = Cell.class;
  20. //然后获取到对应类 value的内存地址的偏移量
  21. valueOffset = UNSAFE.objectFieldOffset
  22. (ak.getDeclaredField("value"));
  23. } catch (Exception e) {
  24. throw new Error(e);
  25. }
  26. }
  27. }

除此之外,Striped64 还有一些其他属性:

  1. //获取当前系统的cpu数,有什么用呢?控制cells数组长度的一个关键条件,在cells数组扩容时候,cells数组长度最大不能超过当前系统的cpu个数
  2. static final int NCPU = Runtime.getRuntime().availableProcessors();
  3. //cell数组,只要数组不为空,那么长度就是2的倍数
  4. transient volatile Cell[] cells;
  5. //多线程没有发生竞争时候,数据会累加到base上面 | 当cells在扩容时候,会将数据写入到base里
  6. transient volatile long base;
  7. //初始化cells数组或者cells数组扩容时候都需要先获取锁,0表示无锁状态,1表示有锁状态,有其他线程已经持有该锁了
  8. transient volatile int cellsBusy;
  9. //通过cas方式获取锁,调用unsafe类的cas方式实现
  10. final boolean casCellsBusy() {
  11. return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
  12. }
  13. //获取当前线程的hash值
  14. static final int getProbe() {
  15. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  16. }
  17. //重置当前线程的hash值
  18. static final int advanceProbe(int probe) {
  19. probe ^= probe << 13; // xorshift
  20. probe ^= probe >>> 17;
  21. probe ^= probe << 5;
  22. UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
  23. return probe;
  24. }

看完longAdder的结构,我们接下来看他的重要方法,add()方法

  1. public void add(long x) {
  2. //as 表示 cells引用
  3. //b 表示获取的base值
  4. //v 表示期望值
  5. //m 表示cells 数组的长度
  6. //a 表示当前线程命中的cel单元格
  7. Cell[] as; long b, v; int m; Cell a;
  8. //条件一:true=>表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中;
  9. // false=> 当前cells未初始化,当前所有线程应该将数据写入到base中
  10. //条件二:false => 表示当前线程cas替换数据成功
  11. // true=> 表示发生竞争了,可能需要重试 或者扩容
  12. if ((as = cells) != null || !casBase(b = base, b + x)) {
  13. //什么条件会进来?
  14. //1. true=>表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
  15. //2. true=> 表示发生竞争了,可能需要重试 或者扩容
  16. //true=> 未竞争 false=> 发生竞争
  17. boolean uncontended = true;
  18. //条件一:true=> 说明cells未初始化,也就是多线程写base发生竞争了
  19. // fasle=> 说明cells已经初始化了,当前线程应该是 找自己的cell写值
  20. //条件二:getProbe() 方法:获取当前线程的hash值 m表示cells长度减一。cells长度一定是2的次方数,15 => 1111
  21. // true=>说明当前线程对应下标的cell为空,需要创建
  22. // false=> 说明当前线程对应的cell不为空,说明 下一步想要将x值 添加到cell中
  23. //条件三:true=> 表示cas失败,意味着当前线程对应的cell有竞争了
  24. // false=> 表示cas成功
  25. if (as == null || (m = as.length - 1) < 0 ||
  26. (a = as[getProbe() & m]) == null ||
  27. !(uncontended = a.cas(v = a.value, v + x)))
  28. //进入该方法哪有三种情况?
  29. //1.cells未初始化,多线程写base发生竞争了,cas base失败
  30. //2.cell数组已经初始化了,当前线程对应cells数组下标的cell为空,需要创建cell
  31. //3.cell数组初始化了,并且cell存在,意味着当前线程对应的cell有竞争了,多线程在该cell位置发生竞争了【重试|扩容】
  32. //uncontended 默认true,false表示是cells数组已经初始化了,但是竞争cell cas失败了
  33. longAccumulate(x, null, uncontended);
  34. }
  35. }

看重要的longAccumulate方法

  1. //进入该方法有三种情况
  2. //1.cells未初始化,多线程写base发生竞争了 cas失败,会初始化cells
  3. //2.cell数组已经初始化了,说明当前线程对应下标的cell为空,需要创建cell
  4. //3.cell数组初始化了,意味着当前线程对应的cell有竞争了【重试|扩容】
  5. //uncontended 默认true,false表示是cells数组已经初始化了,但是竞争cell cas失败了
  6. final void longAccumulate(long x, LongBinaryOperator fn,
  7. boolean wasUncontended) {
  8. //当前线程的hash值
  9. int h;
  10. //条件成立:说明当前线程还未分配hash值
  11. if ((h = getProbe()) == 0) {
  12. //给当前线程分配hash值
  13. ThreadLocalRandom.current(); // force initialization
  14. //取出当前线程的hash值,赋值给h
  15. h = getProbe();
  16. //为什么?因为默认情况下, 当前线程肯定是写入到了cells[0]位置,不把当作一次真正的竞争
  17. wasUncontended = true;
  18. }
  19. //表示扩容意向 ,默认false一定不会扩容, true 可能会扩容
  20. boolean collide = false;
  21. //自旋
  22. for (;;) {
  23. //as 表示cells数组引用
  24. //a 表示当前线程命中的cell
  25. //n 表示cells数组长度
  26. //v 表示期望值
  27. Cell[] as; Cell a; int n; long v;
  28. //case1:表示cells数组已经初始化了,当前线程应该将数据写入到对应的cell中
  29. if ((as = cells) != null && (n = as.length) > 0) {
  30. //case1.1 处理数组初始化了,但是当前线程对应下标的cell为空,进行创建cell处理
  31. if ((a = as[(n - 1) & h]) == null) {
  32. //ture->表示当前锁未被占用,false 表示锁已被占用
  33. if (cellsBusy == 0) { // Try to attach new Cell
  34. //拿当前x创建cell
  35. Cell r = new Cell(x); // Optimistically create
  36. //条件一:ture->表示当前锁未被占用,false 表示锁已被占用
  37. //条件二:true-> 当前线程获取锁成功,false当前线程获取锁失败
  38. if (cellsBusy == 0 && casCellsBusy()) {
  39. //是否创建成功标记
  40. boolean created = false;
  41. try { // Recheck under lock
  42. //rs cells数组引用
  43. //m cells长度
  44. //j 当前线程命中的下表
  45. Cell[] rs; int m, j;
  46. //多线程double check
  47. //条件一,条件二恒成立
  48. //条件三:rs[j = (m - 1) & h] == null为了防止其他线程初始化了该位置,然后当前线程再初始化该cell
  49. if ((rs = cells) != null &&
  50. (m = rs.length) > 0 &&
  51. rs[j = (m - 1) & h] == null) {
  52. rs[j] = r;
  53. created = true;
  54. }
  55. } finally {
  56. cellsBusy = 0;
  57. }
  58. if (created)
  59. break;
  60. continue; // Slot is now non-empty
  61. }
  62. }
  63. //强制改为false
  64. collide = false;
  65. }
  66. //case1.2 处理第三种情况,竞争cell cas失败 线程竞争cell失败
  67. else if (!wasUncontended) // CAS already known to fail
  68. wasUncontended = true; // Continue after rehash
  69. //case1.3 当前线程rehash值,新命中的cell不为空 指定cell cas
  70. //true->写成功,退出循环
  71. //false-> 表示rehash之后命中的新的cell也有竞争,重试第一次
  72. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  73. fn.applyAsLong(v, x))))
  74. break;
  75. //case1.4 如果cells数组超过cpu核数或者已经cells数组已经扩容了,修改扩容意向为fasle
  76. else if (n >= NCPU || cells != as)
  77. //扩容意向改为false,表示不扩容了
  78. collide = false; // At max size or stale
  79. //case1.5 修改扩容意向为true,但并不代表一定要扩容,只是有可能
  80. else if (!collide)
  81. collide = true;
  82. //case1.6 实际的扩容操作
  83. //条件一:cellsBusy == 0 true=>表示当前无锁状态,当前线程可以竞争这把锁
  84. //条件二:casCellsBusy() true->表示当前线程cas方式获取锁成功,可以执行扩容逻辑;false当前时刻有其他线程正在做扩容相关的操作
  85. else if (cellsBusy == 0 && casCellsBusy()) {
  86. try {
  87. //多线程double check
  88. if (cells == as) { // Expand table unless stale
  89. Cell[] rs = new Cell[n << 1];
  90. for (int i = 0; i < n; ++i)
  91. rs[i] = as[i];
  92. cells = rs;
  93. }
  94. } finally {
  95. //释放锁
  96. cellsBusy = 0;
  97. }
  98. collide = false;
  99. continue; // Retry with expanded table
  100. }
  101. //当前线程的hash值 rehash
  102. h = advanceProbe( h);
  103. }
  104. //case2:前置条件cells数组未初始化,代表竞争base cas失败这种情况,进行初始化cells数组处理
  105. //条件一:true 表示当前未加锁
  106. //条件二:cells == as因为其他线程可能会在你给as赋值之前已经初始化了cells数组
  107. //条件三:true表示获取锁成功,会把cellsBusy设置为1,表示其他线程正持有这把锁
  108. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  109. boolean init = false;
  110. try { // Initialize table
  111. //double check,处理AB线程同时执行该代码块逻辑
  112. //防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
  113. if (cells == as) {
  114. Cell[] rs = new Cell[2];
  115. rs[h & 1] = new Cell(x);
  116. cells = rs;
  117. init = true;
  118. }
  119. } finally {
  120. cellsBusy = 0;
  121. }
  122. if (init)
  123. break;
  124. }
  125. //case3:如果竞争base失败,然后初始化数组被其他线程完成后了,那么直接加到base里面
  126. //1.当前cellsBusy加锁状态,表示其他线程正在初始化cells,所有当前线程将值累加到base
  127. //2.cells被其他线程初始化后,当前线程需要将数据累加到base中
  128. else if (casBase(v = base, ((fn == null) ? v + x :
  129. fn.applyAsLong(v, x))))
  130. break; // Fall back on using base
  131. }
  132. }

总结:三种情况

  1. 线程竞争base失败,会有一个获取到锁的线程执行初始化cells数组,其他线程会把数据累加到base中
  2. 线程cells数组已经初始化,但是cell为空,则会创建cell
  3. 线程cells数组已经初始化了,但是竞争cell cas失败,那么会先case1.2修改竞争失败标识,rehash重试一次, 如果失败,会修改扩容意向为true,然后第二次rehash cas重试,如果失败则最后需要进行cells扩容操作。