CAS(compare and swap, 比较并交换),是原子操作的一种,可用于在并发环境下实现对变量的同步操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。
悲观锁与乐观锁
悲观锁,顾名思义,就是很悲观,认为会发生并发冲突,屏蔽一切可能违反数据完整性的操作,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会被阻塞直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。
乐观锁,顾名思义,就是很乐观,认为不会发生并发冲突,只在提交操作时检查是否违反数据完整性。乐观锁不能解决脏读的问题。每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。而我们所讨论的 CAS 就是乐观锁的一种实现。
硬件同步原语
锁来保护共享资源,大多数情况下是有一定性能损失的,并且,如果发生了过多的锁等待,将会非常影响程序的性能。在一些特定的情况下,我们可以使用硬件同步原语来替代锁,可以保证和锁一样的数据安全性,同时具有更好的性能。
硬件同步原语(Atomic Hardware Primitives)是由计算机硬件提供的一组原子操作,我们比较常用的原语主要是 CAS 和 FAA 这两种。
CAS(Compare and Swap),它的字面意思是:先比较,再交换。我们看一下 CAS 实现的伪代码:
<< atomic >>function cas(p : pointer to int, old : int, new : int) returns bool {if *p ≠ old {return false}*p ← newreturn true}
该方法输入参数一共有三个,分别是:
- p: 要修改的变量的指针。
- old: 旧值。
- new: 新值。
返回的是一个布尔值,标识是否赋值成功。通过这个伪代码,你就可以看出 CAS 原语的逻辑,非常简单,就是先比较一下变量 p 当前的值是不是等于 old,如果等于,那就把变量 p 赋值为 new,并返回 true,否则就不改变变量 p,并返回 false。
接下来我们看一下 FAA 原语(Fetch and Add):
<< atomic >>function faa(p : pointer to int, inc : int) returns int {int value <- *location*p <- value + increturn value}
FAA 原语的语义是,先获取变量 p 当前的值 value,然后给变量 p 增加 inc,最后返回变量 p 之前的值 value。
上面的这两段伪代码,如果我们用编程语言来实现,肯定是无法保证原子性的。而原语的特殊之处就是,它们都是由计算机硬件,具体说就是 CPU 提供的实现,可以保证操作的原子性。CAS 和 FAA 在各种编程语言中,都有相应的实现,可以来直接使用,无论你是使用哪种编程语言,它们底层的实现是一样的,效果也是一样的。
接下来我们就来认识一下 CAS 在Java中到底如何实现。
Unsafe 的 CAS 底层实现
以 AtomicInteger 为例,AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式对该整数进行运算操作。
public class AtomicInteger extends Number implements java.io.Serializable {private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}//存储当前AtomicInteger的值private volatile int value;.....}
- Unsafe:由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。
- valueOffset:表示该变量值在类中的偏移地址,因为 Unsafe 就是根据内存偏移地址获取数据的,在加载类时通过静态代码块计算得到。
- value:存放当前 AtomicInteger 的值,用volatile修饰。
其中 AtomicInteger 类提供了给一个整数原子自增的方法
public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);}
上面代码类似于 “i++”,不过可以保证并发环境下计算结果的正确性,原因是调用了 Unsafe 类的 compareAndSwapInt() 方法,通过 CAS 保证了原子性
public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {//根据变量在对象var1中的偏移地址var2,获得该变量的值var5 = this.getIntVolatile(var1, var2);//通过CAS为变量赋值,如果赋值失败则} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}
假设 线程A 和 线程B 同时执行 getAndIncrement() 操作:
- 假设 AtomicInteger 里面的 value原始值为2,即主内存中 AtomicInteger 的 value 为2,根据Java内存模型,线程A和线程B各自持有一份 value 的副本,值为2
- 此时线程A通过 getIntVolatile() 拿到value值2,这时线程A被挂起。
- 线程B也通过getIntVolatile() 方法获取到value值2,线程B没有被挂起,可以继续执行compareAndSwapInt() 方法比较内存值也为3,成功修改内存值1
- 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值2和内存的值1不一致,说明该值已经被其它线程提前修改过了,那只能重新来一遍了。
- 重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功
CAS底层实现
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
该方法实现了对 Integer 类型的原子修改,该方法有四个参数:
- 第一个参数:是一个对象,即前面传入的 AtomicInteger 对象
- 第二个参数:是一个 long 类型,是某个成员变量在对应对象中的偏移量offset
- 第三个参数:变量旧的值
- 第四个参数:修改变量重新赋的值
该方法是一个本地方法,我们一起来看一下其底层 hotspot 的实现,通过上面方法最终会调到下面的 C++ 方法。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))UnsafeWrapper("Unsafe_CompareAndSwapInt");oop p = JNIHandles::resolve(obj);// 1. 根据偏移量,获得变量value在内存中的地址jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);// 2. 通过Atomic::cmpxchg实现比较替换// 如果cas成功,返回期望值e,等于e,返回true// 如果cas失败,返回内存中的value,不等于e,返回falsereturn (jint)(Atomic::cmpxchg(x, addr, e)) == e;UNSAFE_END
上面代码中最核心的就是 Atomic::cmpxchg(x, addr, e),该方法在不同的硬件上有不同的实现方式,下面是 linuxX86 的实现:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {//第一步先判断当前计算机否是多处理器int mp = os::is_MP();//cmpxchgl 指令是包含在 x86 架构及 IA‐64 架构中的一个原子条件指令//它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等//如果相等,则双向交换 dest 中的值与 exchange_value,否则就单方面地将 dest 指向的内存值交 给exchange_value//这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgq %1,(%3)": "=a" (exchange_value): "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp): "cc", "memory");return exchange_value;}//如果是多处理器则返回一个Lock前缀#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
上面的代码其实很大一部分是汇编指令,这段汇编指令的执行过程如下:
“r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp)
表示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄存器。
嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左到右从上到下以“%0”开始,分别记为%0、%1∙∙∙%9。也就是说,输出的eax是%0,输入的exchange_value、compare_value、dest、mp分别是%1、%2、%3、%4。因此,cmpxchg %1,(%3)实际上表示cmpxchg exchange_value,(dest)
需要注意的是 cmpxchg 有个隐含操作数 eax,其实际过程是先比较 eax 的值(也就是
compare_value)和 dest 地址所存的值是否相等,输出是”=a” (exchange_value),表示把eax 中存的值写入 exchange_value 变量中。
Atomic::cmpxchg 这个函数最终返回值是 exchange_value,也就是说:
- 如果 cmpxchgl 执行时 compare_value 和 dest 指针指向内存值相等,则会使得 dest 指针指向内存值变成 exchange_value,最终 eax 存的 compare_value 赋值给了exchange_value 变量,即函数最终返回的值是原先的 compare_value。此时 Unsafe_CompareAndSwapInt 的返回值(jint) (Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。
- 如果cmpxchgl执行时 compare_value 和 dest 指针指向内存值不等,则会把当前 dest 指针指向内存的值写入eax,最终输出时赋值给 exchange_value 变量作为返回值,导致 (jint)(Atomic::cmpxchg(x, addr, e)) == e 得到 false,表明CAS失败。
值得注意的是,在该汇编代码前面还加了一个指令:
LOCK_IF_MP(%4)#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
LOCK_IF_MP 根据当前系统是否为多核处理器决定是否为 cmpxchg 指令添加 lock前缀,如果是多处理器,为 cmpxchg 指令添加 lock 前缀;反之,就省略 lock 前缀。在修改内存操作时,使用 LOCK 前缀去调用加锁的读-修改-写操作,这种机制用于多处理器系统中处理器之间进行可靠的通讯,具体描述如下:在 Pentium 和早期的 IA-32 处理器中,LOCK 前缀会使处理器执行当前指令时产生一个 LOCK#信号,这总是引起显式总线锁定的出现。后续的优化中其实并不采用这种方式,而是引入了缓存一致性协议。
在单处理的情况下,单个CPU可以保证 cmpxchg 指令的执行是原子性的,但是在多处理器情况下,多个CPU之间同时执行该指令,没办法保证原子性,所以需要引入 Lock 前缀来保证。
ABA问题
比如线程A要修改变量i,初始值为0,线程A把变量的值读取到自己的工作内存中,正当要修改该变量时,突然切换到了另一个线程B。线程B同样需要修改i的值,因为线程A还没有对该包变量进行修改,所以线程B是可以修改的。但是线程B先把i的值从0修改为1,再从1修改为0,此时在切换回线程A,由于此时i的值和线程A之前读取到的值是一样的,所以线程A也可以修改成功。
这样就出现了一个问题,一个值在修改之前,别的线程同时拿到这个变量的值,原来是A,变成了B,又变成了A,那么使用CAS进行检查时会认为它的值没有发生变化,但是实际上却变化了。
ABA问题通常的解决方法是利用版本号进行区分,每次对变量进行修改时也要对版本号进行修改。CAS进行修改某个变量时,不仅要对初始值进行判断,同时需要对版本号进行判断,其中任意一个不满足要求则修改失败。
Java中的 AtomicStampedReference 类就提供了该功能
public class AtomicStampedReference<V> {private static class Pair<T> {//泛型为我们指定的原子引用类型final T reference;//版本号final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}private volatile Pair<V> pair;}
除了ABA问题外,在高并发的情况下,如果有很多个线程并发,CAS自旋可能会长时间不成功,会增大CPU的执行开销。
