1、原子类概述

JDK 1.5引进原子类,在java.util.concurrent.atomic包下,atomic包里面一共提供了13个类,分为4种类型,分别是:原子更新基本类型,原子更新数组,原子更新引用,原子更新属性。详细分类如下:

  • 基本类型:
    • AtomicBoolean:布尔型
    • AtomicInteger:整型
    • AtomicLong:长整型
  • 数组:
    • AtomicIntegerArray:数组里的整型
    • AtomicLongArray:数组里的长整型
    • AtomicReferenceArray:数组里的引用类型
  • 引用类型:
    • AtomicReference:引用类型
    • AtomicStampedReference:带有版本号的引用类型
    • AtomicMarkableReference:带有标记位的引用类型
  • 对象的属性:
    • AtomicIntegerFieldUpdater:对象的属性是整型
    • AtomicLongFieldUpdater:对象的属性是长整型
    • AtomicReferenceFieldUpdater:对象的属性是引用类型

JDK8新增DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder是对AtomicLong等类的改进,比如LongAccumulator与LongAdder在高并发环境下比AtomicLong更高效。

上面13个原子类,实际开发中用的较多的是基本类型和引用类型,在统计指标数据时原子类用的会比较多,比如我们需要的仅仅是一个简单的、高效的、线程安全的递增或者递减方案,这个方案一般需要满足以下要求:

  • 简单:操作简单,底层实现简单;
  • 高效:占用资源少,操作速度快;
  • 安全:在高并发和多线程环境下要保证数据的正确性。

对于是需要简单的递增或者递减的需求场景,使用synchronized关键字和lock固然可以实现,但代码写的会略显冗余,且性能会有影响,此时如果线程竞争情况不是很激烈,采用原子类更加方便。

面试的时候可以举meterService添加监控项的例子,walGet、walAcl,起一个周期线程池调用sdk往CMC提交数据,参考举周写的。

2、原子类使用

2.1 AtomicInteger

针对1中介绍的只是简单的递增或者递减场景,存在线程竞争情况但竞争不激烈的,可以使用AtomicInteger或者AtomicLong,这里主要介绍AtomicInteger,AtomicLong同理。

2.1.1 常用API

  1. // 有参构造方法
  2. public AtomicInteger(int initialValue);
  3. // 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回更新的值)。
  4. public int addAndGetint delta
  5. // 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回以前的值)
  6. public int getAndAddint delta
  7. // 以原子方式将当前值加 1(++i)
  8. public int incrementAndGet()
  9. // 以原子方式将当前值加 1(i++)
  10. public int getAndIncrement()
  11. // 以原子方式设置为给定值(返回旧值)
  12. public int getAndSet(int newValue)
  13. // 以原子方式将当前值减 1(--i)
  14. public int decrementAndGet()
  15. // 以原子方式将当前值减 1(i--)
  16. public int getAndDecrement()
  17. // 获取当前值
  18. public int get()

2.1.2 举例

还是举那个同步问题的经典例子,定义一个临界变量val,起10个异步线程,每个线程都是对这个临界变量进行1000次自增操作,如下:

  1. package Atomic;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * @ClassName AtomicWrongDemo
  5. * @Description TODO
  6. * @Auther Jerry
  7. * @Date 2020/3/22 - 22:40
  8. * @Version 1.0
  9. */
  10. public class AtomicWrongDemo {
  11. private int val = 0;
  12. public static void main(String[] args) {
  13. // 初始化实例
  14. AtomicWrongDemo atomicWrongDemo = new AtomicWrongDemo();
  15. for (int i = 0; i < 10; ++i)
  16. {
  17. new Thread(atomicWrongDemo::increase).start();
  18. }
  19. // 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
  20. try {
  21. Thread.sleep(3000);
  22. } catch (InterruptedException e)
  23. {
  24. e.printStackTrace();
  25. }
  26. System.out.println(atomicWrongDemo.getVal());
  27. }
  28. private void increase()
  29. {
  30. for (int i = 0; i < 1000; ++i)
  31. {
  32. ++this.val;
  33. }
  34. }
  35. private int getVal()
  36. {
  37. return this.val;
  38. }
  39. }

运行结果有时为我们期望的10000,有时候比10000少,比如9408,出现比10000少的结果是因为自增操作++i不是原子操作,出现了竞争,需要对临界变量做同步处理。使用synchronized关键字和lock固然可以实现,但这里只是对临界变量val++时做同步处理,有种高射炮打蚊子的感觉,且加锁后势必会对性能有所印象,这种场景正是我们使用Atomic类的场景,如下:

  1. package Atomic;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * @ClassName AtomicDemo
  5. * @Description TODO
  6. * @Auther Jerry
  7. * @Date 2020/3/22 - 22:31
  8. * @Version 1.0
  9. */
  10. public class AtomicDemo {
  11. private AtomicInteger val = new AtomicInteger();
  12. public static void main(String[] args) {
  13. // 初始化实例
  14. AtomicDemo atomicDemo = new AtomicDemo();
  15. for (int i = 0; i < 10; ++i)
  16. {
  17. new Thread(atomicDemo::increase).start();
  18. }
  19. // 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
  20. try {
  21. Thread.sleep(3000);
  22. } catch (InterruptedException e)
  23. {
  24. e.printStackTrace();
  25. }
  26. System.out.println(atomicDemo.getVal().toString());
  27. }
  28. private void increase()
  29. {
  30. for (int i = 0; i < 1000; ++i)
  31. {
  32. this.val.incrementAndGet();
  33. }
  34. }
  35. private AtomicInteger getVal()
  36. {
  37. return this.val;
  38. }
  39. }

这里我们使用了AtomicInterger类的increamentAndGet方法,以原子方式将当前值加 1(返回更新的值),结果自然是每次运行都打印10000,可以看到代码写起来很简洁,很轻量级。

2.2 AtomicStampedReference

AtomicStampedReference 是原子类的引用类型,CAS中的经典问题“ABA问题”的一个解决思路就是添加版本号,AtomicStampedReference 类正是这一思路的实现。关于CAS可以参考我这篇文章:https://www.yuque.com/docs/share/7b32bf8b-9128-482f-a15d-ab9873d77b30?# 《乐观锁和悲观锁》

2.2.1 常用API

  1. // 有参构造方法,initialRef代表要操作的值,initialStamp代表值对应的版本号
  2. public AtomicStampedReference(V initialRef, int initialStamp);
  3. // 获取值
  4. public V getReference();
  5. // 获取值对应的版本号
  6. public int getStamp();
  7. // cas对应的方法,入参含义通过参数名可以知道
  8. public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
  9. // 设置值的方法,包括设置了版本号
  10. public void set(V newReference, int newStamp);

2.2.2 举例

首先复现一下ABA问题:

  1. public class ABAMain {
  2. private static AtomicInteger val = new AtomicInteger(10);
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. // 模拟ABA问题,10 -> 11 -> 10
  6. val.compareAndSet(10, 11);
  7. val.compareAndSet(11, 10);
  8. }).start();
  9. new Thread(() -> {
  10. try {
  11. // 让上面线程的ABA现象完成。休眠2秒
  12. Thread.sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. // 这里依然能CAS成功,返回true
  17. boolean res = val.compareAndSet(10, 12);
  18. System.out.println(res);
  19. }).start();
  20. }
  21. }

采用AtomicStampedReference 类解决ABA问题:

  1. public class ASRMain {
  2. private static AtomicStampedReference val = new AtomicStampedReference(10, 0);
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. try {
  6. TimeUnit.SECONDS.sleep(1);
  7. } catch (InterruptedException e) {
  8. }
  9. // 模拟ABA问题,10 -> 11 -> 10
  10. val.compareAndSet(10, 11, val.getStamp(), val.getStamp() + 1);
  11. val.compareAndSet(11, 10, val.getStamp(), val.getStamp() + 1);
  12. }).start();
  13. new Thread(() -> {
  14. // 在线程休眠前获取stamp
  15. int stamp = val.getStamp();
  16. try {
  17. // 让上面线程的ABA现象完成。休眠2秒
  18. TimeUnit.SECONDS.sleep(2);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. // 这里CAS失败,返回false,因为stamp已不是最新的版本号了
  23. boolean res = val.compareAndSet(10, 11, stamp, stamp + 1);
  24. System.out.println(res);
  25. }).start();
  26. }
  27. }

3、源码浅析

3.1 AtomicInteger

AtomicInteger类我个人认为使用较多的接口应该是原子性的自增/自减,这里主要从自增方法之一的incrementAndGet方法为入口:

  1. public final int incrementAndGet() {
  2. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  3. }

这里的unsafe实例是通过Unsafe类的静态方法获得的一个Unsafe类对象,atomic底层的CAS实现也是由这个Unsafe类提供的,如下:

  1. private static final Unsafe unsafe = Unsafe.getUnsafe();

进去incrementAndGet里的getAndAddInt方法,这个方法是在Unsafe类里提供的,如下:

  1. @HotSpotIntrinsicCandidate
  2. public final int getAndAddInt(Object o, long offset, int delta) {
  3. int v;
  4. do {
  5. v = getIntVolatile(o, offset);
  6. } while (!weakCompareAndSetInt(o, offset, v, v + delta));
  7. return v;
  8. }

先说明一下getAndAddInt方法的入参:

  • Object o:具体的内存地址;
  • long offset:临界变量在内存地址里的偏移量(index)
  • int delta:要add的值。

这里的o和offset结合起来就是临界变量的内存里的值V。
里面的do-while循环就是对应的CAS机制里的自旋操作。getIntVolatile方法获得临界变量在内存里的值,方法名有个volatile猜想底层的实现应该类似volatile机制保证从内存里拿到的值是当前最新的值;while语句判断,表达式里有个weakCompareAndSetInt方法,源码如下:

  1. @HotSpotIntrinsicCandidate
  2. public final boolean weakCompareAndSetInt(Object o, long offset,
  3. int expected,
  4. int x) {
  5. return compareAndSetInt(o, offset, expected, x);
  6. }

又是一环套一环,点进去这个compareAndSetInt方法,源码如下:

  1. @HotSpotIntrinsicCandidate
  2. public final native boolean compareAndSetInt(Object o, long offset,
  3. int expected,
  4. int x);

终于看到了native关键字,代表这个方法可以不用再看底层实现了,该方法做的是根据Object o和long offset拿到临界变量的在内存里的值,跟expected(就是前面讲的预期值A)比较,如果两个值相等,把临界变量内存里的值更新为x并返回true,否则什么也不改变返回false。
weakCompareAndSetInt方法是干什么也知道了,入参里v对应前面讲CAS机制时的预期值A, v + delta对应更新值B。
再回到getAndAddInt方法里,注意当while里的表达式为true跳出循环后,return的是执行更新操作前最后一次获取到的临界变量内存里的值,并不是更新后的值,即getAndAddInt方法返回的是更新前最后一次从内存地址里获取到的值。因此AtomicInteger的incrementAndGet方法里,调用getAndAddInt之后,还要再加1,如下:

  1. public final int incrementAndGet() {
  2. return U.getAndAddInt(this, VALUE, 1) + 1;
  3. }

3.2 AtomicStampedReference

首先看AtomicStampedReference 类里的一个静态内部类,数据和对应的版本号都在这个类里面体现:

  1. private static class Pair<T> {
  2. // 数据
  3. final T reference;
  4. // 版本号
  5. final int stamp;
  6. private Pair(T reference, int stamp) {
  7. this.reference = reference;
  8. this.stamp = stamp;
  9. }
  10. // 静态的泛型方法,AtomicStampedReference类的有参构造函数里会调用of方法进行初始化
  11. static <T> Pair<T> of(T reference, int stamp) {
  12. return new Pair<T>(reference, stamp);
  13. }
  14. }

AtomicStampedReference 类里的有参构造函数也是调用Pair类的静态方法of完成初始化的:

  1. public AtomicStampedReference(V initialRef, int initialStamp) {
  2. pair = Pair.of(initialRef, initialStamp);
  3. }

再看一下set方法:

  1. public void set(V newReference, int newStamp) {
  2. Pair<V> current = pair;
  3. // 二者只要有一个不是当前的值就set成功,将pair引用指向最新的newReference和newStamp构成的对象
  4. // 这也是解决CAS的ABA问题的一条准则:值 + 版本号
  5. if (newReference != current.reference || newStamp != current.stamp)
  6. this.pair = Pair.of(newReference, newStamp);
  7. }

接下来看一下最重要的compareAndSet方法:

  1. public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
  2. Pair<V> current = pair;
  3. return
  4. expectedReference == current.reference &&
  5. expectedStamp == current.stamp &&
  6. // 如果要更新成的pair跟当前的pair相同(值 + 版本号)也ok
  7. ((newReference == current.reference && newStamp == current.stamp)
  8. // 调用casPair方法进行CAS操作
  9. || casPair(current, Pair.of(newReference, newStamp)));
  10. }

最后看一下这个casPair方法:

  1. private boolean casPair(Pair<V> cmp, Pair<V> val) {
  2. return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
  3. }
  4. // UNSAFE类的compareAndSwapObject是个native方法,字面意思可以看出来是C/C++实现的CAS操作
  5. public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言(或者理解成操作系统)实现的,并且被编译成了DLL,由java去调用。这些函数的实现体在DLL中,JDK的源代码中并不包含。