一、概述
AQS,即AbstractQueuedSynchronizer, 队列同步器,它是Java并发用来构建锁和其他同步组件的基础框架。来看下同步组件对AQS的使用(如图),可以看到经常使用的CountDownLatch、ReentrantReadWriteLock等都有对AQS的使用。<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526026143428-b49b7f66-b5f7-4334-a612-47b23a90f172.png#width=520)<br /> <br /> AQS是一个抽象类,主要是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。从图中可以看出,在java的同步组件中,AQS的子类(Sync等)一般是同步组件的静态内部类,即通过组合的方式使用。
二、AQS类介绍
1、AQS类声明的成员变量如下:
其中state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁;Node head、tail用来实现FIFO同步队列,这个后面介绍。
2、AQS主要提供了如下一些方法:
getState():返回同步状态的当前值;
setState(int newState):设置当前同步状态;
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
tryRelease(int arg):独占式释放同步状态;
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
tryReleaseShared(int arg):共享式释放同步状态;
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
releaseShared(int arg):共享式释放同步状态
AQS采用模板方法设计模式,提供给同步组件实现者,为其屏蔽了同步状态的管理,线程排队等底层操作,实现者只需要通过AQS提供的模板方法实现同步组件的语义即可。其中,自定义同步器实现时主要实现标注红色的方法。
以CountDownLatch类,我们看看它如何实现对AQS的继承:
3、实现原理简介
从AQS类的成员变量可以看出,它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列),当state=0表示同步状态可用(如果用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用);如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。不同资源共享方式只要实现各自的方法就行。
三、了解AQS的实现过程
我们通过查看AQS源码来了解它具体实现过程,依照acquire-release、acquireShared-releaseShared的次序来。接下来通过ReentrantReadWriteLock类的源码入手深入了解一下AQS的实现。<br />首先,看一下ReentrantReadWriteLock类的结构:<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526033723493-345d9ffc-9007-4244-8518-0b0de4878a78.png#width=392)<br />可以看到ReentrantReadWriteLock内部维护了两个锁,一个用于读操作,一个用于写操作,支持公平和非公平的获取锁的方式。写锁采用独占式,读锁采用共享式。接下来,先看看写锁的实现源码。<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526260733152-14b4c2aa-e481-4a1d-a9db-3621864b1711.png#width=747)
1、acquire(int)
这是AQS提供的独占模式下获取共享资源的方法。如果获取资源成功,则直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。下面是acquire()的源码:<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526261127868-64a0b05e-2ce4-4bcf-9731-5aa4ef634a8e.png#width=661)
//代码很短,不太好了理解,转换下写法
public final void acquire(int arg){
//尝试直接去获取资源,如果成功则直接返回
boolean hasAcquired = tryAcquire(arg);
if (!hasAcquired) {
//将该线程加入等待队列的尾部,并标记为独占模式
Node currentThreadNode = addWaiter(Node.EXCLUSIVE);
//使线程在等待队列中获取资源,一直获取到资源后才返回。
//如果在整个等待过程中被中断过,则返回true,否则返回false
boolean interrupted = acquireQueued(currentThreadNode, arg);
if (interrupted) {
//如果线程在等待过程中被中断过,它是不响应的。
//只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
selfInterrupt();
}
}
}
1.1 tryAcquire(int)
提供尝试获取资源,如果获取成功,直接返回true,否则返回false;在AQS源码如下:<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526262123929-a365bd40-f75b-4dfe-a63a-aed68847fb02.png#width=747)<br /> 为什么只有抛出一个异常,功能实现昵?上文提过AQS采用模板方式模式,每个同步器获取资源的方式不同,具体资源的获取需要自定义的同步器去实现,所以我们看看ReentrantReadWriteLock内部肯定会有对应的实现方式,具体源码如下:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//如果锁的state不为0,说明有写锁,或读锁,或两种锁持有
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//如果写锁为0,再加上c!=0,说明此时有读锁,自然返回false,表示只能排队去获取写锁
//如果写锁不为0,如果持有写锁的线程不为当前线程,自然返回false,排队去获取写锁。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//表示,当前线程持有写锁,现在是重入,所以只需要修改锁的额数量即可
setState(c + acquires);
return true;
}
//表示通过一次CAS去获取锁的时候失败,说明被别的线程抢去了,也返回false,排队去重试获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//成获取写锁后,将当前线程设置为占有写锁的线程。尝试获取锁方法结束。
setExclusiveOwnerThread(current);
return true;
}
1.2 addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。<br />
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队
enq(node);
return node;
}
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
// 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.3 acquireQueued(Node, int)
如果获取资源失败,并且将线程放到等待队列的队尾,之后进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。该函数的具体流程:
结点进入队尾后,检查状态,找到安全休息点;
调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
final boolean acquireQueued(final Node node, int arg) {
//判断获取资源是否成功标志
boolean failed = true;
try {
//判断当前线程是否被中断过
boolean interrupted = false;
//“自旋”
for (;;) {
//拿到前驱
final Node p = node.predecessor();
//如果前驱是head,即该结点就是第二结点,
//那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,
//当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点
//所以head所指的标杆结点,就是当前获取到资源的那个结点或null
setHead(node);
//此处再将head.next置为null,就是为了方便GC回收以前的head结点。
//也就意味着之前拿完资源的结点出队了!
p.next = null; // help GC
failed = false;
//返回等待过程中是否线程有被中断标识
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
* 获取资源失败后检查是否需要线程进行阻塞等待
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 前驱已经设置释放信号,通知自己,所以可以安全等待
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*如果前驱都已经失效,就要逃过这些结点,知道找到找到最近一个正常等待的状态,并排在它的后边
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驱正常,那就把前驱的状态设置成SIGNAL,
//告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//执行waiting状态并且检查中断标志
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//返回线程是否被中断
}
1.4 小结
接下来,我们总结一下acquire(int)的整个流程。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
实现的流程如下:
1、调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2、没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3、acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4、如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
至此,对acquire()的源码解读到此结束,我们可以发现ReentrantReadWriteLock、ReentrantLock等进行加锁都是调用acquire(1)进行处理。
2、acquireShared(int)
上一节我们了解了ReentrantReadWriteLock中WriteLock的lock方法,调用AQS的acquire(int)进行独占模式下线程获取共享资源,这一节我们看看ReentrantReadWriteLock的ReadLock的lock方法,了解AQS的acquireShared(int)如何进行共享模式下线程获取共享资源。<br />![](https://cdn.yuque.com/yuque/0/2018/png/109893/1526281443905-ac0588cf-22e0-4ddc-901f-b138f4001315.png#width=747)<br />首先,看看acquireShared(int)源码如下:
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
这里tryAcquireShared()依然需要自定义同步器去实现,但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
1、tryAcquireShared()尝试获取资源,成功则直接返回;
2、失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
以ReentrantReadWriteLock为例,看看ReentrantReadWriteLock如何实现tryAcquireShared(),具体源码如下:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}