CAS原子类
lock cmpxchg 指令 : cmpxchg指令 是X86比较交换指令本身是非原子性的,加上lock指令以后其它CPU不允许对当前值做修改
java.util.concurrent.atomic 包下的原子类 AtomicInteger 中的
incrementAndGet 方法
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final class Unsafe {
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
}
AtomicInteger有一个incrementAndGet的自增方法,在一个循环里,每次去获取当前的值current,然后将当前值current+1赋值给next,然后将current和next放到compareAndSet中进行比较,如果返回true那么就return next的值,如果失败,那么继续进行上述操作
compareAndSet 方法进行分析,相关分析如下:
// setup to use Unsafe.compareAndSwapInt for updates
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try { // 计算变量 value 在类对象中的偏移
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
} private volatile int value;
public final boolean compareAndSet(int expect, int update) {
/*
* compareAndSet 实际上只是一个壳子,主要的逻辑封装在 Unsafe 的
* compareAndSwapInt 方法中
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// ......}public final class Unsafe { // compareAndSwapInt 是 native 类型的方法,继续往下看
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
// ......
}
// unsafe.cpp/*
* 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY 和 UNSAFE_END 都是宏,
* 在预编译期间会被替换成真正的代码。下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef):
*
* jni.h
* typedef unsigned char jboolean;
* typedef unsigned short jchar;
* typedef short jshort;
* typedef float jfloat;
* typedef double jdouble;
*
* jni_md.h
* typedef int jint;
* #ifdef _LP64 // 64-bit
* typedef long jlong;
* #else
* typedef long long jlong;
* #endif
* typedef signed char jbyte;
*/UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj); // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffset
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END// atomic.cppunsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do"); /*
* 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载
* 函数。相关的预编译逻辑如下:
*
* atomic.inline.hpp:
* #include "runtime/atomic.hpp"
*
* // Linux
* #ifdef TARGET_OS_ARCH_linux_x86
* # include "atomic_linux_x86.inline.hpp"
* #endif
*
* // 省略部分代码
*
* // Windows
* #ifdef TARGET_OS_ARCH_windows_x86
* # include "atomic_windows_x86.inline.hpp"
* #endif
*
* // BSD
* #ifdef TARGET_OS_ARCH_bsd_x86
* # include "atomic_bsd_x86.inline.hpp"
* #endif
*
* 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现
*/
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
(jint)compare_value);
}
Atomic::cmpxchg 函数
// atomic_windows_x86.inline.hpp#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // 判断是否是多核 CPU
int mp = os::is_MP();
__asm { // 将参数值放入寄存器中
mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中
mov ecx, exchange_value
mov eax, compare_value
// LOCK_IF_MP
cmp mp, 0
/*
* 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,
* 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
* 前加 lock 前缀。
*/
je L0 /*
* 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的
* 原因可以参考知乎的一个回答:
* https://www.zhihu.com/question/50878124/answer/123099923
*/
_emit 0xF0L0: /*
* 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:
* cmpxchg: 即“比较并交换”指令
* dword: 全称是 double word,在 x86/x64 体系中,一个
* word = 2 byte,dword = 4 byte = 32 bit
* ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
* [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。
* 那么 [edx] 表示内存地址为 dest 的内存单元
*
* 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值
* 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。
*/
cmpxchg dword ptr [edx], ecx
}
}
到这里 CAS 的实现过程就讲完了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带lock 前缀的 cmpxchg 指令,即lock cmpxchg dword ptr [edx], ecx
。
ABA问题
1.给当前值 加版本号 AtomicStampedReference
2.加Boolean值 AtomicMarkableReference
谈到 CAS,基本上都要谈一下 CAS 的 ABA 问题。CAS 由三个步骤组成,分别是“读取->比较->写回”。考虑这样一种情况,线程1和线程2同时执行 CAS 逻辑,两个线程的执行顺序如下:
- 时刻1:线程1执行读取操作,获取原值 A,然后线程被切换走
- 时刻2:线程2执行完成 CAS 操作将原值由 A 修改为 B
- 时刻3:线程2再次执行 CAS 操作,并将原值由 B 修改为 A
- 时刻4:线程1恢复运行,将比较值(compareValue)与原值(oldValue)进行比较,发现两个值相等。
然后用新值(newValue)写入内存中,完成 CAS 操作
如上流程,线程1并不知道原值已经被修改过了,在它看来并没什么变化,所以它会继续往下执行流程。对于 ABA 问题,通常的处理措施是对每一次 CAS 操作设置版本号。java.util.concurrent.atomic 包下提供了一个可处理 ABA 问题的原子类 AtomicStampedReference,具体的实现
AtomicStampedReference
内部类Pair
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
我们可以发现,AtomicStampedReference 对比 AtomicReference,全局维护的不是 T reference,而是 Pair。Pair 对象里多维护了一个 stamp 标识。
compareAndSet方法
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
// 1 引用没变
expectedReference == current.reference &&
// 2 版本号没变
expectedStamp == current.stamp &&
// 3 新引用等于旧引用
((newReference == current.reference &&
// 4 新版本号等于旧版本号
newStamp == current.stamp) ||
// 5 构造 Pair 然后 cas
casPair(current, Pair.of(newReference, newStamp)));
}
可以看到,最后的 return 的逻辑很复杂,我们可以看到多了版本号的校验。
AtomicStampedReference源码
/**
* AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
*
* 实现注意事项。通过创建表示“已装箱”的 [reference, integer] 对的内部对象,此实现维持带标志的引用。
* @param <V> 此引用引用的对象的类型
*/
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
/**
*
* @param reference 引用
* @param stamp 标志
*/
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
/**
* 静态工厂
* @param reference 引用
* @param stamp 标志
* @param <T> 此引用引用的对象的类型
* @return
*/
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* 创建具有给定初始值的新 AtomicStampedReference。
* @param initialRef 初始引用
* @param initialStamp 初始标志
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
/**
* 返回该引用的当前值。
* @return 该引用的当前值
*/
public V getReference() {
return pair.reference;
}
/**
* 返回该标志的当前值。
* @return 该标志的当前值
*/
public int getStamp() {
return pair.stamp;
}
/**
* 返回该引用和该标志的当前值。典型的用法为 int[1] holder; ref = v.get(holder); 。
* @param stampHolder 大小至少为 1 的数组。返回时,stampholder[0] 将保存该标志的值。
* @return 该引用的当前值
*/
public V get(int[] stampHolder) {
Pair<V> pair = this.pair;
stampHolder[0] = pair.stamp;
return pair.reference;
}
/**
* 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。
* 可能意外失败并且不提供排序保证,所以只有在很少的情况下才对 compareAndSet 进行适当的选择。
* @param expectedReference 该引用的预期值
* @param newReference 该引用的新值
* @param expectedStamp 该标志的预期值
* @param newStamp 该标志的新值
* @return 如果成功,则返回 true
*/
public boolean weakCompareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
return compareAndSet(expectedReference, newReference,
expectedStamp, newStamp);
}
/**
* 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。
* @param expectedReference 该引用的预期值
* @param newReference 该引用的新值
* @param expectedStamp 该标志的预期值
* @param newStamp 该标志的新值
* @return 如果成功,则返回 true
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
/**
* 无条件地同时设置该引用和标志的值。
* @param newReference 该引用的新值
* @param newStamp 该标志的新值
*/
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}
/**
* 如果当前引用 == 预期引用,则以原子方式将该标志的值设置为给定的更新值。此操作的任何给定调用都可能会意外失败(返回 false),
* 但是在当前值保持预期值而且没有其他线程也在尝试设置该值时,重复调用将最终获得成功。
* @param expectedReference 该引用的预期值
* @param newStamp 该标志的新值
* @return 如果成功,则返回 true
*/
public boolean attemptStamp(V expectedReference, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newStamp == current.stamp ||
casPair(current, Pair.of(expectedReference, newStamp)));
}
// 不安全机制
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
/**
* pair比对
* @param cmp
* @param val
* @return
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
/**
* 获取偏移量
* @param UNSAFE
* @param field 字段名
* @param klazz 类型
* @return
*/
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// 将异常转换为相应的错误
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}
AtomicMarkableReference 源码
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
/**
* An {@code AtomicMarkableReference} maintains an object reference
* along with a mark bit, that can be updated atomically.
*
* <p>Implementation note: This implementation maintains markable
* references by creating internal objects representing "boxed"
* [reference, boolean] pairs.
*
* @since 1.5
* @author Doug Lea
* @param <V> The type of object referred to by this reference
*/
public class AtomicMarkableReference<V> {
private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
// 用volatile的内存语义保证可见性
// 保存引用值和标记值
private volatile Pair<V> pair;
// 构造函数,根据指定的引用值和标记值,构造一个Pair对象,并将该对象赋值给成员变量pair。
// 由于成员变量pair被volatile修饰,并且这里只有一个单操作的赋值语句,因此是可以保证原子性的。
public AtomicMarkableReference(V initialRef, boolean initialMark) {
pair = Pair.of(initialRef, initialMark);
}
// 以原子方式获取当前引用值
public V getReference() {
return pair.reference;
}
// 以原子方式获取当前标记值
public boolean isMarked() {
return pair.mark;
}
// 同时获取引用值和标记值。由于Java程序只能有一个返回值,
// 该函数通过一个数组参数int[] markHolder来返回标记值,而通过return语句返回引用值。
public V get(boolean[] markHolder) {
Pair<V> pair = this.pair;
markHolder[0] = pair.mark;
return pair.reference;
}
// 以原子的方式同时更新引用值和标记值。该是通过调用CompareAndSet实现的。
// JDK文档中说,weakCompareAndSet在更新变量时并不创建任何happens-before顺序,
// 因此即使要修改的值是volatile的,也不保证对该变量的读写操作的顺序
// (一般来讲,volatile的内存语义保证happens-before顺序)。
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
return compareAndSet(expectedReference, newReference,
expectedMark, newMark);
}
// 以原子的方式同时更新引用值和标记值。
// 当期望引用值不等于当前引用值时,操作失败,返回false。
// 当期望标记值不等于当前标记值时,操作失败,返回false。
// 在期望引用值和期望标记值同时等于当前值的前提下,当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true。
// 由于要修改的内容与原内容完全一致,这种处理可以避免一次内存操作,效率较高。
// 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回true
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedMark == current.mark &&
((newReference == current.reference &&
newMark == current.mark) ||
casPair(current, Pair.of(newReference, newMark)));
}
// 只要新的引用值和新的标记值,有一个与当前值不一样的,就同时修改引用值和标记值。
public void set(V newReference, boolean newMark) {
Pair<V> current = pair;
if (newReference != current.reference || newMark != current.mark)
this.pair = Pair.of(newReference, newMark);
}
// 修改指定引用值的标记值。
// 当期望的引用值与当前引用值不相同时,操作失败,返回fasle。
// 当期望的引用值与当前引用值相同时,操作成功,返回true。
public boolean attemptMark(V expectedReference, boolean newMark) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newMark == current.mark ||
// 使用sun.misc.Unsafe类原子地交换两个对象。
casPair(current, Pair.of(expectedReference, newMark)));
}
// Unsafe mechanics
// 成员变量unsafe是原子变量相关操作的基础
// 原子变量的修改操作最终有sun.misc.Unsafe类的CAS操作实现
private static final sun.misc.Unsafe UNSAFE = Unsafe.getUnsafe();
// 成员变量value的内存偏移值
private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);
// 使用sun.misc.Unsafe类原子地交换两个对象。
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
// 获取指定域的内存偏移量
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}