- 一、概述
- 二、深入源码
- 2.1 核心方法
ReentrantLock#lock()AbstractQueuedSynchronizer#acquire()FairSync#tryAcquire()AbstractQueuedSynchronizer#addWaiterAbstractQueuedSynchronizer#acquireQueuedAbstractQueuedSynchronizer#shouldParkAfterFailedAcquireAbstractQueuedSynchronizer#parkAndCheckInterruptReentrantLock#unlockReentrantLock.Sync#tryReleaseAbstractQueuedSynchronizer#unparkSuccessorSemaphore#acquireAbstractQueuedSynchronizer#acquireSharedInterruptibly- Semaphore.FairSync#tryAcquireShared
AbstractQueuedSynchronizer#doAcquireSharedInterruptiblySemaphore#releaseAbstractQueuedSynchronizer#releaseSharedSemaphore.Sync#tryReleaseSharedAbstractQueuedSynchronizer#doReleaseShared
- 2.1 核心方法
- 总结
https://zhuanlan.zhihu.com/p/178991617
一、概述
AQS是AbstractQueuedSynchronizer类的简称,虽然我们不会直接使用这个类,但是这个类是Java很多并发工具的功能实现提供支撑。
从下图可看出,我们常用的ReentrantLock, Semaphore, CountDownLatch都依赖这个类实现。
二、深入源码
/*** 可以看到AbstractQueuedSynchronizer是一个抽象类* 实现了Serializable 接口* @since 1.5* @author Doug Lea*/public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {/*** The synchronization state.* state变量表示锁的状态* 0 表示未锁定* 大于0表示已锁定* 需要注意的是,这个值可以用来实现锁的【可重入性】,例如 state=3 就表示锁被同一个线程获取了3次,想要完全解锁,必须要对应的解锁3次* 同时这个变量还是用volatile关键字修饰的,保证可见性*/private volatile int state;/*** 等待队列的头节点,只能通过setHead方法修改* 如果head存在,能保证waitStatus状态不为CANCELLED*/private transient volatile Node head;/*** 等待队列的尾结点,只能通过enq方法来添加新的等待节点*/private transient volatile Node tail;}
AbstractQueuedSynchronizer从名字上就可看出本质是一个队列(Queue),其内部维护着FIFO的双向队列,也就是CLH(Craig, Landin, and Hagersten)队列。
这个队列中的每一个元素都是一个Node,所以接下来了解一下其内部类Node,内部类Node的定义如下
static final class Node {// 节点正在共享模式下等待的标记static final Node SHARED = new Node();// 节点正在以独占模式等待的标记static final Node EXCLUSIVE = null;// waitStatus变量的可选值,因为超时或者或者被中断,节点会被设置成取消状态。被取消的节点不会参与锁竞争,状态也不会再改变static final int CANCELLED = 1;// waitStatus变量的可选值,表示后继节点处于等待状态,如果当前节点释放了锁或者被取消,会通知后继节点去运行static final int SIGNAL = -1;// waitStatus变量的可选值,表示节点处于condition队列中,正在等待被唤醒static final int CONDITION = -2;// waitStatus变量的可选值,下一次acquireShared应该无条件传播static final int PROPAGATE = -3;// 节点的等待状态volatile int waitStatus;// 前驱节点volatile Node prev;// 后继节点volatile Node next;// 获取同步状态的线程volatile Thread thread;// 下一个condition队列等待节点Node nextWaiter;// 是否是共享模式final boolean isShared() {return nextWaiter == SHARED;}// 返回前驱节点或者抛出异常final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
2.1 核心方法
我们都知道CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock这些工具类中,有的只支持独占,如ReentrantLock#lock(),有的支持共享,多个线程同时执行,如Semaphore。并且,从前文Node类的定义也可以看到
// 节点正在共享模式下等待的标记static final Node SHARED = new Node();// 节点正在以独占模式等待的标记static final Node EXCLUSIVE = null;
AQS实现了两套加锁解锁的方式,那就是独占式和共享式。我们先看下独占式的实现,独占式的实现,就从ReentrantLock#lock()方法开始。
ReentrantLock#lock()
该方法体如下
public void lock() {sync.lock();}
其中sync是AbstractQueuedSynchronizer的实现,我们知道,ReentrantLock支持公平锁和非公平锁,其实现类分别是FairSync和NonfairSync,我们看看公平锁和非公平锁分别是怎么实现的。
// FairSync 公平锁的实现final void lock() {acquire(1);}// NonfairSync 非公平锁的实现final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
可以看到,非公平锁的实现仅仅是多了一个步骤:通过CAS的方式(compareAndSetState)尝试改变state的状态,修改成功后设置当前线程以独占的方式获取了锁,不会去判断等待队列中在当前线程之前是否还有等待获取锁的线程,所以是“不公平”的,修改失败执行的逻辑和公平锁一样。
这就是公平锁和非公平锁的本质区别
从这段代码中可以看到,独占锁加锁的核心逻辑就是acquire方法,接下来就看看这个方法
AbstractQueuedSynchronizer#acquire()
方法体如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
该方法主要调用tryAcquire方法尝试获取锁,成功返回true,失败就将线程封装成Node对象,放入队列。
FairSync#tryAcquire()
tryAcquire方法在AQS中并没有直接实现,而是采用模板方法的设计模式,交给子类去实现。我们来看公平锁的实现。
protected final boolean tryAcquire(int acquires) {// 当前线程final Thread current = Thread.currentThread();// 获取state状态,0表示未锁定,大于1表示重入int c = getState();if (c == 0) {// 表示没有线程获取锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {// 没有比当前线程等待更久的线程了,通过CAS的方式修改state// 成功之后,设置当前拥有独占访问权的线程setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {// 独占访问权的线程就是当前线程,重入// 此处就是【可重入性】的实现int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");// 直接修改statesetState(nextc);return true;}return false;}
可以看到该方法就是以独占的方式获取锁,获取成功后返回true。从这个方法可以看出state变量是实现可重入性的关键。
非公平锁的实现方式大同小异,感兴趣的同学可以自行阅读源码。
acquire方法除了调用tryAcquire,还调用了acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里分为两步,先看下addWaiter方法。
AbstractQueuedSynchronizer#addWaiter
/*** Creates and enqueues node for current thread and given mode.* 为当前线程和给定模式创建并排队节点,给的的模式分为:* 1、Node.EXCLUSIVE:独占模式* 2、Node.SHARED:共享模式** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared*/private Node addWaiter(Node mode) {// 创建Node节点Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// 尝试快速添加尾结点,失败就执行enq方法Node pred = tail;if (pred != null) {node.prev = pred;// CAS的方式设置尾结点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 快速添加失败,执行该方法enq(node);return node;}
如果前面添加失败,则enq()方法保证添加成功
/*** Inserts node into queue, initializing if necessary. See picture above.* 将节点插入队列,必要时进行初始化** @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {// 自旋Node t = tail;if (t == null) { // Must initialize// 尾结点为空,队列还没有进行初始化// 设置头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// CAS的方式设置尾结点,失败就进入下次循环// 也就是【自旋 + CAS】的方式保证设置成功if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
可以看到该方法就是用来往队列尾部插入一个新的节点,通过自旋 + CAS的方式保证线程安全和插入成功。
需要注意的是,该方法返回的Node节点不是新插入的节点,而是新插入节点的前驱节点。
AbstractQueuedSynchronizer#acquireQueued
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.**/final boolean acquireQueued(final Node node, int arg) {// 操作是否成功boolean failed = true;try {boolean interrupted = false;for (;;) {// 自旋// 获取当前节点的前驱节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 前驱节点是头节点,并且已经获取了锁(tryAcquire方法在前文中详细讲解过)// 就把当前节点设置成头节点(因为前驱节点已经获取了锁,所以前驱节点不用再留在队列)setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果前驱节点不是头节点或者没有获取锁// shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞// parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断// 没抢到锁的线程需要被阻塞,避免一直去争抢锁,浪费CPU资源interrupted = true;}} finally {if (failed)// 自旋异常退出,取消正在进行锁争抢cancelAcquire(node);}}
AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
用于判断当前线程获取锁失败后是否需要被阻塞
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前驱节点的等待状态int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** SIGNAL表示后继节点处于等待状态,如果当前节点释放了锁或者被取消,会通知后继节点去运行* 所以作为后继节点,node直接返回true,表示需要被阻塞*/return true;if (ws > 0) {/** 前驱节点被取消了,需要从队列中移除,并且循环找到下一个不是取消状态的节点*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** 通过CAS将前驱节点的status设置成SIGNAL*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {// 阻塞当前线程LockSupport.park(this);// 检测当前线程是否被中断(该方法会清除中断标识位)return Thread.interrupted();}
至此,独占锁的整个加锁过程就已经完成。再来回顾下整个流程
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
首先执行tryAcquire方法用于尝试获取锁,成功后就直接返回,失败后就通过addWaiter方法把当前线程封装成一个Node,加到队列的尾部,再通过acquireQueued方法尝试获取同步锁,成功获取锁的线程的Node节点会被移出队列。
如果以上条件都满足,会执行selfInterrupt方法中断当前线程。
看完了独占锁的加锁,再来看看独占锁的解锁。同样从ReentrantLock入手
ReentrantLock#unlock
public void unlock() {sync.release(1);}
我们已经知道了sync是AQS的实现,所以直接查看AQS中的release方法
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {// 尝试释放锁Node h = head;if (h != null && h.waitStatus != 0)// 头节点已经释放,唤醒后继节点unparkSuccessor(h);return true;}return false;}
ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {// 计算剩余的重入次数int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// 是否完全的释放了锁(针对可重入性)boolean free = false;if (c == 0) {// 表示完全释放了锁free = true;// 设置独占锁的持有者为nullsetExclusiveOwnerThread(null);}// 设置AQS的statesetState(c);return free;}
AbstractQueuedSynchronizer#unparkSuccessor
unparkSuccessor方法用于唤醒后继节点,其定义如下
/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {// 获取当前节点的状态int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {// 当前节点的后继节点为null,或者被取消了s = null;for (Node t = tail; t != null && t != node; t = t.prev)// 从尾结点查找状态不为取消的可用节点if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒后继节点LockSupport.unpark(s.thread);}
前文说过AQS实现了两套同步逻辑,也就是独占式和共享式。看完了独占式锁的实现,再来看一下共享式。这里以Semaphore为例。
Semaphore#acquire
该方法是作用是请求一个许可,如果暂时没有可用的许可,则被阻塞,等待将来的某个时间被唤醒。因为Semaphore可以允许多个线程同时执行,所以可以看成是共享锁的实现。该方法定义如下
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
sync是AQS的实现,可以看到acquire方法底层调用的是acquireSharedInterruptibly方法。
在JDK中,与锁相关的方法,Interruptibly表示可中断,也就是可中断锁。可中断锁的意思是线程在等待获取锁的过程中可以被中断,换言之,线程在等待锁的过程中可以响应中断。
接下来看看acquireSharedInterruptibly方法的实现
AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())// 检测线程的中断中断状态,如果已经被中断了,就响应中断// 该方法会清除线程的中断标识位throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
Semaphore.FairSync#tryAcquireShared
tryAcquireShared方法,相信大家已经能看出来,这里使用了模板方法模式,具体实现由子类去实现。Semaphore也实现了公平模式和非公平模式。公平的方式和非公平的方式实现逻辑大同小异。所以具体看下公平模式下的实现方式
protected int tryAcquireShared(int acquires) {for (;;) {// 自旋if (hasQueuedPredecessors())// 如果有线程排在自己的前面(公平锁排队),直接返回return -1;// 获取同步状态的值int available = getState();// 可用的(许可)减去申请的,等于剩余的int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))// 如果剩余的小于0,或者设置状态成功,就返回,如果设置失败,则进入下一次循环// 如果剩余小于0,返回负数,表示失败// 如果设置状态成功,表示申请许可成功,返回正数return remaining;}}
AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
方法定义如下
/*** Acquires in shared interruptible mode.* 在共享可中断模式下请求(许可)*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 为当前线程和给定模式创建节点并插入队列尾部,addWaiter方法前文讲解过final Node node = addWaiter(Node.SHARED);// 操作是否失败boolean failed = true;try {for (;;) {// 自旋// 获取当前节点的前驱节点final Node p = node.predecessor();if (p == head) {// 如果前驱节点是头节点,以共享的方式请求获取锁,tryAcquireShared方法前文讲解过int r = tryAcquireShared(arg);if (r >= 0) {// 成功获取锁,设置头节点和共享模式传播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果前驱节点不是头节点或者没有获取锁// shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞,该方法前文讲解过// parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断,该方法前文讲解过// 没抢到锁的线程需要被阻塞,避免一直去争抢锁,浪费CPU资源throw new InterruptedException();}} finally {if (failed)// 自旋异常退出,取消正在进行锁争抢cancelAcquire(node);}}
Semaphore#release
release用于释放许可,其方法定义如下
public void release() {sync.releaseShared(1);}
AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
Semaphore.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int releases) {for (;;) {// 自旋// 获取同步状态的值int current = getState();// 可用的(许可)加上释放的,等于剩余的int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))// CAS的方式设置同步状态return true;}}
AbstractQueuedSynchronizer#doReleaseShared
/*** Release action for shared mode -- signals successor and ensures* propagation. (Note: For exclusive mode, release just amounts* to calling unparkSuccessor of head if it needs signal.)*/private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {// 自旋// 记录头节点Node h = head;if (h != null && h != tail) {// 头节点不为null,且不等于尾结点,说明队列中还有节点// 获取头节点等待状态int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 头节点等待状态是SIGNALif (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 如果修改节点等待状态失败,进入下一次循环continue; // loop to recheck cases// 修改成功后,唤醒后继节点,unparkSuccessor前文讲过unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
总结
AQS可以说是整个并发编程中最难的一个类。但是理解AQS的实现却非常重要,因为它是JDK中锁和其他同步工具实现的基础。
