为什么要用 CAS

在 Java 并发中,我们最先接触的应该是 synchronized 关键字,但是它属于重量级锁,使用不当会严重地影响程序性能。volatile 是一个不错的选择,但它不能保证原子性,所以不能在并发环境中单独使用 volatile
使用锁首先得获取锁,操作完成后再释放锁。CPU 通过上下文切换和调度管理从而进行这个操作。对于一个独占锁而言,线程 A 在持有锁后但没释放锁之前,其它线程必须阻塞。等到线程 A 释放锁后,CPU 会唤醒其它线程,然后其它线程再来争夺这把锁。
深入浅出 CAS - 图1
synchronized 这种独占所属于悲观锁,假设在并发环境下始终会出现冲突,用它来解决资源冲突是最方便的。相对于悲观锁,自然也有乐观锁,假设没有冲突则直接修改资源,假设有冲突,那么这项操作失败时候应该立即返回失败或不断重试,直到成功。乐观锁最常见的就是 CAS。

资源冲突,一般指多个线程同时访问并修改同一个临界区资源。

在并发情况并不高时,多线程会让 CPU 不断切换,导致浪费资源。所以有人就提出了无锁算法,这就是刚刚提到的 CAS。
深入浅出 CAS - 图2

什么是 CAS

先看看 CAS 的定义:

比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。该操作通过将内存中的值与指定数据(旧值,期望操作前的值)进行比较,当数值一样时将内存中的数据替换为新的值。

Java 中的 CAS

如果让你用 Java 的 API 来实现你可能会想到两种方式,一种是加锁(可能是 synchronized 或者其他种类的锁),另一种是使用 atomic 类,如 AtomicInteger,这一系列类是在 JDK1.5 的时候出现的,在我们常用的 java.util.concurrent.atomic 包下,我们来看个例子:

  1. ExecutorService executorService = Executors.newCachedThreadPool();
  2. AtomicInteger atomicInteger = new AtomicInteger(0);
  3. for (int i = 0; i < 5000; i++) {
  4. executorService.execute(atomicInteger::incrementAndGet);
  5. }
  6. System.out.println(atomicInteger.get());
  7. executorService.shutdown();

这个例子开启了 5000 个线程去进行累加操作,不管你执行多少次答案都是 5000。这么神奇的操作是如何实现的呢?就是依靠 CAS 这种技术来完成的,我们来看 getAndIncrement 的内部:

  1. public final int getAndIncrement() {
  2. return unsafe.getAndAddInt(this, valueOffset, 1);
  3. }

再深入到 getAndAddInt()

  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }

其实很简单,先通过 getIntVolatile 获取到内存的当前值,然后进行比较,展开 compareAndSwapInt 方法的几个参数:

  • var1: 当前要操作的对象(其实就是 AtomicInteger 实例)
  • var2: 当前要操作的变量偏移量
  • var4: 更新值
  • var5: 通过 var1 和 var2 获取到的值(可以理解为 CAS 中的内存当前值)

所以 compareAndSwapInt(var1, var2, var5, var5 + var4) 其实换成 compareAndSwapInt(obj, offset, expect, update) 比较清楚,意思就是如果 obj 内的 value 和 expect 相等,就证明没有其他线程改变过这个变量,那么就更新它为 update,如果这一步的 CAS 没有成功,那就采用自旋的方式继续进行 CAS 操作,取出乍一看这也是两个步骤了啊,其实在 JNI 里是借助于一个 CPU 指令完成的,所以还是原子操作。
使用上面的方式就可以很好的解决多线程下的原子性和可见性问题。由于代码里使用了 do while 这种循环结构,所以 CPU 不会被挂起,比较失败后重试,就不存在上下文切换了,实现了无锁并发编程。

CAS 存在的问题

自旋时间过长

你留意上面的代码会发现一个问题,while 循环如果在最坏情况下总是失败怎么办?会导致 CPU 在不断处理。像这种 while(!compareAndSwapInt) 的操作我们称之为自旋,CAS 是乐观的,认为大家来并不都是修改数据的,现实可能出现非常多的线程过来都要修改这个数据,此时随着并发量的增加会导致 CAS 操作长时间不成功,CPU 也会有很大的开销。所以我们要清楚,如果是读多写少的情况也就满足乐观,性能是非常好的。

ABA 问题

CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。这就是 CAS 的 ABA 问题。
常见的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 A-B-A 就会变成 1A-2B-3A。
目前在 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

CAS 源码

CAS 底层使用 JNI 调用 C 代码实现的,如果你有 Hotspot 源码,那么在 Unsafe.cpp 里可以找到它的实现:

  1. static JNINativeMethod methods_15[] = {
  2. //省略一堆代码...
  3. {CC"compareAndSwapInt", CC"("OBJ"J""I""I"")Z", FN_PTR(Unsafe_CompareAndSwapInt)},
  4. {CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z", FN_PTR(Unsafe_CompareAndSwapLong)},
  5. //省略一堆代码...
  6. };

我们可以看到 compareAndSwapInt 实现是在 Unsafe_CompareAndSwapInt 里面,再深入到 Unsafe_CompareAndSwapInt:

  1. UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  2. UnsafeWrapper("Unsafe_CompareAndSwapInt");
  3. oop p = JNIHandles::resolve(obj);
  4. jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  5. return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
  6. UNSAFE_END

p 是取出的对象,addr 是 p 中 offset 处的地址,最后调用了 Atomic::cmpxchg(x, addr, e), 其中参数 x 是即将更新的值,参数 e 是原内存的值。