锁是一种悲观的并发策略,它总是假设每一次的临界区操作都会产生冲突,而无论共享的数据是否真的会出现竞争,都会执行加锁、解锁操作(这里讨论概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁),这将会导致用户态到核心态转换、维护锁计数器和检查是否有被阻塞的线程需要被唤醒等开销,对性能的消耗很大。
随着硬件指令集的发展,我们已经有了另外一个选择:基于冲突检测的乐观并发策略。相比互斥锁方案最大的好处就是性能,它假设对资源的访问是没有冲突的,如果没有其他线程争用共享数据,那操作就直接成功了;如果共享的数据的确被争用,产生了冲突,那再进行其他的补偿措施,最常用的是不断重试直到出现没有竞争的共享数据为止。这种乐观并发策略的实现不再需要把线程阻塞挂起,因此这种同步操作也被称为非阻塞同步。
CAS 实现
在乐观并发策略中,我们必须要求操作和冲突检测这两个步骤具备原子性,但如果这里再使用互斥锁来保证就失去了意义,所以我们只能靠硬件来保证某些从语义上看起来需要多次操作的行为可以只通过一条处理器指令就能完成。因此 CAS 的实现依赖于 CPU 提供的特定指令,具体根据体系结构的不同还存在着明显区别。比如在 x86 指令集中就使用了 cmpxchg 指令完成了 CAS 功能;而在 ARM 架构下,则需要使用一对指令(如 load and reserve 和 store conditional)来实现。不过在大多数处理器上 CAS 都是个非常轻量级的操作,这也是其优势所在。
CAS 指令需要有三个操作数,分别是内存位置(在 Java 中可以简单地理解为变量的内存地址,用 V 表示)、旧的预期值 A 和准备设置的新值 B。CAS 指令执行时,当且仅当 V 符合 A 时,处理器才会用 B 更新 V 的值,否则它就不执行更新。但不管是否更新了 V 的值,都会返回 V 的旧值,上述的处理过程是一个原子操作,执行期间不会被其他线程中断。
作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。当多个线程同时使用 CAS 操作一个变量时,只有一个会成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并允许重新读取该变量的最新值,再次尝试修改,当然也可以放弃本次操作,所以 CAS 算法是非阻塞的。
1. ABA 问题
尽管 CAS 这种无锁方案看起来很美好,既简单又高效,但显然这种操作无法涵盖互斥同步的所有使用场景,并且 CAS 从语义上来说并不是真正完美的,比如经典的 ABA 问题。
假设有两个线程分别对内存中某一变量做 CAS 操作,线程一先从内存中取出值 A,线程二也从内存中取出值 A 并把值从 A 变为 B 写回,然后又把值从 B 变为 A 写回,此时线程一进行 CAS 操作,发现内存中的值还是 A,和预期值一致,操作成功。尽管线程一的 CAS 操作成功,但并不代表这个过程就没有问题。
ABA 问题带来的隐患:
假设有一个用单链表实现的堆栈,栈顶为 A,A.next = B,现有线程一希望用 CAS 把栈顶替换为 B,但在此之前,线程二介入,将 A、B 出栈,再压入 D、C、A,整个过程如下图所示:
但此时 B 处于游离转态,轮到线程一执行 CAS 操作,发现栈顶仍为 A,于是 CAS 成功,栈顶变为 B,但实际上 B.next = null,即堆栈中只有 B 一个元素,C 和 D 并不在堆栈中,平白无故就丢了。
如何解决 ABA 问题:
可以通过版本号的方式来解决 ABA 问题,每次执行数据修改操作时,都会带上一个版本号,如果版本号和数据的版本一致,对数据进行修改操作并对版本号 +1,否则执行失败。因为每次操作的版本号都会随之增加,所以不用担心出现 ABA 问题。
同时 JUC 包为了解决这个问题,也提供了一个带有标记的原子引用类 AtomicStampedReference,可以通过为引用建立类似版本号(stamp)的方式来保证 CAS 的正确性。不过目前来说,大部分情况下 ABA 问题不会影响程序并发的正确性。
2. JDK 提供的 CAS 操作
在 JDK 5 之后,Java 类库才开始使用 CAS 操作,该操作由 Unsafe 类里的几个 CAS 方法包装提供。HotSpot 虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条与平台相关的处理器 CAS 指令,没有方法调用的过程。
Unsafe 提供了三种 CAS 方法
public final native boolean compareAndSwapObject(
Object o, long offset, Object expected, Object newValue);
public final native boolean compareAndSwapInt(
Object o, long offset, int expected, int newValue);
public final native boolean compareAndSwapLong(
Object o, long offset, long expected, long newValue);
Unsafe 类是在 JDK 内部使用的专属类,在设计上就不是提供给用户程序调用的类,因为 Unsafe::getUnsafe() 方法用于获取 Unsafe 类的实例,但该方法限制了只有启动类加载器加载的 Class 才能访问它。所以我们自己的应用程序是无法直接使用 Unsafe 类的。
因此在 JDK 9 之前,只有 Java 类库可以使用 CAS,譬如 J.U.C 包里面的整数原子类,其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 类的 CAS 操作来实现。如果用户程序也有使用 CAS 操作的需求,要么采用反射手段突破 Unsafe 的访问限制,要么就只能通过 Java 类库提供的原子类间接使用。
直到 JDK 9 之后,Java 类库才在 VarHandle 类里开放了面向用户程序使用的 CAS 操作。
private static final VarHandle HANDLE = MethodHandles
.lookup()
.findStaticVarHandle(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)){
// ......
}
}
原子类
原子包 java.util.concurrent.atomic 提供了一组原子类,原子类的操作具有原子性,底层正是基于 CAS 算法来实现的线程安全。JDK 提供了十六个原子类,可以大致分为以下五个类别:基本数据类型、数组、累加器、引用类型和对象属性更新器。
1. 基本数据类型
其相关实现有 AtomicBoolean、AtomicInteger 和 AtomicLong,提供的方法主要有以下这些:
// 以原子方式设置为给定值,返回旧值。
public final int getAndSet(int newValue)
// CAS操作,如果当前值为expect,则以原子方式将该值设置为update, 返回是否成功
public final boolean compareAndSet(int expect, int update);
// 以原子方式将当前值加1(减1),返回自增前的值
public final xxx getAndIncrement();
public final xxx getAndDecrement();
// 以原子方式将当前值加1(减1),返回自增后的值
public final xxx incrementAndGet();
public final xxx decrementAndGet();
// 以原子方式将当前值增加delta,前者返回旧值、后者返回新值
public final int getAndAdd(int delta);
public final int addAndGet(int delta);
// 用func函数的结果以原子方式更新当前值,前者返回旧值、后者返回新值。func函数应具有幂等性
public final int getAndUpdate(IntUnaryOperator updateFunction);
public final int updateAndGet(IntUnaryOperator updateFunction);
public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction);
public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction);
AtomicIntger 是对 int 类型的一个封装,提供原子性的访问和更新操作,而 AtomicBoolean 内部也使用 int 类型的 1 和 0 来表示 true 和 false,其底层也是对 int 类型的原子操作。
从 AtomicInteger 的内部属性可以看出,它依赖于 Unsafe 提供的一些底层能力进行底层操作,通过 volatile 修饰的 value 字段来记录数值,以保证可见性。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
具体的原子操作细节,可以参考任意一个原子更新方法,比如下面的 getAndIncrement 方法。
在 JDK 1.8 中,getAndIncrement() 方法内部会调用 unsafe 实例的 getAndAddInt() 方法。这里 this 和 valueOffset 两个参数可以唯一确定共享变量的内存地址。
unsafe 的 getAndAddInt() 方法源码如下,该方法首先会在内存中读取共享变量的值,并且由于需要返回数值,所以后面会循环调用 compareAndSwapInt 方法来尝试以原子方式设置共享变量的值,直到成功为止。
public final int getAndAddInt(Object o, long offset, long delta) {
long v;
do {
// 读取内存中的值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
而类似 compareAndSet 这种返回 boolean 类型的函数,因为其返回值表现的就是成功与否,所以无需重试。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
2. 对象引用类型
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新,并且后面这两个原子类都可以解决 ABA 问题。
解决 ABA 问题的思路很简单,增加一个版本号维度即可,类似于 StampedLock 中的乐观锁机制,每次执行 CAS 操作时附加更新一个版本号,只要保证版本号是递增的,即便 A 变成 B 之后再变回 A,版本号也不会变回来了。AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,核心逻辑如下:
public boolean compareAndSet(V expectedReference, V newReference,
int expectedStamp, int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference && newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
// 内部不仅维护了对象值,还维护了一个版本号
private static class Pair<T> {
final T reference;
final int stamp;
......
}
当 AtomicStampedReference 对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。在写入时,只有当对象值及时间戳都满足期望值时,写入才会成功。因此,即使对象值被反复读写,只要时间戳发生变化,就能防止不恰当的写入。
使用示例如下:
public class AtomicStampedReferenceDemo {
private final AtomicStampedReference<Integer> count = new AtomicStampedReference<>(0, 0);
public int getCount() {
return count.getReference();
}
public int increment() {
int[] stamp = new int[1];
while(true) {
// 同时获取时间戳和数据,防止获取到数据和版本不是一致的
Integer value = count.get(stamp);
int newValue = value + 1;
boolean writeOk = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
if (writeOk) {
return newValue;
}
}
}
public int decrement() {
int[] stamp = new int[1];
while(true) {
// 同时获取时间戳和数据,防止获取到数据和版本不是一致的
Integer value = count.get(stamp);
int newValue = value - 1;
boolean writeOk = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
if (writeOk) {
return newValue;
}
}
}
}
AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法如下:
public boolean compareAndSet(V expectedReference, V newReference,
boolean expectedMark, boolean newMark) {
Pair<V> current = pair;
return expectedReference == current.reference &&
expectedMark == current.mark &&
((newReference == current.reference && newMark == current.mark) ||
casPair(current, Pair.of(newReference, newMark)));
}
private static class Pair<T> {
final T reference;
final boolean mark;
......
}
3. 数组
相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,利用这些原子类,我们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数。
下面以 AtomicIntegerArray 为例展示其核心方法:
// 获取数组长度
public final int length()
// 获取数组第i个下标的元素
public final int get(int i)
// 将数组第i个下标设置为newValue,并返回旧值
public final int getAndSet(int i, int newValue)
// 进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
4. 对象属性更新器
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新普通对象中的 int 类型字段、long 类型字段、引用类型字段,这三个类都是利用反射机制实现的,我们需要保证传入的类型和字段名称正确。创建更新器的方法如下:
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
需要注意的是,修改的只能是 实例变量,不能是类变量(因为需要使用对象实例中的偏移量),并且对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的话,则 newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。
你会发现 newUpdater() 的方法参数只有类的信息,没有对象的引用,而更新对象的属性,一定需要对象的引用,那这个参数是在哪里传入的呢?是在原子操作的方法参数中传入的。例如 compareAndSet() 这个原子操作,相比原子化的基本数据类型多了一个对象引用 obj。原子化对象属性更新器相关的方法,相比原子化的基本数据类型仅仅是多了对象引用参数,所以这里也不再赘述了。
public abstract boolean compareAndSet(T obj, int expect, int update);
使用示例如下:
public class AtomicIntegerFieldUpdaterDemo {
public static final AtomicIntegerFieldUpdater<Holder> updater =
AtomicIntegerFieldUpdater.newUpdater(Holder.class,"count");
public static void main(String[] args) {
Holder holder = new Holder();
if (updater.compareAndSet(holder,100,120)) {
System.out.println("Update Success! current value is: " + holder.getCount());
}
if (updater.compareAndSet(holder,100,130)) {
System.out.println("Update Success! current value is: " + holder.getCount());
} else {
System.out.println("Update Fail! current value is: " + holder.getCount());
}
}
@Getter
@Setter
private static class Holder {
public volatile int count = 100;
}
}
如果对象字段需要仅用作常规 volatile 字段,不想将其声明为 AtomicXXX 类,但偶尔又需要原子操作,则可以使用 AtomicFieldUpdater。此外,如果需要使用大量原子对象但又不想创建大量 AtomicXXX 对象,则可以创建一个静态的 AtomicFieldUpdater 对象并让它在所有对象之间共享,以节省内存消耗。
5. 累加器
DoubleAdder 和 LongAdder 这两个类仅仅用来执行线程安全的加法操作,并且从 0 开始计算。相比原子化的基本数据类型,速度更快,但不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。常用方法如下:
public void add(long x)
public void increment()
public void decrement()
public long sum()
public void reset()
DoubleAccumulator 和 LongAccumulator 这两个类,需要传入一个对应类型的二元操作用来进行计算各种聚合操作,并且可以指定初始值。
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
/**
* 二元操作函数
*/
public interface LongBinaryOperator {
long applyAsLong(long left, long right);
}
区别:new LongAdder() 等价于 new LongAccumulator((x, y) -> x + y, 0L)。
5.1 LongAdder
在 AtomicInteger 的实现中,使用了 for 死循环不断重试 CAS 操作直到修改成功。如果竞争不激烈,那么修改成功的概率就很高,但如果长时间未成功就会给 CPU 带来很大的执行开销。那么当竞争激烈时,我们应该如何进一步提高系统的性能呢?
在 JDK 1.8 中,Java 提供了一个新的原子类 LongAdder。LongAdder 在高并发场景下会比 AtomicInteger 和 AtomicLong 的性能更好,但代价就是会消耗更多的内存空间。
LongAdder 的原理就是降低操作共享变量的并发数,也就是将对单一共享变量的操作压力分散到多个变量值上,将竞争的每个写线程的 value 值分散到一个数组中,每个线程访问时,不同线程会通过哈希等算法映射到到数组的不同槽中,各个线程只对自己槽中的 value 值进行 CAS 操作,最后在读取值的时候会将原子操作的共享变量与各个分散在数组的 value 值相加,返回一个近似准确的数值。
LongAdder 内部由一个 base 变量和一个 cell 数组组成。当只有一个写线程,即没有竞争时,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;当有多个写线程竞争的情况下,就会初始化 cell 数组,此时除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell 数组中。如果使用 cell 数组更新后,发现在某一个 cell 上的更新依然发生冲突,那么系统就会尝试创建新的 cell 或者将 cell 的数量加倍,以减少冲突的可能。
最终结果可通过以下公式计算得出:。我们可以发现,LongAdder 在操作后的返回值只是一个近似准确的数值,要取得当前的实际值,需要使用 sum() 方法重新计算,但由于计算总和时没有对 Cell 数组加锁,所以在累加过程中可能有其他线程对 Cell 中的值进行了修改,也有可能对数组进行了扩容,所以 sum 返回的值并不是非常精确的,其返回值并不是调用 sum 方法时的原子快照值。
LongAdder 的另一个优化手段是避免了伪共享,通过 JVM 提供的 @sun.misc.Contended 注解来修饰 Cell 对象,这样 JVM 会自动为 Cell 解决伪共享问题。
@sun.misc.Contended static final class Cell {......}
5.2 LongAccumulator
LongAccumulator 是 LongAdder 的亲兄弟,它们有公共的父类 Striped64,因此两者的内部优化方式都是一样的。它们都将一个 long 型整数分割,并存储在不同的变量中,以防止多线程竞争。
两者的主要逻辑是类似的,但是 LongAccumulator 是 LongAdder 的功能扩展。对于 LongAdder 来说,它只是每次对给定的整数执行一次加法操作,而 LongAccumulator 则可以实现一个函数操作。