CAS 和 AQS
这两者都是Java并发编程的基础。
CAS
什么是CAS
CAS,compare and swap的缩写,即:比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。
如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
CAS “我认为位置 V 此刻的值是 A;如果确实如此,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”
CAS通过这种方法保证在 “我” 操作位置V的这个时间段内,位置V的值A没有被 “其他人” 修改,从而保证操作的原子性。
CAS存在的问题
CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。
- ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 - 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
- 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
concurrent包中 CAS 的实现
由于java的CAS同时具有 volatile 读和volatile写的内存语义,因此Java线程之间的通信现在有了下面四种方式:
- A线程写volatile变量,随后B线程读这个volatile变量。
- A线程写volatile变量,随后B线程用CAS更新这个volatile变量。
- A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。
- A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。
Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:
- 首先,声明共享变量为volatile;
- 然后,使用CAS的原子条件更新来实现线程之间的同步;
- 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。
Atomic 原子类(CAS + volatile)
并发包 java.util.concurrent
的原子类都存放在java.util.concurrent.atomic
AQS
AQS是并发编程中非常重要的概念,它是juc包下的许多并发工具类,如CountdownLatch,CyclicBarrier,Semaphore 和锁, 如ReentrantLock, ReaderWriterLock的实现基础,提供了一个基于int状态码和队列来实现的并发框架。
AQS基本概念
AQS(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示状态,通过内置的FIFO(first in,first out)队列来完成资源获取线程的排队工作。
同步状态
AQS中维持一个全局的int状态码(state),线程通过修改(加/减指定的数量)是否成功来决定当前线程是否成功获取到同步状态。
独占or共享模式
AQS支持两种获取同步状态的模式既独占式和共享式。
独占式模式同一时刻只允许一个线程获取同步状态,而共享模式则允许多个线程同时获取。
同步队列
同步队列(一个FIFO双向队列)是AQS的核心,用来完成同步状态的管理,当线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成一个节点并加入到同步队列,同时会阻塞当前线程。
独占模式获取与释放状态
独占模式既同一时间只能由一个线程持有同步状态。当多个线程竞争时(acquire),获取到同步状态的线程会将当前线程赋值给Thread exclusiveOwnerThread
属性(AQS父类中)来标记当前状态被线程独占。其他线程将被构造成Node加入到同步队列中。当线程l
获取同步状态
Copy/**
* 获取同步状态
*/
public final void acquire(int arg) {
/**
* 1. tryAcquire 尝试获取同步状态;
* 2.1 addWaiter 如果尝试获取到同步状态失败,则加入到同步队列中;
* 2.2 acquireQueued 在队列中尝试获取同步状态.
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 多线程并发获取(修改)同步状态, 修改同步状态成功的线程标记为拥有同步状态
Copy/**
* 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实
* 现框架,具体的实现逻辑由子类决定
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// a. 尝试修改状态值操作执行成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// b. 修改状态值成功,记录当前持有同步状态的线程信息
setExclusiveOwnerThread(current);
return true;
}
// 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理,不理解可以先忽略】
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
获取失败的线程,加入到同步队列的队尾;加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态(下文代码:p == head && tryAcquire(arg))。
```java Copy/**- 没有获取到同步状态的线程加入到队尾部
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试用最快的方式入队,如果入队失败,再走完整的入队方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 将当前线程设置到队尾
if (compareAndSetTail(pred, node)) {
} } // 正常的入队方法 enq(node); return node; }pred.next = node;
return node;
- 没有获取到同步状态的线程加入到队尾部
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试用最快的方式入队,如果入队失败,再走完整的入队方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 将当前线程设置到队尾
if (compareAndSetTail(pred, node)) {
/**
同步队列中节点,尝试获取同步状态 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {
boolean interrupted = false;
// 自旋(死循环)
for (;;) {
// 只有当前节点的前驱节点是头节点时才会尝试执行获取同步状态操作
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);// 注意: 此处重点, 当前节点设置为头节点,相当于头节点出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取失败后是否进入wait
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
} } ```
- 如果头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;其他节点则继续自旋。
Copy// 伪代码
final boolean acquireQueued(final Node node, int arg) {
for (;;) {
// -------获取同步状态失败-------
// 获取失败后是否进入wait
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
}
/**
* 当获取同步状态失败后是否进入park状态
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点为唤醒状态,返回true【后面代码暂时可以忽略】
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 独占模式获取同步状态总结
释放同步状态
当线程执行完相应逻辑后,需要释放同步状态,使后继节点有机会同步状态(让出资源,让排队的线程使用)。这时就需要调用release(int arg)方法。调用该方法后,会唤醒后继节点。
- 释放同步状态,唤醒后继节点
Copy/**
* 释放同步状态
*/
public final boolean release(int arg) {
// 1. 尝试释放同步状态
if (tryRelease(arg)) {
Node h = head;
// 释放成功后,执行unpark,既唤醒操作(暂时可忽略waitStatus,涉及到条件队列)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 尝试释放同步状态,既将同步状态减去指定的值
* 如果state = 0,表示当前线程 获取次数 = 释放次数,既释放成功,此时将持有同步状态线程标志为null
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 状态码=0,表示释放成功了
if (c == 0) {
free = true;
// 独占标志设置为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* 唤醒后继节点操作
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
- 后继节点获取同步状态成功,头节点出队。需要注意的事,出队操作是间接的,有节点获取到同步状态时,会将当前节点设置为head,而原本的head设置为null。
Copy/**
* 同步队列中节点,尝试获取同步状态(伪代码)
* 获取成功后,当前节点设置为头节点,头节点设置为null,既头节点出队
*/
final boolean acquireQueued(final Node node, int arg) {
try {
// 自旋(死循环)
for (;;) {
if (p == head && tryAcquire(arg)) {
// a. 操作:当前节点设置为头节点,当前节点的前驱节点设置为null
setHead(node);
// b. 原始的head的next设置为null,此时原始的head已经被移出队列
p.next = null; // help GC
failed = false;
return interrupted;
}
}
}
}
/**
* a.当前节点设置为头节点,当前节点的前驱节点设置为null
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
其他竞争情况
- 当同步队列中头节点唤醒后继节点时,此时可能有其他线程尝试获取同步状态。
- 假设获取成功,将会被设置为头节点。
- 头节点后续节点获取同步状态失败。
共享模式获取与释放状态
共享模式和独占模式最主要的区别是在支持同一时刻有多个线程同时获取同步状态。为了避免带来额外的负担,在上文中提到的同步队列中都是用独占模式进行讲述,其实同步队列中的节点应该是独占和共享节点并存的。
接下来将针对共享模式状态下获取与释放状态的过程,图文并茂得进行分析。
获取同步状态
- 首先至少要调用一次tryAcquireShared(arg)方法,如果返回值大于等于0表示获取成功。
- 当获取锁失败时,则创建一个共享类型的节点并进入一个同步队列,然后进入队列中进入自旋状态(阻塞,唤醒两种状态来回切换,直到获取到同步状态为止)
- 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,否则继续挂起等待。
当一个同享节点获取到同步状态,并唤醒后面等待的共享状态的结果如下图所示:
Copy/**
* 共享模式获取同步状态;
* 1. 首先至少要调用一次tryAcquireShared(arg)方法,如果返回值大于等于0表示获取成功,直接返回结果即可
* 2. 否则,将会加入到同步队列中,反复阻塞与唤醒,直到获取同步状态成功为止; 获取成功后会唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 2. 自旋模式获取同步状态
*/
private void doAcquireShared(int arg) {
// 2.1 第一次获取失败后,会将此线程加入到同步队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 如果前驱节点是头节点,尝试获取同步状态
final Node p = node.predecessor();
if (p == head) {
// r > 0表示获取同步状态成功,并且还有共享类型节点在同步队列中
// r == 0 表示获取同步状态成功,同步队列中没有其他共享模式节点
int r = tryAcquireShared(arg);
if (r >= 0) {
// !!!! 获取同步状态成功后,将当前node设置为头节点,并向后传播,唤醒共享模式等待的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 设置新的头结点,并设置后面需要唤醒的节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// propagate > 0 表明后面需要唤醒的共享模式节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
// 这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 唤醒所有共享模式节点
*/
private void doReleaseShared() {
for (;;) {
// 唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
// 其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果头结点没有发生变化,表示设置完成,退出循环
// 如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点:
释放同步状态
释放同步状态后,同步队列的变化过程和共享节点获取到同步状态后的变化过程一致,此处不再进行赘述。
Copy/**
* 释放同步状态,如果释放成功,唤醒后面等待的节点
*
*/
public final boolean releaseShared(int arg) {
// 1. 尝试释放同步状态
if (tryReleaseShared(arg)) {
// 2. 释放成功后,唤醒后续等待共享节点
doReleaseShared();
return true;
}
return false;
}
基于AQS的并发工具类
- CountDownLatch
让主线程等待一组事件发生后继续执行,子线程执行countDown方法,使主线程的cnt值-1,减到零时主线程继续执行。
创建子线程时可以传入CountDownLatch - CyclicBarrier
阻塞当前线程,等待其他线程、所有线程到达栅栏处后,才能继续执行
T1.await()栅栏计数器减一,若不为零t1阻塞
创建子线程时可以传入CyclicBarrier - Semaphore
控制某个资源可被多少个线程同时访问、semp.acquire()获取许可、semp.release()释放
创建子线程时可以传入Semaphore - Exchanger
线程之间用于数据交换,两个线程到达同步点之后,相互交换数据,先到的会阻塞(只能用于两个线程)