背景知识介绍
Lock接口
Lock类是所有锁的父类。提供了如下几个接口方法:
/** 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。 */
void lock();
/**
* 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。
* 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。
*/
void lockInterruptibly() throws InterruptedException;
/** 同步获取锁。会立即返回获取锁的结果 */
boolean tryLock();
/**
* 同步获取锁。在超时时间内,如果获取不到会一直请求。
* 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/** 请求释放锁 */
void unlock();
而Lock接口的实现类有如下几个:
- ReentrantLock:重入锁。它实现Lock接口,支持公平锁/非公平锁、可重入的功能。
- ReentrantReadWriteLock:重入读写锁。它实现了ReadWriteLock接口,是一种悲观锁。
而内部的ReadLock/WriteLock又分别实现了Lock接口。也支持公平、非公平锁的实现。 - StampedLock:Jdk1.8版本引入的邮戳锁。是对上面两种锁的优化。提供了:悲观写锁、悲观读锁、乐观读锁。
其内部的ReadLockView/WriteLockView实现了Lock接口。
AQS(AbstractQueuedSynchronizer)抽象类
AQS是一个抽象类。主要的业务逻辑由它的实现类重写。它本身没有实现任何同步锁的功能。接口实现类有如下几个:
- ReentrantLock:重入锁。
- ReentrantReadWriteLock:重入读写锁。
- CountDownLatch:倒计时器。初始化一个值,然后调用await()方法阻塞至值减为0再执行。
- Semaphore:信号量计数器。主要用来做流控。(可以结合令牌桶和漏桶算法一起学习。)
我们查看AQS的源码会发现,有两个关键的内部类:Node、ConditionObject。
而且,AQS的几个重要参数如head、tail、status都是由volatile修饰,保证多线程间可见。
Node有几个重要的参数:
- SHARED/EXCLUSIVE:表明节点是共享还是独占;
- waitStatus:表明当前节点线程的状态,分别有CANCELLED/SIGNAL/CONDITION三种状态;
- prev/next:链表结构的前驱和后继;
- thread:封装的当前线程对象;
- nextWaiter:下一个处于等待状态的或共享模式的节点;
AQS的实现依赖于内部的同步队列,也就是FIFO的双向队列。如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息封装称Node加入到同步队列中,同时阻塞该线程。当和获取锁的线程释放锁后,会从队列中唤醒一个阻塞的节点继续持有锁。
在这里,我们要将Node节点的waitStatus(线程等待状态)和AQS的status(锁状态)区分清楚:
waitStatus:当前节点的等待状态;
/** 在取消抢锁或释放锁失败时,会将线程状态置为已取消 */
static final int CANCELLED = 1;
/** 标识当前节点的next节点需要被唤醒 */
static final int SIGNAL = -1;
/** 标识当前节点处于等待条件队列中 */
static final int CONDITION = -2;
/** 标识下一个获取锁的线程无条件的传播? */
static final int PROPAGATE = -3;
/** None of the above */
0:None of the above
status:获取锁的线程的状态。0是未有线程获取锁,因为支持重入,所以重入多少次,就需要释放多少次锁。
AQS加锁
public final void acquire(int arg) {
// tryAcquire是aqs提供的接口方法。由子类进行实现并扩展。
if (!tryAcquire(arg) &&
// 看下面代码块
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// Thread.currentThread().interrupt();
selfInterrupt();
}
private Node addWaiter(Node mode) {
// 将当前节点封装称Node节点(独占)。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 尾节点不为空的情况下,将当前节点放置在最后并和原尾节点进行CAS链接。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 尾节点为空的情况下,死循环进行争抢锁。
for (;;) {
Node t = tail;
/**
* 从这里,我们可以看见这个CLH队列锁的结构了。
* 头节点,是一个空节点。里面不包含线程信息,有用的仅是前驱和后继两个引用。
* 当CLH队列为空时,先创建个空节点,然后将其next指向当前节点,
* 并将当前节点设置为tail节点。
*/
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
到现在,当前节点已经封装称了Node,并且加入到了CLH队列称为尾部tail。在这个阶段我们做了如下操作:
修改了当前Node节点的前驱和后继;
- 修改了CLH队列的head和tail。
下一步,我们开始执行acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 死循环抢锁;
for (;;) {
// 获取当前节点的前驱。
final Node p = node.predecessor();
// tryAcquire是AQS提供的接口类。由子类自己实现。
// 进入循环的条件就是,当前节点就是头节点,不需要被中断,直接返回。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 执行下面方法。注意入参
// p:当前节点的前驱节点;node:当前节点
if (shouldParkAfterFailedAcquire(p, node) &&
//
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前驱节点是等待被唤醒状态,直接返回true;
if (ws == Node.SIGNAL)
return true;
// 大于0的情况,就是该线程已经取消获取锁了。
// 进行死循环,直到找到0<=状态的节点,并将它设置为当前节点的前驱。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱节点的状态是0或-3。(-2呢??文档只写了0或-3).
// 将前驱节点的状态更新为-1(signal),
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
在shouldParkAfterFailedAcquire方法执行完成后,我们回到它的父方法acquireQueued,去查看失败(false)后的处理逻辑:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 查询当前Node节点的前继非取消状态的节点。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果当前节点是尾部节点,就替换当前节点为前置节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 前置节点不是head头节点,并且前置节点的状态为-1,并且pred的线程不为空
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 如果node的后继节点不为null,且状态<=0,则将pred的next指针指向node的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 前继节点不是head节点,但是状态不为SIGNAL(-1)或者前继节点为head节点
// 也就是前置节点没有“叫醒服务”,则node节点直接叫醒后继的线程
unparkSuccessor(node);
}
// 将node的next指针指向自己,方便被GC
node.next = node; // help GC
}
}
用个通俗的例子来概括整个过程:
相当于,我在队伍中间因为某种原因不想排队了,但是我有1个任务,就是需要叫醒排在我后面睡觉的人。 如果我已经是队尾了,没有排在我后面人需要被唤醒,那我直接离队就可以。 如果排在我前面的人已经是队头了,则直接叫醒排在我后面的人,让他去获取锁。 如果排在我前面的人不是队头,那么我会先尝试把这个任务交给排在我前面的人,如果前面的人同意,我就会把排在我后面的人告诉他(把他的后继指针指向排在我后面的人)。 如果不同意,那我直接叫醒排在我后面的人,让他自己去求排在我前面的人,直到前面的人同意叫醒他,他再次睡去。 自此,AQS独占锁的获取过程我们就讲解完了,代码不多,但其设计真的很精巧,值得反复咀嚼体会。
AQS释放锁
释放节点代码入下:
public final boolean release(int arg) {
// 子类重写该方法,指定释放的条件。
if (tryRelease(arg)) {
Node h = head;
// 头结点不为空即等待队列不为空,并且存在等待状态不为0的数据;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 小于0的数据直接将状态CAS自旋改为0
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 获取当前节点的继任节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果当前节点的继任节点为空,那么从后往前遍历,找到可以唤醒的节点并将其唤醒。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结:
1、如果节点的状态<0,那么直接更新当前节点状态为0,
2、通过LockSupport.unpart()方法,唤醒下一个状态<0的线程;
ReeranceLock
加锁
public void lock() {
sync.lock();
}
而Sync的实现类有两个。分别是FairSync公平锁、NonfairSync非公平锁。它们分别提供了两个方法:lock()、tryAcquire()。
lock()
- 公平锁的lock()方法是直接调用acquire()方法;
非公平锁的lock()方法时先自旋抢锁,抢锁失败再调用acquire()方法。
// FairSync
final void lock() {
acquire(1);
}
// NonfairSync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
tryAcquire()
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// c是锁的状态。0为未加锁,因为支持重入,所以>0即为加锁次数。
int c = getState();
if (c == 0) {
// aqs的clh队列为空,直接加锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 这块儿是重入逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 跟公平锁有区别在于,不去判断clh队列有没有等待的线程,直接抢锁。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 更公平所完全一样
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWriteLock
公平、非公平锁的实现
```java // 非公平锁 static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
// clh不为空,header的子节点不为空并且是独占锁
// 我们知道,创建clh队列时,head节点就是一个空节点,里面并未封装线程。而header的后继节点才是真正的需要执行任务的线程节点。
return apparentlyFirstQueuedIsExclusive();
}
} final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
// 公平锁 static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
} public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// clh队列不为空,并且第一个任务节点不是当前线程的节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我们总结下:
- 非公平锁
- 写:false
- 读:clh队列不为空时为true;为空时为false;
- 公平锁:
- 写:clh队列不为空时为true;为空时为false;
- 读:clh队列不为空时为true;为空时为false;
> **readerShouldBlock、writerShouldBlock两个方法,在需要加锁时返回true,不需要加锁时返回false。**
<a name="AHBQF"></a>
### 读、写锁的实现
<a name="RMitK"></a>
#### 读锁
```java
public void lock() {
sync.acquireShared(1);
}
// AQS实现
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// c & (1 << 16) - 1:c和15位1做与运算;结果=c。
// 这个判断为true的条件即为:c>0(已加锁状态)
if (exclusiveCount(c) != 0 &&
// 持有锁的线程非当前线程。
getExclusiveOwnerThread() != current)
return -1;
// c >>> 16
int r = sharedCount(c);
if (!readerShouldBlock() &&
// MAX_COUNT = (1 << 16) - 1 = 2的15次方-1
r < MAX_COUNT &&
// 将c的状态改为10000..c
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
// AQS实现
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
参考文献:
1、深入分析AQS实现原理(此文章为本文的框架)
2、