CAS原子类

lock cmpxchg 指令 : cmpxchg指令 是X86比较交换指令本身是非原子性的,加上lock指令以后其它CPU不允许对当前值做修改
java.util.concurrent.atomic 包下的原子类 AtomicInteger 中的
incrementAndGet 方法
/*** Atomically increments by one the current value.** @return the updated value*/public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}public final class Unsafe {public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}}
AtomicInteger有一个incrementAndGet的自增方法,在一个循环里,每次去获取当前的值current,然后将当前值current+1赋值给next,然后将current和next放到compareAndSet中进行比较,如果返回true那么就return next的值,如果失败,那么继续进行上述操作
compareAndSet 方法进行分析,相关分析如下:
// setup to use Unsafe.compareAndSwapInt for updatespublic class AtomicInteger extends Number implements java.io.Serializable {private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try { // 计算变量 value 在类对象中的偏移valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }} private volatile int value;public final boolean compareAndSet(int expect, int update) {/** compareAndSet 实际上只是一个壳子,主要的逻辑封装在 Unsafe 的* compareAndSwapInt 方法中*/return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}// ......}public final class Unsafe { // compareAndSwapInt 是 native 类型的方法,继续往下看public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);// ......}
// unsafe.cpp/** 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY 和 UNSAFE_END 都是宏,* 在预编译期间会被替换成真正的代码。下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef):** jni.h* typedef unsigned char jboolean;* typedef unsigned short jchar;* typedef short jshort;* typedef float jfloat;* typedef double jdouble;** jni_md.h* typedef int jint;* #ifdef _LP64 // 64-bit* typedef long jlong;* #else* typedef long long jlong;* #endif* typedef signed char jbyte;*/UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))UnsafeWrapper("Unsafe_CompareAndSwapInt");oop p = JNIHandles::resolve(obj); // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffsetjint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中return (jint)(Atomic::cmpxchg(x, addr, e)) == e;UNSAFE_END// atomic.cppunsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {assert(sizeof(unsigned int) == sizeof(jint), "more work to do"); /** 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载* 函数。相关的预编译逻辑如下:** atomic.inline.hpp:* #include "runtime/atomic.hpp"** // Linux* #ifdef TARGET_OS_ARCH_linux_x86* # include "atomic_linux_x86.inline.hpp"* #endif** // 省略部分代码** // Windows* #ifdef TARGET_OS_ARCH_windows_x86* # include "atomic_windows_x86.inline.hpp"* #endif** // BSD* #ifdef TARGET_OS_ARCH_bsd_x86* # include "atomic_bsd_x86.inline.hpp"* #endif** 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现*/return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,(jint)compare_value);}
Atomic::cmpxchg 函数
// atomic_windows_x86.inline.hpp#define LOCK_IF_MP(mp) __asm cmp mp, 0 \__asm je L0 \__asm _emit 0xF0 \__asm L0:inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchangeint mp = os::is_MP();__asm {mov edx, destmov ecx, exchange_valuemov eax, compare_value LOCK_IF_MP(mp)cmpxchg dword ptr [edx], ecx}}
上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // 判断是否是多核 CPUint mp = os::is_MP();__asm { // 将参数值放入寄存器中mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中mov ecx, exchange_valuemov eax, compare_value// LOCK_IF_MPcmp mp, 0/** 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,* 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令* 前加 lock 前缀。*/je L0 /** 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的* 原因可以参考知乎的一个回答:* https://www.zhihu.com/question/50878124/answer/123099923*/_emit 0xF0L0: /** 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:* cmpxchg: 即“比较并交换”指令* dword: 全称是 double word,在 x86/x64 体系中,一个* word = 2 byte,dword = 4 byte = 32 bit* ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元* [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。* 那么 [edx] 表示内存地址为 dest 的内存单元** 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值* 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。*/cmpxchg dword ptr [edx], ecx}}
到这里 CAS 的实现过程就讲完了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带lock 前缀的 cmpxchg 指令,即lock cmpxchg dword ptr [edx], ecx。
ABA问题
1.给当前值 加版本号 AtomicStampedReference
2.加Boolean值 AtomicMarkableReference
谈到 CAS,基本上都要谈一下 CAS 的 ABA 问题。CAS 由三个步骤组成,分别是“读取->比较->写回”。考虑这样一种情况,线程1和线程2同时执行 CAS 逻辑,两个线程的执行顺序如下:
- 时刻1:线程1执行读取操作,获取原值 A,然后线程被切换走
- 时刻2:线程2执行完成 CAS 操作将原值由 A 修改为 B
- 时刻3:线程2再次执行 CAS 操作,并将原值由 B 修改为 A
- 时刻4:线程1恢复运行,将比较值(compareValue)与原值(oldValue)进行比较,发现两个值相等。
然后用新值(newValue)写入内存中,完成 CAS 操作
如上流程,线程1并不知道原值已经被修改过了,在它看来并没什么变化,所以它会继续往下执行流程。对于 ABA 问题,通常的处理措施是对每一次 CAS 操作设置版本号。java.util.concurrent.atomic 包下提供了一个可处理 ABA 问题的原子类 AtomicStampedReference,具体的实现
AtomicStampedReference
内部类Pair
private static class Pair<T> {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}private volatile Pair<V> pair;
我们可以发现,AtomicStampedReference 对比 AtomicReference,全局维护的不是 T reference,而是 Pair。Pair 对象里多维护了一个 stamp 标识。
compareAndSet方法
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;return// 1 引用没变expectedReference == current.reference &&// 2 版本号没变expectedStamp == current.stamp &&// 3 新引用等于旧引用((newReference == current.reference &&// 4 新版本号等于旧版本号newStamp == current.stamp) ||// 5 构造 Pair 然后 cascasPair(current, Pair.of(newReference, newStamp)));}
可以看到,最后的 return 的逻辑很复杂,我们可以看到多了版本号的校验。
AtomicStampedReference源码
/*** AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。** 实现注意事项。通过创建表示“已装箱”的 [reference, integer] 对的内部对象,此实现维持带标志的引用。* @param <V> 此引用引用的对象的类型*/public class AtomicStampedReference<V> {private static class Pair<T> {final T reference;final int stamp;/**** @param reference 引用* @param stamp 标志*/private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}/*** 静态工厂* @param reference 引用* @param stamp 标志* @param <T> 此引用引用的对象的类型* @return*/static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}private volatile Pair<V> pair;/*** 创建具有给定初始值的新 AtomicStampedReference。* @param initialRef 初始引用* @param initialStamp 初始标志*/public AtomicStampedReference(V initialRef, int initialStamp) {pair = Pair.of(initialRef, initialStamp);}/*** 返回该引用的当前值。* @return 该引用的当前值*/public V getReference() {return pair.reference;}/*** 返回该标志的当前值。* @return 该标志的当前值*/public int getStamp() {return pair.stamp;}/*** 返回该引用和该标志的当前值。典型的用法为 int[1] holder; ref = v.get(holder); 。* @param stampHolder 大小至少为 1 的数组。返回时,stampholder[0] 将保存该标志的值。* @return 该引用的当前值*/public V get(int[] stampHolder) {Pair<V> pair = this.pair;stampHolder[0] = pair.stamp;return pair.reference;}/*** 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。* 可能意外失败并且不提供排序保证,所以只有在很少的情况下才对 compareAndSet 进行适当的选择。* @param expectedReference 该引用的预期值* @param newReference 该引用的新值* @param expectedStamp 该标志的预期值* @param newStamp 该标志的新值* @return 如果成功,则返回 true*/public boolean weakCompareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {return compareAndSet(expectedReference, newReference,expectedStamp, newStamp);}/*** 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。* @param expectedReference 该引用的预期值* @param newReference 该引用的新值* @param expectedStamp 该标志的预期值* @param newStamp 该标志的新值* @return 如果成功,则返回 true*/public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedStamp == current.stamp &&((newReference == current.reference &&newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));}/*** 无条件地同时设置该引用和标志的值。* @param newReference 该引用的新值* @param newStamp 该标志的新值*/public void set(V newReference, int newStamp) {Pair<V> current = pair;if (newReference != current.reference || newStamp != current.stamp)this.pair = Pair.of(newReference, newStamp);}/*** 如果当前引用 == 预期引用,则以原子方式将该标志的值设置为给定的更新值。此操作的任何给定调用都可能会意外失败(返回 false),* 但是在当前值保持预期值而且没有其他线程也在尝试设置该值时,重复调用将最终获得成功。* @param expectedReference 该引用的预期值* @param newStamp 该标志的新值* @return 如果成功,则返回 true*/public boolean attemptStamp(V expectedReference, int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&(newStamp == current.stamp ||casPair(current, Pair.of(expectedReference, newStamp)));}// 不安全机制private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();private static final long pairOffset =objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);/*** pair比对* @param cmp* @param val* @return*/private boolean casPair(Pair<V> cmp, Pair<V> val) {return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);}/*** 获取偏移量* @param UNSAFE* @param field 字段名* @param klazz 类型* @return*/static long objectFieldOffset(sun.misc.Unsafe UNSAFE,String field, Class<?> klazz) {try {return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));} catch (NoSuchFieldException e) {// 将异常转换为相应的错误NoSuchFieldError error = new NoSuchFieldError(field);error.initCause(e);throw error;}}}
AtomicMarkableReference 源码
/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.** Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent.atomic;/*** An {@code AtomicMarkableReference} maintains an object reference* along with a mark bit, that can be updated atomically.** <p>Implementation note: This implementation maintains markable* references by creating internal objects representing "boxed"* [reference, boolean] pairs.** @since 1.5* @author Doug Lea* @param <V> The type of object referred to by this reference*/public class AtomicMarkableReference<V> {private static class Pair<T> {final T reference;final boolean mark;private Pair(T reference, boolean mark) {this.reference = reference;this.mark = mark;}static <T> Pair<T> of(T reference, boolean mark) {return new Pair<T>(reference, mark);}}// 用volatile的内存语义保证可见性// 保存引用值和标记值private volatile Pair<V> pair;// 构造函数,根据指定的引用值和标记值,构造一个Pair对象,并将该对象赋值给成员变量pair。// 由于成员变量pair被volatile修饰,并且这里只有一个单操作的赋值语句,因此是可以保证原子性的。public AtomicMarkableReference(V initialRef, boolean initialMark) {pair = Pair.of(initialRef, initialMark);}// 以原子方式获取当前引用值public V getReference() {return pair.reference;}// 以原子方式获取当前标记值public boolean isMarked() {return pair.mark;}// 同时获取引用值和标记值。由于Java程序只能有一个返回值,// 该函数通过一个数组参数int[] markHolder来返回标记值,而通过return语句返回引用值。public V get(boolean[] markHolder) {Pair<V> pair = this.pair;markHolder[0] = pair.mark;return pair.reference;}// 以原子的方式同时更新引用值和标记值。该是通过调用CompareAndSet实现的。// JDK文档中说,weakCompareAndSet在更新变量时并不创建任何happens-before顺序,// 因此即使要修改的值是volatile的,也不保证对该变量的读写操作的顺序// (一般来讲,volatile的内存语义保证happens-before顺序)。public boolean weakCompareAndSet(V expectedReference,V newReference,boolean expectedMark,boolean newMark) {return compareAndSet(expectedReference, newReference,expectedMark, newMark);}// 以原子的方式同时更新引用值和标记值。// 当期望引用值不等于当前引用值时,操作失败,返回false。// 当期望标记值不等于当前标记值时,操作失败,返回false。// 在期望引用值和期望标记值同时等于当前值的前提下,当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true。// 由于要修改的内容与原内容完全一致,这种处理可以避免一次内存操作,效率较高。// 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回truepublic boolean compareAndSet(V expectedReference,V newReference,boolean expectedMark,boolean newMark) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedMark == current.mark &&((newReference == current.reference &&newMark == current.mark) ||casPair(current, Pair.of(newReference, newMark)));}// 只要新的引用值和新的标记值,有一个与当前值不一样的,就同时修改引用值和标记值。public void set(V newReference, boolean newMark) {Pair<V> current = pair;if (newReference != current.reference || newMark != current.mark)this.pair = Pair.of(newReference, newMark);}// 修改指定引用值的标记值。// 当期望的引用值与当前引用值不相同时,操作失败,返回fasle。// 当期望的引用值与当前引用值相同时,操作成功,返回true。public boolean attemptMark(V expectedReference, boolean newMark) {Pair<V> current = pair;returnexpectedReference == current.reference &&(newMark == current.mark ||// 使用sun.misc.Unsafe类原子地交换两个对象。casPair(current, Pair.of(expectedReference, newMark)));}// Unsafe mechanics// 成员变量unsafe是原子变量相关操作的基础// 原子变量的修改操作最终有sun.misc.Unsafe类的CAS操作实现private static final sun.misc.Unsafe UNSAFE = Unsafe.getUnsafe();// 成员变量value的内存偏移值private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);// 使用sun.misc.Unsafe类原子地交换两个对象。private boolean casPair(Pair<V> cmp, Pair<V> val) {return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);}// 获取指定域的内存偏移量static long objectFieldOffset(sun.misc.Unsafe UNSAFE,String field, Class<?> klazz) {try {return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));} catch (NoSuchFieldException e) {// Convert Exception to corresponding ErrorNoSuchFieldError error = new NoSuchFieldError(field);error.initCause(e);throw error;}}}
