AQS
简介
AQS全称为AbstractQueuedSynchronizer
,这个类在concurrent包下,AQS是一个用来构建锁和同步器的框架,像ReentrantLock,Semaphore,FutureTask
都是基于AQS,我们也可以使用AQS构造出符合我们需求的同步器。
原理
AQS的思想是如果被请求的共享资源空闲,则将请求资源的线程设置诶有效工作线程,把当前资源设置为Exclusive状态。
但如果被请求资源被占用,则需要将该线程放入阻塞队列并阻塞该线程,当锁释放时同时唤醒阻塞队列并进行锁分配。
这个队列是由CLH虚拟双向队列,之所以是虚拟表示不存在队列的实例,而仅仅是同过节点之间互相关联来实现队列。队列中的节点是由线程封装而来。
AQS模板方法
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方法为:
- 使用者继承AbstractQueuedSynchronizer并重写指定方法。
- 将AQS组合在自定义同步组件实现,并调用方法
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
AQS 常用组件:
Semaphore(信号量)
Semaphore用于线程获取某个资源的线程数量,也就是互斥量。执行acquire方法若semaphore里没有permit则阻塞等待,直到调用release方法增加一个permit这是会唤醒释放一个正在阻塞的线程。
Semaphore有两种模式,公平模式和非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
CountDownLatch (倒计时器)
CountDownLatch 可以让一个线程等待,直至所有线程都执行了countdown方法,再全部往下执行。
CountDownLatch原理
CountDownLatch是共享锁的一种实现,它的构造AQS的state值为count个也就是我们期望计数的个数。当线程使用Countdown方法其实调用了内部的tryReleaseShared
方法以CAS的操作使state减一。直到计数器为0,然后被阻塞的进程会继续执行。
CountDownLatch的两种典型用法
- 在启动一个服务时,主线程需要等待多个组件加载完毕,之后再运行。
- 多个线程共享一个countdownLatch对象,并且都调用await()方法,这样把多个线程放在起点,然后主线程再使用countdown让他们在同一时刻开始,以此来执行任务最大并发数。
CountDownLatch常见面试题
- CountDownLatch简介以及原理
- CountDownLatch和CyclicBarrier不同之处
- CountDownLatch使用的场景
- CountDownLatch类中的方法
CyclicBarrier(循环栅栏)
CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。
CyclicBarrier构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中,parties就代表了有拦截的线程的数量,当拦截的线程数量达到这个值就打开栅栏,让所有线程通过。
CyclicBarrier使用实例
public class SemaphoreExample1 {
// 请求的数量
private static final int threadCount = 550;
public static CyclicBarrier barrierAction = new CyclicBarrier(5, () -> {
System.out.println("barrierAction");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(100);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
Thread.sleep(100);
threadPool.execute(() -> {
try {
test(threadnum);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println(threadnum+"ready");
barrierAction.await();
System.out.println(threadnum+"finish");
}
}
运行结果:
0ready
1ready
2ready
3ready
4ready
barrierAction
4finish
3finish
2finish
1finish
0finish
5ready
6ready
7ready
8ready
9ready
barrierAction
5finish
6finish
7finish
8finish
9finish
....
CyclicBarrier等待的原理
CyclicBarrier
内部通过一个count变量作为计数器,count的初始值为parties属性的初始化值,每当一个线程到了栅栏就将计数器减一,如果count值为0表示这是这一轮最后一个线程到达栅栏,就尝试执行构造方法中输入的任务。
CyclicBarrier和CountdownLatch的区别
- CyclicBarrier能够循环使用,并且提供reset功能,而CountDownLatch只能使用一次。
- 对于CountDownLatch来说,重点是一个或多个等待,而其他N个线程负责计数,等待和计数是可以分开的,完成计数的线程可以根据需要继续运行也可以终止运行。而对于CyclicBarrier重点是多个线程一边计数一边等待,在满足计数条件后,多个线程继续执行。
ReentrantReadWriteLock
ReentrantReadWriteLock可以保证多个线程可以同时读,两个读操作之间不互斥,在读操作远大于写操作时,读写锁能发挥明显的作用。
AQS FairSync:公平可重入锁逻辑
- 进入直接调这条语句
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
先tryAcquire(),如果直接成功了,也就不需要进队列了```java protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//虽然是try一下但还是不能插队的,因为是公平锁吗
//要先用hasQueuedPredecessors看一下有没有先来的
if (!hasQueuedPredecessors() &&
//没有先来的就用CAS改变一下状态
compareAndSetState(0, acquires)) {
//成功就申明一下互斥
setExclusiveOwnerThread(current);
return true;
}
}
//可重入锁情况,需要操作:state=state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```
若try一下失败了,则进入
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,先看addWaiter(Node.EXCLUSIVE)```java private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//先获取一下尾部的node,看看存不存在,注意tail和head均属于成员变量
Node pred = tail;
//尾部存在的情况就直接插入返回就行了
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//走到这里说明两种情况,1.队列为空,尾部不存在2.之前的CAS插入失败了(有竞争)
//此时调用enq初始化队列,或自旋入队。
enq(node);
return node;
} ```
调用enq函数自旋入队,或者初始化队列```java private Node enq(final Node node) {
for (;;) { Node t = tail; //针对之前队列为空的情况,初始化头部 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; //这里是自旋尝试插入尾部 //之前的情况可能为1.插入尾部有竞争2.刚初始化完 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
} ```
返回节点后,进入acquireQueued```java final boolean acquireQueued(final Node node, long arg) {
boolean failed = true; try { boolean interrupted = false; for (;;) { //predecessor方法获取前驱节点 final Node p = node.predecessor(); //前驱为head的话,说明是队列的第一个,再试一下tryAcquire if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //走到这里说明要么不是队列的第一个,要么tryAcquire的时候没有抢赢 //失败了就要被挂起了 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }
} ```
节点被挂起```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //Node.SIGNAL为-1 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. * 前端点为-1,为正常,可以挂起 */ return true; //前端点取消了排队,说明可以插在前端点前面了 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 仔细想想,如果进入到这个分支意味着什么 // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0 // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0 // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1) // 改好之后走一次for循环,再进来此方法,会进入第一个分支 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;
} ```
开始挂起进程了```java private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); return Thread.interrupted();
} ```
公平重入锁解锁
解锁操作
最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this);
挂起停止,等待被唤醒。
// 唤醒的代码还是比较简单的,你如果上面加锁的都看懂了,下面都不需要看就知道怎么回事了
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 往后看吧
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁
boolean free = false;
// 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果head节点当前waitStatus<0, 将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
// 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了