AQS基本介绍
AQS(AbstractQueuedSynchronizer)是抽象队列同步器。它定义了多线程访问共享资源的同步框架,依赖队列同步器状态和线程队列,实现资源的同步访问。诸如ReentrantLock、CountDownLatch、Semaphore等的实现都依赖了AQS。AQS支持设置成独占模式、共享模式或者两种模式兼而有之。
如果要定义一个队列同步器,需要重新定义下面的方法:
//互斥模式tryAcquire()tryRelease()//共享模式tryAcquireShared()tryReleaseShared()isHeldExclusively()
可以通过下面的方法查看、修改同步状态。
- getState();
- setState();
- compareAndSetState
首先看一下AQS定义的变量:
//等待队列的头private transient volatile Node head;//等待队列的尾private transient volatile Node tail;//同步状态private volatile int state;//当前占有锁的线程private transient Thread exclusiveOwnerThread;
ReentrantLock
公平锁
基本流程
ReentrantLock的公平锁为例,state初始状态为0,当调用ReentrantLock的lock方法时。首先会调用AQS的acquire()方法。它会以独占的模式获取锁,如果成功,返回true,否则加入等待队列。
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {//1.调用AQS的acquire方法acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {//1 获取当前线程和同步状态final Thread current = Thread.currentThread();int c = getState();//2 如果状态为0if (c == 0) {//如果没有没有前继节点且设置状态成功,将当前线程设置成拥有排它锁的线程。此处判断是否有前继节点,是因为公平锁要按照FIFO的规则去获取锁。if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前当前线程拥有排它锁,可重入,状态位+1else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
//2.如果获取资源成功,返回true;否则的话,加入到队列当中public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
队列
加锁如果不存在竞争,那么当前线程可以获取到锁,此时不需要排队。当有多个线程竞争锁时,此时需要排队。排队分为两步操作:
调用addWaiter(Node.EXCLUSIVE), arg)方法,创建队列节点并加入到队列当中,等待队列的数据结构如下:
调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
当前线程加入队列后,此时获取到锁的线程有可能恰好释放锁。此处会再次尝试获取锁,没有获取到锁的话,会将上一个节点的waitStatus从0改成-1,表示上一个节点处于睡眠状态。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();//判断上一个节点是否是head节点,如果是的话,尝试获取锁。获取到锁的话,if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//将上一个节点的waitStatus设置成阻塞状态,然后将自己parkif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);//当前线程阻塞在这里,等待被唤醒。return Thread.interrupted();}
锁释放
当执行完锁释放后,如果队列中有等待的线程,那么唤醒该线程。唤醒后会继续继续自悬,尝试获取锁。如果获取到锁之后,将当前节点设置成头节点。也就是下面的setHead方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
