从最简单的ReentrantLock的使用开始分析
public void testLock() {
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 业务逻辑
} finally {
lock.unlock();
}
}
Lock lock = new ReentrantLock(); 创建一个锁
默认为传参,创建的是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
也可通过传参的方式,指定创建的为公平锁/非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 通过定义的Sync 内部类,实现AQS 抽象队列同步器
AbstractQueuedSynchronizer结构:
AQS 本质上是一个双向链表,通过内部的Node类型,组成了一个双向链表,Node节点包含如下的属性
static final class Node {
// 定义了节点是共享类型
static final Node SHARED = new Node();
// 定义了是独占类型
static final Node EXCLUSIVE = null;
// 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
static final int CANCELLED = 1;
// 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
static final int SIGNAL = -1;
// 条件,将会等待在condition中,当其他线程对Condition调用signal方法之后,进入到CLH队列中
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会被无条件地传播下去
static final int PROPAGATE = -3;
// 当前这个节点的状态,总共有4种,分别为上面的四种属性
volatile int waitStatus;
// 当前节点的前驱节点
volatile Node prev;
// 当前节点的后继节点
volatile Node next;
// 当前节点的绑定线程
volatile Thread thread;
// 下一个等待节点
Node nextWaiter;
}
AQS 内部还维护着几个属性,用来维护当前队列的情况
// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 当前队列的状态,除了0,其他数值都是被其他线程枷锁了,无法使用
private volatile int state;
先分析公平锁,再看非公平锁
FairSync 公平锁
lock.lock(); 加锁
acquire(1); 分为三步,只有当上一步执行成功之后,才会执行下一步
!tryAcquire(arg) 尝试加锁
- getState(); 获取当前AQS的状态,如果state != 0 说明,当前的同步器已经被其他线程所占用了或当前线程重入
- 如果state = 0,说明当前同步器无其他线程占用
- hasQueuedPredecessors() 判断当前的队列中,是否有其他等待的线程
- compareAndSetState(0, acquires) 如果没有其他等待的线程,通过CAS,把state变量的值 +1
- setExclusiveOwnerThread(current) 并把当前同步器的所有者改成自己
- 如果state !=0,并且判断当前同步器中绑定的所有者是不是当前的线程(current==getExclusiveOwnerThread()) 如果是,说明是同一个线程多次进行lock,将当前的state + 1(因为前面的判断,只会保证一个线程进入到这段逻辑,所以,并没有采用cas进行设置state变量)。否则,就是存在锁竞争,tryAcquire失败,返回false。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 如果第一步尝试加锁失败,则将当前的节点入队,进入CLH中。
addWaiter(Node.EXCLUSIVE) 将加锁失败的线程加入到CLH同步等待队列中。
private Node addWaiter(Node mode) {
// 创建一个独占模式的节点(上一步中传入的模式是EXCLUSIVE)
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通过 CAS,一定要把当前的节点,设置为CLH队列的对尾中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 将当前节点加入CLH队列中
enq(node);
return node;
}
acquireQueued();
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取出当前节点的上一个节点
final Node p = node.predecessor();
// 上一个节点就是头节点,即马上就到它了
// 那么,就可以尝试再次去加锁一次,如果加锁成功,则这个节点对应的线程获得锁,执行逻辑
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否则,上一个节点就不是头节点,这个节点所对应的线程就需要进行等待
// 具体的 shouldParkAfterFailedAcquire 及 parkAndCheckInterrupt 详见下面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果失败了, 取消Acquire
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
- int ws = pred.waitStatus 取出上一个节点的waitStatus(waitStatus 默认值为0)
- waitStatus == -1 如果上一个节点的waitStatus是 SIGNAL(-1),返回 true,执行下一步,阻塞线程
- waitStatus == 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 如果waitStatus是0,将前驱节点的节点状态设置为SIGNAL,下一次进行唤醒的时候,就执行了waitStatus == -1 的逻辑
- waitStatus > 0, 循环往前找,找到一个waitStatus > 0的节点,将当前节点的next指向这个节点
- parkAndCheckInterrupt(); 将当前的线程进行阻塞,并判断其是否有中断的标志
- 如果上一步中,前驱节点的waitStatus == -1,说明当前线程需要进行阻塞一下
- LockSupport.park(this) park住当前的线程
- Thread.interrupted() 返回当前的线程中,是否有中断的标记
- 如果执行了park之后,当前这个抢锁的线程,就进入到了阻塞的状态,当同步器中的线程执行完毕之后,唤醒了队列中的下一个Node所对应的线程,下一个线程就执行了下一次的循环,此时它就是头节点,加锁成功。
- 如果上一步中,前驱节点的waitStatus == -1,说明当前线程需要进行阻塞一下
cancelAcquire(node) 如果failed 变量为 true,则取消
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 从当前节点开始,往前遍历,waitStatus >0(canceled)的节点,通通抛弃
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 设置当前节点是已取消的
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 去唤醒其他线程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
selfInterrupt()
- Thread.**currentThread**().interrupt() 给当前的线程打一个中断的标记
lock.unlock(); 解锁
sync.release(1)
public final boolean release(int arg) {
// 尝试释放
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// unpark唤醒下一个Node
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease 尝试释放
从这就可以看出,ReentrantLock 这个锁,lock几次就必须unlock几次,否则会造成死锁。protected final boolean tryRelease(int releases) {
// 扣减state的value
int c = getState() - releases;
// 判断当前的线程是不是加锁的线程,不是的话,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果state已经为0了,说明解锁已经完成
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 将state设置回去
setState(c);
return free;
}
unparkSuccessor(h) 唤醒下一个节点
private void unparkSuccessor(Node node) {
// 获取当前节点的waitStatus
int ws = node.waitStatus;
// 如果waitStatus < 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果没有下一个节点,或者下一个节点的waitStatus>0(>0说明是canneled已取消的节点)
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点开始,依次往前找,直到找到一个waitStatus不是canneled的节点,将这个节点赋值给s变量
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果有这么一个节点,调用LockSupport.unpart(s.thread)这个节点所对应的线程给他唤醒起来干活
if (s != null)
LockSupport.unpark(s.thread);
}
- Thread.**currentThread**().interrupt() 给当前的线程打一个中断的标记
NonfairSync 非公平锁
lock.lock();
公平锁与非公平锁的区别,就在于lock方法中,在非公平锁中,进行lock的时候,会首先尝试将State 设置为1并绑定当前线程,如果设置失败了,才走acquire方法,去后面排队。
final void lock() {
// 先尝试看能不能把state的值设置为1,如果设置成功了,绑定当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 否则,设置失败的话,那就走公平锁的逻辑,去后面排队去。
else
acquire(1);
}
!tryAcquire(arg)
公平与非公平的另一点差别,还在于 tryAcquire方法中
在公平锁当中,会先判断当前的队列中是否有其他节点正在等待(hasQueuedPredecessors()),没有其他节点等待的话,才进行cas修改state以及绑定当前线程
而对于非公平锁来说,只要state == 0,就说明当前同步器没线程使用,就直接进行cas修改state,绑定当前线程
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}