前序

这里有个知识点容易混,那就是 阻塞、唤醒 和 锁

  • 阻塞 这里是调用的LockSupport.part(); 当前线程 阻塞了,cpu不会去抢占他, 不一定获取到锁
  • 唤醒 这里指的是LockSupport.unpart(thread) ;当前线程处于就绪状态,有资格被cpu抢占了
  • 锁 这里指的是aqs中的 state(不同场景 意义不同)

    锁实现的基本原理

  • Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该

类的父类是AbstractOwnableSynchronizer

  • 此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,

需要几个核心要素

    1. 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作, 使用CAS保证线程安全
      • 在 AbstractQueuedSynchronizer中 持有state
      • 当state=0时,没有线程持有锁,exclusiveOwnerThread=null
      • 当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
      • 当state > 1时,说明该线程重入了该锁。
      • 在ReetrantReadWriteLock中 state需要看场景 偏移16位后再具体分析
  • 2.需要记录当前是哪个线程持有锁
    • 由aqs 的父亲 AbstractOwnableSynchronizer持有
  • 3.需要底层支持对一个线程进行阻塞唤醒操作
    • Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark
    • LockSupport工具类对线程进程阻塞或唤醒
  • 4.需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用 CAS
    • 在AQS中利用双向链表和CAS实现了一个阻塞队列
      • 阻塞队列是整个AQS核心中的核心
      • image.png

        Lock

  • 继承体系 image.png

    互斥锁 (ReentrantLock)

  • 类图 Lock 和Condition - 图3

  • ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中

    1. public class ReentrantLock implements Lock, java.io.Serializable {
    2. private final Sync sync;
    3. public ReentrantLock() {
    4. sync = new NonfairSync();
    5. }
    6. public ReentrantLock(boolean fair) {
    7. sync = fair ? new FairSync() : new NonfairSync();
    8. }
    9. public void lock() {
    10. // 具体由子类 公平和非公平2中lock方式
    11. sync.lock();
    12. }
    13. public void unlock() {
    14. // 在aqs 中 LockSupport.unpark(s.thread);
    15. sync.release(1);
    16. }
    17. abstract static class Sync extends AbstractQueuedSynchronizer {
    18. abstract void lock();
    19. }
    20. static final class FairSync extends Sync {
    21. final void lock() {
    22. // 获取锁 LockSupport.park();
    23. acquire(1);
    24. }
    25. }
    26. static final class NonfairSync extends Sync {
    27. final void lock() {
    28. if (compareAndSetState(0, 1))
    29. setExclusiveOwnerThread(Thread.currentThread());
    30. else
    31. acquire(1);
    32. }
    33. }
    34. }

    lock() 分析

  • AbstractQueueSychronizer ```java public final void acquire(int arg) {

    1. if (!tryAcquire(arg) &&
    2. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    3. selfInterrupt();

    } // 就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注 // 意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。 private Node addWaiter(Node mode) {

    1. Node node = new Node(Thread.currentThread(), mode);
    2. // Try the fast path of enq; backup to full enq on failure
    3. Node pred = tail;
    4. if (pred != null) {
    5. node.prev = pred;
    6. if (compareAndSetTail(pred, node)) {
    7. pred.next = node;
    8. return node;
    9. }
    10. }
    11. enq(node);
    12. return node;

    }

// 线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤 // 醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(…)返回 // acquireQueued(…)方法有一个返回值,表示什么意思呢?虽然该方法不会中断响应,但它会 // 记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true;否则,返回 // false。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

// 当 acquireQueued(…)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自 // 己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没 // 有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方 // 法,就可以抛出异常,响应该中断信号。 static void selfInterrupt() { Thread.currentThread().interrupt(); }

// 阻塞就发生在下面这个方法中: private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

/**

  • 线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。
  • park()方法返回有两种情况。
    1. 其他线程调用了unpark(Thread t)。
    2. 其他线程调用了t.interrupt()。 这里要注意的是,lock()不能响应中断,但LockSupport.park() 会响应中断。
  • 也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环。唤
  • 醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过
  • 程,直到拿到锁。
  • 被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。 如果是情况1,会返回false; 如果是情况2,则返回true。 **/
  1. > tryAcquirearg)在aqs中定义 但是具体实现由子类的Sync或者Sync的子类 NonfairSync/FairSync ,在互斥锁中这里 Sync的子类 NonfairSync/FairSy的实现的
  2. > addWaiter(Node.EXCLUSIVE), arg) : 将当前线程放到aqs维护的队列尾部
  3. > acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) aqs 中实现 :如果 当前线程是在头节点(空节点)指向的节点,且获取锁成功,则 返回中断值,如果不是,则在阻塞中间如果被中断过,刷新局部变量,
  4. > selfInterrupt(): 中断自己
  5. - 阻塞就发生在下面这个方法中:
  6. - 注意: 公平非公平的区别在于
  7. - ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1286216/1637374620448-96fe9b56-7c61-4cfc-a203-4c6a7ac64630.png#clientId=u44699f69-2598-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=430&id=u0ada6cd7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=430&originWidth=1259&originalType=binary&ratio=1&rotation=0&showTitle=false&size=91350&status=done&style=none&taskId=u0fd75bab-6ccb-462a-bbbf-b4710947199&title=&width=1259)
  8. - 公平获取锁的前提是 判断自己是否是有前驱节点
  9. - 非公平锁 在尝试获取锁前会先尝试自旋,获取不到 在尝试获取锁的时候不会考虑队列 再尝试自旋
  10. <a name="qBHNs"></a>
  11. ### unlock()分析
  12. - AbstractQueueSynchronizer
  13. ```java
  14. public final boolean release(int arg) {
  15. if (tryRelease(arg)) {
  16. Node h = head;
  17. if (h != null && h.waitStatus != 0)
  18. unparkSuccessor(h);
  19. return true;
  20. }
  21. return false;
  22. }
  23. private void unparkSuccessor(Node node) {
  24. int ws = node.waitStatus;
  25. if (ws < 0)
  26. compareAndSetWaitStatus(node, ws, 0);
  27. Node s = node.next;
  28. if (s == null || s.waitStatus > 0) {
  29. s = null;
  30. for (Node t = tail; t != null && t != node; t = t.prev)
  31. if (t.waitStatus <= 0)
  32. s = t;
  33. }
  34. // 重点在这里 让队列的第一个线释放锁
  35. if (s != null)
  36. LockSupport.unpark(s.thread);
  37. }
  • ReentrantLock

    1. // 主要2个动作
    2. // 1、 state -1
    3. // 2、 如果 state = 0 则 setExclusiveOwnerThread(null)
    4. protected final boolean tryRelease(int releases) {
    5. int c = getState() - releases;
    6. if (Thread.currentThread() != getExclusiveOwnerThread())
    7. throw new IllegalMonitorStateException();
    8. boolean free = false;
    9. if (c == 0) {
    10. free = true;
    11. setExclusiveOwnerThread(null);
    12. }
    13. setState(c);
    14. return free;
    15. }

    如果 当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head指向的节点获取锁。

  • 小结:

    • release()里面做了两件事:
      • tryRelease(…)方法释放锁;
      • unparkSuccessor(…)方法唤醒队列中的继者。

        lockInterruptibly

  • ReentrantLock

    1. public void lockInterruptibly() throws InterruptedException {
    2. sync.acquireInterruptibly(1);
    3. }
  • AbstractQueueSynchronizer ```java public final void acquireInterruptibly(int arg)

    1. throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. if (!tryAcquire(arg))
    5. doAcquireInterruptibly(arg);

    }

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 重点在这里,如果阻塞期间被中断 说明 // 当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出 // InterruptedException,跳出for循环,整个方法返回。 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

  1. <a name="EOReN"></a>
  2. ### tryLock
  3. - ReentrantLock
  4. ```java
  5. public boolean tryLock() {
  6. return sync.nonfairTryAcquire(1);
  7. }
  • tryLock()实现基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;

如果操作不成功则直接返回false,也不阻塞。

读写锁 (ReadWriteLock)

和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间不互斥。 读读不互斥,读写互斥,写写互斥

ReadWriteLock是一个接口,内部由两个Lock接口组成。 Lock 和Condition - 图4note : 说明 ReadWriteLock 持有 lock 而不是实现或者继承

  • ReentrantReadWriteLock ```java public interface ReadWriteLock {

    Lock readLock(); Lock writeLock(); }

public class ReentrantReadWriteLock implements ReadWriteLock{

  1. private final ReentrantReadWriteLock.ReadLock readerLock;
  2. private final ReentrantReadWriteLock.WriteLock writerLock;
  3. final Sync sync;
  4. public ReentrantReadWriteLock() {
  5. this(false);
  6. }
  7. public ReentrantReadWriteLock(boolean fair) {
  8. sync = fair ? new FairSync() : new NonfairSync();
  9. readerLock = new ReadLock(this);
  10. writerLock = new WriteLock(this);
  11. }

public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync;

  1. protected ReadLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }
  4. public void lock() {
  5. sync.acquireShared(1);
  6. }
  7. public void unlock() {
  8. sync.releaseShared(1);
  9. }

} public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync;

  1. protected WriteLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }
  4. // 写锁 的实现类似 互斥锁
  5. public void lock() {
  6. sync.acquire(1);
  7. }
  8. public void unlock() {
  9. sync.release(1);
  10. }

} /* 同互斥锁一样,读写锁也是用state变量来表示锁状态的。只是state变量在这里的含义和互斥锁完全 不同。在内部类Sync中,对state变量进行了重新定义 也就是把 state 变量拆成两半,低16位,用来记录写锁。但同一时间既然只能有一个线程写,为什 么还需要16位呢?这是因为一个写线程可能多次重入。例如,低16位的值等于5,表示一个写线程重入 了5次。 高16位,用来“读”锁。例如,高16位的值等于5,既可以表示5个读线程都拿到了该锁;也可以表示 一个读线程重入了5次 / abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  1. // 读锁偏移量
  2. static int sharedCount(int state) { return state >>> SHARED_SHIFT; }
  3. // 写锁偏移量
  4. static int exclusiveCount(int state) { return state & EXCLUSIVE_MASK; }
  5. }

}

  1. - 也就是说,当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后
  2. 分别调用lock/unlock
  3. - 为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?
  4. 这是因为无法用一次CAS 同时操作两个int变量,所以用了一个int型的高16位和低16位分别表示读 <br />锁和写锁的状态。
  5. - state=0时,说明既没有线程持有读锁,也没有线程持有写锁;
  6. - state != 0时,要么有线程持有 读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过 sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁
  7. <a name="jwUc1"></a>
  8. ### 读写锁实现的基本原理
  9. > 从表面来看,ReadLockWriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两
  10. > 个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和写线程之间不互斥(可以
  11. > 同时拿到这把锁),读线程之间不互斥,写线程之间互斥。
  12. > 从下面的构造方法也可以看出,readerLockwriterLock实际共用同一个sync对象。sync对象同互
  13. > 斥锁一样,分为非公平和公平两种策略,并继承自AQS
  14. <a name="sgHq0"></a>
  15. ### AQS的两对模板方法
  16. - ReentrantReadWriteLock
  17. ```java
  18. public static class ReadLock implements Lock, java.io.Serializable {
  19. private final Sync sync;
  20. protected ReadLock(ReentrantReadWriteLock lock) {
  21. sync = lock.sync;
  22. }
  23. public void lock() {
  24. sync.acquireShared(1);
  25. }
  26. public void unlock() {
  27. sync.releaseShared(1);
  28. }
  29. }
  30. public static class WriteLock implements Lock, java.io.Serializable {
  31. private final Sync sync;
  32. protected WriteLock(ReentrantReadWriteLock lock) {
  33. sync = lock.sync;
  34. }
  35. // 写锁 的实现类似 互斥锁
  36. public void lock() {
  37. sync.acquire(1);
  38. }
  39. public void unlock() {
  40. sync.release(1);
  41. }
  42. }
  • AbstractQueueSynchronizer ```java public final void acquire(int arg) {
    1. if (!tryAcquire(arg) && // tryAcquire方法由多个Sync子 类实现
    2. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    3. selfInterrupt();
    } public final void acquireShared(int arg) {
    1. if (tryAcquireShared(arg) < 0) // tryAcquireShared 方法由多个Sync子 类实现
    2. doAcquireShared(arg);
    }

public final boolean release(int arg) { if (tryRelease(arg)) { // tryRelease方法由多个Sync子 类实现 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // tryReleaseShared 类实现 doReleaseShared(); return true; } return false; }

  1. 将读/写、公平/非公平进行排列组合,就有4种组合。如下图所示,上面的两个方法都是在Sync中实现的。Sync中的两个方法又是模板方法,在NonfairSyncFairSync中分别有实现。最终的对应关系如下:
  2. 1. 读锁的公平实现:Sync.tryAccquireShared()+FairSync中的两个重写的子方法。
  3. 1. 读锁的非公平实现:Sync.tryAccquireShared()+NonfairSync中的两个重写的子方法。
  4. 1. 写锁的公平实现:Sync.tryAccquire()+FairSync中的两个重写的子方法。
  5. 1. 写锁的非公平实现:Sync.tryAccquire()+NonfairSync中的两个重写的子方法
  6. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1286216/1637334015792-114745a5-9b8f-4767-8217-265e396e4850.png#clientId=uafe0765c-471b-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=521&id=ufe2ad3cf&margin=%5Bobject%20Object%5D&name=image.png&originHeight=521&originWidth=885&originalType=binary&ratio=1&rotation=0&showTitle=false&size=53863&status=done&style=none&taskId=u89eeed63-5725-406f-afe6-92ed5887649&title=&width=885)
  7. - ReentrantReadWriteLock.NonfairSync
  8. ```java
  9. static final class NonfairSync extends Sync {
  10. // 写线程抢锁的时候是否应该阻塞
  11. final boolean writerShouldBlock() {
  12. return false; // 写线程在抢锁之前永远不被阻塞,非公平锁
  13. }
  14. // 读线程抢锁的时候是否应该阻塞
  15. final boolean readerShouldBlock() {
  16. /* 读线程抢锁的时候,当队列中第一个元素是写线程的时候要阻塞
  17. */
  18. return apparentlyFirstQueuedIsExclusive();
  19. }
  20. }
  • ReentrantReadWriteLock.FairSync

    1. static final class FairSync extends Sync {
    2. // 写线程抢锁的时候是否应该阻塞
    3. final boolean writerShouldBlock() {
    4. // 写线程在抢锁之前,如果队列中有其他线程在排队,则阻塞。公平锁
    5. return hasQueuedPredecessors();
    6. }
    7. // 读线程抢锁的时候是否应该阻塞
    8. final boolean readerShouldBlock() {
    9. // 读线程在抢锁之前,如果队列中有其他线程在排队,阻塞。公平锁
    10. return hasQueuedPredecessors();
    11. }
    12. }
    • 对于公平,比较容易理解,不论是读锁,还是写锁,只要队列中有其他线程在排队(排队等锁, 或者排队等写锁),就不能直接去抢锁,要排在队列尾部。
    • 对于非公平,读锁和写锁的实现策略略有差异。
      • 写线程能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢 锁。或者state != 0,但那个持有写锁的线程是它自己,再次重入。写线程是非公平的,即 writerShouldBlock()方法一直返回false。
      • 对于读线程,假设当前线程被读线程持有,然后其他读线程还非公平地一直去抢,可能导致写线程 永远拿不到锁,所以对于读线程的非公平,要做一些“约束”。当发现队列的第1个元素是写线程的时候, 读线程也要阻塞,不能直接去抢。即偏向写线程。

        WriteLock 的lock 和unlock

  • 类似于互斥锁 不再重复分析

    ReadLock 的lock和 unlock

  • 读锁是共享锁,其实现策略和排他锁有很大的差异。

    • 读的时候 如果有线程在写 则阻塞;如果有别的线程在读或者 自己自己重复读 都可以

      lock

  • image.png

  • ReentrantReadWriteLock.Sync

    1. protected final int tryAcquireShared(int unused) {
    2. Thread current = Thread.currentThread();
    3. int c = getState();
    4. // 偏移量计算 当前有锁 且为写锁,同时 被别的线程占有,获取锁失败
    5. if (exclusiveCount(c) != 0 &&
    6. getExclusiveOwnerThread() != current)
    7. return -1;
    8. // 偏移量计算 当前读锁
    9. int r = sharedCount(c);
    10. // 分是否公平决定是否应该阻塞 上面已经分析过了
    11. if (!readerShouldBlock()
    12. // 读锁没有超过范围
    13. && r < MAX_COUNT
    14. // 则尝试 自增 1个单位的 偏移量
    15. && compareAndSetState(c, c + SHARED_UNIT)) {
    16. if (r == 0) { // 如果r=0,则当前线程就是第一个读线程
    17. firstReader = current;
    18. firstReaderHoldCount = 1; // 读线程个数为1
    19. } else if (firstReader == current) { // 如果写线程是当前线程
    20. firstReaderHoldCount++; // 如果第一个读线程就是当前线程,表示读线程重入读锁
    21. } else {
    22. // 如果firstReader不是当前线程,则从ThreadLocal中获取当前线程的读锁 个数,并设置当前线程持有的读锁个数
    23. HoldCounter rh = cachedHoldCounter;
    24. if (rh == null || rh.tid != getThreadId(current))
    25. cachedHoldCounter = rh = readHolds.get();
    26. else if (rh.count == 0)
    27. readHolds.set(rh);
    28. rh.count++;
    29. }
    30. return 1;
    31. }
    32. return fullTryAcquireShared(current);
    33. }
  • AbstractQueueSynchronizer

    1. private void doAcquireShared(int arg) {
    2. final Node node = addWaiter(Node.SHARED);
    3. boolean failed = true;
    4. try {
    5. boolean interrupted = false;
    6. for (;;) {
    7. final Node p = node.predecessor();
    8. if (p == head) {
    9. int r = tryAcquireShared(arg);
    10. if (r >= 0) {
    11. setHeadAndPropagate(node, r);
    12. p.next = null; // help GC
    13. if (interrupted)
    14. selfInterrupt();
    15. failed = false;
    16. return;
    17. }
    18. }
    19. if (shouldParkAfterFailedAcquire(p, node) &&
    20. parkAndCheckInterrupt())
    21. interrupted = true;
    22. }
    23. } finally {
    24. if (failed)
    25. cancelAcquire(node);
    26. }
    27. }

    unlock

    image.png

  • AQS

    1. protected final boolean tryReleaseShared(int unused) {
    2. Thread current = Thread.currentThread();
    3. if (firstReader == current) {
    4. // assert firstReaderHoldCount > 0;
    5. if (firstReaderHoldCount == 1)
    6. firstReader = null;
    7. else
    8. firstReaderHoldCount--;
    9. } else {
    10. HoldCounter rh = cachedHoldCounter;
    11. if (rh == null || rh.tid != getThreadId(current))
    12. rh = readHolds.get();
    13. int count = rh.count;
    14. if (count <= 1) {
    15. readHolds.remove();
    16. if (count <= 0)
    17. throw unmatchedUnlockException();
    18. }
    19. --rh.count;
    20. }
    21. for (;;) {
    22. int c = getState();
    23. int nextc = c - SHARED_UNIT;
    24. // 关键在这里
    25. // 因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个
    26. // for循环+CAS操作不断重试。这是tryReleaseShared和tryRelease的根本差异所在。
    27. if (compareAndSetState(c, nextc))
    28. // Releasing the read lock has no effect on readers,
    29. // but it may allow waiting writers to proceed if
    30. // both read and write locks are now free.
    31. return nextc == 0;
    32. }
    33. }

    Condition

    image.pngimage.png

    wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口

和Synchronized 中 wait/notify的阻塞区别

  • Syncnronized 同时唤醒被阻塞的生产者和消费者
  • Condition 的await/signal 可以只唤醒对方

    场景

    以ArrayBlockingQueue为例。如下所示为一个用数组实现的阻塞队列,执行put(…)操作的时候,队 列满了,生产者线程被阻塞;执行take()操作的时候,队列为空,消费者线程被阻塞。

  • ArrayBlockingQueue

    1. public classs ArrayBlockingQueue {
    2. final Object[] items;
    3. int takeIndex;
    4. /** items index for next put, offer, or add */
    5. int putIndex;
    6. /** Number of elements in the queue */
    7. int count;
    8. /** Main lock guarding all access */
    9. final ReentrantLock lock;
    10. /** Condition for waiting takes */
    11. private final Condition notEmpty;
    12. /** Condition for waiting puts */
    13. private final Condition notFull;
    14. }

    image.png

    原理

    每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表 组成的队列,此队列仅阻塞 唤醒作用,是否拥有该锁 依赖aqs的state 注意: 1、 condition的await 作用 :释放锁、 将线程放到Condition维护的队列中,阻塞该线程 2、 condition的signal 作用: 将线程放到aqs的队列中,唤醒该线程 3、 对于 aqs和condition 节点 数据结构一样都是aqs的node对象,condition的定义本身就在aqs中

  • ReentrantLock

    1. public Condition newCondition() {
    2. return sync.newCondition();
    3. }
  • ReentrantReadWriteLock

    1. public static class WriteLock implements Lock, java.io.Serializable {
    2. public Condition newCondition() {
    3. return sync.newCondition();
    4. }
    5. }
    6. public static class ReadLock implements Lock, java.io.Serializable {
    7. // 读锁可以并行读 不需要condition
    8. public Condition newCondition() {
    9. throw new UnsupportedOperationException();
    10. }
    11. }
  • 首先,读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition:

    1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    2. public class ConditionObject implements Condition, java.io.Serializable {
    3. // Condition的所有实现,都在ConditionObject类中
    4. }
    5. abstract static class Sync extends AbstractQueuedSynchronizer {
    6. final ConditionObject newCondition() {
    7. return new ConditionObject();
    8. }
    9. }
    10. }

    下面来看一下在await()/notify()方法中,是如何使用这个队列的。

    await() 分析

  • AQS

    1. /**
    2. * Implements interruptible condition wait.
    3. * <ol>
    4. * <li> If current thread is interrupted, throw InterruptedException.
    5. * <li> Save lock state returned by {@link #getState}.
    6. * <li> Invoke {@link #release} with saved state as argument,
    7. * throwing IllegalMonitorStateException if it fails.
    8. * <li> Block until signalled or interrupted.
    9. * <li> Reacquire by invoking specialized version of
    10. * {@link #acquire} with saved state as argument.
    11. * <li> If interrupted while blocked in step 4, throw InterruptedException.
    12. * </ol>
    13. */
    14. public final void await() throws InterruptedException {
    15. if (Thread.interrupted())
    16. throw new InterruptedException();
    17. // 加入Condition的等待队列
    18. Node node = addConditionWaiter();
    19. // 阻塞在Condition之前必须先释放锁,否则会死锁
    20. int savedState = fullyRelease(node);
    21. int interruptMode = 0;
    22. // 是否在aqs队列中
    23. while (!isOnSyncQueue(node)) {
    24. // 阻塞当前线程
    25. LockSupport.park(this);
    26. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    27. break;
    28. }
    29. // 重新获取锁
    30. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    31. interruptMode = REINTERRUPT;
    32. if (node.nextWaiter != null) // clean up if cancelled
    33. unlinkCancelledWaiters();
    34. if (interruptMode != 0)
    35. // 被中断唤醒,抛中断异常
    36. reportInterruptAfterWait(interruptMode);
    37. }
  • 线程调用 await()的时候,肯定已经先拿到了锁。所以,在 addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的

  • 在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
  • 线程从wait中被唤醒后,必须用acquireQueued(node, savedState)方法重新拿锁。
  • checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否 收到过中断信号。当线程从park中醒来时,有两种可能:
    • 一种是其他线程调用了unpark,
    • 另 一种是收到中断信号。这里的await()方法是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()方法也会返回。
  • isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只 在Condition的队列里,而不在AQS的队列里。但执行signal()操作的时候,会放进AQS的同步队 列

    awaitUninterruptibly()实现分析

  • aqs 中

    1. public final void awaitUninterruptibly() {
    2. Node node = addConditionWaiter();
    3. int savedState = fullyRelease(node);
    4. boolean interrupted = false;
    5. while (!isOnSyncQueue(node)) {
    6. LockSupport.park(this);
    7. if (Thread.interrupted())
    8. // 和await的区别在于 阻塞期间 如果有中断,仅记录 不处理继续在while中
    9. interrupted = true;
    10. }
    11. if (acquireQueued(node, savedState) || interrupted)
    12. selfInterrupt();
    13. }

    signal() 分析

  • aqs中 ```java public final void signal() { // 只有持有锁的线程,才有资格调用signal()方法

    1. if (!isHeldExclusively())
    2. throw new IllegalMonitorStateException();
    3. Node first = firstWaiter;
    4. if (first != null)
    5. doSignal(first);
    6. }

// 唤醒队列中的第1个线程 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /*

  1. * If cannot change waitStatus, the node has been cancelled.
  2. */
  3. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  4. return false;

// 先把Node放入互斥锁的同步队列中,再调用unpark方法 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒动作 LockSupport.unpark(node.thread);

  1. return true;
  2. }
  3. private Node enq(final Node node) {
  4. for (;;) {
  5. Node t = tail;
  6. if (t == null) { // Must initialize
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. node.prev = t;
  11. if (compareAndSetTail(t, node)) {
  12. t.next = node;
  13. return t;
  14. }
  15. }
  16. }
  17. }

```

  • 同 await()一样,在调用 signal()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面 执行await()的时候,把锁释放了。
  • 然后,从队列中取出firstWaiter,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()方法里面的判断条件:while( ! isOnSyncQueue(node)) 这个判断条件满足,说明await线程不是被中断,而是被unpark唤醒的。 notifyAll()与此类似。

    扩展 - StampedLock

    image.png

    首先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示 读锁和写锁的状态。同时,它还需要一个数据的version。但是,一次CAS没有办法操作两个变量,所以 这个state变量本身同时也表示了数据的version。