什么是AQS?
AQS全称AbstractQueuedSynchronizer(队列同步器),用于多线程之间的同步操作,JUC中的ReentrantLock、CountDownLatch、Semaphore都是基于该工具实现的。
基本原理?
AQS持有一个stat属性,用于表述同步状态
- stat>0,锁被获取
- stat=0,锁空闲
针对stat的操作,AQS提供了三个方法
- getStat
- setStat
- compareAndSetStat
AQS内部持有一个队列(CLH同步队列),用于处理线程排队。类似于monitorObject里面的_entry_set。
队列元素Node封装了线程信息和等待信息。Node结构如下:
当锁释放stat=0的时候,会唤醒队列中的线程进行再次争抢。
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
Node nextWaiter;
}
提供的方法概览
- getState():返回同步状态的当前值;
- setState(int newState):设置当前同步状态;
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
- tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
- tryRelease(int arg):独占式释放同步状态;
- tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
- tryReleaseShared(int arg):共享式释放同步状态;
- isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
- acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
- acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
- tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
- acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
- acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
- release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
- releaseShared(int arg):共享式释放同步状态;
公平锁和非公平锁的区别
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 第二次机会,直接CAS操作state,成功就先于很多前辈线程了。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 第一次机会,直接CAS操作state,成功就先于很多前辈线程了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 需要先判断队列里是否有前辈节点,有的话就不能直接CAS
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 公平和非公平的逻辑是在Sync层实现的,不是AQS层。
公平和非公平体现在线程首次进入临界区的锁竞争上,也就是刚来有机会直接参与竞争(2次机会),如果一旦没抢到,那么后续的加入阻塞队列后的唤醒机制,就和其他老节点一样了,需要等待逐个唤醒。
为什么是从尾部开始唤醒?
```java // 先来看看从后向前遍历的逻辑 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 如果head节点当前waitStatus<0, 将其修改为0 if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
} if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
// 原因在于构造队列的时候存在线程安全问题 // addWaiter和enq在入列过程中都下述问题,这里以enq为例。 // 顺便说下addWaiter和enq的关系,就是addWaiter进行compareAndSetTail失败,就会进入enq // enq的逻辑就是无限自旋去compareAndSetTail。 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 把当前节点的前驱指向尾节点 node.prev = t; // 尝试将当前节点设定为队列尾节点 if (compareAndSetTail(t, node)) { // 设定成功后,将原尾节点的后驱指向当前节点(也就是新尾节点) // 但是如果此时发生线程切换,开始执行唤醒节点的逻辑,如果是从前遍历, // 到这里的原尾节点的next就是null,认为没有后续节点了 // 再考虑后续时间片分配回来,这个next又连上了,就会很奇怪。 // 从后向前则没有这个问题,根因就是前驱节点的赋值是在CAS操作之前的。 // 再深入考虑其实就是因为阻塞队列是双向的,需要两步操作导致的。 t.next = node; return t; } // 没有尝试成功自旋继续 } } }
<a name="Vzkp2"></a>
### 为什么是双向队列?
首先队列的好处是增删方便,双向队列的好处是从任意一个位置的增删方便。<br />在单向前提下,如果要删除一个node,必须从头开始遍历到其前驱节点,再将其前驱节点的next指向其next节点。<br />而双向的话,如果要删除一个node,只需要将prev.next=node.next,next.prev=node.prev,很显然少了遍历的过程。<br />回到AQS,首先AQS不会发生需要在中间增加节点的情况,因为增加都是在尾部增加,这点上单向、双向都能实现。但是是否会发生在中间节点删除节点的情况呢?会!这就是AQS用双向链表的核心原因。<br />什么场景下会,waitStatus>0的时候,也是CANCELLED状态,AQS支持阻塞中断,应用上也就是ReentrantLock的tryLock(long millisecond),允许定时中断,node在中断后,就需要从队列中删除(GC回收)。
```java
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*
* 指向前驱节点,用于当前节点检查其waitStatus。
* 入列时赋值,出列时置null(为了尽快被GC)
* 此外,当前驱节点取消后,我们会一直向前寻找一个未取消的节点。
* 而且一定会存在因为至少有头节点(已经获取到锁的节点不会被取消)兜底。
* 一个取消后的节点不可能拿到锁,一个节点只有自己取消自己,其他节点不能取消
*/
volatile Node prev;