1. J.U.C下的原子类

1.1 原子类介绍

  • 原子类,对象的数据操作不可分割的。具有原子性
  • 作用:原子类的作用和锁类似,是为了保证并发情况下线程的安全问题,不过原子类相比于锁,有一定优势:
    1. 粒度更细:原子类把竞争范围缩小到了变量级别
    2. 效率较高:通常情况下更高,但是高度竞争的情况下效率更低
  • J.U.C下的原子类,大都由CAS实现(最后会有源码分析)

    1.2 Atomic*基本类型原子类

  • 以AtomicInteger为例

  1. 常用方法
    • int get() 获取当前值
    • int getAndSet(int) 获取当前值并设置新的值
    • int getAndIncrement() 获取当前值并自增
    • int incrementAndGet() 获取自增后的值 (同一个方法相比,get在前就获取自增前的值,get在后就获取自增后的值)
    • int getAndDecrement() 获取当前值并自减
    • int getAndAdd(int) 获取当前值并加上一个值
    • boolean compareAndSet(int expect, int update) 判断当前值是否符合预期值expect,如果符合就设置更新值update
  2. 使用示例

    1. /**
    2. * 使用AtomicInteger 对比 非原子类,演示线程安全问题
    3. *
    4. * @author yiren
    5. */
    6. public class AtomicIntegerExample01 {
    7. private static AtomicInteger atomicInteger = new AtomicInteger();
    8. private static volatile Integer count = 0;
    9. public static void main(String[] args) throws InterruptedException {
    10. Runnable runnable = () -> {
    11. for (int i = 0; i < 10000; i++) {
    12. atomicInteger.getAndIncrement();
    13. count++;
    14. }
    15. };
    16. Thread thread1 = new Thread(runnable);
    17. Thread thread2 = new Thread(runnable);
    18. thread1.start();
    19. thread2.start();
    20. thread1.join();
    21. thread2.join();
    22. System.out.println("atomicInteger=" + atomicInteger);
    23. System.out.println("count=" + count);
    24. }
    25. }
  1. atomicInteger=20000
  2. count=13409
  3. Process finished with exit code 0
  • 我们可以看到此时AtomicInteger结果是正确的
  • Integer的结果则不正确,如果要保证线程安全,则需要加锁
  • 只要在线程中不多次调用,都可以保证线程安全,单个方法原子类都是保证线程安全的
  • 注意:原子操作+原子操作!= 原子操作

    1.3 Atomic*Array数组类型分析

  • AtomicIntegerArray为例

  • 数组类型的时候,它会保证每个元素的的操作都是线程安全的
  • AtomicIntegerArray的方法和AtomicInteger的方法都类似,不过AtomicIntegerArray的方法需要指定数组的index
  • 代码演示:
  1. /**
  2. * @author yiren
  3. */
  4. public class AtomicIntegerArrayExample {
  5. private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
  6. public static void main(String[] args) throws InterruptedException {
  7. Runnable incrRunnable = () -> {
  8. for (int i = 0; i < atomicIntegerArray.length(); i++) {
  9. atomicIntegerArray.incrementAndGet(i);
  10. }
  11. };
  12. Runnable decrRunnable = () -> {
  13. for (int i = 0; i < atomicIntegerArray.length(); i++) {
  14. atomicIntegerArray.decrementAndGet(i);
  15. }
  16. };
  17. ExecutorService executorService = Executors.newFixedThreadPool(100);
  18. for (int i = 0; i < 1000; i++) {
  19. executorService.execute(incrRunnable);
  20. }
  21. for (int i = 0; i < 1000; i++) {
  22. executorService.execute(decrRunnable);
  23. }
  24. TimeUnit.SECONDS.sleep(5);
  25. for (int i = 0; i < atomicIntegerArray.length(); i++) {
  26. System.out.print(atomicIntegerArray.get(i) + " ");
  27. }
  28. }
  29. }
  1. 0 0 0 0 0 0 0 0 0 0

1.4 AtomicReference应用类型分析

  • AtomicReference类的作用,和AtomicInteger并没有太多区别,只是作用对象变了,AtomicInteger是保证一个整数的原子性,而AtomicReference是让一个对象保证原子性
  • AtomicReference会比AtomicInteger更强大,因为对象中会包含很多属性,用法是类似的
  • 类对象的方法

并发编程之AtomicX类与CAS - 图1

  • 案例

    1. /**
    2. * @author yiren
    3. */
    4. public class SpinLock {
    5. private static AtomicReference<Thread> sign = new AtomicReference<>();
    6. private static void lock() {
    7. Thread current = Thread.currentThread();
    8. while (!sign.compareAndSet(null, current)) {
    9. System.out.println("fail to set!");
    10. }
    11. }
    12. private static void unlock() {
    13. Thread thread = Thread.currentThread();
    14. sign.compareAndSet(thread, null);
    15. }
    16. public static void main(String[] args) {
    17. Runnable runnable = () -> {
    18. System.out.println("start to get lock");
    19. SpinLock.lock();
    20. System.out.println("got lock successfully!");
    21. try {
    22. TimeUnit.SECONDS.sleep(1);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }finally {
    26. SpinLock.unlock();
    27. }
    28. };
    29. Thread thread = new Thread(runnable);
    30. Thread thread1 = new Thread(runnable);
    31. thread.start();
    32. thread1.start();
    33. }
    34. }
  • 我们利用AtomicReference来实现一个自旋锁,通过compareAndSet方法去先比较然后赋值来避免使用锁

1.5 封装普通类型成原子类

  • AtomicIntegerFieldUpdater为例
  • AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");我们创建对象时候,需要指定目标类,以及属性。
  • 并且在操作的时候,需要传入操作的对象
  • 为什么会需要用这样的方式呢?而不直接在原有对象上修改?
    • 如果我们在编码中,仅有极少时候才会用到原子性操作,如果在原有对象直接使用原子类就十分浪费性能了
    • 此外,我们在使用别人定义的类的时候,有这样的需求,但是别人没有这样的需求,我们也不能破坏人家的定义,这个时候AtomicIntegerFieldUpdater的使用,就不会对原有类进行侵入式破坏了。
  • 注意:这个类不支持static修饰的变量
  • 案例如下:
  1. /**
  2. * @author yiren
  3. */
  4. public class AtomicFieldUpdaterExample {
  5. private static Counter one = new Counter();
  6. private static Counter two = new Counter();
  7. private static AtomicIntegerFieldUpdater<Counter> updater = AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
  8. public static void main(String[] args) throws InterruptedException {
  9. Runnable runnable = () -> {
  10. for (int i = 0; i < 10000; i++) {
  11. one.count++;
  12. updater.getAndIncrement(two);
  13. }
  14. };
  15. Thread thread1 = new Thread(runnable);
  16. Thread thread2 = new Thread(runnable);
  17. thread1.start();
  18. thread2.start();
  19. thread1.join();
  20. thread2.join();
  21. System.out.println("one.count = " + one.count);
  22. System.out.println("two.count = " + two.count);
  23. }
  24. private static class Counter {
  25. volatile int count;
  26. }
  27. }
  1. one.count = 18417
  2. two.count = 20000
  3. Process finished with exit code 0
  • 可以看出,升级过后,原有的数据操作线程安全了

1.6 Adder累加器

  • 以LongAdder为例
  • LongAdder是Java8引入的新类,高并发下LongAdderAtomicLong的效率高,其本事是利用了空间换时间
  • 它其实是利用了分段锁技术,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,提高了并发性能。
  1. 代码演示:对比AtomicLongLongAdder

    1. /**
    2. * @author yiren
    3. */
    4. public class AtomicLongExample {
    5. public static void main(String[] args) {
    6. AtomicLong counter = new AtomicLong();
    7. ExecutorService executorService = Executors.newFixedThreadPool(16);
    8. Runnable task = () -> {
    9. for (int i = 0; i < 10000; i++) {
    10. counter.incrementAndGet();
    11. }
    12. };
    13. long start = System.currentTimeMillis();
    14. for (int i = 0; i < 10000; i++) {
    15. executorService.execute(task);
    16. }
    17. executorService.shutdown();
    18. while (!executorService.isTerminated()) {
    19. }
    20. long end = System.currentTimeMillis();
    21. System.out.println("end-start=" + (end - start)+ "ms");
    22. }
    23. }
  1. end-start=2140ms
  2. Process finished with exit code 0
  1. /**
  2. * @author yiren
  3. */
  4. public class LongAdderExample {
  5. public static void main(String[] args) {
  6. LongAdder counter = new LongAdder();
  7. ExecutorService executorService = Executors.newFixedThreadPool(16);
  8. Runnable task = () -> {
  9. for (int i = 0; i < 10000; i++) {
  10. counter.increment();
  11. }
  12. };
  13. long start = System.currentTimeMillis();
  14. for (int i = 0; i < 10000; i++) {
  15. executorService.execute(task);
  16. }
  17. executorService.shutdown();
  18. while (!executorService.isTerminated()) {
  19. }
  20. long end = System.currentTimeMillis();
  21. System.out.println("end-start=" + (end - start)+ "ms");
  22. }
  23. }
  1. end-start=157ms
  2. Process finished with exit code 0
  • 我们从上面可以看出,我本地机器是i7处理器,两个程序唯一差别就是使用的原子类相差了10多倍。LongAdder明显比AtomicLong更快
  1. 为什么差距这么大?
  • AtomicLong的操作,首先每个线程操作完后,会需要把线程本地内存的数据刷到主内存,然后另外一个线程还得从主内存中刷新新的数据。
  • LongAdder不需要这样做,LongAdder在每个线程都会有自己的一个计数器,仅仅用来在自己的线程内计数,这样一来就不会和其他线程的计数器干扰。
  • LongAdder引入的是分段累加的概念,内部有一个base变量和一个Cell[] cells数组共同参与计算
    • base:竞争不激烈就直接累加到该变量
    • cells: 竞争激烈的时候,各个线程就分散累加到自己的cells[i]
  1. 适用场景
  • AtomicLong:在竞争低的情况加和LongAdder相似,但是它具有CAS方法,可以提供更多的功能
  • LongAdder:在并发高的情况下有明显优势,但是只适用于统计求和计数的场景,有一定的局限性

1.7 Accumulator累加器

  • LongAccumulator为例
  • 基本用法

    1. /**
    2. * @author yiren
    3. */
    4. public class AccumulatorExample {
    5. public static void main(String[] args) {
    6. // 累加 :此处的(left, right) -> left + right 可以替换成 Long::sum
    7. // left=3
    8. LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left + right, 3);
    9. // left=3+right=3+2=5
    10. longAccumulator.accumulate(2);
    11. // left=5+right=5+3=8
    12. longAccumulator.accumulate(3);
    13. System.out.println(longAccumulator.getThenReset());
    14. // left=3
    15. LongAccumulator longAccumulator1 = new LongAccumulator((left, right) -> left - right, 3);
    16. // left=3-right=3-2=1
    17. longAccumulator1.accumulate(2);
    18. // left=1-right=1-3=-2
    19. longAccumulator1.accumulate(3);
    20. System.out.println(longAccumulator1.getThenReset());
    21. // 求最大值
    22. LongAccumulator longAccumulator2 = new LongAccumulator(Math::max, -1);
    23. longAccumulator2.accumulate(14);
    24. longAccumulator2.accumulate(3);
    25. System.out.println(longAccumulator2.getThenReset());
    26. }
    27. }
  1. 8
  2. -2
  3. 14
  4. Process finished with exit code 0
  • 详细过程可以看注释
  • 有人或许会觉得这个麻烦。这个只是单线程,原子类是保证多线程操作的。也就是说我们可以在不同的线程直接调用

    1. /**
    2. * @author yiren
    3. */
    4. public class AccumulatorExample01 {
    5. public static void main(String[] args) {
    6. LongAccumulator accumulator = new LongAccumulator((left, right) -> {
    7. long y = left;
    8. long x = right;
    9. return x + y;
    10. }, 0);
    11. ExecutorService executorService = Executors.newFixedThreadPool(100);
    12. IntStream.range(1, 100).forEach(item -> executorService.execute(() -> accumulator.accumulate(item)));
    13. executorService.shutdown();
    14. while (!executorService.isTerminated()) {
    15. }
    16. System.out.println(accumulator.get());
    17. }
    18. }
  1. 4950
  2. Process finished with exit code 0
  • 使用场景:
    • 需要并行计算,数据量大的
    • 没有顺序要求的

2. CAS原理

2.1 CAS是什么?

  • CAS,全称是compare and swap
  • CAS有三个值。内存值Value、预期值Expect、要修改的值Target,当且仅当Expect==Value时,才能将内存值改为Target,否则什么都不做。最后返回当前的Value
  • CAS在现代处理器中,是有特殊指令可以实现的,而JVM在实现的也会利用汇编指令: cmpxchg

    2.2 案例演示

  • CAS的等价代码:

    1. /**
    2. * @author yiren
    3. */
    4. public class CasExample {
    5. private static volatile int value;
    6. public static synchronized int compareAndSwap(int expect, int target) {
    7. int oldValue = value;
    8. if (expect == oldValue) {
    9. value = target;
    10. }
    11. return value;
    12. }
    13. public static void main(String[] args) throws InterruptedException {
    14. value = 0;
    15. Runnable runnable = () -> {
    16. compareAndSwap(0, 1);
    17. };
    18. Thread thread1 = new Thread(runnable);
    19. Thread thread2 = new Thread(runnable);
    20. thread1.start();
    21. thread2.start();
    22. thread1.join();
    23. thread2.join();
    24. System.out.println(value);
    25. }
    26. }

2.3 源码分析CAS

  • 以原子类AtomicInteger的源码为案例
  • 以下是AtomicInteger中的属性定义:

    1. // setup to use Unsafe.compareAndSwapInt for updates
    2. private static final Unsafe unsafe = Unsafe.getUnsafe();
    3. private static final long valueOffset;
    4. static {
    5. try {
    6. valueOffset = unsafe.objectFieldOffset
    7. (AtomicInteger.class.getDeclaredField("value"));
    8. } catch (Exception ex) { throw new Error(ex); }
    9. }
    10. private volatile int value;
  • 其中value是我们主要保存数据的属性;而valueOffset则表示变量value值在内存地址中当前对象的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了

  • 我们看一下AtomicInteger的具体的操作方法
  1. public final boolean compareAndSet(int expect, int update) {
  2. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  3. }
  4. public final int getAndDecrement() {
  5. return unsafe.getAndAddInt(this, valueOffset, -1);
  6. }
  7. public final int getAndAdd(int delta) {
  8. return unsafe.getAndAddInt(this, valueOffset, delta);
  9. }
  • 可以看出里面主要相关的方法就是unsafecompareAndSwapIntgetAndAddInt且每个方法调用都传入了当前对象、value的偏移地址、和操作数
  • Unsafe类:它是CAS实现的核心类,Java无法直接访问底层操作系统,而是通过本地native方法来访问,不过JVM还是提供了一个途径,JDK中的Unsafe类,它就提供了硬件级别的原子操作.
  • 我们看下int getAndAddInt(Object var1, long var2, int var4)方法
  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }
  • var1是当前AtomicInteger的对象,var2是value的偏移地址,通过偏移地址用UnsafegetIntVolatile获取到当前AtomicInteger对象的value值,然后调用UnsafecompareAndSwapInt方法来做CAS。
  • 看下compareAndSwapInt的定义

    1. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  • 它是一个native方法,实际调用的JVM的C实现的方法,而在C代码中又调用的是Atomic::cmpxchg,而在现代处理器中,实际是可以对应汇编指令集中的 比较并交换指令CMPXCHG

2.5 CAS的缺点

  1. ABA问题

比如

  • 初始值为0,线程1将它改成1,然后又将它改回0
  • 线程2在线程1修改成1之前拿到了0,然后又在线程1改回0后去比较。
  • 此时线程2就会修改成功,但是线程2并不知道线程1对里面的数进行过修改
  • 对于此,可以使用版本号来解决 比如1A->2B->3A-4B这样,每个操作都有一个版本号作为记录。在比较的时候就是用版本号去比较
  • Java中有一个AtomicStampedReference可用于解决ABA问题
  1. 并发度高的情况下,效率很低,可能需要比较很多次