简介

CAS,即 Compare And Swap,比较再交换,是一种无锁算法,基于硬件原语实现,能够在不使用锁的情况下实现多线程之间的变量同步。jdk 中的 java.util.concurrent.atomic 包中的原子类就是通过 CAS 来实现了乐观锁。

算法过程

算法涉及到三个操作数:

  • 需要读写的内存位置 V
  • 需要进行比较的预期值 A
  • 需要写入的新值 U

算法解析:
CAS的具体算法解释是:V 是需要进行操作的值,A 是当前期待的预期值,当且仅当 V 符合预期值 A 的时候,才用新值 U 替换掉旧值,并写入到内存地址 V 中,否则不做更新
用 count++ 的例子来简单的解释就是:我现在需要修改的值是 3(V),那么我希望我操作的时候它还是 3(A),当我要对它进行自增操作的时候,我发现期望值 A 它变成 4 了,此时 V 不等于 A,说明有其他线程进来修改了这个值 V,那这时候我肯定是不能进行操作的。然后我再把需要修改的值变成 4,那此时期望值就是 4,当我操作的时候发现 V == A,即没有其他线程修改了这个值,这样就可以放心操作了,把自增后的值 U 5 赋值给 V,自增完成。

几个问题

  1. 如果当你判断的时候,发现是我期望的值,但是还没有进行新值设定的时候值发现了改变,即其他线程进来改变了这个值,这时候怎么办?

答:CAS 是 CPU 的原语支持,也就是说 CAS 操作是 CPU 指令级别上的支持,是保证了原子性的,中间不能被打断,也就不会出现上述的问题。

  1. ABA 问题:假如你有一个值,我拿到这个值是1,想把它变成2,我拿到1用 CAS 操作,期望值是1,准备变成2,这个对象Object,再这个过程中,没有一个线程改过的时候我肯定是可以进行修改的,但是如果有一个线程先把这个1变成了2后来又变回来1,中间值更改过,这时候怎么办?

答:如果是int类型,最终值是你期望的,中间值被修改过你可以不去管这个问题,最终结果仍然正确。但是如果是引用类型,它就可能有其他的一些问题。处理这个问题的方法之一就是你可以加一个版本号,做任何一个值的修改的时候,版本号自增,后面检查的时候连带版本号一起检查,就可以解决上述的 ABA 问题。

Java 中的 CAS

在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。

举个例子,如果你需要对一个 count++ 保证其线程安全,那么你可能会想到使用 synchronized 加锁。但是进入了 atomic 包后,你可与直接使用 AtomicInteger 类就可以了。

  1. private AtomicInteger count = new AtomicInteger(0);
  2. void main(){
  3. for(int i = 0; i < 1000; i++){
  4. //相当于count++
  5. count.incrementAndGet();
  6. }
  7. }

AtomicInteger 类的 API 看着就会用,很简单,其原理就是使用了 CAS 操作:

  1. public class AtomicInteger extends Number implements java.io.Serializable {
  2. private static final long serialVersionUID = 6214790243416807050L;
  3. // setup to use Unsafe.compareAndSwapInt for updates
  4. private static final Unsafe unsafe = Unsafe.getUnsafe();
  5. private static final long valueOffset;
  6. private volatile int value;
  7. public final int incrementAndGet() {
  8. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  9. }
  10. }

incrementAndGet() 方法中,使用了 unsafe.getAndAddInt(this, valueOffset, 1) 这也是一个 CAS 操作的方法,传入了当前的 object 对象,即当前值,valueOffset 期望值,以及需要增加的数值 1。

  1. public final class Unsafe {
  2. public final int getAndAddInt(Object var1, long var2, int var4) {
  3. int var5;
  4. do {
  5. var5 = this.getIntVolatile(var1, var2);
  6. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  7. return var5;
  8. }
  9. }

Unsafe 类里的这个方法就是我们的 CAS 操作,不断地循环判断旧值和期望值,直到旧值等于期望值的时候,就进行新值的操作。

关于 Unsafe 类,里面有很多方法,而且很多是本地方法,所以这里不再深入探究,只需要知道个原理就好了。所有的 Atomic 操作内部下面都是 CompareAndSet 这样的操作,都是在 Unsafe 里面完成的。

LongAdder

LongAdder 在Java8中的提出其实是对 AtomicLong 的CAS机制的一种优化方案,其采用了 Cell 机制。其核心思想是:将热点数据分离。它可以将 AtomicLong 内部核心数据 Value 分离称一个数组,每个线程访问时,通过 hash 等算法映射到其中一个数组下标进行计算,然后将数组所有结果求和累加作为最终结果。

这种机制将热点数据value分离成多个单元的 Cell,每个 Cell 独自维护内部的值,提高了并行度,对原本单点更新的压力分摊在各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。

缺点:
如果在统计的时候,如果有并发更新,可能会有统计数据有误差。
实际使用中在处理高并发计数的时候优先使用LongAdder,而不是AtomicLong在线程竞争不激烈的时候,使用AtomicLong会简单效率更高一些。比如序列号生成(准确性)

直接上源码!

几个问题:

  1. Cell[] 是何时被初始化的?
  2. 如果没有竞争,只会对base进行操作,这是从哪里看出来的?
  3. 初始化Cell[] 的规则是什么?
  4. Cell[] 扩容的时机是什么?
  5. 初始化Cell[]和扩容Cell[]是如何保证线程安全性的?

看源码前先了解几个属性:

  1. /**
  2. *操作数,所有操作都是基于base上进行累加的
  3. */
  4. transient volatile long base;
  5. /**
  6. * 相当于mutex,用于控制并发加锁
  7. */
  8. transient volatile int cellsBusy;
  9. /**
  10. * 核心分段Cell数组,用于分离数据
  11. */
  12. transient volatile Cell[] cells;
  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. //1.先判断cells 是否不为空 或者 casBase 是否不成功(即是否有竞争)
  4. if ((as = cells) != null || !casBase(b = base, b + x)) {
  5. boolean uncontended = true;
  6. //2.判断cells是否已经被初始化
  7. if (as == null || (m = as.length - 1) < 0 ||
  8. //3.如果cells已经被初始化,通过getProbe() & m 算法得到一个数字,判断as[数字]是否null
  9. (a = as[getProbe() & m]) == null ||
  10. //4.如果不为空,对这个cell进行cas操作,判断是否不成功
  11. !(uncontended = a.cas(v = a.value, v + x)))
  12. //5.如果上面判断条件都不通过,即cas操作都失败了,进入该方法
  13. longAccumulate(x, null, uncontended);
  14. }
  15. }
  16. /**
  17. *casBase 操作就是一个 cas 操作,如果操作成功即没有竞争,如果不成功则有竞争
  18. */
  19. final boolean casBase(long cmp, long val) {
  20. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
  21. }
  22. static final int getProbe() {
  23. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  24. }

在以上代码中,第一个if的解释是:如果cell[] 已经被初始化了,或者有竞争,才会进入这个if,如果没有竞争,也没有初始化,就不会进入第二行代码。因此从这里可以得到第二个问题(如果没有竞争,只会对base进行操作,这是从哪里看出来的?)的答案。

  1. /**
  2. *传入的参数是上一步add需要添加的值x,fn是null,
  3. *wasUncontended是true时,cell没被初始化或者cells[]内指定位置为空
  4. *wasUncontended是false时,cells[]指定位置不为空且cas操作失败,即有竞争
  5. */
  6. final void longAccumulate(long x, LongBinaryOperator fn,
  7. boolean wasUncontended) {
  8. int h;
  9. //若通过 getProbe() 算法仍是0,通过ThreadLocal操作强制初始化
  10. if ((h = getProbe()) == 0) {
  11. ThreadLocalRandom.current(); // force initialization
  12. h = getProbe();
  13. wasUncontended = true;
  14. }
  15. boolean collide = false; // True if last slot nonempty
  16. //无限循环
  17. for (;;) {
  18. Cell[] as; Cell a; int n; long v;
  19. //1.判断cells是否被初始化
  20. if ((as = cells) != null && (n = as.length) > 0) {
  21. //1.1.n是cells数组的长度
  22. if ((a = as[(n - 1) & h]) == null) {
  23. //1.2.cellsBusy == 0,代表现在 “不忙”,可以进入这个if
  24. if (cellsBusy == 0) { // Try to attach new Cell
  25. //1.3.创新一个新cell
  26. Cell r = new Cell(x); // Optimistically create
  27. //1.4.再次判断cellsBusy,casCellsBusy()加锁,这样只有一个线程可以进入这个if
  28. if (cellsBusy == 0 && casCellsBusy()) {
  29. boolean created = false;
  30. //1.5.把创建出来的cell元素加入到cell[]里
  31. try { // Recheck under lock
  32. Cell[] rs; int m, j;
  33. if ((rs = cells) != null &&
  34. (m = rs.length) > 0 &&
  35. rs[j = (m - 1) & h] == null) {
  36. rs[j] = r;
  37. created = true;
  38. }
  39. } finally {
  40. //1.6.释放锁
  41. cellsBusy = 0;
  42. }
  43. if (created)
  44. //创建成功,退出循环
  45. break;
  46. continue; // Slot is now non-empty
  47. }
  48. }
  49. collide = false;
  50. }
  51. //2.判断 cas 是否失败,如果是则重新进行
  52. else if (!wasUncontended) // CAS already known to fail
  53. wasUncontended = true; // Continue after rehash
  54. //3.对cell进行cas操作
  55. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  56. fn.applyAsLong(v, x))))
  57. break;
  58. //4.判断cell[]的长度是否大于CPU核心数,如果小于核心数,collide变为false,进入第五个if
  59. else if (n >= NCPU || cells != as)
  60. collide = false; // At max size or stale
  61. //5.重置collide为true,代表有冲突,然后跳到advanceProbe方法生成新的THREAD_PROBE,继续循环
  62. else if (!collide)
  63. collide = true;
  64. //6.判断是否不忙,且加锁
  65. else if (cellsBusy == 0 && casCellsBusy()) {
  66. //6.1.扩容处理
  67. try {
  68. if (cells == as) { // Expand table unless stale
  69. Cell[] rs = new Cell[n << 1];
  70. for (int i = 0; i < n; ++i)
  71. rs[i] = as[i];
  72. cells = rs;
  73. }
  74. } finally {
  75. cellsBusy = 0;
  76. }
  77. collide = false;
  78. continue; // Retry with expanded table
  79. }
  80. h = advanceProbe(h);
  81. }
  82. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  83. boolean init = false;
  84. try { // Initialize table
  85. if (cells == as) {
  86. Cell[] rs = new Cell[2];
  87. rs[h & 1] = new Cell(x);
  88. cells = rs;
  89. init = true;
  90. }
  91. } finally {
  92. cellsBusy = 0;
  93. }
  94. if (init)
  95. break;
  96. }
  97. else if (casBase(v = base, ((fn == null) ? v + x :
  98. fn.applyAsLong(v, x))))
  99. break; // Fall back on using base
  100. }
  101. }

上面的代码很复杂,我们以第一次调用add()方法为例子走一遍流程:

  1. 调用 add() 方法,判断 cells 是否不为空,且 casBase() 操作是否成功
    1. 若 cells 是空的,执行 casBase() 操作,即进行 CAS 赋值来操作 Base(Base即记录的旧值)
      1. 若 CAS 操作成功,表示没有线程竞争,add() 结束
      2. 若 CAS 操作失败,表示存在线程竞争,跳转第2步
    2. 若 cells 不是空的,即前面的操作存在过线程竞争,跳转第2步
  2. 判断 as(即cells)是否为未被初始化
    1. as 未被初始化,第一个判断条件成立,跳转第3步
    2. as 被初始化了,但是根据 getProbe() & m 得到 as[] 中某一元素为空,跳转第3步
    3. as 被初始化了,且根据算法得到某一元素不为空,则执行对该元素(cell)进行 CAS 操作
      1. CAS 成功,没有竞争,add() 结束
      2. CAS 失败,存在竞争,跳转第3步
  3. 进入 longAccumulate() 方法,先根据 getProbe() 算法计算出需要操作的 cells 内的元素下标
  4. 进入循环,判断 cells 是否已被初始化
    1. 若 cells 已被初始化,通过 cellsBusy==0 两次判断和 casCellsBusy() 操作进行 CAS 操作,给指定下标元素初始化一个 Cell,若成功,break 退出循环,若失败,则循环继续,下一次还会回到第一个 if(即第四步)
    2. 若未被初始化,跳转第10步。
  5. 判断 wasUncontended 是否是 false(只有在add() 方法层面调用 CAS 操作失败时,这个才是false)
    1. 若为 false,把 wasUncontended 重置为 true,继续循环
    2. 若为 true,无操作,继续循环
  6. 对 a(即根据算法计算出 Cells[] 的某一元素)进行 CAS 操作
    1. 如果成功,即无竞争,退出循环
    2. 如果失败,循环继续会继续回到第6步
  7. 判断 n(cells 的数组长度)是否大于核心CPU数
    1. n 大于核心CPU数,collide 置为 false,继续循环
    2. n 不大于核心CPU数,判断 cells 是否不等于当前操作之后的 as
      1. 若不等于,collide 置为 false,继续循环
      2. 若等于,无操作,继续循环
  8. 判断 collide 是否是 false(代表有冲突,在判断需要扩容和初始化 cell 的时候会变成false)
    1. 若是,则重置为 true,继续循环
    2. 若否,无操作,继续循环
  9. 判断** `cellsBusy==0` 且进行** **casCellsBusy()** **操作加锁,进行 Cells[] 的扩容,若成功,collide 置为 false**
  10. 进行 **cellsBusy==0** **操作,且判断 `cells==as` ,尝试加锁**
    1. 若成功,对 Cell[] 进行初始化,且将需要插入的值 x 放入新创建的 Cell 中,并按照算法计算的下标存入 Cells 中,若操作成功,退出循环,否则继续循环操作这一步
  11. 对 base 进行 CAS 操作,若成功,退出循环。

回答几个问题:

  1. 答:当出现竞争,且Cell[]还没有被初始化的时候,会初始化Cell[]
  2. 答:在add方法里第一个if语句,可以看出,若 cell[] 未被初始化,即没有出现过竞争的时候,只会对 Base 进行 CAS 操作
  3. 答:初始化的规则从 longAccumulate() 方法里最外层的第一个 else if 语句可以看出,初始化是创建长度为 2 的数组,但是只会初始化其中一个元素,另外一个元素 为 null
  4. 答:当Cell[]的长度小于CPU核心数,并且已经两次Cell CAS失败了。
  5. 答:初始化 Cell[] 和 扩充 Cell[] 都是采用了 CAS 加锁来保证线程安全的。