背景知识介绍
Lock接口
Lock类是所有锁的父类。提供了如下几个接口方法:
/** 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。 */void lock();/*** 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。* 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。*/void lockInterruptibly() throws InterruptedException;/** 同步获取锁。会立即返回获取锁的结果 */boolean tryLock();/*** 同步获取锁。在超时时间内,如果获取不到会一直请求。* 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/** 请求释放锁 */void unlock();
而Lock接口的实现类有如下几个:
- ReentrantLock:重入锁。它实现Lock接口,支持公平锁/非公平锁、可重入的功能。
- ReentrantReadWriteLock:重入读写锁。它实现了ReadWriteLock接口,是一种悲观锁。
而内部的ReadLock/WriteLock又分别实现了Lock接口。也支持公平、非公平锁的实现。 - StampedLock:Jdk1.8版本引入的邮戳锁。是对上面两种锁的优化。提供了:悲观写锁、悲观读锁、乐观读锁。
其内部的ReadLockView/WriteLockView实现了Lock接口。
AQS(AbstractQueuedSynchronizer)抽象类
AQS是一个抽象类。主要的业务逻辑由它的实现类重写。它本身没有实现任何同步锁的功能。接口实现类有如下几个:
- ReentrantLock:重入锁。
- ReentrantReadWriteLock:重入读写锁。
- CountDownLatch:倒计时器。初始化一个值,然后调用await()方法阻塞至值减为0再执行。
- Semaphore:信号量计数器。主要用来做流控。(可以结合令牌桶和漏桶算法一起学习。)
我们查看AQS的源码会发现,有两个关键的内部类:Node、ConditionObject。
而且,AQS的几个重要参数如head、tail、status都是由volatile修饰,保证多线程间可见。
Node有几个重要的参数:
- SHARED/EXCLUSIVE:表明节点是共享还是独占;
- waitStatus:表明当前节点线程的状态,分别有CANCELLED/SIGNAL/CONDITION三种状态;
- prev/next:链表结构的前驱和后继;
- thread:封装的当前线程对象;
- nextWaiter:下一个处于等待状态的或共享模式的节点;
AQS的实现依赖于内部的同步队列,也就是FIFO的双向队列。如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息封装称Node加入到同步队列中,同时阻塞该线程。当和获取锁的线程释放锁后,会从队列中唤醒一个阻塞的节点继续持有锁。
在这里,我们要将Node节点的waitStatus(线程等待状态)和AQS的status(锁状态)区分清楚:
waitStatus:当前节点的等待状态;
/** 在取消抢锁或释放锁失败时,会将线程状态置为已取消 */static final int CANCELLED = 1;/** 标识当前节点的next节点需要被唤醒 */static final int SIGNAL = -1;/** 标识当前节点处于等待条件队列中 */static final int CONDITION = -2;/** 标识下一个获取锁的线程无条件的传播? */static final int PROPAGATE = -3;/** None of the above */0:None of the above
status:获取锁的线程的状态。0是未有线程获取锁,因为支持重入,所以重入多少次,就需要释放多少次锁。
AQS加锁
public final void acquire(int arg) {// tryAcquire是aqs提供的接口方法。由子类进行实现并扩展。if (!tryAcquire(arg) &&// 看下面代码块acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// Thread.currentThread().interrupt();selfInterrupt();}
private Node addWaiter(Node mode) {// 将当前节点封装称Node节点(独占)。Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 尾节点不为空的情况下,将当前节点放置在最后并和原尾节点进行CAS链接。if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
private Node enq(final Node node) {// 尾节点为空的情况下,死循环进行争抢锁。for (;;) {Node t = tail;/*** 从这里,我们可以看见这个CLH队列锁的结构了。* 头节点,是一个空节点。里面不包含线程信息,有用的仅是前驱和后继两个引用。* 当CLH队列为空时,先创建个空节点,然后将其next指向当前节点,* 并将当前节点设置为tail节点。*/if (t == null) {if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
到现在,当前节点已经封装称了Node,并且加入到了CLH队列称为尾部tail。在这个阶段我们做了如下操作:
修改了当前Node节点的前驱和后继;
- 修改了CLH队列的head和tail。
下一步,我们开始执行acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 死循环抢锁;for (;;) {// 获取当前节点的前驱。final Node p = node.predecessor();// tryAcquire是AQS提供的接口类。由子类自己实现。// 进入循环的条件就是,当前节点就是头节点,不需要被中断,直接返回。if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 执行下面方法。注意入参// p:当前节点的前驱节点;node:当前节点if (shouldParkAfterFailedAcquire(p, node) &&//parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 如果前驱节点是等待被唤醒状态,直接返回true;if (ws == Node.SIGNAL)return true;// 大于0的情况,就是该线程已经取消获取锁了。// 进行死循环,直到找到0<=状态的节点,并将它设置为当前节点的前驱。if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 前驱节点的状态是0或-3。(-2呢??文档只写了0或-3).// 将前驱节点的状态更新为-1(signal),/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
在shouldParkAfterFailedAcquire方法执行完成后,我们回到它的父方法acquireQueued,去查看失败(false)后的处理逻辑:
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;// Skip cancelled predecessors// 查询当前Node节点的前继非取消状态的节点。Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.// 如果当前节点是尾部节点,就替换当前节点为前置节点if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;// 前置节点不是head头节点,并且前置节点的状态为-1,并且pred的线程不为空if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;// 如果node的后继节点不为null,且状态<=0,则将pred的next指针指向node的后继节点if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 前继节点不是head节点,但是状态不为SIGNAL(-1)或者前继节点为head节点// 也就是前置节点没有“叫醒服务”,则node节点直接叫醒后继的线程unparkSuccessor(node);}// 将node的next指针指向自己,方便被GCnode.next = node; // help GC}}
用个通俗的例子来概括整个过程:
相当于,我在队伍中间因为某种原因不想排队了,但是我有1个任务,就是需要叫醒排在我后面睡觉的人。 如果我已经是队尾了,没有排在我后面人需要被唤醒,那我直接离队就可以。 如果排在我前面的人已经是队头了,则直接叫醒排在我后面的人,让他去获取锁。 如果排在我前面的人不是队头,那么我会先尝试把这个任务交给排在我前面的人,如果前面的人同意,我就会把排在我后面的人告诉他(把他的后继指针指向排在我后面的人)。 如果不同意,那我直接叫醒排在我后面的人,让他自己去求排在我前面的人,直到前面的人同意叫醒他,他再次睡去。 自此,AQS独占锁的获取过程我们就讲解完了,代码不多,但其设计真的很精巧,值得反复咀嚼体会。
AQS释放锁
释放节点代码入下:
public final boolean release(int arg) {// 子类重写该方法,指定释放的条件。if (tryRelease(arg)) {Node h = head;// 头结点不为空即等待队列不为空,并且存在等待状态不为0的数据;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 小于0的数据直接将状态CAS自旋改为0if (ws < 0) {compareAndSetWaitStatus(node, ws, 0);}// 获取当前节点的继任节点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 如果当前节点的继任节点为空,那么从后往前遍历,找到可以唤醒的节点并将其唤醒。for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
总结:
1、如果节点的状态<0,那么直接更新当前节点状态为0,
2、通过LockSupport.unpart()方法,唤醒下一个状态<0的线程;
ReeranceLock
加锁
public void lock() {sync.lock();}
而Sync的实现类有两个。分别是FairSync公平锁、NonfairSync非公平锁。它们分别提供了两个方法:lock()、tryAcquire()。
lock()
- 公平锁的lock()方法是直接调用acquire()方法;
非公平锁的lock()方法时先自旋抢锁,抢锁失败再调用acquire()方法。
// FairSyncfinal void lock() {acquire(1);}// NonfairSyncfinal void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
tryAcquire()
// FairSyncprotected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// c是锁的状态。0为未加锁,因为支持重入,所以>0即为加锁次数。int c = getState();if (c == 0) {// aqs的clh队列为空,直接加锁。if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 这块儿是重入逻辑else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// NonfairSyncprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 跟公平锁有区别在于,不去判断clh队列有没有等待的线程,直接抢锁。if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 更公平所完全一样else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
ReentrantReadWriteLock
公平、非公平锁的实现
```java // 非公平锁 static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {return false;}final boolean readerShouldBlock() {// clh不为空,header的子节点不为空并且是独占锁// 我们知道,创建clh队列时,head节点就是一个空节点,里面并未封装线程。而header的后继节点才是真正的需要执行任务的线程节点。return apparentlyFirstQueuedIsExclusive();}
} final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;
}
// 公平锁 static final class FairSync extends Sync {
final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}
} public final boolean hasQueuedPredecessors() {
Node t = tail;Node h = head;Node s;// clh队列不为空,并且第一个任务节点不是当前线程的节点return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
我们总结下:- 非公平锁- 写:false- 读:clh队列不为空时为true;为空时为false;- 公平锁:- 写:clh队列不为空时为true;为空时为false;- 读:clh队列不为空时为true;为空时为false;> **readerShouldBlock、writerShouldBlock两个方法,在需要加锁时返回true,不需要加锁时返回false。**<a name="AHBQF"></a>### 读、写锁的实现<a name="RMitK"></a>#### 读锁```javapublic void lock() {sync.acquireShared(1);}// AQS实现public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// c & (1 << 16) - 1:c和15位1做与运算;结果=c。// 这个判断为true的条件即为:c>0(已加锁状态)if (exclusiveCount(c) != 0 &&// 持有锁的线程非当前线程。getExclusiveOwnerThread() != current)return -1;// c >>> 16int r = sharedCount(c);if (!readerShouldBlock() &&// MAX_COUNT = (1 << 16) - 1 = 2的15次方-1r < MAX_COUNT &&// 将c的状态改为10000..ccompareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullTryAcquireShared(current);}// AQS实现private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
参考文献:
1、深入分析AQS实现原理(此文章为本文的框架)
2、
