AQS是什么
AbstractQueuedSynchronizer 抽象队列同步器,是JAVA JUC包的核心,并发编程的核心,是ReentrantLock的底层实现。
AQS队列
AQS队列中,队列中的第一个结点是占有锁的线程对应的结点。
六个操作(思想)
- 抢锁
- 释放锁
- 入队
- 出队
- 阻塞
- 唤醒
生活实例类比
银行办业务:
- 看办理的人多不多,不多就直接去柜台,多就排队;
- 等待区
- 如果超级VIP来办业务,不需要排队,哪个柜台空了,就直接去柜台去办; => 对应非公平锁
- 过号了,重新排队。
对应:
if (failed)
cancelAcquire(node);
公平锁和非公平锁
只有一个临界点的不同,就是如果占有锁的线程释放锁的一瞬间,刚好来了个线程,那么该线程无需等待,直接获得该锁。
// 公平锁,需要先去判断等待区有没有其他线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// 非公平锁,不管等待区有没有线程,就直接上锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
抢锁
看锁标志位,对应的就是AQS中的state属性,0就是没有锁。
private volatile int state;
线程刚进来,此时没有线程上锁,state状态为0,那么就上锁,将state改成1。在这个过程中,所有线程都会进来抢锁,为了保证只有一个线程可以抢到,采用CAS机制。
假设此时有线程占有了锁,这时候有线程来抢锁:
- 如果抢锁的线程和占有锁的线程是同一个,state++,在释放的时候要释放相同的次数。
- 如果不是同一个锁,抢锁失败。
- 这里有一个优化的点,就是可以先去判断等待区有没有线程在等待,如果有,说明此时锁是被占的。
public final void acquire(int arg) {
// 如果抢到锁了,!tryAcquire就为false,后面的就不执行了,短路操作
// 如果没抢到锁,就进行后面的操作,比如入队。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
// 获取状态,如果等于0,则此时锁没有被占用
if (c == 0) {
// hasQueuedPredecessors判断等待池中是否有线程在等待
// 如果没有线程在等待,才说明可以加锁,虽然前面判断了 c == 0
// 但是有可能其他线程已经进入并且拿到锁了。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 否则的话,判断当前线程是否为之前已经拿到锁的线程
// 如果是的话,state++;
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 否则,return false, 表示抢不到锁,抢不到锁就要进行入队操作了。
return false;
}
入队
- 首先判断队列中有没有结点
- 如果没有结点:AQS队列会生成两个结点
```java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
} } // 为空,则走到这里 enq(node); return node; }pred.next = node; return node;
- 如果没有结点:AQS队列会生成两个结点
```java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
以上代码没有理解很透彻,但是做的事情如下:
1. 刚开始时进if (t == null) :
![image.png](https://cdn.nlark.com/yuque/0/2021/png/12689050/1631252167386-53534583-3216-4b55-ab60-87b652f5fca3.png#clientId=uac838385-1eb4-4&from=paste&height=649&id=u92cd5432&margin=%5Bobject%20Object%5D&name=image.png&originHeight=248&originWidth=389&originalType=url&ratio=1&size=12377&status=done&style=none&taskId=u8698a04f-22f8-49ec-acfe-cdb2e935eaa&width=1018)
2. 然后进else:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/12689050/1631252187840-dc9a1d89-a7d5-4d3d-996d-162ed05a749b.png#clientId=uac838385-1eb4-4&from=paste&height=356&id=u2ac571c9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=291&originWidth=640&originalType=url&ratio=1&size=20391&status=done&style=none&taskId=ud339ed35-e0d5-4997-9e67-11eceb6452e&width=784)
[图片来源](https://blog.csdn.net/dataiyangu/article/details/104881208)
因此为空时生成了两个结点。
2. 如果有结点:直接在尾部插入
```java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 不为空,插入到尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 为空,则走到这里
enq(node);
return node;
}
- 入队完以后,要抢锁或者上闹钟,阻塞。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 拿到当前结点的上一个结点, final Node p = node.predecessor(); // 如果为head,说明当前结点为第二个结点。 if (p == head && tryAcquire(arg)) { // 抢锁成功,则将该结点设置为新的头结点 setHead(node); // 有利于GC p.next = null; // help GC failed = false; return interrupted; } // 抢锁失败,阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
阻塞
抢锁失败的话,上闹钟并阻塞:第一次循环将它前面的那个结点的waitStatus设置为-1。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 第二次进到这里,return true
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 第一次走到这里,将前一个的结点改成-1,并return false;
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 经过两次循环以后,往后走,执行parkAndCheckInterrupt。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
释放锁
因为是可重入锁, state的值可以 > 1, 例如state == 2,表示重入了两次。
因此我们要减去相同的次数,才能释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 等于0的时候,free才为true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
出队
出队这个事情谁来做?
由被唤醒的线程来做。在acquireQueued里的setHead, 然后删除掉引用。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 设置新的头结点, 并且删除掉第二个结点指向第一个结点的引用。
setHead(node);
// 删除掉第一个结点向后的引用
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
// 将head指向第二个结点,使它成为新的第一个结点
head = node;
// 将thread属性设置为空
node.thread = null;
// 删除掉向前的引用
node.prev = null;
}
唤醒
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}