1、原子类概述
JDK 1.5引进原子类,在java.util.concurrent.atomic包下,atomic包里面一共提供了13个类,分为4种类型,分别是:原子更新基本类型,原子更新数组,原子更新引用,原子更新属性。详细分类如下:
- 基本类型:
- AtomicBoolean:布尔型
- AtomicInteger:整型
- AtomicLong:长整型
- 数组:
- AtomicIntegerArray:数组里的整型
- AtomicLongArray:数组里的长整型
- AtomicReferenceArray:数组里的引用类型
- 引用类型:
- AtomicReference:引用类型
- AtomicStampedReference:带有版本号的引用类型
- AtomicMarkableReference:带有标记位的引用类型
- 对象的属性:
- AtomicIntegerFieldUpdater:对象的属性是整型
- AtomicLongFieldUpdater:对象的属性是长整型
- AtomicReferenceFieldUpdater:对象的属性是引用类型
JDK8新增DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder是对AtomicLong等类的改进,比如LongAccumulator与LongAdder在高并发环境下比AtomicLong更高效。
上面13个原子类,实际开发中用的较多的是基本类型和引用类型,在统计指标数据时原子类用的会比较多,比如我们需要的仅仅是一个简单的、高效的、线程安全的递增或者递减方案,这个方案一般需要满足以下要求:
- 简单:操作简单,底层实现简单;
- 高效:占用资源少,操作速度快;
- 安全:在高并发和多线程环境下要保证数据的正确性。
对于是需要简单的递增或者递减的需求场景,使用synchronized关键字和lock固然可以实现,但代码写的会略显冗余,且性能会有影响,此时如果线程竞争情况不是很激烈,采用原子类更加方便。
面试的时候可以举meterService添加监控项的例子,walGet、walAcl,起一个周期线程池调用sdk往CMC提交数据,参考举周写的。
2、原子类使用
2.1 AtomicInteger
针对1中介绍的只是简单的递增或者递减场景,存在线程竞争情况但竞争不激烈的,可以使用AtomicInteger或者AtomicLong,这里主要介绍AtomicInteger,AtomicLong同理。
2.1.1 常用API
// 有参构造方法
public AtomicInteger(int initialValue);
// 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回更新的值)。
public int addAndGet(int delta)
// 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回以前的值)
public int getAndAdd(int delta)
// 以原子方式将当前值加 1(++i)
public int incrementAndGet()
// 以原子方式将当前值加 1(i++)
public int getAndIncrement()
// 以原子方式设置为给定值(返回旧值)
public int getAndSet(int newValue)
// 以原子方式将当前值减 1(--i)
public int decrementAndGet() :
// 以原子方式将当前值减 1(i--)
public int getAndDecrement()
// 获取当前值
public int get()
2.1.2 举例
还是举那个同步问题的经典例子,定义一个临界变量val,起10个异步线程,每个线程都是对这个临界变量进行1000次自增操作,如下:
package Atomic;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName AtomicWrongDemo
* @Description TODO
* @Auther Jerry
* @Date 2020/3/22 - 22:40
* @Version 1.0
*/
public class AtomicWrongDemo {
private int val = 0;
public static void main(String[] args) {
// 初始化实例
AtomicWrongDemo atomicWrongDemo = new AtomicWrongDemo();
for (int i = 0; i < 10; ++i)
{
new Thread(atomicWrongDemo::increase).start();
}
// 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
try {
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(atomicWrongDemo.getVal());
}
private void increase()
{
for (int i = 0; i < 1000; ++i)
{
++this.val;
}
}
private int getVal()
{
return this.val;
}
}
运行结果有时为我们期望的10000,有时候比10000少,比如9408,出现比10000少的结果是因为自增操作++i不是原子操作,出现了竞争,需要对临界变量做同步处理。使用synchronized关键字和lock固然可以实现,但这里只是对临界变量val++时做同步处理,有种高射炮打蚊子的感觉,且加锁后势必会对性能有所印象,这种场景正是我们使用Atomic类的场景,如下:
package Atomic;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName AtomicDemo
* @Description TODO
* @Auther Jerry
* @Date 2020/3/22 - 22:31
* @Version 1.0
*/
public class AtomicDemo {
private AtomicInteger val = new AtomicInteger();
public static void main(String[] args) {
// 初始化实例
AtomicDemo atomicDemo = new AtomicDemo();
for (int i = 0; i < 10; ++i)
{
new Thread(atomicDemo::increase).start();
}
// 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
try {
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(atomicDemo.getVal().toString());
}
private void increase()
{
for (int i = 0; i < 1000; ++i)
{
this.val.incrementAndGet();
}
}
private AtomicInteger getVal()
{
return this.val;
}
}
这里我们使用了AtomicInterger类的increamentAndGet方法,以原子方式将当前值加 1(返回更新的值),结果自然是每次运行都打印10000,可以看到代码写起来很简洁,很轻量级。
2.2 AtomicStampedReference
AtomicStampedReference 是原子类的引用类型,CAS中的经典问题“ABA问题”的一个解决思路就是添加版本号,AtomicStampedReference 类正是这一思路的实现。关于CAS可以参考我这篇文章:https://www.yuque.com/docs/share/7b32bf8b-9128-482f-a15d-ab9873d77b30?# 《乐观锁和悲观锁》
2.2.1 常用API
// 有参构造方法,initialRef代表要操作的值,initialStamp代表值对应的版本号
public AtomicStampedReference(V initialRef, int initialStamp);
// 获取值
public V getReference();
// 获取值对应的版本号
public int getStamp();
// cas对应的方法,入参含义通过参数名可以知道
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
// 设置值的方法,包括设置了版本号
public void set(V newReference, int newStamp);
2.2.2 举例
首先复现一下ABA问题:
public class ABAMain {
private static AtomicInteger val = new AtomicInteger(10);
public static void main(String[] args) {
new Thread(() -> {
// 模拟ABA问题,10 -> 11 -> 10
val.compareAndSet(10, 11);
val.compareAndSet(11, 10);
}).start();
new Thread(() -> {
try {
// 让上面线程的ABA现象完成。休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 这里依然能CAS成功,返回true
boolean res = val.compareAndSet(10, 12);
System.out.println(res);
}).start();
}
}
采用AtomicStampedReference 类解决ABA问题:
public class ASRMain {
private static AtomicStampedReference val = new AtomicStampedReference(10, 0);
public static void main(String[] args) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
// 模拟ABA问题,10 -> 11 -> 10
val.compareAndSet(10, 11, val.getStamp(), val.getStamp() + 1);
val.compareAndSet(11, 10, val.getStamp(), val.getStamp() + 1);
}).start();
new Thread(() -> {
// 在线程休眠前获取stamp
int stamp = val.getStamp();
try {
// 让上面线程的ABA现象完成。休眠2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 这里CAS失败,返回false,因为stamp已不是最新的版本号了
boolean res = val.compareAndSet(10, 11, stamp, stamp + 1);
System.out.println(res);
}).start();
}
}
3、源码浅析
3.1 AtomicInteger
AtomicInteger类我个人认为使用较多的接口应该是原子性的自增/自减,这里主要从自增方法之一的incrementAndGet方法为入口:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
这里的unsafe实例是通过Unsafe类的静态方法获得的一个Unsafe类对象,atomic底层的CAS实现也是由这个Unsafe类提供的,如下:
private static final Unsafe unsafe = Unsafe.getUnsafe();
进去incrementAndGet里的getAndAddInt方法,这个方法是在Unsafe类里提供的,如下:
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
先说明一下getAndAddInt方法的入参:
- Object o:具体的内存地址;
- long offset:临界变量在内存地址里的偏移量(index)
- int delta:要add的值。
这里的o和offset结合起来就是临界变量的内存里的值V。
里面的do-while循环就是对应的CAS机制里的自旋操作。getIntVolatile方法获得临界变量在内存里的值,方法名有个volatile猜想底层的实现应该类似volatile机制保证从内存里拿到的值是当前最新的值;while语句判断,表达式里有个weakCompareAndSetInt方法,源码如下:
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
int expected,
int x) {
return compareAndSetInt(o, offset, expected, x);
}
又是一环套一环,点进去这个compareAndSetInt方法,源码如下:
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
int expected,
int x);
终于看到了native关键字,代表这个方法可以不用再看底层实现了,该方法做的是根据Object o和long offset拿到临界变量的在内存里的值,跟expected(就是前面讲的预期值A)比较,如果两个值相等,把临界变量内存里的值更新为x并返回true,否则什么也不改变返回false。
weakCompareAndSetInt方法是干什么也知道了,入参里v对应前面讲CAS机制时的预期值A, v + delta对应更新值B。
再回到getAndAddInt方法里,注意当while里的表达式为true跳出循环后,return的是执行更新操作前最后一次获取到的临界变量内存里的值,并不是更新后的值,即getAndAddInt方法返回的是更新前最后一次从内存地址里获取到的值。因此AtomicInteger的incrementAndGet方法里,调用getAndAddInt之后,还要再加1,如下:
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
3.2 AtomicStampedReference
首先看AtomicStampedReference 类里的一个静态内部类,数据和对应的版本号都在这个类里面体现:
private static class Pair<T> {
// 数据
final T reference;
// 版本号
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
// 静态的泛型方法,AtomicStampedReference类的有参构造函数里会调用of方法进行初始化
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
AtomicStampedReference 类里的有参构造函数也是调用Pair类的静态方法of完成初始化的:
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
再看一下set方法:
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
// 二者只要有一个不是当前的值就set成功,将pair引用指向最新的newReference和newStamp构成的对象
// 这也是解决CAS的ABA问题的一条准则:值 + 版本号
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}
接下来看一下最重要的compareAndSet方法:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
// 如果要更新成的pair跟当前的pair相同(值 + 版本号)也ok
((newReference == current.reference && newStamp == current.stamp)
// 调用casPair方法进行CAS操作
|| casPair(current, Pair.of(newReference, newStamp)));
}
最后看一下这个casPair方法:
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
// UNSAFE类的compareAndSwapObject是个native方法,字面意思可以看出来是C/C++实现的CAS操作
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言(或者理解成操作系统)实现的,并且被编译成了DLL,由java去调用。这些函数的实现体在DLL中,JDK的源代码中并不包含。