CAS原子类

20200212180741122.png
lock cmpxchg 指令 : cmpxchg指令 是X86比较交换指令本身是非原子性的,加上lock指令以后其它CPU不允许对当前值做修改
java.util.concurrent.atomic 包下的原子类 AtomicInteger 中的

incrementAndGet 方法

  1. /**
  2. * Atomically increments by one the current value.
  3. *
  4. * @return the updated value
  5. */
  6. public final int incrementAndGet() {
  7. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  8. }
  9. public final class Unsafe {
  10. public final int getAndAddInt(Object var1, long var2, int var4) {
  11. int var5;
  12. do {
  13. var5 = this.getIntVolatile(var1, var2);
  14. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  15. return var5;
  16. }
  17. }

AtomicInteger有一个incrementAndGet的自增方法,在一个循环里,每次去获取当前的值current,然后将当前值current+1赋值给next,然后将current和next放到compareAndSet中进行比较,如果返回true那么就return next的值,如果失败,那么继续进行上述操作

compareAndSet 方法进行分析,相关分析如下:

  1. // setup to use Unsafe.compareAndSwapInt for updates
  2. public class AtomicInteger extends Number implements java.io.Serializable {
  3. private static final Unsafe unsafe = Unsafe.getUnsafe();
  4. private static final long valueOffset;
  5. static {
  6. try { // 计算变量 value 在类对象中的偏移
  7. valueOffset = unsafe.objectFieldOffset
  8. (AtomicInteger.class.getDeclaredField("value"));
  9. } catch (Exception ex) { throw new Error(ex); }
  10. } private volatile int value;
  11. public final boolean compareAndSet(int expect, int update) {
  12. /*
  13. * compareAndSet 实际上只是一个壳子,主要的逻辑封装在 Unsafe 的
  14. * compareAndSwapInt 方法中
  15. */
  16. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  17. }
  18. // ......}public final class Unsafe { // compareAndSwapInt 是 native 类型的方法,继续往下看
  19. public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
  20. // ......
  21. }
  1. // unsafe.cpp/*
  2. * 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY UNSAFE_END 都是宏,
  3. * 在预编译期间会被替换成真正的代码。下面的 jbooleanjlong jint 等是一些类型定义(typedef):
  4. *
  5. * jni.h
  6. * typedef unsigned char jboolean;
  7. * typedef unsigned short jchar;
  8. * typedef short jshort;
  9. * typedef float jfloat;
  10. * typedef double jdouble;
  11. *
  12. * jni_md.h
  13. * typedef int jint;
  14. * #ifdef _LP64 // 64-bit
  15. * typedef long jlong;
  16. * #else
  17. * typedef long long jlong;
  18. * #endif
  19. * typedef signed char jbyte;
  20. */UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  21. UnsafeWrapper("Unsafe_CompareAndSwapInt");
  22. oop p = JNIHandles::resolve(obj); // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffset
  23. jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中
  24. return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
  25. UNSAFE_END// atomic.cppunsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
  26. assert(sizeof(unsigned int) == sizeof(jint), "more work to do"); /*
  27. * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载
  28. * 函数。相关的预编译逻辑如下:
  29. *
  30. * atomic.inline.hpp:
  31. * #include "runtime/atomic.hpp"
  32. *
  33. * // Linux
  34. * #ifdef TARGET_OS_ARCH_linux_x86
  35. * # include "atomic_linux_x86.inline.hpp"
  36. * #endif
  37. *
  38. * // 省略部分代码
  39. *
  40. * // Windows
  41. * #ifdef TARGET_OS_ARCH_windows_x86
  42. * # include "atomic_windows_x86.inline.hpp"
  43. * #endif
  44. *
  45. * // BSD
  46. * #ifdef TARGET_OS_ARCH_bsd_x86
  47. * # include "atomic_bsd_x86.inline.hpp"
  48. * #endif
  49. *
  50. * 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现
  51. */
  52. return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
  53. (jint)compare_value);
  54. }

Atomic::cmpxchg 函数

  1. // atomic_windows_x86.inline.hpp#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
  2. __asm je L0 \
  3. __asm _emit 0xF0 \
  4. __asm L0:
  5. inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange
  6. int mp = os::is_MP();
  7. __asm {
  8. mov edx, dest
  9. mov ecx, exchange_value
  10. mov eax, compare_value LOCK_IF_MP(mp)
  11. cmpxchg dword ptr [edx], ecx
  12. }
  13. }

上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下

  1. inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // 判断是否是多核 CPU
  2. int mp = os::is_MP();
  3. __asm { // 将参数值放入寄存器中
  4. mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中
  5. mov ecx, exchange_value
  6. mov eax, compare_value
  7. // LOCK_IF_MP
  8. cmp mp, 0
  9. /*
  10. * 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,
  11. * 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
  12. * 前加 lock 前缀。
  13. */
  14. je L0 /*
  15. * 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的
  16. * 原因可以参考知乎的一个回答:
  17. * https://www.zhihu.com/question/50878124/answer/123099923
  18. */
  19. _emit 0xF0L0: /*
  20. * 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:
  21. * cmpxchg: 即“比较并交换”指令
  22. * dword: 全称是 double word,在 x86/x64 体系中,一个
  23. * word = 2 byte,dword = 4 byte = 32 bit
  24. * ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
  25. * [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。
  26. * 那么 [edx] 表示内存地址为 dest 的内存单元
  27. *
  28. * 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值
  29. * 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。
  30. */
  31. cmpxchg dword ptr [edx], ecx
  32. }
  33. }

到这里 CAS 的实现过程就讲完了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带lock 前缀的 cmpxchg 指令,即lock cmpxchg dword ptr [edx], ecx

ABA问题

1.给当前值 加版本号 AtomicStampedReference
2.加Boolean值 AtomicMarkableReference
谈到 CAS,基本上都要谈一下 CAS 的 ABA 问题。CAS 由三个步骤组成,分别是“读取->比较->写回”。考虑这样一种情况,线程1和线程2同时执行 CAS 逻辑,两个线程的执行顺序如下:

  1. 时刻1:线程1执行读取操作,获取原值 A,然后线程被切换走
  2. 时刻2:线程2执行完成 CAS 操作将原值由 A 修改为 B
  3. 时刻3:线程2再次执行 CAS 操作,并将原值由 B 修改为 A
  4. 时刻4:线程1恢复运行,将比较值(compareValue)与原值(oldValue)进行比较,发现两个值相等。
    然后用新值(newValue)写入内存中,完成 CAS 操作

如上流程,线程1并不知道原值已经被修改过了,在它看来并没什么变化,所以它会继续往下执行流程。对于 ABA 问题,通常的处理措施是对每一次 CAS 操作设置版本号。java.util.concurrent.atomic 包下提供了一个可处理 ABA 问题的原子类 AtomicStampedReference,具体的实现

AtomicStampedReference

内部类Pair

  1. private static class Pair<T> {
  2. final T reference;
  3. final int stamp;
  4. private Pair(T reference, int stamp) {
  5. this.reference = reference;
  6. this.stamp = stamp;
  7. }
  8. static <T> Pair<T> of(T reference, int stamp) {
  9. return new Pair<T>(reference, stamp);
  10. }
  11. }
  12. private volatile Pair<V> pair;

我们可以发现,AtomicStampedReference 对比 AtomicReference,全局维护的不是 T reference,而是 Pair。Pair 对象里多维护了一个 stamp 标识。

compareAndSet方法

  1. public boolean compareAndSet(V expectedReference,
  2. V newReference,
  3. int expectedStamp,
  4. int newStamp) {
  5. Pair<V> current = pair;
  6. return
  7. // 1 引用没变
  8. expectedReference == current.reference &&
  9. // 2 版本号没变
  10. expectedStamp == current.stamp &&
  11. // 3 新引用等于旧引用
  12. ((newReference == current.reference &&
  13. // 4 新版本号等于旧版本号
  14. newStamp == current.stamp) ||
  15. // 5 构造 Pair 然后 cas
  16. casPair(current, Pair.of(newReference, newStamp)));
  17. }

可以看到,最后的 return 的逻辑很复杂,我们可以看到多了版本号的校验。

AtomicStampedReference源码

  1. /**
  2. * AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
  3. *
  4. * 实现注意事项。通过创建表示“已装箱”的 [reference, integer] 对的内部对象,此实现维持带标志的引用。
  5. * @param <V> 此引用引用的对象的类型
  6. */
  7. public class AtomicStampedReference<V> {
  8. private static class Pair<T> {
  9. final T reference;
  10. final int stamp;
  11. /**
  12. *
  13. * @param reference 引用
  14. * @param stamp 标志
  15. */
  16. private Pair(T reference, int stamp) {
  17. this.reference = reference;
  18. this.stamp = stamp;
  19. }
  20. /**
  21. * 静态工厂
  22. * @param reference 引用
  23. * @param stamp 标志
  24. * @param <T> 此引用引用的对象的类型
  25. * @return
  26. */
  27. static <T> Pair<T> of(T reference, int stamp) {
  28. return new Pair<T>(reference, stamp);
  29. }
  30. }
  31. private volatile Pair<V> pair;
  32. /**
  33. * 创建具有给定初始值的新 AtomicStampedReference。
  34. * @param initialRef 初始引用
  35. * @param initialStamp 初始标志
  36. */
  37. public AtomicStampedReference(V initialRef, int initialStamp) {
  38. pair = Pair.of(initialRef, initialStamp);
  39. }
  40. /**
  41. * 返回该引用的当前值。
  42. * @return 该引用的当前值
  43. */
  44. public V getReference() {
  45. return pair.reference;
  46. }
  47. /**
  48. * 返回该标志的当前值。
  49. * @return 该标志的当前值
  50. */
  51. public int getStamp() {
  52. return pair.stamp;
  53. }
  54. /**
  55. * 返回该引用和该标志的当前值。典型的用法为 int[1] holder; ref = v.get(holder); 。
  56. * @param stampHolder 大小至少为 1 的数组。返回时,stampholder[0] 将保存该标志的值。
  57. * @return 该引用的当前值
  58. */
  59. public V get(int[] stampHolder) {
  60. Pair<V> pair = this.pair;
  61. stampHolder[0] = pair.stamp;
  62. return pair.reference;
  63. }
  64. /**
  65. * 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。
  66. * 可能意外失败并且不提供排序保证,所以只有在很少的情况下才对 compareAndSet 进行适当的选择。
  67. * @param expectedReference 该引用的预期值
  68. * @param newReference 该引用的新值
  69. * @param expectedStamp 该标志的预期值
  70. * @param newStamp 该标志的新值
  71. * @return 如果成功,则返回 true
  72. */
  73. public boolean weakCompareAndSet(V expectedReference,
  74. V newReference,
  75. int expectedStamp,
  76. int newStamp) {
  77. return compareAndSet(expectedReference, newReference,
  78. expectedStamp, newStamp);
  79. }
  80. /**
  81. * 如果当前引用 == 预期引用,并且当前标志等于预期标志,则以原子方式将该引用和该标志的值设置为给定的更新值。
  82. * @param expectedReference 该引用的预期值
  83. * @param newReference 该引用的新值
  84. * @param expectedStamp 该标志的预期值
  85. * @param newStamp 该标志的新值
  86. * @return 如果成功,则返回 true
  87. */
  88. public boolean compareAndSet(V expectedReference,
  89. V newReference,
  90. int expectedStamp,
  91. int newStamp) {
  92. Pair<V> current = pair;
  93. return
  94. expectedReference == current.reference &&
  95. expectedStamp == current.stamp &&
  96. ((newReference == current.reference &&
  97. newStamp == current.stamp) ||
  98. casPair(current, Pair.of(newReference, newStamp)));
  99. }
  100. /**
  101. * 无条件地同时设置该引用和标志的值。
  102. * @param newReference 该引用的新值
  103. * @param newStamp 该标志的新值
  104. */
  105. public void set(V newReference, int newStamp) {
  106. Pair<V> current = pair;
  107. if (newReference != current.reference || newStamp != current.stamp)
  108. this.pair = Pair.of(newReference, newStamp);
  109. }
  110. /**
  111. * 如果当前引用 == 预期引用,则以原子方式将该标志的值设置为给定的更新值。此操作的任何给定调用都可能会意外失败(返回 false),
  112. * 但是在当前值保持预期值而且没有其他线程也在尝试设置该值时,重复调用将最终获得成功。
  113. * @param expectedReference 该引用的预期值
  114. * @param newStamp 该标志的新值
  115. * @return 如果成功,则返回 true
  116. */
  117. public boolean attemptStamp(V expectedReference, int newStamp) {
  118. Pair<V> current = pair;
  119. return
  120. expectedReference == current.reference &&
  121. (newStamp == current.stamp ||
  122. casPair(current, Pair.of(expectedReference, newStamp)));
  123. }
  124. // 不安全机制
  125. private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
  126. private static final long pairOffset =
  127. objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
  128. /**
  129. * pair比对
  130. * @param cmp
  131. * @param val
  132. * @return
  133. */
  134. private boolean casPair(Pair<V> cmp, Pair<V> val) {
  135. return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
  136. }
  137. /**
  138. * 获取偏移量
  139. * @param UNSAFE
  140. * @param field 字段名
  141. * @param klazz 类型
  142. * @return
  143. */
  144. static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
  145. String field, Class<?> klazz) {
  146. try {
  147. return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
  148. } catch (NoSuchFieldException e) {
  149. // 将异常转换为相应的错误
  150. NoSuchFieldError error = new NoSuchFieldError(field);
  151. error.initCause(e);
  152. throw error;
  153. }
  154. }
  155. }

AtomicMarkableReference 源码

  1. /*
  2. * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3. *
  4. * Written by Doug Lea with assistance from members of JCP JSR-166
  5. * Expert Group and released to the public domain, as explained at
  6. * http://creativecommons.org/publicdomain/zero/1.0/
  7. */
  8. package java.util.concurrent.atomic;
  9. /**
  10. * An {@code AtomicMarkableReference} maintains an object reference
  11. * along with a mark bit, that can be updated atomically.
  12. *
  13. * <p>Implementation note: This implementation maintains markable
  14. * references by creating internal objects representing "boxed"
  15. * [reference, boolean] pairs.
  16. *
  17. * @since 1.5
  18. * @author Doug Lea
  19. * @param <V> The type of object referred to by this reference
  20. */
  21. public class AtomicMarkableReference<V> {
  22. private static class Pair<T> {
  23. final T reference;
  24. final boolean mark;
  25. private Pair(T reference, boolean mark) {
  26. this.reference = reference;
  27. this.mark = mark;
  28. }
  29. static <T> Pair<T> of(T reference, boolean mark) {
  30. return new Pair<T>(reference, mark);
  31. }
  32. }
  33. // 用volatile的内存语义保证可见性
  34. // 保存引用值和标记值
  35. private volatile Pair<V> pair;
  36. // 构造函数,根据指定的引用值和标记值,构造一个Pair对象,并将该对象赋值给成员变量pair。
  37. // 由于成员变量pair被volatile修饰,并且这里只有一个单操作的赋值语句,因此是可以保证原子性的。
  38. public AtomicMarkableReference(V initialRef, boolean initialMark) {
  39. pair = Pair.of(initialRef, initialMark);
  40. }
  41. // 以原子方式获取当前引用值
  42. public V getReference() {
  43. return pair.reference;
  44. }
  45. // 以原子方式获取当前标记值
  46. public boolean isMarked() {
  47. return pair.mark;
  48. }
  49. // 同时获取引用值和标记值。由于Java程序只能有一个返回值,
  50. // 该函数通过一个数组参数int[] markHolder来返回标记值,而通过return语句返回引用值。
  51. public V get(boolean[] markHolder) {
  52. Pair<V> pair = this.pair;
  53. markHolder[0] = pair.mark;
  54. return pair.reference;
  55. }
  56. // 以原子的方式同时更新引用值和标记值。该是通过调用CompareAndSet实现的。
  57. // JDK文档中说,weakCompareAndSet在更新变量时并不创建任何happens-before顺序,
  58. // 因此即使要修改的值是volatile的,也不保证对该变量的读写操作的顺序
  59. // (一般来讲,volatile的内存语义保证happens-before顺序)。
  60. public boolean weakCompareAndSet(V expectedReference,
  61. V newReference,
  62. boolean expectedMark,
  63. boolean newMark) {
  64. return compareAndSet(expectedReference, newReference,
  65. expectedMark, newMark);
  66. }
  67. // 以原子的方式同时更新引用值和标记值。
  68. // 当期望引用值不等于当前引用值时,操作失败,返回false。
  69. // 当期望标记值不等于当前标记值时,操作失败,返回false。
  70. // 在期望引用值和期望标记值同时等于当前值的前提下,当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true。
  71. // 由于要修改的内容与原内容完全一致,这种处理可以避免一次内存操作,效率较高。
  72. // 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回true
  73. public boolean compareAndSet(V expectedReference,
  74. V newReference,
  75. boolean expectedMark,
  76. boolean newMark) {
  77. Pair<V> current = pair;
  78. return
  79. expectedReference == current.reference &&
  80. expectedMark == current.mark &&
  81. ((newReference == current.reference &&
  82. newMark == current.mark) ||
  83. casPair(current, Pair.of(newReference, newMark)));
  84. }
  85. // 只要新的引用值和新的标记值,有一个与当前值不一样的,就同时修改引用值和标记值。
  86. public void set(V newReference, boolean newMark) {
  87. Pair<V> current = pair;
  88. if (newReference != current.reference || newMark != current.mark)
  89. this.pair = Pair.of(newReference, newMark);
  90. }
  91. // 修改指定引用值的标记值。
  92. // 当期望的引用值与当前引用值不相同时,操作失败,返回fasle。
  93. // 当期望的引用值与当前引用值相同时,操作成功,返回true。
  94. public boolean attemptMark(V expectedReference, boolean newMark) {
  95. Pair<V> current = pair;
  96. return
  97. expectedReference == current.reference &&
  98. (newMark == current.mark ||
  99. // 使用sun.misc.Unsafe类原子地交换两个对象。
  100. casPair(current, Pair.of(expectedReference, newMark)));
  101. }
  102. // Unsafe mechanics
  103. // 成员变量unsafe是原子变量相关操作的基础
  104. // 原子变量的修改操作最终有sun.misc.Unsafe类的CAS操作实现
  105. private static final sun.misc.Unsafe UNSAFE = Unsafe.getUnsafe();
  106. // 成员变量value的内存偏移值
  107. private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);
  108. // 使用sun.misc.Unsafe类原子地交换两个对象。
  109. private boolean casPair(Pair<V> cmp, Pair<V> val) {
  110. return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
  111. }
  112. // 获取指定域的内存偏移量
  113. static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
  114. String field, Class<?> klazz) {
  115. try {
  116. return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
  117. } catch (NoSuchFieldException e) {
  118. // Convert Exception to corresponding Error
  119. NoSuchFieldError error = new NoSuchFieldError(field);
  120. error.initCause(e);
  121. throw error;
  122. }
  123. }
  124. }