Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,
比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
1.AQS原理
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。
这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
它维护了一个**volatile int state**(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列), 通过CAS完成对State值的修改
compareAndSetState()
1.1 AQS数据结构
AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点 
方法和属性值的含义 :
| 方法和属性值 | 含义 |
|---|---|
| waitStatus | 当前节点在队列中的状态 |
| thread | 表示处于该节点的线程 |
| prev | 前驱指针 |
| predecessor | 返回前驱节点,没有的话抛出npe |
| nextWaiter | 指向下一个处于CONDITION状态的节点 |
| next | 后继指针 |
线程两种锁的模式:
| 模式 | 含义 |
|---|---|
| SHARED | 共享,多个线程可同时执行,如Semaphore/CountDownLatch |
| EXCLUSIVE | 独占,只有一个线程能执行,如ReentrantLock |
waitStatus的枚举值 :
| 枚举 | 含义 |
|---|---|
| 0 | 当一个Node被初始化的时候的默认值 |
| CANCELLED | 为1,表示线程获取锁的请求已经取消了 |
| CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
| PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 |
| SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
1.2 同步状态State
AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况 
state的访问方式有三种:
getState()setState()compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
| 方法名 | 描述 |
|---|---|
| protected boolean isHeldExclusively() | 该线程是否正在独占资源。 只有用到Condition才需要去实现它。 |
| protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
| protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
| protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源, 如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
2.AQS源码
2.1 加锁
2.1.1 acquire
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响. 下面是acquire()的源码:
/*** 获取独占锁*/public final void acquire(int arg) {//尝试获取锁if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式selfInterrupt();}
函数流程如下:
tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待;addWaiter()如果获取锁失败, 将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt(,将中断补上。
2.1.1 tryAcquire(int)
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
ReentrantLock中公平锁tryAcquire的实现
/*** 公平锁*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** 重写aqs中的方法逻辑* 尝试加锁,被AQS的acquire()方法调用*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {/*** 与非公平锁中的区别,需要先判断队列当中是否有等待的节点* 如果没有则可以尝试CAS获取锁*/if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {//独占线程指向当前线程setExclusiveOwnerThread(current);return true;}} //判断同一个线程多次获取,体现可重入性else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
2.1.3 addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
private Node addWaiter(Node mode) {// 1. 将当前线程构建成Node类型。mode有两种:EXCLUSIVE(独占)和SHARED(共享)Node node = new Node(Thread.currentThread(), mode);//尝试快速方式直接放到队尾。Node pred = tail;if (pred != null) // 2. 1当前尾节点是否为null?node.prev = pred; // 2.2 将当前节点尾插入的方式if (compareAndSetTail(pred, node)) { // 2.3 CAS将节点插入同步队列的尾部pred.next = node;return node;}}//上一步失败则通过enq入队。enq(node);return node;}
2.1.4 enq(Node)
此方法用于将node加入队尾
private Node enq(final Node node) {//CAS"自旋",直到成功加入队尾for (;;) {Node t = tail;if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。if (compareAndSetHead(new Node()))tail = head;} else {//正常流程,放入队尾node.prev = t;if (compareAndSetTail(t, node)) { //当前节点置为尾部t.next = node; //前驱节点的next指针指向当前节点return t;}}}}
2.1.5 acquireQueued(Node, int)
进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回
acquireQueued源码
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;//标记是否成功拿到资源try {boolean interrupted = false;//标记等待过程中是否被中断过//又是一个“自旋”!for (;;) {final Node p = node.predecessor();//拿到前驱//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。if (p == head && tryAcquire(arg)) {setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!failed = false; // 成功获取资源return interrupted;//返回等待过程中是否被中断过}//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。//如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true}} finally {if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。cancelAcquire(node);}}
到这里了,我们先不急着总结acquireQueued()的函数流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具体干些什么。
2.1.6 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2.1.7 parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark();
2)被interrupt();
需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
2.1.8 小结
acquireQueued()分析完之后,我们接下来再回到acquire()!再贴上它的源码吧
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用自定义同步器的
tryAcquire()尝试直接去获取资源,如果成功则直接返回; - 没成功,则
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式; acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt(),将中断补上。
2.2 解锁
2.2.1 release(int)
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0)
它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。
下面是release()的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,**
2.2.2 tryRelease(int)
此方法尝试去释放指定量的资源。下面是tryRelease()的源码:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
2.2.3 unparkSuccessor(Node)
此方法用于唤醒等待队列中下一个线程。下面是源码:
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,
这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),
然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!
2.2.4 小结
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?
答案是YES!!!这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!!!但是我们再回头想想,获取锁的线程在什么情形下会release抛出异常呢??
- 线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
- 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
- release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了。
3.ASQ应用
3.1 ReentrantLock的可重入应用
ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //可重入,
int nextc = c + acquires; //同一个线程多次获得锁, 就会多次+1,这里就是可重入的概念
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
3.2 JUC中的应用场景
AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,
大体介绍一下AQS的应用场景:
| 同步工具 | 同步工具与AQS的关联 |
|---|---|
| ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
| Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 |
| CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。 |
| ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 |
| ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 |
3.3 自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具
public class LeeLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
通过我们自己定义的Lock完成一定的同步功能。
public class LeeMain {
static int count = 0;
static LeeLock leeLock = new LeeLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
3.4 Mutex(互斥锁)
Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。
下边是Mutex的核心源码:
class Mutex implements Lock, java.io.Serializable {
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取资源,立即返回。成功则返回true,否则false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}
// 尝试释放资源,立即返回。成功则为true,否则false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定为1个量
if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//释放资源,放弃占有状态
return true;
}
}
// 真正同步类的实现都依赖继承于AQS的自定义同步器!
private final Sync sync = new Sync();
//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}
//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
public boolean tryLock() {
return sync.tryAcquire(1);
}
//unlock<-->release。两者语文一样:释放资源。
public void unlock() {
sync.release(1);
}
//锁是否占有状态
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
