抽象的队列同步器
是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的 FIFO 队列(先进先出)的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个Node节点来实现锁的分配,有一个int
类变量表示 锁的状态(private volatile int state),通过 CAS 完成对status
值的修改(0表示没有,1表示阻塞)。
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSuport.park()
的方式,维护state变量的状态,使并发达到同步的效果。
Node节点
CountDownLatch 和 CyclicBarrier 的区别?
- CountDownLatch简单的说就是一个线程等待,直到他所等待的其他线程都执
行完成并且调用countDown()方法发出通知后,当前线程才可以继续执行。
- CyclicBarrier是所有线程都进行等待,直到所有线程都准备好进入await()方
法之后,所有线程同时开始执行!
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使
用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果
计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获
得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
如果被中断返回true,否则返回false。
AQS为什么要使用双向队列?
- 没有竞争到锁的线程,加入到队列,并且阻塞等待的一个前提是:当前线程所在的节点的前置节点是一个正常状态。这么设计的目的是为了避免在链表里存在异常状态的节点,导致无法唤醒后续线程的一个问题。所以线程在阻塞之前要去判断前置节点的状态,如果没有指针指向前置节点,那么就需要从头部节点往下遍历,他的性能非常低。
- 在 Lock 接口里面有一个可以 中断的 锁的竞争方法
lockInterruptibly()
,这个方法是表示处于锁阻塞的线程是允许被中断的,也就是说没有竞争到锁的线程加入到同步队列等待以后,是允许被外部线程通过interrupt()
方法去触发唤醒并且中断的。这个时候,被中断线程的状态会写改成 CANCELLED 的状态,被标记为 CANCELLED 状态的一个线程是不需要去竞争锁的,但是它仍然会存在于整个双向链表里面,意味着后续的锁的竞争中,需要把这个节点从链表里面去移除,否则会导致锁阻塞的线程无法被正常唤醒。这种情况下,如果是单向链表,就需要从 Head 节点开始往下去逐个遍历,找到并移除异常状态的节点,同样效率也是比较低的,而且还会导致锁的唤醒的操作和遍历的操作之间的一个竞争。 - 为了避免线程阻塞和唤醒的开销,所以加入到链表的线程,他首先会通过自旋的方式去尝试竞争锁,但是实际上按照公平锁的一个设计:只有头节点的下一个节点,才有必要去竞争锁,后续的节点去竞争锁的意义不大,否则会造成一个惊群效应,也就是说大量的线程在阻塞之前尝试竞争锁带来一个比较大的性能开销,所以为了避免这个问题,加入到链表里面的节点在尝试竞争锁的时候需要去判断前置节点是不是头节点,如果不是头节点,就没有必要去触发锁的竞争动作,所以这里会涉及了一个前置节点的查找,如果是单项链表是无法实现这样一个功能的。
lockInterruptibly 方法和 lock 方法类似,当有可用锁时会直接得到锁并立即返回,如果没有可用锁会一直等待直到获取锁,但和 lock 方法不同,lockInterruptibly 方法在等待获取时,如果遇到线程中断会放弃获取锁。
线程要加入阻塞队列,当前线程的节点的前置节点是正常,这样设计是有异常情况导致后面节点无法正常唤醒,所以线程在阻塞之前,需要判断节点状态,如果没有指针指向前置节点,从头部开始判断,效率是非常慢的。
AQS的双向链表,如果只是简单的加入节点与移除节点,其实使用单链表加上head、tail已经足够,与出队,入队没有关系。
在AQS中,还有其它一些功能。比如,线程A此时占有锁,线程B也来lock,因为A占有,B会判断是否需要阻塞,如下代码为判断是否需要阻塞,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 该节点已经设置了要求释放信号的状态,因此它可以安全阻塞。
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry. 前任已被取消。跳过前置任务并指示重试。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
WaitStatus必须为0或传播。表示我们需要信号,但先别停车。呼叫者需要重试,以确保在停车之前无法接听。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
需要阻塞的条件是前驱节点的waitStatus=Node.SIGNAL,在代码块
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
中,如果pred.waitStatus > 0就要不断向前查找,此时就是双向链表的特性,可以循环向前查找。
以上是双向链表在AQS中的一个体现。