AQS基本介绍
AQS(AbstractQueuedSynchronizer)是抽象队列同步器。它定义了多线程访问共享资源的同步框架,依赖队列同步器状态和线程队列,实现资源的同步访问。诸如ReentrantLock、CountDownLatch、Semaphore等的实现都依赖了AQS。AQS支持设置成独占模式、共享模式或者两种模式兼而有之。
如果要定义一个队列同步器,需要重新定义下面的方法:
//互斥模式
tryAcquire()
tryRelease()
//共享模式
tryAcquireShared()
tryReleaseShared()
isHeldExclusively()
可以通过下面的方法查看、修改同步状态。
- getState();
- setState();
- compareAndSetState
首先看一下AQS定义的变量:
//等待队列的头
private transient volatile Node head;
//等待队列的尾
private transient volatile Node tail;
//同步状态
private volatile int state;
//当前占有锁的线程
private transient Thread exclusiveOwnerThread;
ReentrantLock
公平锁
基本流程
ReentrantLock的公平锁为例,state初始状态为0,当调用ReentrantLock的lock方法时。首先会调用AQS的acquire()方法。它会以独占的模式获取锁,如果成功,返回true,否则加入等待队列。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//1.调用AQS的acquire方法
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//1 获取当前线程和同步状态
final Thread current = Thread.currentThread();
int c = getState();
//2 如果状态为0
if (c == 0) {
//如果没有没有前继节点且设置状态成功,将当前线程设置成拥有排它锁的线程。此处判断是否有前继节点,是因为公平锁要按照FIFO的规则去获取锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前当前线程拥有排它锁,可重入,状态位+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//2.如果获取资源成功,返回true;否则的话,加入到队列当中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
队列
加锁如果不存在竞争,那么当前线程可以获取到锁,此时不需要排队。当有多个线程竞争锁时,此时需要排队。排队分为两步操作:
调用addWaiter(Node.EXCLUSIVE), arg)方法,创建队列节点并加入到队列当中,等待队列的数据结构如下:
调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
当前线程加入队列后,此时获取到锁的线程有可能恰好释放锁。此处会再次尝试获取锁,没有获取到锁的话,会将上一个节点的waitStatus从0改成-1,表示上一个节点处于睡眠状态。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断上一个节点是否是head节点,如果是的话,尝试获取锁。获取到锁的话,
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//将上一个节点的waitStatus设置成阻塞状态,然后将自己park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//当前线程阻塞在这里,等待被唤醒。
return Thread.interrupted();
}
锁释放
当执行完锁释放后,如果队列中有等待的线程,那么唤醒该线程。唤醒后会继续继续自悬,尝试获取锁。如果获取到锁之后,将当前节点设置成头节点。也就是下面的setHead方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}