1.AQS—锁的底层支持
AbstractQueuedSynchronizer 抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中的锁的底层就是使用的AQS实现的,另外大多数的开发者可能永远都不会去使用AQS

由上图可以看得出 AQS 是一个先进先出的双向队列,其中内部通过节点 head 和 tail 记录队尾和队首的元素
Node节点:
- 队列的元素的类型为 Node ,其中Node的thread 变量用来存储进入到AQS中的线程
- Node节点内部的 SHARED 用来标识该线程是获取共享资源时被阻塞挂起后放入到AQS队列中的
- Node节点内部的 EXCLUSIVE 用来标识线程获取独占资源时被挂起后放入AQS队列中的
- waitStatus 记录当前线程的状态,可以为 CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在队列里面等待),PROPAGATE(释放共享资源后需要通知其他节点)
- pre 记录当前节点的前驱节点
- next 记录当前节点的后继节点
解读:
**
- 在AQS内部维持了一个单一的状态信息 State ,可以通过getState 和 setState 以及 compareAndSetState 函数修改其值,对于 ReentrantLock 的实现来说, state 可以用来表示当前线程获取锁的可重入次数,对于读写锁 ReentrantReadWriteLock 来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数,对于 semaphore 来说,state用来表示当前可用信号的个数,对于 CountDownlatch 来说,state用来表示计数器的当前的值。
- AQS内部有一个类 ConditionObject ,用来结合锁实现线程的同步,ConditionObject可以直接访问AQS对象内部的变量,比如 state 状态值和AQS队列,ConditionObject是条件变量,每个变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后阻塞的线程的,如上图所示,这个条件队列的头尾元素分别位 firstWaiter 和 lastWaiter
对于AQS来说,线程同步的关机按就是对 状态值 state 进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式,在独占方式下获取和释放资源使用的方法位:
void acquire(int arg);void acquireInterruptibly(int arg);boolean relase(int arg);
在共享方式下获取和释放资源的方法为:
void acquireShared(int arg);void acquireSharedInterruptibly(int arg);boolean relaseShared(int arg);
使用独占的方式获取的资源是和具体的线程绑定的,也就是说如果一个线程获取到资源,就会标记这个线程获取到了,其他线程在尝试操作 state 获取资源的适合就会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁 ReentrantLock 的实现,当一个线程获取到 lock 锁以后,在 AQS 内部会首先使用 CAS 操作把 state的 状态值从 0 变为 1,然后设置当前锁的持有者线程为当前线程,当线程再次获取锁时发现它就是锁的持有者,则会把状态值从 1 变成 2,也就是设置可重入的次数,而当另外一个线程获取锁时发现自己不是该锁的持有者就会被放入 AQS阻塞队列后挂起
- 对应共享方式的资源和具体线程是不相关的,当多个线程去请求资源时通过CAS方法竞争获取资源,当一个线程获取到资源后,另外一个线程再去获取时如果当前资源还能满足它的需求,则当前线程只需要使用CAS方法进行获取即可。比如 Semaphore 信号量,当一个线程通过 acquire() 方法获取信号量时,首先会去看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列中,如果满足则通过自旋CAS获取信号量
1.1独占方式下获取和释放资源的流程
1.当一个线程调用 acquire(int arg)方法获取独占资源的时候,会首先使用 tryAcquire() 方法尝试获取资源,具体是设置状态变量 state 的值,成功后则直接返回,失败则将当前线程封装为 Node.EXCLUSIVE 的Node 节点后插入到 AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己
// ReentrantLock lock.lock()public void lock() {sync.lock();}//继续向下,查看非公平锁final void lock() {//尝试获取锁,将state的状态值设置为1if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else//尝试失败则acquire(1);}public final void acquire(int arg) {//再次尝试获取锁,如果失败返回 false并将自己封装为 Node.EXCLUSIVE 的 Node 加入到 AQS队列尾部if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//继续 tryAcquire(1)protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}//nonfairTryAcquirefinal boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//如果当前线程第一次获取锁,且锁没有被其他线程获取,则设置state 的值 为 0if (c == 0) {if (compareAndSetState(0, acquires)) {//设置当前线程为锁的持有者//也就是将 AbstractOwnableSynchronizer 类中的 private transient Thread exclusiveOwnerThread; 进行赋值setExclusiveOwnerThread(current);return true;}}//如果发现是 c!= 0 锁被持有,且是当前线程持有的,则说明当前线程再次获取锁,设置可重入的次数(也就是当前线程多少次获取了该锁)else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");//设置可重入的次数setState(nextc);return true;}return false;}
2.当一个线程调用 relase(int arg)方法时会尝试使用 tryRelase 操作释放资源,这里是设置状态变量 state的值,然后调用 LockSupport.unpark(thread) 方法激活AQS队列里面阻塞的一个线程(thread),被激活的线程则使用 tryAcquire 尝试,看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被机会,然后继续向下运行,否则还是会被加入AQS队列并被挂起
public void unlock() {sync.release(1);}//继续向下,查看非公平锁的具体实现public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}//在 Sync 类中 可以看到protected final boolean tryRelease(int releases) {//这里就说明获取几次锁就需要释放几次锁//当一个线程可重入的获取锁的时候,状态值 会被 + 1,每次获取一次锁就会执行 int nextc = c + acquires; 这个操作//同样在释放锁的时候 int c = getState() - releases; 执行 - 1 操作//只有当 c == 0 的时候才说明当前线程释放锁成功,不然会返回 false,继续持有锁int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
需要注意的是,AQS类并没有提供可用的 tryAcquire 和 tryRelase 方法,正如 AQS 是锁阻塞赛和同步器的基础框架一样,tryAcquire 和 tryRelase 需要由具体的子来实现。子类在实现 tryAcquire 和 reyRelase 时要根据具体的场景使用 CAS 算法尝试修改 state的状态值,成功则返回 true,否则则返回 false。子类还需要定义,在调用acquire 和 relase方法时state状态值的增减代表什么含义
比如几次自 AQS 实现的独占锁 ReentrantLock,定义status 为0的时候表示锁空闲,为1表示锁以及被占用,在重写tryAcquire时,在内部需要使用 CAS 算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false
比如继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前 state的值从 1修改为0,并设置当前锁的持有这为 null ,然后返回true,如果CAS失败则返回false
1.2共享方式下获取和释放资源
获取:
1.当线程调用 acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量 state的值,成功则直接返回,失败将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己
//查看 ReentrantReadWriteLock 中读锁就是操作共享资源的public void lock() {sync.acquireShared(1);}public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}//读写锁中的 tryAcquireSharedprotected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();//exclusiveCount:state&0x0000FFFF 会将高16位全部置0,计算结果就是获取的写锁的数量//if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;//将 state 右移16位//获取state的高16位,然后CAS进行递增state,高16位味获取读锁的次数int r = sharedCount(c);/*** 在下面的代码中进行了三个判断:* 1、读锁是否应该排队。如果没有人排队,就进行if后面的判断。有人排队,就不会进行if后面的判断,而是最终调用fullTryAcquireShared()方法* 2、读锁数量是否超过最大值。(最大数量为2的16次方-1)* 3、尝试修改同步变量的值*/if (!readerShouldBlock() &&r < MAX_COUNT &&//c + SHARED_UNIT;将高16位得到的结果 + 1compareAndSetState(c, c + SHARED_UNIT)) {//说明第一次获取读锁if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {//读锁数量不为0且firstReader(第一次获取读的线程)为当前线程,就将firstReaderHoldCount累加firstReaderHoldCount++;} else {// 读锁数量不为0,且第一个获取到读锁的线程不是当前线程// 下面这一段逻辑就是保存当前线程获取读锁的次数,如何保存的呢?// 通过ThreadLocal来实现的,readHolds就是一个ThreadLocal的实例HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//最后去尝试获取锁return fullTryAcquireShared(current);}
解读:
在tryAcquireShared()方法中,会先通过exclusiveCount()方法来计算写锁的数量,如果写锁存在,再判断持有写锁的线程是不是当前线程,如果不是当前线程,就表示写锁被其他线程给占用,此时当前线程不能获取读锁。tryAcquireShared()方法返回-1,表示获取读锁失败。如果写锁不存在或者持有写锁的线程是当前线程,那么就表示当前线程有机会获取到读锁。接下里会判断当前线程获取读锁是否不需要排队,读锁数量是否会超过最大值,以及通过CAS修改读锁的状态是否成功(将state的值加 1<<16)。如果这三个条件成立,就进入if语句块中,这一块的代码比较繁琐,但是功能比较单一,就是统计读锁的数量以及当前线程对读锁的重入次数,底层原理就是ThreadLocal。因为在读写锁中提供了getReadLockCount()、getReadHoldCount()等方法,这几个方法的数据就来自这儿。如果上面的三个条件有一个不成立,就不会进入if语句块,那么就会调用fullTryAcquireShared()方法。该方法的作用就是让线程不停的获取锁,其源码如下。
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {int c = getState();// 锁的状态为写锁时,持有锁的线程不等于当期那线程,就说明当前线程获取锁失败,返回-1if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;} else if (readerShouldBlock()) {// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 尝试设置同步变量的值,只要设置成功了,就表示当前线程获取到了锁,然后就设置锁的获取次数等相关信息if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}
当获取到读锁成功以后,tryAcquireShared()方法会返回1,这样当回到AQS的acquireShared()方法时,就会直接结束了。如果获取锁失败,tryAcquireShared()方法会返回-1,那么在AQS中,就会接着执行doAcquireShared()方法。doAcquireShared()方法的作用就是将自己加入到同步队列中,等待获取锁,直到获取锁成功。该方法不响应中断。
释放:
**
2.当一个线程调用 releaseShared(int arg)时会尝试使用 tryReleaseShared 操作释放资源,这里是设置状态变量 state的值,然后使用LockSupport.unpark(thread)激活AQS队列中被阻塞一个线程,被激活的线程则使用tryReleaseShared 查看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下允许,否则还是会被放入到AQS队列中配挂起
public void unlock() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}
在tryReleaseShared()方法中,会先修改和读锁计数有关的数据,然后在for的死循环中,通过CAS操作将state的值减去1<<16。如果CAS操作成功,才会从for循环中退出。当读锁数量为0时,tryReleaseShared()返回true,表示锁被完全释放。当tryReleaseShared()方法返回后,接下来的步骤和共享锁的释放逻辑完全一样的。
同样需要注意的是,AQS类中并没有提供可用的 tryAcquireShared 和 tryReleaseShared 方法,正如 AQS 是锁阻塞赛和同步器的基础框架一样,tryAcquireShared 和 tryRelaseShared 需要由具体的子来实现。子类在实现 tryAcquireShared 和 tryRelaseShared 时要根据具体的场景使用 CAS 算法尝试修改 state的状态值,成功则返回 true,否则则返回 false。
比如继承自AQS实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写tryAcquireShared 时,首先去查看写锁是否被其他线程持有,如果时则直接返回 false,否则使用CAS递增 state的高16位(在ReentrantReadWriteLock 中,state的高16位为获取读锁的次数)
比如继承自AQS实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryReleaseShared 时,在内部需要使用 CAS 算法把当前的 state的值的高16位 - 1然后返回true,如果CAS 失败则直接返回 false
