从最简单的ReentrantLock的使用开始分析

  1. public void testLock() {
  2. ReentrantLock lock = new ReentrantLock();
  3. lock.lock();
  4. try {
  5. // 业务逻辑
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

Lock lock = new ReentrantLock(); 创建一个锁

默认为传参,创建的是非公平锁

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }

也可通过传参的方式,指定创建的为公平锁/非公平锁

  1. public ReentrantLock(boolean fair) {
  2. sync = fair ? new FairSync() : new NonfairSync();
  3. }

ReentrantLock 通过定义的Sync 内部类,实现AQS 抽象队列同步器

AbstractQueuedSynchronizer结构:

AQS 本质上是一个双向链表,通过内部的Node类型,组成了一个双向链表,Node节点包含如下的属性

  1. static final class Node {
  2. // 定义了节点是共享类型
  3. static final Node SHARED = new Node();
  4. // 定义了是独占类型
  5. static final Node EXCLUSIVE = null;
  6. // 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
  7. static final int CANCELLED = 1;
  8. // 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
  9. static final int SIGNAL = -1;
  10. // 条件,将会等待在condition中,当其他线程对Condition调用signal方法之后,进入到CLH队列中
  11. static final int CONDITION = -2;
  12. // 表示下一次共享式同步状态获取将会被无条件地传播下去
  13. static final int PROPAGATE = -3;
  14. // 当前这个节点的状态,总共有4种,分别为上面的四种属性
  15. volatile int waitStatus;
  16. // 当前节点的前驱节点
  17. volatile Node prev;
  18. // 当前节点的后继节点
  19. volatile Node next;
  20. // 当前节点的绑定线程
  21. volatile Thread thread;
  22. // 下一个等待节点
  23. Node nextWaiter;
  24. }

AQS 内部还维护着几个属性,用来维护当前队列的情况

  1. // 队列的头节点
  2. private transient volatile Node head;
  3. // 队列的尾节点
  4. private transient volatile Node tail;
  5. // 当前队列的状态,除了0,其他数值都是被其他线程枷锁了,无法使用
  6. private volatile int state;


先分析公平锁,再看非公平锁

FairSync 公平锁

lock.lock(); 加锁

acquire(1); 分为三步,只有当上一步执行成功之后,才会执行下一步

  • !tryAcquire(arg) 尝试加锁

    • getState(); 获取当前AQS的状态,如果state != 0 说明,当前的同步器已经被其他线程所占用了或当前线程重入
    • 如果state = 0,说明当前同步器无其他线程占用
      • hasQueuedPredecessors() 判断当前的队列中,是否有其他等待的线程
      • compareAndSetState(0, acquires) 如果没有其他等待的线程,通过CAS,把state变量的值 +1
      • setExclusiveOwnerThread(current) 并把当前同步器的所有者改成自己
    • 如果state !=0,并且判断当前同步器中绑定的所有者是不是当前的线程(current==getExclusiveOwnerThread()) 如果是,说明是同一个线程多次进行lock,将当前的state + 1(因为前面的判断,只会保证一个线程进入到这段逻辑,所以,并没有采用cas进行设置state变量)。否则,就是存在锁竞争,tryAcquire失败,返回false。
      1. protected final boolean tryAcquire(int acquires) {
      2. final Thread current = Thread.currentThread();
      3. int c = getState();
      4. if (c == 0) {
      5. if (!hasQueuedPredecessors() &&
      6. compareAndSetState(0, acquires)) {
      7. setExclusiveOwnerThread(current);
      8. return true;
      9. }
      10. }
      11. else if (current == getExclusiveOwnerThread()) {
      12. int nextc = c + acquires;
      13. if (nextc < 0)
      14. throw new Error("Maximum lock count exceeded");
      15. setState(nextc);
      16. return true;
      17. }
      18. return false;
      19. }
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 如果第一步尝试加锁失败,则将当前的节点入队,进入CLH中。

    • addWaiter(Node.EXCLUSIVE) 将加锁失败的线程加入到CLH同步等待队列中。

      1. private Node addWaiter(Node mode) {
      2. // 创建一个独占模式的节点(上一步中传入的模式是EXCLUSIVE)
      3. Node node = new Node(Thread.currentThread(), mode);
      4. Node pred = tail;
      5. if (pred != null) {
      6. node.prev = pred;
      7. // 通过 CAS,一定要把当前的节点,设置为CLH队列的对尾中
      8. if (compareAndSetTail(pred, node)) {
      9. pred.next = node;
      10. return node;
      11. }
      12. }
      13. // 将当前节点加入CLH队列中
      14. enq(node);
      15. return node;
      16. }
    • acquireQueued();

      1. final boolean acquireQueued(final Node node, int arg) {
      2. boolean failed = true;
      3. try {
      4. boolean interrupted = false;
      5. for (;;) {
      6. // 取出当前节点的上一个节点
      7. final Node p = node.predecessor();
      8. // 上一个节点就是头节点,即马上就到它了
      9. // 那么,就可以尝试再次去加锁一次,如果加锁成功,则这个节点对应的线程获得锁,执行逻辑
      10. if (p == head && tryAcquire(arg)) {
      11. setHead(node);
      12. p.next = null; // help GC
      13. failed = false;
      14. return interrupted;
      15. }
      16. // 否则,上一个节点就不是头节点,这个节点所对应的线程就需要进行等待
      17. // 具体的 shouldParkAfterFailedAcquire 及 parkAndCheckInterrupt 详见下面
      18. if (shouldParkAfterFailedAcquire(p, node) &&
      19. parkAndCheckInterrupt())
      20. interrupted = true;
      21. }
      22. } finally {
      23. // 如果失败了, 取消Acquire
      24. if (failed)
      25. cancelAcquire(node);
      26. }
      27. }
    • shouldParkAfterFailedAcquire()

      • int ws = pred.waitStatus 取出上一个节点的waitStatus(waitStatus 默认值为0)
      • waitStatus == -1 如果上一个节点的waitStatus是 SIGNAL(-1),返回 true,执行下一步,阻塞线程
      • waitStatus == 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 如果waitStatus是0,将前驱节点的节点状态设置为SIGNAL,下一次进行唤醒的时候,就执行了waitStatus == -1 的逻辑
      • waitStatus > 0, 循环往前找,找到一个waitStatus > 0的节点,将当前节点的next指向这个节点
    • parkAndCheckInterrupt(); 将当前的线程进行阻塞,并判断其是否有中断的标志
      • 如果上一步中,前驱节点的waitStatus == -1,说明当前线程需要进行阻塞一下
        • LockSupport.park(this) park住当前的线程
        • Thread.interrupted() 返回当前的线程中,是否有中断的标记
      • 如果执行了park之后,当前这个抢锁的线程,就进入到了阻塞的状态,当同步器中的线程执行完毕之后,唤醒了队列中的下一个Node所对应的线程,下一个线程就执行了下一次的循环,此时它就是头节点,加锁成功。
    • cancelAcquire(node) 如果failed 变量为 true,则取消

      1. private void cancelAcquire(Node node) {
      2. if (node == null)
      3. return;
      4. node.thread = null;
      5. // 从当前节点开始,往前遍历,waitStatus >0(canceled)的节点,通通抛弃
      6. Node pred = node.prev;
      7. while (pred.waitStatus > 0)
      8. node.prev = pred = pred.prev;
      9. Node predNext = pred.next;
      10. // 设置当前节点是已取消的
      11. node.waitStatus = Node.CANCELLED;
      12. if (node == tail && compareAndSetTail(node, pred)) {
      13. compareAndSetNext(pred, predNext, null);
      14. } else {
      15. int ws;
      16. if (pred != head &&
      17. ((ws = pred.waitStatus) == Node.SIGNAL ||
      18. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
      19. pred.thread != null) {
      20. Node next = node.next;
      21. if (next != null && next.waitStatus <= 0)
      22. compareAndSetNext(pred, predNext, next);
      23. } else {
      24. // 去唤醒其他线程
      25. unparkSuccessor(node);
      26. }
      27. node.next = node; // help GC
      28. }
      29. }
  • selfInterrupt()

    • Thread.**currentThread**().interrupt() 给当前的线程打一个中断的标记

      lock.unlock(); 解锁

      sync.release(1)

      1. public final boolean release(int arg) {
      2. // 尝试释放
      3. if (tryRelease(arg)) {
      4. Node h = head;
      5. if (h != null && h.waitStatus != 0)
      6. // unpark唤醒下一个Node
      7. unparkSuccessor(h);
      8. return true;
      9. }
      10. return false;
      11. }

      tryRelease 尝试释放

      从这就可以看出,ReentrantLock 这个锁,lock几次就必须unlock几次,否则会造成死锁。
      1. protected final boolean tryRelease(int releases) {
      2. // 扣减state的value
      3. int c = getState() - releases;
      4. // 判断当前的线程是不是加锁的线程,不是的话,抛出异常
      5. if (Thread.currentThread() != getExclusiveOwnerThread())
      6. throw new IllegalMonitorStateException();
      7. boolean free = false;
      8. // 如果state已经为0了,说明解锁已经完成
      9. if (c == 0) {
      10. free = true;
      11. setExclusiveOwnerThread(null);
      12. }
      13. // 将state设置回去
      14. setState(c);
      15. return free;
      16. }

      unparkSuccessor(h) 唤醒下一个节点

      1. private void unparkSuccessor(Node node) {
      2. // 获取当前节点的waitStatus
      3. int ws = node.waitStatus;
      4. // 如果waitStatus < 0
      5. if (ws < 0)
      6. compareAndSetWaitStatus(node, ws, 0);
      7. // 获取当前节点的下一个节点
      8. Node s = node.next;
      9. // 如果没有下一个节点,或者下一个节点的waitStatus>0(>0说明是canneled已取消的节点)
      10. if (s == null || s.waitStatus > 0) {
      11. s = null;
      12. // 从尾节点开始,依次往前找,直到找到一个waitStatus不是canneled的节点,将这个节点赋值给s变量
      13. for (Node t = tail; t != null && t != node; t = t.prev)
      14. if (t.waitStatus <= 0)
      15. s = t;
      16. }
      17. // 如果有这么一个节点,调用LockSupport.unpart(s.thread)这个节点所对应的线程给他唤醒起来干活
      18. if (s != null)
      19. LockSupport.unpark(s.thread);
      20. }


NonfairSync 非公平锁

lock.lock();

公平锁与非公平锁的区别,就在于lock方法中,在非公平锁中,进行lock的时候,会首先尝试将State 设置为1并绑定当前线程,如果设置失败了,才走acquire方法,去后面排队。

  1. final void lock() {
  2. // 先尝试看能不能把state的值设置为1,如果设置成功了,绑定当前线程
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. // 否则,设置失败的话,那就走公平锁的逻辑,去后面排队去。
  6. else
  7. acquire(1);
  8. }

!tryAcquire(arg)

公平与非公平的另一点差别,还在于 tryAcquire方法中
在公平锁当中,会先判断当前的队列中是否有其他节点正在等待(hasQueuedPredecessors()),没有其他节点等待的话,才进行cas修改state以及绑定当前线程
而对于非公平锁来说,只要state == 0,就说明当前同步器没线程使用,就直接进行cas修改state,绑定当前线程

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }
  17. return false;
  18. }

其余的逻辑,与公平锁相同,不在赘述!

以上为本人的拙见,如有错误,欢迎在评论区留言,不胜感激