背景知识介绍

Lock接口

Lock类是所有锁的父类。提供了如下几个接口方法:

  1. /** 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。 */
  2. void lock();
  3. /**
  4. * 获取锁。如果锁不可用,那么当前线程会阻塞直至获取锁成功。
  5. * 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。
  6. */
  7. void lockInterruptibly() throws InterruptedException;
  8. /** 同步获取锁。会立即返回获取锁的结果 */
  9. boolean tryLock();
  10. /**
  11. * 同步获取锁。在超时时间内,如果获取不到会一直请求。
  12. * 如果当前线程在获取锁的过程中中断,会抛出异常并停止获取锁。
  13. */
  14. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  15. /** 请求释放锁 */
  16. void unlock();

而Lock接口的实现类有如下几个:

  • ReentrantLock:重入锁。它实现Lock接口,支持公平锁/非公平锁、可重入的功能。
  • ReentrantReadWriteLock:重入读写锁。它实现了ReadWriteLock接口,是一种悲观锁。
    而内部的ReadLock/WriteLock又分别实现了Lock接口。也支持公平、非公平锁的实现。
  • StampedLock:Jdk1.8版本引入的邮戳锁。是对上面两种锁的优化。提供了:悲观写锁、悲观读锁、乐观读锁。
    其内部的ReadLockView/WriteLockView实现了Lock接口。

AQS(AbstractQueuedSynchronizer)抽象类

AQS是一个抽象类。主要的业务逻辑由它的实现类重写。它本身没有实现任何同步锁的功能。接口实现类有如下几个:

  • ReentrantLock:重入锁。
  • ReentrantReadWriteLock:重入读写锁。
  • CountDownLatch:倒计时器。初始化一个值,然后调用await()方法阻塞至值减为0再执行。
  • Semaphore:信号量计数器。主要用来做流控。(可以结合令牌桶和漏桶算法一起学习。)

我们查看AQS的源码会发现,有两个关键的内部类:Node、ConditionObject。
而且,AQS的几个重要参数如head、tail、status都是由volatile修饰,保证多线程间可见

Node有几个重要的参数:

  • SHARED/EXCLUSIVE:表明节点是共享还是独占;
  • waitStatus:表明当前节点线程的状态,分别有CANCELLED/SIGNAL/CONDITION三种状态;
  • prev/next:链表结构的前驱和后继;
  • thread:封装的当前线程对象;
  • nextWaiter:下一个处于等待状态的或共享模式的节点;

AQS的实现依赖于内部的同步队列,也就是FIFO的双向队列。如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息封装称Node加入到同步队列中,同时阻塞该线程。当和获取锁的线程释放锁后,会从队列中唤醒一个阻塞的节点继续持有锁。

在这里,我们要将Node节点的waitStatus(线程等待状态)和AQS的status(锁状态)区分清楚:

  • waitStatus:当前节点的等待状态;

    1. /** 在取消抢锁或释放锁失败时,会将线程状态置为已取消 */
    2. static final int CANCELLED = 1;
    3. /** 标识当前节点的next节点需要被唤醒 */
    4. static final int SIGNAL = -1;
    5. /** 标识当前节点处于等待条件队列中 */
    6. static final int CONDITION = -2;
    7. /** 标识下一个获取锁的线程无条件的传播? */
    8. static final int PROPAGATE = -3;
    9. /** None of the above */
    10. 0:None of the above
  • status:获取锁的线程的状态。0是未有线程获取锁,因为支持重入,所以重入多少次,就需要释放多少次锁。

    AQS加锁

    1. public final void acquire(int arg) {
    2. // tryAcquire是aqs提供的接口方法。由子类进行实现并扩展。
    3. if (!tryAcquire(arg) &&
    4. // 看下面代码块
    5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    6. // Thread.currentThread().interrupt();
    7. selfInterrupt();
    8. }
    1. private Node addWaiter(Node mode) {
    2. // 将当前节点封装称Node节点(独占)。
    3. Node node = new Node(Thread.currentThread(), mode);
    4. Node pred = tail;
    5. // 尾节点不为空的情况下,将当前节点放置在最后并和原尾节点进行CAS链接。
    6. if (pred != null) {
    7. node.prev = pred;
    8. if (compareAndSetTail(pred, node)) {
    9. pred.next = node;
    10. return node;
    11. }
    12. }
    13. enq(node);
    14. return node;
    15. }
    1. private Node enq(final Node node) {
    2. // 尾节点为空的情况下,死循环进行争抢锁。
    3. for (;;) {
    4. Node t = tail;
    5. /**
    6. * 从这里,我们可以看见这个CLH队列锁的结构了。
    7. * 头节点,是一个空节点。里面不包含线程信息,有用的仅是前驱和后继两个引用。
    8. * 当CLH队列为空时,先创建个空节点,然后将其next指向当前节点,
    9. * 并将当前节点设置为tail节点。
    10. */
    11. if (t == null) {
    12. if (compareAndSetHead(new Node()))
    13. tail = head;
    14. } else {
    15. node.prev = t;
    16. if (compareAndSetTail(t, node)) {
    17. t.next = node;
    18. return t;
    19. }
    20. }
    21. }
    22. }

    到现在,当前节点已经封装称了Node,并且加入到了CLH队列称为尾部tail。在这个阶段我们做了如下操作:

  • 修改了当前Node节点的前驱和后继;

  • 修改了CLH队列的head和tail。

下一步,我们开始执行acquireQueued方法。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. // 死循环抢锁;
  6. for (;;) {
  7. // 获取当前节点的前驱。
  8. final Node p = node.predecessor();
  9. // tryAcquire是AQS提供的接口类。由子类自己实现。
  10. // 进入循环的条件就是,当前节点就是头节点,不需要被中断,直接返回。
  11. if (p == head && tryAcquire(arg)) {
  12. setHead(node);
  13. p.next = null; // help GC
  14. failed = false;
  15. return interrupted;
  16. }
  17. // 执行下面方法。注意入参
  18. // p:当前节点的前驱节点;node:当前节点
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. //
  21. parkAndCheckInterrupt())
  22. interrupted = true;
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. // 如果前驱节点是等待被唤醒状态,直接返回true;
  4. if (ws == Node.SIGNAL)
  5. return true;
  6. // 大于0的情况,就是该线程已经取消获取锁了。
  7. // 进行死循环,直到找到0<=状态的节点,并将它设置为当前节点的前驱。
  8. if (ws > 0) {
  9. do {
  10. node.prev = pred = pred.prev;
  11. } while (pred.waitStatus > 0);
  12. pred.next = node;
  13. } else {
  14. // 前驱节点的状态是0或-3。(-2呢??文档只写了0或-3).
  15. // 将前驱节点的状态更新为-1(signal),
  16. /*
  17. * waitStatus must be 0 or PROPAGATE. Indicate that we
  18. * need a signal, but don't park yet. Caller will need to
  19. * retry to make sure it cannot acquire before parking.
  20. */
  21. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  22. }
  23. return false;
  24. }

在shouldParkAfterFailedAcquire方法执行完成后,我们回到它的父方法acquireQueued,去查看失败(false)后的处理逻辑:

  1. private void cancelAcquire(Node node) {
  2. if (node == null)
  3. return;
  4. node.thread = null;
  5. // Skip cancelled predecessors
  6. // 查询当前Node节点的前继非取消状态的节点。
  7. Node pred = node.prev;
  8. while (pred.waitStatus > 0)
  9. node.prev = pred = pred.prev;
  10. // predNext is the apparent node to unsplice. CASes below will
  11. // fail if not, in which case, we lost race vs another cancel
  12. // or signal, so no further action is necessary.
  13. Node predNext = pred.next;
  14. // Can use unconditional write instead of CAS here.
  15. // After this atomic step, other Nodes can skip past us.
  16. // Before, we are free of interference from other threads.
  17. node.waitStatus = Node.CANCELLED;
  18. // If we are the tail, remove ourselves.
  19. // 如果当前节点是尾部节点,就替换当前节点为前置节点
  20. if (node == tail && compareAndSetTail(node, pred)) {
  21. compareAndSetNext(pred, predNext, null);
  22. } else {
  23. // If successor needs signal, try to set pred's next-link
  24. // so it will get one. Otherwise wake it up to propagate.
  25. int ws;
  26. // 前置节点不是head头节点,并且前置节点的状态为-1,并且pred的线程不为空
  27. if (pred != head &&
  28. ((ws = pred.waitStatus) == Node.SIGNAL ||
  29. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  30. pred.thread != null) {
  31. Node next = node.next;
  32. // 如果node的后继节点不为null,且状态<=0,则将pred的next指针指向node的后继节点
  33. if (next != null && next.waitStatus <= 0)
  34. compareAndSetNext(pred, predNext, next);
  35. } else {
  36. // 前继节点不是head节点,但是状态不为SIGNAL(-1)或者前继节点为head节点
  37. // 也就是前置节点没有“叫醒服务”,则node节点直接叫醒后继的线程
  38. unparkSuccessor(node);
  39. }
  40. // 将node的next指针指向自己,方便被GC
  41. node.next = node; // help GC
  42. }
  43. }

最后总结**(link:**

用个通俗的例子来概括整个过程:

相当于,我在队伍中间因为某种原因不想排队了,但是我有1个任务,就是需要叫醒排在我后面睡觉的人。 如果我已经是队尾了,没有排在我后面人需要被唤醒,那我直接离队就可以。 如果排在我前面的人已经是队头了,则直接叫醒排在我后面的人,让他去获取锁。 如果排在我前面的人不是队头,那么我会先尝试把这个任务交给排在我前面的人,如果前面的人同意,我就会把排在我后面的人告诉他(把他的后继指针指向排在我后面的人)。 如果不同意,那我直接叫醒排在我后面的人,让他自己去求排在我前面的人,直到前面的人同意叫醒他,他再次睡去。 自此,AQS独占锁的获取过程我们就讲解完了,代码不多,但其设计真的很精巧,值得反复咀嚼体会。

AQS释放锁

释放节点代码入下:

  1. public final boolean release(int arg) {
  2. // 子类重写该方法,指定释放的条件。
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. // 头结点不为空即等待队列不为空,并且存在等待状态不为0的数据;
  6. if (h != null && h.waitStatus != 0)
  7. unparkSuccessor(h);
  8. return true;
  9. }
  10. return false;
  11. }
  12. private void unparkSuccessor(Node node) {
  13. int ws = node.waitStatus;
  14. // 小于0的数据直接将状态CAS自旋改为0
  15. if (ws < 0) {
  16. compareAndSetWaitStatus(node, ws, 0);
  17. }
  18. // 获取当前节点的继任节点
  19. Node s = node.next;
  20. if (s == null || s.waitStatus > 0) {
  21. s = null;
  22. // 如果当前节点的继任节点为空,那么从后往前遍历,找到可以唤醒的节点并将其唤醒。
  23. for (Node t = tail; t != null && t != node; t = t.prev)
  24. if (t.waitStatus <= 0)
  25. s = t;
  26. }
  27. if (s != null)
  28. LockSupport.unpark(s.thread);
  29. }

总结:
1、如果节点的状态<0,那么直接更新当前节点状态为0,
2、通过LockSupport.unpart()方法,唤醒下一个状态<0的线程;

ReeranceLock

加锁

  1. public void lock() {
  2. sync.lock();
  3. }

而Sync的实现类有两个。分别是FairSync公平锁、NonfairSync非公平锁。它们分别提供了两个方法:lock()、tryAcquire()。

lock()

  • 公平锁的lock()方法是直接调用acquire()方法;
  • 非公平锁的lock()方法时先自旋抢锁,抢锁失败再调用acquire()方法。

    1. // FairSync
    2. final void lock() {
    3. acquire(1);
    4. }
    5. // NonfairSync
    6. final void lock() {
    7. if (compareAndSetState(0, 1))
    8. setExclusiveOwnerThread(Thread.currentThread());
    9. else
    10. acquire(1);
    11. }

    tryAcquire()

    1. // FairSync
    2. protected final boolean tryAcquire(int acquires) {
    3. final Thread current = Thread.currentThread();
    4. // c是锁的状态。0为未加锁,因为支持重入,所以>0即为加锁次数。
    5. int c = getState();
    6. if (c == 0) {
    7. // aqs的clh队列为空,直接加锁。
    8. if (!hasQueuedPredecessors() &&
    9. compareAndSetState(0, acquires)) {
    10. setExclusiveOwnerThread(current);
    11. return true;
    12. }
    13. }
    14. // 这块儿是重入逻辑
    15. else if (current == getExclusiveOwnerThread()) {
    16. int nextc = c + acquires;
    17. if (nextc < 0)
    18. throw new Error("Maximum lock count exceeded");
    19. setState(nextc);
    20. return true;
    21. }
    22. return false;
    23. }
    24. // NonfairSync
    25. protected final boolean tryAcquire(int acquires) {
    26. return nonfairTryAcquire(acquires);
    27. }
    28. final boolean nonfairTryAcquire(int acquires) {
    29. final Thread current = Thread.currentThread();
    30. int c = getState();
    31. // 跟公平锁有区别在于,不去判断clh队列有没有等待的线程,直接抢锁。
    32. if (c == 0) {
    33. if (compareAndSetState(0, acquires)) {
    34. setExclusiveOwnerThread(current);
    35. return true;
    36. }
    37. }
    38. // 更公平所完全一样
    39. else if (current == getExclusiveOwnerThread()) {
    40. int nextc = c + acquires;
    41. if (nextc < 0) // overflow
    42. throw new Error("Maximum lock count exceeded");
    43. setState(nextc);
    44. return true;
    45. }
    46. return false;
    47. }

    ReentrantReadWriteLock

    公平、非公平锁的实现

    ```java // 非公平锁 static final class NonfairSync extends Sync {

    1. final boolean writerShouldBlock() {
    2. return false;
    3. }
    4. final boolean readerShouldBlock() {
    5. // clh不为空,header的子节点不为空并且是独占锁
    6. // 我们知道,创建clh队列时,head节点就是一个空节点,里面并未封装线程。而header的后继节点才是真正的需要执行任务的线程节点。
    7. return apparentlyFirstQueuedIsExclusive();
    8. }

    } final boolean apparentlyFirstQueuedIsExclusive() {

    1. Node h, s;
    2. return (h = head) != null &&
    3. (s = h.next) != null &&
    4. !s.isShared() &&
    5. s.thread != null;

    }

    // 公平锁 static final class FairSync extends Sync {

    1. final boolean writerShouldBlock() {
    2. return hasQueuedPredecessors();
    3. }
    4. final boolean readerShouldBlock() {
    5. return hasQueuedPredecessors();
    6. }

    } public final boolean hasQueuedPredecessors() {

    1. Node t = tail;
    2. Node h = head;
    3. Node s;
    4. // clh队列不为空,并且第一个任务节点不是当前线程的节点
    5. return h != t &&
    6. ((s = h.next) == null || s.thread != Thread.currentThread());

    }

  1. 我们总结下:
  2. - 非公平锁
  3. - 写:false
  4. - 读:clh队列不为空时为true;为空时为false
  5. - 公平锁:
  6. - 写:clh队列不为空时为true;为空时为false
  7. - 读:clh队列不为空时为true;为空时为false
  8. > **readerShouldBlockwriterShouldBlock两个方法,在需要加锁时返回true,不需要加锁时返回false。**
  9. <a name="AHBQF"></a>
  10. ### 读、写锁的实现
  11. <a name="RMitK"></a>
  12. #### 读锁
  13. ```java
  14. public void lock() {
  15. sync.acquireShared(1);
  16. }
  17. // AQS实现
  18. public final void acquireShared(int arg) {
  19. if (tryAcquireShared(arg) < 0)
  20. doAcquireShared(arg);
  21. }
  22. protected final int tryAcquireShared(int unused) {
  23. Thread current = Thread.currentThread();
  24. int c = getState();
  25. // c & (1 << 16) - 1:c和15位1做与运算;结果=c。
  26. // 这个判断为true的条件即为:c>0(已加锁状态)
  27. if (exclusiveCount(c) != 0 &&
  28. // 持有锁的线程非当前线程。
  29. getExclusiveOwnerThread() != current)
  30. return -1;
  31. // c >>> 16
  32. int r = sharedCount(c);
  33. if (!readerShouldBlock() &&
  34. // MAX_COUNT = (1 << 16) - 1 = 2的15次方-1
  35. r < MAX_COUNT &&
  36. // 将c的状态改为10000..c
  37. compareAndSetState(c, c + SHARED_UNIT)) {
  38. if (r == 0) {
  39. firstReader = current;
  40. firstReaderHoldCount = 1;
  41. } else if (firstReader == current) {
  42. firstReaderHoldCount++;
  43. } else {
  44. HoldCounter rh = cachedHoldCounter;
  45. if (rh == null || rh.tid != getThreadId(current))
  46. cachedHoldCounter = rh = readHolds.get();
  47. else if (rh.count == 0)
  48. readHolds.set(rh);
  49. rh.count++;
  50. }
  51. return 1;
  52. }
  53. return fullTryAcquireShared(current);
  54. }
  55. // AQS实现
  56. private void doAcquireShared(int arg) {
  57. final Node node = addWaiter(Node.SHARED);
  58. boolean failed = true;
  59. try {
  60. boolean interrupted = false;
  61. for (;;) {
  62. final Node p = node.predecessor();
  63. if (p == head) {
  64. int r = tryAcquireShared(arg);
  65. if (r >= 0) {
  66. setHeadAndPropagate(node, r);
  67. p.next = null; // help GC
  68. if (interrupted)
  69. selfInterrupt();
  70. failed = false;
  71. return;
  72. }
  73. }
  74. if (shouldParkAfterFailedAcquire(p, node) &&
  75. parkAndCheckInterrupt())
  76. interrupted = true;
  77. }
  78. } finally {
  79. if (failed)
  80. cancelAcquire(node);
  81. }
  82. }

参考文献:

1、深入分析AQS实现原理(此文章为本文的框架)
2、