VM的Synchronized轻量级锁使用CAS(Compare And Swap,比较并交换)进行自旋抢锁,CAS是CPU指令级的原子操作,并处于用户态下,所以JVM轻量级锁的开销较小。

3.1 什么是CAS

JDK 5所增加的JUC(java.util.concurrent)并发包对操作系统的底层CAS原子操作进行了封装,为上层Java程序提供了CAS操作的API。

3.1.1 Unsafe类中的CAS方法

  1. 获取Unsafe实例 ```java public final class Unsafe {

    1. private static final Unsafe theUnsafe;
    2. public static final int INVALID_FIELD_OFFSET = -1;
    3. private static native void registerNatives();
    4. // 构造函数是private的,不允许外部实例化
    5. private Unsafe() {
    6. }
    7. ...


  1. 因此,我们无法在外部对Unsafe进行实例化,那么怎么获取Unsafe的实例呢?可以通过反射的方式自定义地获取Unsafe实例的辅助方法,代码如下:
  2. ```java
  3. package com.crazymakercircle.util;
  4. // 省略import
  5. public class JvmUtil
  6. {
  7. //自定义地获取Unsafe实例的辅助方法
  8. public static Unsafe getUnsafe()
  9. {
  10. try
  11. {
  12. Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
  13. theUnsafe.setAccessible(true);
  14. return (Unsafe) theUnsafe.get(null);
  15. } catch (Exception e)
  16. {
  17. throw new AssertionError(e);
  18. }
  19. }
  20. // 省略不相干代码
  21. }
  1. 调用Unsafe提供的CAS方法 ```java /**

    1. * 定义在Unsafe类中的三个“比较并交换”原子方法
    2. * @param o 需要操作的字段所在的对象
    3. * @param offset 需要操作的字段的偏移量(相对的,相对于对象头)
    4. * @param expected 期望值(旧的值)
    5. * @param update 更新值(新的值)
    6. * @return true 更新成功 | false 更新失败
    7. */

    public final native boolean compareAndSwapObject(

    1. Object o, long offset, Object expected, Object update);

    public final native boolean compareAndSwapInt(

    1. Object o, long offset, int expected,int update);

    public final native boolean compareAndSwapLong(

    1. Object o, long offset, long expected, long update);
  1. 3. 调用Unsafe提供的偏移量相关
  2. ```java
  3. /**
  4. * 定义在Unsafe类中的几个获取字段偏移量的方法
  5. * @param o 需要操作字段的反射
  6. * @return 字段的偏移量
  7. */
  8. public native long staticFieldOffset(Field field);
  9. public native long objectFieldOffset(Field field);


  1. static
  2. {
  3. try
  4. {
  5. //获取反射的Field对象
  6. OptimisticLockingPlus.class.getDeclaredField("value");
  7. //取得内存偏移
  8. valueOffset = unsafe.objectFieldOffset();
  9. } catch (Exception ex)
  10. {
  11. throw new Error(ex);
  12. }
  13. }

3.1.2 使用CAS进行无锁编程


  1. do
  2. {
  3. 获得字段的期望值(oldValue);
  4. 计算出需要替换的新值(newValue);
  5. } while (!CAS(内存地址,oldValuenewValue))

3.1.3 使用无锁编程实现轻量级安全自增


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class TestCompareAndSwap
  4. {
  5. // 基于CAS无锁实现的安全自增
  6. static class OptimisticLockingPlus
  7. {
  8. //并发数量
  9. private static final int THREAD_COUNT = 10;
  10. //内部值,使用volatile保证线程可见性
  11. private volatile int value;//值
  12. //不安全类
  13. private static final Unsafe unsafe = getUnsafe();;
  14. //value 的内存偏移(相对于对象头部的偏移,不是绝对偏移)
  15. private static final long valueOffset;
  16. //统计失败的次数
  17. private static final AtomicLong failure = new AtomicLong(0);
  18. static
  19. {
  20. try
  21. {
  22. //取得value属性的内存偏移
  23. valueOffset = unsafe.objectFieldOffset(
  24. OptimisticLockingPlus.class.getDeclaredField("value"));
  25. Print.tco("valueOffset:=" + valueOffset);
  26. } catch (Exception ex)
  27. {
  28. throw new Error(ex);
  29. }
  30. }
  31. //通过CAS原子操作,进行“比较并交换”
  32. public final boolean unSafeCompareAndSet(int oldValue, int newValue)
  33. {
  34. //原子操作:使用unsafe的“比较并交换”方法进行value属性的交换
  35. return unsafe.compareAndSwapInt(
  36. this, valueOffset,oldValue ,newValue );
  37. }
  38. //使用无锁编程实现安全的自增方法
  39. public void selfPlus()
  40. {
  41. int oldValue = value;
  42. //通过CAS原子操作,如果操作失败就自旋,一直到操作成功
  43. do
  44. {
  45. // 获取旧值
  46. oldValue = value;
  47. //统计无效的自旋次数
  48. if (i++ > 1)
  49. {
  50. //记录失败的次数
  51. failure.incrementAndGet();
  52. }
  53. } while (!unSafeCompareAndSet(oldValue, oldValue + 1));
  54. }
  55. //测试用例入口方法
  56. public static void main(String[] args) throws InterruptedException
  57. {
  58. final OptimisticLockingPlus cas = new OptimisticLockingPlus();
  59. //倒数闩,需要倒数THREAD_COUNT次
  60. CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
  61. for (int i = 0; i < THREAD_COUNT; i++)
  62. {
  63. // 提交10个任务
  64. ThreadUtil.getMixedTargetThreadPool().submit(() ->
  65. {
  66. //每个任务累加1000次
  67. for (int j = 0; j < 1000; j++)
  68. {
  69. cas.selfPlus();
  70. }
  71. latch.countDown(); // 执行完一个任务,倒数闩减少一次
  72. });
  73. }
  74. latch.await(); //主线程等待倒数闩倒数完毕
  75. Print.tco("累加之和:" + cas.value);
  76. Print.tco("失败次数:" + cas.failure.get());
  77. }
  78. }
  79. }

3.1.4 字段偏移量的计算


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class TestCompareAndSwap
  4. {
  5. @Test
  6. public void printObjectStruct()
  7. {
  8. //创建一个对象
  9. OptimisticLockingPlus object=new OptimisticLockingPlus();
  10. //给成员赋值
  11. object.value=100;
  12. //通过JOL工具输出内存布局
  13. String printable = ClassLayout.parseInstance(object).toPrintable();
  14. Print.fo("object = " + printable);
  15. }
  16. // 省略不相关代码
  17. }

3.2 JUC原子类


3.2.1 JUC中的Atomic原子操作包


  1. 基本原子类
  • AtomicInteger:整型原子类。
  • AtomicLong:长整型原子类。
  • AtomicBoolean:布尔型原子类。
  1. 数组原子类
  • AtomicIntegerArray:整型数组原子类。
  • AtomicLongArray:长整型数组原子类。
  • AtomicReferenceArray:引用类型数组原子类。
  1. 引用原子类
  • AtomicReference:引用类型原子类。
  • AtomicMarkableReference:带有更新标记位的原子引用类型。
  • AtomicStampedReference:带有更新版本号的原子引用类型。
  1. 字段更新原子类
  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型中的字段。

    3.2.2 基础原子类AtomicInteger

    基础原子类AtomicInteger常用的方法如下: ```java public final int get() //获取当前的值 public final int getAndSet(int newValue) //获取当前的值,然后设置新的值 public final int getAndIncrement() //获取当前的值,然后自增 public final int getAndDecrement() //获取当前的值,然后自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 boolean compareAndSet(int expect, int update) //通过CAS方式设置整数值
  1. 下面是一个基础原子类AtomicInteger的使用示例,具体代码如下:
  2. ```java
  3. package com.crazymakercircle.cas;
  4. // 省略import
  5. public class AtomicTest
  6. {
  7. @Test
  8. public void atomicIntegerTest()
  9. {
  10. int tempvalue = 0;
  11. //定义一个整数原子类实例,赋值到变量 i
  12. AtomicInteger i = new AtomicInteger(0);
  13. //取值,然后设置一个新值
  14. tempvalue = i.getAndSet(3);
  15. //输出tempvalue:0; i:3
  16. Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
  17. //取值,然后自增
  18. tempvalue = i.getAndIncrement();
  19. //输出tempvalue:3; i:4
  20. Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
  21. //取值,然后增加5
  22. tempvalue = i.getAndAdd(5);
  23. //输出tempvalue:4; i:9
  24. Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
  25. //CAS交换
  26. boolean flag = i.compareAndSet(9, 100);
  27. //输出flag:true; i:100
  28. Print.fo("flag:" + flag + "; i:" + i.get());
  29. }
  30. }


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class AtomicTest
  4. {
  5. @Test
  6. public static void main(String[] args) throws InterruptedException
  7. {
  8. CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
  9. //定义一个整数原子类实例,赋值到变量 i
  10. AtomicInteger atomicInteger = new AtomicInteger(0);
  11. for (int i = 0; i < THREAD_COUNT; i++)
  12. {
  13. // 创建10个线程,模拟多线程环境
  14. ThreadUtil.getMixedTargetThreadPool().submit(() ->
  15. {
  16. for (int j = 0; j < 1000; j++)
  17. {
  18. atomicInteger.getAndIncrement();
  19. }
  20. latch.countDown();
  21. });
  22. }
  23. latch.await();
  24. Print.tco("累加之和:" + atomicInteger.get());
  25. }
  26. // 省略不相关代码
  27. }

3.2.3 数组原子类AtomicIntegerArray


  1. //获取 index=i 位置元素的值
  2. public final int get(int i)
  3. //返回 index=i 位置当前的值,并将其设置为新值:newValue
  4. public final int getAndSet(int i, int newValue)
  5. //获取 index=i 位置元素的值,并让该位置的元素自增
  6. public final int getAndIncrement(int i)
  7. //获取 index=i 位置元素的值,并让该位置的元素自减
  8. public final int getAndDecrement(int i)
  9. //获取 index=i 位置元素的值,并加上预期的值
  10. public final int getAndAdd(int delta)
  11. //如果输入的数值等于预期值,就以原子方式将位置i的元素值设置为输入值(update)
  12. boolean compareAndSet(int expect, int update)
  13. //最终将位置i的元素设置为newValue
  14. //lazySet()方法可能导致其他线程在之后的一小段时间内还是可以读到旧的值
  15. public final void lazySet(int i, int newValue)


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class AtomicTest
  4. {
  5. @Test
  6. public void testAtomicIntegerArray () {
  7. int tempvalue = 0;
  8. //原始的数组
  9. int[] array = { 1, 2, 3, 4, 5, 6 };
  10. //包装为原子数组
  11. AtomicIntegerArray i = new AtomicIntegerArray(array);
  12. //获取第0个元素,然后设置为2
  13. tempvalue = i.getAndSet(0, 2);
  14. //输出tempvalue:1; i:[2, 2, 3, 4, 5, 6]
  15. Print.fo("tempvalue:" + tempvalue + "; i:" + i);
  16. //获取第0个元素,然后自增
  17. tempvalue = i.getAndIncrement(0);
  18. //输出tempvalue:2; i:[3, 2, 3, 4, 5, 6]
  19. Print.fo("tempvalue:" + tempvalue + "; i:" + i);
  20. //获取第0个元素,然后增加一个delta 5
  21. tempvalue = i.getAndAdd(0, 5);
  22. //输出tempvalue:3; i:[8, 2, 3, 4, 5, 6]
  23. Print.fo("tempvalue:" + tempvalue + "; i:" + i);
  24. }
  25. }

3.2.4 AtomicInteger线程安全原理


  1. public class AtomicInteger extends Number
  2. implements java.io.Serializable {
  3. //Unsafe类实例
  4. private static final Unsafe unsafe = Unsafe.getUnsafe();
  5. //内部value值,使用volatile保证线程可见性
  6. private volatile int value;
  7. //value属性值的地址偏移量
  8. private static final long valueOffset;
  9. static {
  10. try {
  11. //计算value 属性值的地址偏移量
  12. valueOffset = unsafe.objectFieldOffset(
  13. AtomicInteger.class.getDeclaredField("value"));
  14. } catch (Exception ex) { throw new Error(ex); }
  15. }
  16. //初始化
  17. public AtomicInteger(int initialValue) {
  18. value = initialValue;
  19. }
  20. //获取当前value值
  21. public final int get() {
  22. return value;
  23. }
  24. //方法:返回旧值并赋新值
  25. public final int getAndSet(int newValue) {
  26. for (;;) {//自旋
  27. int current = get();//获取旧值
  28. //以CAS方式赋值,直到成功返回
  29. if (compareAndSet(current, newValue)) return current;
  30. }
  31. }
  32. //方法:封装底层的CAS操作,对比expect(期望值)与value,若不同则返回false
  33. //若expect与value相同,则将新值赋给value,并返回true
  34. public final boolean compareAndSet(int expect, int update) {
  35. return unsafe.compareAndSwapInt(
  36. this, valueOffset, expect, update);
  37. }
  38. //方法:安全自增 i++
  39. public final int getAndIncrement() {
  40. for (;;) { //自旋
  41. int current = get();
  42. int next = current + 1;
  43. if (compareAndSet(current, next))
  44. return current;
  45. }
  46. }
  47. //方法:自定义增量数
  48. public final int getAndAdd(int delta) {
  49. for (;;) { //自旋
  50. int current = get();
  51. int next = current + delta;
  52. if (compareAndSet(current, next))
  53. return current;
  54. }
  55. }
  56. //方法:类似++i,返回自增后的值
  57. public final int incrementAndGet() {
  58. for (;;) { //自旋
  59. int current = get();
  60. int next = current + 1;
  61. if (compareAndSet(current, next))
  62. return next;
  63. }
  64. }
  65. //方法:返回加上delta后的值
  66. public final int addAndGet(int delta) {
  67. for (;;) { //自旋
  68. int current = get();
  69. int next = current + delta;
  70. if (compareAndSet(current, next))
  71. return next;
  72. }
  73. }
  74. // 省略其他源码
  75. }
  1. AtomicInteger源码中的主要方法都是通过CAS自旋实现的。CAS自旋的主要操作为:如果一次CAS操作失败,获取最新的value值后,再次进行CAS操作,直到成功。

3.3 对象操作的原子性


3.3.1 引用类型原子类


  1. package com.crazymakercircle.im.common.bean;
  2. // 省略import
  3. public class User implements Serializable
  4. {
  5. String uid; //用户ID
  6. String nickName; //昵称
  7. public volatile int age; //年龄
  8. public User(String uid, String nickName)
  9. {
  10. this.uid = uid;
  11. this.nickName = nickName;
  12. }
  13. @Override
  14. public String toString()
  15. {
  16. return "User{" +
  17. "uid='" + getUid() + '\'' +
  18. ", nickName='" + getNickName() + '\'' +
  19. ", platform=" + getPlatform() +
  20. '}';
  21. }


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class AtomicTest
  4. {
  5. @Test
  6. public void testAtomicReference()
  7. {
  8. //包装的原子对象
  9. AtomicReference<User> userRef = new AtomicReference<User>();
  10. //待包装的User对象
  11. User user = new User("1", "张三");
  12. //为原子对象设置值
  13. userRef.set(user);
  14. Print.tco("userRef is:" + userRef.get());
  15. //要使用CAS替换的User对象
  16. User updateUser = new User("2", "李四");
  17. //使用CAS替换
  18. boolean success = userRef.compareAndSet(user, updateUser);
  19. Print.tco(" cas result is:" + success);
  20. Print.tco(" after cas,userRef is:" + userRef.get());
  21. }
  22. // 省略其他
  23. }

3.3.2 属性更新原子类


  1. @Test
  2. public void testAtomicIntegerFieldUpdater()
  3. {
  4. //调用静态方法newUpdater()创建一个更新器updater
  5. AtomicIntegerFieldUpdater<User> updater=
  6. AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
  7. User user = new User("1", "张三");
  8. //使用属性更新器的getAndIncrement、getAndAdd增加user的age值
  9. Print.tco(updater.getAndIncrement(user));// 1
  10. Print.tco(updater.getAndAdd(user, 100));// 101
  11. //使用属性更新器的get获取user的age值
  12. Print.tco(updater.get(user));// 101
  13. }

3.4 ABA问题

3.4.1 了解ABA问题

3.4.2 ABA问题解决方案


3.4.3 使用AtomicStampedReference解决ABA问题


  1. //构造器,V表示要引用的原始数据,initialStamp表示最初的版本印戳(版本号)
  2. AtomicStampedReference(V initialRef, int initialStamp)


  1. //获取被封装的数据
  2. public V getRerference();
  3. //获取被封装的数据的版本印戳
  4. public int getStamp();


  1. public boolean compareAndSet(
  2. V expectedReference, //预期引用值
  3. V newReference, //更新后的引用值
  4. int expectedStamp, //预期印戳(Stamp)标志值
  5. int newStamp) //更新后的印戳(Stamp)标志值


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class AtomicTest
  4. {
  5. @Test
  6. public void testAtomicStampedReference()
  7. {
  8. CountDownLatch latch = new CountDownLatch(2);
  9. AtomicStampedReference<Integer> atomicStampedRef =
  10. new AtomicStampedReference<Integer>(1, 0);
  11. ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
  12. {
  13. @Override
  14. public void run()
  15. {
  16. boolean success = false;
  17. int stamp = atomicStampedRef.getStamp();
  18. Print.tco("before sleep 500: value="
  19. + atomicStampedRef.getReference()
  20. + " stamp=" + atomicStampedRef.getStamp());
  21. //等待500毫秒
  22. sleepMilliSeconds(500);
  23. success = atomicStampedRef.compareAndSet(1, 10,
  24. stamp, stamp + 1);
  25. Print.tco("after sleep 500 cas 1: success=" + success
  26. + " value=" + atomicStampedRef.getReference()
  27. + " stamp=" + atomicStampedRef.getStamp());
  28. //增加印戳值,然后更新,如果stamp被其他线程改了,就会更新失败
  29. stamp++;
  30. success = atomicStampedRef.compareAndSet(10, 1,
  31. stamp, stamp+1);
  32. Print.tco("after sleep 500 cas 2: success=" + success
  33. + " value=" + atomicStampedRef.getReference()
  34. + " stamp=" + atomicStampedRef.getStamp());
  35. latch.countDown();
  36. }
  37. });
  38. ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
  39. {
  40. @Override
  41. public void run()
  42. {
  43. boolean success = false;
  44. int stamp = atomicStampedRef.getStamp();
  45. // stamp = 0
  46. Print.tco("before sleep 1000: value="
  47. + atomicStampedRef.getReference()
  48. + " stamp=" + atomicStampedRef.getStamp());
  49. //等待1000毫秒
  50. sleepMilliSeconds(1000);
  51. Print.tco("after sleep 1000: stamp = "
  52. + atomicStampedRef.getStamp());
  53. //stamp = 1,这个值实际已经被修改了
  54. success = atomicStampedRef.compareAndSet(
  55. 1, 20, stamp, stamp++);
  56. Print.tco("after cas 3 1000: success=" + success
  57. + " value=" + atomicStampedRef.getReference()
  58. + " stamp=" + atomicStampedRef.getStamp());
  59. latch.countDown();
  60. }
  61. });
  62. latch.await();
  63. }
  64. // 省略其他
  65. }

3.4.4 使用AtomicMarkableReference解决ABA问题


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class AtomicTest
  4. {
  5. @Test
  6. public void testAtomicMarkableReference() throws InterruptedException
  7. {
  8. CountDownLatch latch = new CountDownLatch(2);
  9. AtomicMarkableReference<Integer> atomicRef =
  10. new AtomicMarkableReference<Integer>(1, false);
  11. ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
  12. {
  13. @Override
  14. public void run()
  15. {
  16. boolean success = false;
  17. int value = atomicRef.getReference();
  18. boolean mark = getMark(atomicRef);
  19. Print.tco("before sleep 500: value=" + value
  20. + " mark=" + mark);
  21. //等待500毫秒
  22. sleepMilliSeconds(500);
  23. success = atomicRef.compareAndSet(1, 10, mark, !mark);
  24. Print.tco("after sleep 500 cas 1: success=" + success
  25. + " value=" + atomicRef.getReference()
  26. + " mark=" + getMark(atomicRef));
  27. latch.countDown();
  28. }
  29. });
  30. ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
  31. {
  32. @Override
  33. public void run()
  34. {
  35. boolean success = false;
  36. int value = atomicRef.getReference();
  37. boolean mark = getMark(atomicRef);
  38. Print.tco("before sleep 1000: value="
  39. + atomicRef.getReference()
  40. + " mark=" + mark);
  41. //等待1000毫秒
  42. sleepMilliSeconds(1000);
  43. Print.tco("after sleep 1000: mark = " + getMark(atomicRef));
  44. success = atomicRef.compareAndSet(1, 20, mark,!mark);
  45. Print.tco("after cas 3 1000: success=" + success
  46. + " value=" + atomicRef.getReference()
  47. + " mark=" + getMark(atomicRef));
  48. latch.countDown();
  49. }
  50. });
  51. latch.await();
  52. }
  53. //取得修改标志值
  54. private boolean getMark(AtomicMarkableReference<Integer> atomicRef)
  55. {
  56. boolean[] markHolder = {false};
  57. int value = atomicRef.get(markHolder);
  58. return markHolder[0];
  59. }
  60. // 省略其他
  61. }

3.5 提升高并发场景下CAS操作的性能


3.5.1 以空间换时间:LongAdder


  1. package com.crazymakercircle.cas;
  2. // 省略import
  3. public class LongAdderVSAtomicLongTest
  4. {
  5. //每个线程的执行轮数
  6. final int TURNS = 100000000;
  7. //对比测试用例一:调用AtomicLong完成10个线程累加1000次
  8. @org.junit.Test
  9. public void testAtomicLong() {
  10. // 并发任务数
  11. final int TASK_AMOUNT = 10;
  12. //线程池,获取CPU密集型任务线程池
  13. ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();
  14. //定义一个原子对象
  15. AtomicLong atomicLong = new AtomicLong(0);
  16. // 线程同步倒数闩
  17. CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
  18. long start = System.currentTimeMillis();
  19. for (int i = 0; i < TASK_AMOUNT; i++) {
  20. pool.submit(() ->
  21. {
  22. try {
  23. for (int j = 0; j < TURNS; j++) {
  24. atomicLong.incrementAndGet();
  25. }
  26. // Print.tcfo("本线程累加完成");
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. //倒数闩,倒数一次
  31. countDownLatch.countDown();
  32. });
  33. }
  34. try {
  35. //等待倒数闩完成所有的倒数操作
  36. countDownLatch.await();
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. float time = (System.currentTimeMillis() - start) / 1000F;
  41. //输出统计结果
  42. Print.tcfo("运行的时长为:" + time);
  43. Print.tcfo("累加结果为:" + atomicLong.get());
  44. }
  45. @org.junit.Test
  46. public void testLongAdder() {
  47. // 并发任务数
  48. final int TASK_AMOUNT = 10;
  49. //线程池,获取CPU密集型任务线程池
  50. ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();
  51. //定义一个LongAdder 对象
  52. LongAdder longAdder = new LongAdder();
  53. // 线程同步倒数闩
  54. CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
  55. long start = System.currentTimeMillis();
  56. for (int i = 0; i < TASK_AMOUNT; i++) {
  57. pool.submit(() ->
  58. {
  59. try {
  60. for (int j = 0; j < TURNS; j++) {
  61. longAdder.add(1);
  62. }
  63. // Print.tcfo("本线程累加完成");
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. //倒数闩,倒数一次
  68. countDownLatch.countDown();
  69. });
  70. }
  71. try {
  72. //等待倒数闩完成所有的倒数操作
  73. countDownLatch.await();
  74. } catch (InterruptedException e) {
  75. e.printStackTrace();
  76. }
  77. float time = (System.currentTimeMillis() - start) / 1000F;
  78. //输出统计结果
  79. Print.tcfo("运行的时长为:" + time);
  80. Print.tcfo("累加结果为:" + longAdder.longValue());
  81. }

3.5.2 LongAdder的原理

  1. LongAdder实例的内部结构


  1. 基类Striped64内部三个重要的成员


  1. /**
  2. * 成员一:存放Cell的哈希表,大小为2的幂
  3. */
  4. transient volatile Cell[] cells;
  5. /**
  6. * 成员二:基础值
  7. * 1. 在没有竞争时会更新这个值
  8. * 2. 在cells初始化时,cells不可用,也会尝试通过CAS操作值累加到base
  9. */
  10. transient volatile long base;
  11. /**
  12. * 自旋锁,通过CAS操作加锁,为0表示cells数组没有处于创建、扩容阶段
  13. * 为1表示正在创建或者扩展cells数组,不能进行新Cell元素的设置操作
  14. */
  15. transient volatile int cellsBusy;


  1. public long longValue() {
  2. //longValue()方法调用了sum(),累加所有Cell的值
  3. return sum();
  4. }
  5. /**
  6. * 将多个cells数组中的值加起来的和就类似于AtomicLong中的value
  7. */
  8. public long sum() {
  9. Cell[] as = cells;
  10. Cell a;
  11. long sum = base;
  12. if (as != null) {
  13. //累加所有cell的值
  14. for (int i = 0; i < as.length; ++i) {
  15. if ((a = as[i]) != null)
  16. sum += a.value;
  17. }
  18. }
  19. return sum;
  20. }
  1. LongAdder类的add()方法 ```java /**
  • 自增1 */ public void increment() { add(1L); }


  • 自减1 */ public void decrement() { add(-1L); }

public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || //CASE 1 !casBase(b = base, b + x)) { //CASE 2 if (as == null || (m = as.length - 1) < 0 || //CASE 3 (a = as[getProbe() & m]) == null || //CASE 4 !(uncontended = a.cas(v = a.value, v + x))) //CASE 5 longAccumulate(x, null, uncontended); } }

  1. 4. LongAdder类中的longAccumulate()方法
  2. longAccumulate()是Striped64中重要的方法,实现不同的线程更新各自Cell中的值,其实现逻辑类似于分段锁,具体的代码如下:
  3. ```java
  4. final void longAccumulate(long x, LongBinaryOperator fn,
  5. boolean wasUncontended) {
  6. int h;
  7. if ((h = getProbe()) == 0) {
  8. ThreadLocalRandom.current(); // force initialization
  9. h = getProbe();
  10. wasUncontended = true;
  11. }
  12. //扩容意向,collide=true可以扩容,collide=false不可扩容
  13. boolean collide = false;
  14. //自旋,一直到操作成功
  15. for (;;) {
  16. //as 表示cells引用
  17. //a 表示当前线程命中的Cell
  18. //n 表示cells数组长度
  19. //v 表示期望值
  20. Cell[] as; Cell a; int n; long v;
  21. //CASE1: 表示cells已经初始化了,当前线程应该将数据写入对应的Cell中
  22. //这个大的if分支有三个小分支
  23. if ((as = cells) != null && (n = as.length) > 0) {
  24. //CASE1.1:true表示下标位置的Cell为null,需要创建new Cell
  25. if ((a = as[(n - 1) & h]) == null) {
  26. if (cellsBusy == 0) { // cells数组没有处于创建、扩容阶段
  27. Cell r = new Cell(x); // Optimistically create
  28. if (cellsBusy == 0 && casCellsBusy()) {
  29. boolean created = false;
  30. try { // Recheck under lock
  31. Cell[] rs; int m, j;
  32. if ((rs = cells) != null &&
  33. (m = rs.length) > 0 &&
  34. rs[j = (m - 1) & h] == null) {
  35. rs[j] = r;
  36. created = true;
  37. }
  38. } finally {
  39. cellsBusy = 0;
  40. }
  41. if (created)
  42. break;
  43. continue; // Slot is now non-empty
  44. }
  45. }
  46. collide = false;
  47. }
  48. // CASE1.2:当前线程竞争修改失败,wasUncontended为false
  49. else if (!wasUncontended) // CAS already known to fail
  50. wasUncontended = true; // Continue after rehash
  51. //CASE 1.3:当前线程rehash过哈希值,CAS更新Cell
  52. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  53. fn.applyAsLong(v, x))))
  54. break;
  55. //CASE 1.4:调整扩容意向,然后进入下一轮循环
  56. else if (n >= NCPU || cells != as)
  57. collide = false; // 达到最大值,或者as值过期
  58. //CASE 1.5:设置扩容意向为true,但是不一定真的发生扩容
  59. if (!collide)
  60. collide = true;
  61. //CASE 1.6:真正扩容的逻辑
  62. else if (cellsBusy == 0 && casCellsBusy()) {
  63. try {
  64. if (cells == as) { // Expand table unless stale
  65. Cell[] rs = new Cell[n << 1];
  66. for (int i = 0; i < n; ++i)
  67. rs[i] = as[i];
  68. cells = rs;
  69. }
  70. } finally {
  71. cellsBusy = 0; //释放锁
  72. }
  73. collide = false;
  74. continue; // Retry with expanded table
  75. }
  76. h = advanceProbe(h); //重置(rehash)当前线程Hash值
  77. }
  1. LongAdder类的casCellsBusy()方法


  1. final boolean casCellsBusy() {
  2. return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
  3. }

3.6 CAS在JDK中的广泛应用

3.6.1 CAS操作的弊端和规避措施

  1. ABA问题
  2. 只能保证一个共享变量之间的原子性操作
  3. 开销问题

3.6.2 CAS操作在JDK中的应用