简介
CAS,即 Compare And Swap,比较再交换,是一种无锁算法,基于硬件原语实现,能够在不使用锁的情况下实现多线程之间的变量同步。jdk 中的 java.util.concurrent.atomic 包中的原子类就是通过 CAS 来实现了乐观锁。
算法过程
算法涉及到三个操作数:
- 需要读写的内存位置 V
- 需要进行比较的预期值 A
- 需要写入的新值 U
算法解析:
CAS的具体算法解释是:V 是需要进行操作的值,A 是当前期待的预期值,当且仅当 V 符合预期值 A 的时候,才用新值 U 替换掉旧值,并写入到内存地址 V 中,否则不做更新。
用 count++ 的例子来简单的解释就是:我现在需要修改的值是 3(V),那么我希望我操作的时候它还是 3(A),当我要对它进行自增操作的时候,我发现期望值 A 它变成 4 了,此时 V 不等于 A,说明有其他线程进来修改了这个值 V,那这时候我肯定是不能进行操作的。然后我再把需要修改的值变成 4,那此时期望值就是 4,当我操作的时候发现 V == A,即没有其他线程修改了这个值,这样就可以放心操作了,把自增后的值 U 5 赋值给 V,自增完成。
几个问题
- 如果当你判断的时候,发现是我期望的值,但是还没有进行新值设定的时候值发现了改变,即其他线程进来改变了这个值,这时候怎么办?
答:CAS 是 CPU 的原语支持,也就是说 CAS 操作是 CPU 指令级别上的支持,是保证了原子性的,中间不能被打断,也就不会出现上述的问题。
- ABA 问题:假如你有一个值,我拿到这个值是1,想把它变成2,我拿到1用 CAS 操作,期望值是1,准备变成2,这个对象Object,再这个过程中,没有一个线程改过的时候我肯定是可以进行修改的,但是如果有一个线程先把这个1变成了2后来又变回来1,中间值更改过,这时候怎么办?
答:如果是int类型,最终值是你期望的,中间值被修改过你可以不去管这个问题,最终结果仍然正确。但是如果是引用类型,它就可能有其他的一些问题。处理这个问题的方法之一就是你可以加一个版本号,做任何一个值的修改的时候,版本号自增,后面检查的时候连带版本号一起检查,就可以解决上述的 ABA 问题。
Java 中的 CAS
在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。
举个例子,如果你需要对一个 count++ 保证其线程安全,那么你可能会想到使用 synchronized 加锁。但是进入了 atomic 包后,你可与直接使用 AtomicInteger 类就可以了。
private AtomicInteger count = new AtomicInteger(0);void main(){for(int i = 0; i < 1000; i++){//相当于count++count.incrementAndGet();}}
AtomicInteger 类的 API 看着就会用,很简单,其原理就是使用了 CAS 操作:
public class AtomicInteger extends Number implements java.io.Serializable {private static final long serialVersionUID = 6214790243416807050L;// setup to use Unsafe.compareAndSwapInt for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;private volatile int value;public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}}
在 incrementAndGet() 方法中,使用了 unsafe.getAndAddInt(this, valueOffset, 1) 这也是一个 CAS 操作的方法,传入了当前的 object 对象,即当前值,valueOffset 期望值,以及需要增加的数值 1。
public final class Unsafe {public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}}
Unsafe 类里的这个方法就是我们的 CAS 操作,不断地循环判断旧值和期望值,直到旧值等于期望值的时候,就进行新值的操作。
关于 Unsafe 类,里面有很多方法,而且很多是本地方法,所以这里不再深入探究,只需要知道个原理就好了。所有的 Atomic 操作内部下面都是 CompareAndSet 这样的操作,都是在 Unsafe 里面完成的。
LongAdder
LongAdder 在Java8中的提出其实是对 AtomicLong 的CAS机制的一种优化方案,其采用了 Cell 机制。其核心思想是:将热点数据分离。它可以将 AtomicLong 内部核心数据 Value 分离称一个数组,每个线程访问时,通过 hash 等算法映射到其中一个数组下标进行计算,然后将数组所有结果求和累加作为最终结果。
这种机制将热点数据value分离成多个单元的 Cell,每个 Cell 独自维护内部的值,提高了并行度,对原本单点更新的压力分摊在各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。
缺点:
如果在统计的时候,如果有并发更新,可能会有统计数据有误差。
实际使用中在处理高并发计数的时候优先使用LongAdder,而不是AtomicLong在线程竞争不激烈的时候,使用AtomicLong会简单效率更高一些。比如序列号生成(准确性)
直接上源码!
几个问题:
- Cell[] 是何时被初始化的?
- 如果没有竞争,只会对base进行操作,这是从哪里看出来的?
- 初始化Cell[] 的规则是什么?
- Cell[] 扩容的时机是什么?
- 初始化Cell[]和扩容Cell[]是如何保证线程安全性的?
看源码前先了解几个属性:
/***操作数,所有操作都是基于base上进行累加的*/transient volatile long base;/*** 相当于mutex,用于控制并发加锁*/transient volatile int cellsBusy;/*** 核心分段Cell数组,用于分离数据*/transient volatile Cell[] cells;
public void add(long x) {Cell[] as; long b, v; int m; Cell a;//1.先判断cells 是否不为空 或者 casBase 是否不成功(即是否有竞争)if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;//2.判断cells是否已经被初始化if (as == null || (m = as.length - 1) < 0 ||//3.如果cells已经被初始化,通过getProbe() & m 算法得到一个数字,判断as[数字]是否null(a = as[getProbe() & m]) == null ||//4.如果不为空,对这个cell进行cas操作,判断是否不成功!(uncontended = a.cas(v = a.value, v + x)))//5.如果上面判断条件都不通过,即cas操作都失败了,进入该方法longAccumulate(x, null, uncontended);}}/***casBase 操作就是一个 cas 操作,如果操作成功即没有竞争,如果不成功则有竞争*/final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);}static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);}
在以上代码中,第一个if的解释是:如果cell[] 已经被初始化了,或者有竞争,才会进入这个if,如果没有竞争,也没有初始化,就不会进入第二行代码。因此从这里可以得到第二个问题(如果没有竞争,只会对base进行操作,这是从哪里看出来的?)的答案。
/***传入的参数是上一步add需要添加的值x,fn是null,*wasUncontended是true时,cell没被初始化或者cells[]内指定位置为空*wasUncontended是false时,cells[]指定位置不为空且cas操作失败,即有竞争*/final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;//若通过 getProbe() 算法仍是0,通过ThreadLocal操作强制初始化if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonempty//无限循环for (;;) {Cell[] as; Cell a; int n; long v;//1.判断cells是否被初始化if ((as = cells) != null && (n = as.length) > 0) {//1.1.n是cells数组的长度if ((a = as[(n - 1) & h]) == null) {//1.2.cellsBusy == 0,代表现在 “不忙”,可以进入这个ifif (cellsBusy == 0) { // Try to attach new Cell//1.3.创新一个新cellCell r = new Cell(x); // Optimistically create//1.4.再次判断cellsBusy,casCellsBusy()加锁,这样只有一个线程可以进入这个ifif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;//1.5.把创建出来的cell元素加入到cell[]里try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {//1.6.释放锁cellsBusy = 0;}if (created)//创建成功,退出循环break;continue; // Slot is now non-empty}}collide = false;}//2.判断 cas 是否失败,如果是则重新进行else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//3.对cell进行cas操作else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;//4.判断cell[]的长度是否大于CPU核心数,如果小于核心数,collide变为false,进入第五个ifelse if (n >= NCPU || cells != as)collide = false; // At max size or stale//5.重置collide为true,代表有冲突,然后跳到advanceProbe方法生成新的THREAD_PROBE,继续循环else if (!collide)collide = true;//6.判断是否不忙,且加锁else if (cellsBusy == 0 && casCellsBusy()) {//6.1.扩容处理try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}
上面的代码很复杂,我们以第一次调用add()方法为例子走一遍流程:
- 调用
add()方法,判断 cells 是否不为空,且casBase()操作是否成功- 若 cells 是空的,执行 casBase() 操作,即进行 CAS 赋值来操作 Base(Base即记录的旧值)
- 若 CAS 操作成功,表示没有线程竞争,add() 结束
- 若 CAS 操作失败,表示存在线程竞争,跳转第2步
- 若 cells 不是空的,即前面的操作存在过线程竞争,跳转第2步
- 若 cells 是空的,执行 casBase() 操作,即进行 CAS 赋值来操作 Base(Base即记录的旧值)
- 判断 as(即cells)是否为未被初始化
- as 未被初始化,第一个判断条件成立,跳转第3步
- as 被初始化了,但是根据 getProbe() & m 得到 as[] 中某一元素为空,跳转第3步
- as 被初始化了,且根据算法得到某一元素不为空,则执行对该元素(cell)进行 CAS 操作
- CAS 成功,没有竞争,add() 结束
- CAS 失败,存在竞争,跳转第3步
- 进入 longAccumulate() 方法,先根据 getProbe() 算法计算出需要操作的 cells 内的元素下标
- 进入循环,判断 cells 是否已被初始化
- 若 cells 已被初始化,通过
cellsBusy==0两次判断和casCellsBusy()操作进行 CAS 操作,给指定下标元素初始化一个 Cell,若成功,break 退出循环,若失败,则循环继续,下一次还会回到第一个 if(即第四步) - 若未被初始化,跳转第10步。
- 若 cells 已被初始化,通过
- 判断 wasUncontended 是否是 false(只有在add() 方法层面调用 CAS 操作失败时,这个才是false)
- 若为 false,把 wasUncontended 重置为 true,继续循环
- 若为 true,无操作,继续循环
- 对 a(即根据算法计算出 Cells[] 的某一元素)进行 CAS 操作
- 如果成功,即无竞争,退出循环
- 如果失败,循环继续,会继续回到第6步
- 判断 n(cells 的数组长度)是否大于核心CPU数
- n 大于核心CPU数,collide 置为 false,继续循环
- n 不大于核心CPU数,判断 cells 是否不等于当前操作之后的 as
- 若不等于,collide 置为 false,继续循环
- 若等于,无操作,继续循环
- 判断 collide 是否是 false(代表有冲突,在判断需要扩容和初始化 cell 的时候会变成false)
- 若是,则重置为 true,继续循环
- 若否,无操作,继续循环
- 判断** `cellsBusy==0` 且进行**
**casCellsBusy()****操作加锁,进行 Cells[] 的扩容,若成功,collide 置为 false** - 进行
**cellsBusy==0****操作,且判断 `cells==as` ,尝试加锁**- 若成功,对 Cell[] 进行初始化,且将需要插入的值 x 放入新创建的 Cell 中,并按照算法计算的下标存入 Cells 中,若操作成功,退出循环,否则继续循环操作这一步
- 对 base 进行 CAS 操作,若成功,退出循环。
回答几个问题:
- 答:当出现竞争,且Cell[]还没有被初始化的时候,会初始化Cell[]
- 答:在add方法里第一个if语句,可以看出,若 cell[] 未被初始化,即没有出现过竞争的时候,只会对 Base 进行 CAS 操作
- 答:初始化的规则从 longAccumulate() 方法里最外层的第一个 else if 语句可以看出,初始化是创建长度为 2 的数组,但是只会初始化其中一个元素,另外一个元素 为 null
- 答:当Cell[]的长度小于CPU核心数,并且已经两次Cell CAS失败了。
- 答:初始化 Cell[] 和 扩充 Cell[] 都是采用了 CAS 加锁来保证线程安全的。
