从最简单的ReentrantLock的使用开始分析
public void testLock() {ReentrantLock lock = new ReentrantLock();lock.lock();try {// 业务逻辑} finally {lock.unlock();}}
Lock lock = new ReentrantLock(); 创建一个锁
默认为传参,创建的是非公平锁
public ReentrantLock() {sync = new NonfairSync();}
也可通过传参的方式,指定创建的为公平锁/非公平锁
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
ReentrantLock 通过定义的Sync 内部类,实现AQS 抽象队列同步器
AbstractQueuedSynchronizer结构:
AQS 本质上是一个双向链表,通过内部的Node类型,组成了一个双向链表,Node节点包含如下的属性
static final class Node {// 定义了节点是共享类型static final Node SHARED = new Node();// 定义了是独占类型static final Node EXCLUSIVE = null;// 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待static final int CANCELLED = 1;// 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行static final int SIGNAL = -1;// 条件,将会等待在condition中,当其他线程对Condition调用signal方法之后,进入到CLH队列中static final int CONDITION = -2;// 表示下一次共享式同步状态获取将会被无条件地传播下去static final int PROPAGATE = -3;// 当前这个节点的状态,总共有4种,分别为上面的四种属性volatile int waitStatus;// 当前节点的前驱节点volatile Node prev;// 当前节点的后继节点volatile Node next;// 当前节点的绑定线程volatile Thread thread;// 下一个等待节点Node nextWaiter;}
AQS 内部还维护着几个属性,用来维护当前队列的情况
// 队列的头节点private transient volatile Node head;// 队列的尾节点private transient volatile Node tail;// 当前队列的状态,除了0,其他数值都是被其他线程枷锁了,无法使用private volatile int state;
先分析公平锁,再看非公平锁
FairSync 公平锁
lock.lock(); 加锁
acquire(1); 分为三步,只有当上一步执行成功之后,才会执行下一步
!tryAcquire(arg) 尝试加锁
- getState(); 获取当前AQS的状态,如果state != 0 说明,当前的同步器已经被其他线程所占用了或当前线程重入
- 如果state = 0,说明当前同步器无其他线程占用
- hasQueuedPredecessors() 判断当前的队列中,是否有其他等待的线程
- compareAndSetState(0, acquires) 如果没有其他等待的线程,通过CAS,把state变量的值 +1
- setExclusiveOwnerThread(current) 并把当前同步器的所有者改成自己
- 如果state !=0,并且判断当前同步器中绑定的所有者是不是当前的线程(current==getExclusiveOwnerThread()) 如果是,说明是同一个线程多次进行lock,将当前的state + 1(因为前面的判断,只会保证一个线程进入到这段逻辑,所以,并没有采用cas进行设置state变量)。否则,就是存在锁竞争,tryAcquire失败,返回false。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 如果第一步尝试加锁失败,则将当前的节点入队,进入CLH中。
addWaiter(Node.EXCLUSIVE) 将加锁失败的线程加入到CLH同步等待队列中。
private Node addWaiter(Node mode) {// 创建一个独占模式的节点(上一步中传入的模式是EXCLUSIVE)Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;// 通过 CAS,一定要把当前的节点,设置为CLH队列的对尾中if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 将当前节点加入CLH队列中enq(node);return node;}
acquireQueued();
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 取出当前节点的上一个节点final Node p = node.predecessor();// 上一个节点就是头节点,即马上就到它了// 那么,就可以尝试再次去加锁一次,如果加锁成功,则这个节点对应的线程获得锁,执行逻辑if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 否则,上一个节点就不是头节点,这个节点所对应的线程就需要进行等待// 具体的 shouldParkAfterFailedAcquire 及 parkAndCheckInterrupt 详见下面if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 如果失败了, 取消Acquireif (failed)cancelAcquire(node);}}
shouldParkAfterFailedAcquire()
- int ws = pred.waitStatus 取出上一个节点的waitStatus(waitStatus 默认值为0)
- waitStatus == -1 如果上一个节点的waitStatus是 SIGNAL(-1),返回 true,执行下一步,阻塞线程
- waitStatus == 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 如果waitStatus是0,将前驱节点的节点状态设置为SIGNAL,下一次进行唤醒的时候,就执行了waitStatus == -1 的逻辑
- waitStatus > 0, 循环往前找,找到一个waitStatus > 0的节点,将当前节点的next指向这个节点
- parkAndCheckInterrupt(); 将当前的线程进行阻塞,并判断其是否有中断的标志
- 如果上一步中,前驱节点的waitStatus == -1,说明当前线程需要进行阻塞一下
- LockSupport.park(this) park住当前的线程
- Thread.interrupted() 返回当前的线程中,是否有中断的标记
- 如果执行了park之后,当前这个抢锁的线程,就进入到了阻塞的状态,当同步器中的线程执行完毕之后,唤醒了队列中的下一个Node所对应的线程,下一个线程就执行了下一次的循环,此时它就是头节点,加锁成功。
- 如果上一步中,前驱节点的waitStatus == -1,说明当前线程需要进行阻塞一下
cancelAcquire(node) 如果failed 变量为 true,则取消
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;// 从当前节点开始,往前遍历,waitStatus >0(canceled)的节点,通通抛弃Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;// 设置当前节点是已取消的node.waitStatus = Node.CANCELLED;if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 去唤醒其他线程unparkSuccessor(node);}node.next = node; // help GC}}
selfInterrupt()
- Thread.**currentThread**().interrupt() 给当前的线程打一个中断的标记
lock.unlock(); 解锁
sync.release(1)
public final boolean release(int arg) {// 尝试释放if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// unpark唤醒下一个NodeunparkSuccessor(h);return true;}return false;}
tryRelease 尝试释放
从这就可以看出,ReentrantLock 这个锁,lock几次就必须unlock几次,否则会造成死锁。protected final boolean tryRelease(int releases) {// 扣减state的valueint c = getState() - releases;// 判断当前的线程是不是加锁的线程,不是的话,抛出异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 如果state已经为0了,说明解锁已经完成if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 将state设置回去setState(c);return free;}
unparkSuccessor(h) 唤醒下一个节点
private void unparkSuccessor(Node node) {// 获取当前节点的waitStatusint ws = node.waitStatus;// 如果waitStatus < 0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取当前节点的下一个节点Node s = node.next;// 如果没有下一个节点,或者下一个节点的waitStatus>0(>0说明是canneled已取消的节点)if (s == null || s.waitStatus > 0) {s = null;// 从尾节点开始,依次往前找,直到找到一个waitStatus不是canneled的节点,将这个节点赋值给s变量for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果有这么一个节点,调用LockSupport.unpart(s.thread)这个节点所对应的线程给他唤醒起来干活if (s != null)LockSupport.unpark(s.thread);}
- Thread.**currentThread**().interrupt() 给当前的线程打一个中断的标记
NonfairSync 非公平锁
lock.lock();
公平锁与非公平锁的区别,就在于lock方法中,在非公平锁中,进行lock的时候,会首先尝试将State 设置为1并绑定当前线程,如果设置失败了,才走acquire方法,去后面排队。
final void lock() {// 先尝试看能不能把state的值设置为1,如果设置成功了,绑定当前线程if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());// 否则,设置失败的话,那就走公平锁的逻辑,去后面排队去。elseacquire(1);}
!tryAcquire(arg)
公平与非公平的另一点差别,还在于 tryAcquire方法中
在公平锁当中,会先判断当前的队列中是否有其他节点正在等待(hasQueuedPredecessors()),没有其他节点等待的话,才进行cas修改state以及绑定当前线程
而对于非公平锁来说,只要state == 0,就说明当前同步器没线程使用,就直接进行cas修改state,绑定当前线程
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
