概述

ReentrantReadWriteLock 结构图.png

类名 描述
Sync ReentrantReadWriteLock 内部类,一个抽象类,继承 AQS 类,实现了 AQS 绝大多数的方法,并暴露两个抽象方法让子类实现:① writeShouldBlock() ② readerShouldBlock()
NonfairSync Sync 实现类,实现非公平锁。writeShouldBlock() 方法直接返回 false。
FairSync Sync 实现类,实现公平锁。无论 read 还是 write,都是调用 hasQueuedPredecessors() 方法
ReadLock Lock 接口的实现类。内部方法都是委托 ReentrantReadWriteLock 对象实现
WrietLock Lock 接口的实现类。内部方法都是委托 ReentrantReadWriteLock 对象实现

ReadLock 和 WriteLock 都是使用同一个 Sync 子类实现,但是为什么一个 AQS 类可以既实现共享模式,又能实现独占模式呢?
共享模式、独占模式.png
AQS 实现共享模式和独占模式.png
AQS 的精髓就在于对 state 的值是如何定义的。

锁的类型 state 值 描述
独占模式 0 线程可以获得锁
1 锁被其它线程占用,当前线程只能插入阻塞队列
共享模式

总的来说,独占模式和共享模式对 state 操作完全不一样,那 ReentrantReadWrite 是如何实现在一个 AQS 实例中实现两种锁模式,答案是将 state 这个 int 类型拆分成两部分,高 16 位用于共享模式,低 16 位用于独占模式。

源码分析

内部类:Sync

我们先瞅瞅 ReentrantReadWrite 内部类 Sync 的重要变量:

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 6317671515068378041L;
  3. // 将 int 的 state 拆分成两部分,低 16 位表示独占模式,即写锁线程持有数量
  4. // 高 16 位表示共享模式,即读锁线程持有数量
  5. static final int SHARED_SHIFT = 16;
  6. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  7. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  8. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  9. // 取高 16 位值,返回共享锁(读锁)的获取次数(包括重入次数)
  10. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  11. // 取低 16 位值,返回独占锁(写锁)的获取次数(包括重入次数)
  12. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  13. // 记录每个线程持有读锁数量(读锁重入),每个线程都有一个对应的实例对象
  14. static final class HoldCounter {
  15. // 持有读锁的次数
  16. int count = 0;
  17. // 线程ID
  18. final long tid = getThreadId(Thread.currentThread());
  19. }
  20. // 使用 ThreadLocal 存储这个变量值
  21. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  22. public HoldCounter initialValue() {
  23. return new HoldCounter();
  24. }
  25. }
  26. // 从这个对象中可以获取当前线程持有读锁的数量
  27. private transient ThreadLocalHoldCounter readHolds;
  28. /**
  29. * The hold count of the last thread to successfully acquire
  30. * readLock. This saves ThreadLocal lookup in the common case
  31. * where the next thread to release is the last one to
  32. * acquire. This is non-volatile since it is just used
  33. * as a heuristic, and would be great for threads to cache.
  34. *
  35. * <p>Can outlive the Thread for which it is caching the read
  36. * hold count, but avoids garbage retention by not retaining a
  37. * reference to the Thread.
  38. *
  39. * <p>Accessed via a benign data race; relies on the memory
  40. * model's final field and out-of-thin-air guarantees.
  41. */
  42. /**
  43. * 缓存最后一个成功获得读锁的线程的读锁重入次数
  44. * 无论哪个线程获得读锁,都会设置这个变量。为什么不直接从ThreadLocal获取呢?
  45. * 基于这么一个事实:实际上,一个线程获得读锁后并很快释放,所以在当前线程获取读锁这段时间,
  46. * 其它线程大概率不会获取读锁,因此,这个缓存可以提高最后一个线程操作的效率,节省从ThreadLocal查找的时间
  47. */
  48. private transient HoldCounter cachedHoldCounter;
  49. /**
  50. * firstReader is the first thread to have acquired the read lock.
  51. * firstReaderHoldCount is firstReader's hold count.
  52. *
  53. * <p>More precisely, firstReader is the unique thread that last
  54. * changed the shared count from 0 to 1, and has not released the
  55. * read lock since then; null if there is no such thread.
  56. *
  57. * <p>Cannot cause garbage retention unless the thread terminated
  58. * without relinquishing its read locks, since tryReleaseShared
  59. * sets it to null.
  60. *
  61. * <p>Accessed via a benign data race; relies on the memory
  62. * model's out-of-thin-air guarantees for references.
  63. *
  64. * <p>This allows tracking of read holds for uncontended read
  65. * locks to be very cheap.
  66. */
  67. // 缓存首个获取读锁的线程(且未释放),以及它所持有的读锁的数量
  68. // 这允许追踪无竞争读锁的读次数非常容易
  69. private transient Thread firstReader = null;
  70. private transient int firstReaderHoldCount;
  71. Sync() {
  72. // 初始化 readHolds
  73. readHolds = new ThreadLocalHoldCounter();
  74. // 为了保证 readHolds 对其它线程可见,相当于做了一道写屏障
  75. setState(getState());
  76. }
  77. // ...
  78. }
  1. int 类型的 state 变量被拆分成两部分
    1. 高 16 位表示读锁的获取次数,包含重入次数。当线程获得读锁则 +1,释放读锁 -1,重入也 +1
    2. 低 16 位表示写锁的获取次数,包含重入次数。因为读锁是独占锁,所以 >1 的值表示线程重入次数。
  2. 每个线程都有 ThreadLocal<HoldCounter> 本地变量,里面存储当前线程获取读锁的次数。因为高 16 位的 state 值包含重入和线程获取读锁的次数,因此使用每个线程都拥有 HoldCounter 变量来区分是锁重入还是线程获取的读锁。
  3. cacheHoldCounter 具有缓存功能。缓存最后一次获取读锁的线程的 HoldCounter,这是基于每个线程获得读锁到释放读锁这一过程是非常短暂的,较少出现并发情况,所以缓存最后一次获取读锁的线程以减少线程查询 ThreadLocal 过程,从而提高性能。
  4. firstReaderfirstReaderHoldCount 其实也是充当缓存作用。记录首个获取读锁且未释放读锁的线程。这两个变量的目的在于使得在读锁不产生竞争的情况下,记录读锁重入次数这一操作非常快捷。

    获取读锁:ReadLock#lock

    尝试获得读锁。当写锁没有被其它线程持有时,这个方法会立即返回。 ```java // java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock public void lock() { sync.acquireShared(1); }

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared // 这个就是一个模板类,子类需要根据实际需求实现自己的tryAcquireShared方法 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }

`tryAcquireShared(arg)` 方法返回值:

1. 如果 `< 0` 表示没有获得共享锁(这里就是读锁),意味此次尝试失败。`> 0` 意味着获得共享锁,意味着此次尝试成功。
1. 如果尝试获得锁操作失败,那么进入 `doAcquireShared(arg)` 方法,这个方法由 `AQS` 实现,它会将当前线程封装为 Node 对象并插入阻塞队列的末尾,阻塞当前线程。然后等待前驱节点唤醒。线程被唤醒后,依旧调用 `tryAcquireShared(arg)` 方法尝试获得锁。如果尝试失败,依旧阻塞,否则退出 `for(;;)` 无限循环,并返回。

对于公平和非公平来说,`tryAcquireShared(arg)` 在 `ReentrantReadWriteLock` 并没有做区分:
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();

    // #1 首先判断是否有其它线程持有「写锁」,如果持有,显然当前线程是无法获得「读锁」的。
    // 当前,还有一种情况:当前线程已经持有「写锁」,它也可以获得「读锁」。
    // 当前线程在持有「写锁」的情况下,是可以允许获得「读锁」的。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        // #2 如果都不满足,直接返回 -1,后续会将当前线程封装为Node节点并插入到「阻塞队列」的末尾
        return -1;

    // #2 获取读锁的获取次数
    int r = sharedCount(c);

    // #3 readerShouldBlock():读锁获取是否需要阻塞。公平和非公平有不同实现方法,
    //    在非公平模式下,如果阻塞队列不为空,会判断首个线程是否准备获取写锁,如果是,则返回false,否则返回true。
    // 如果返回true,会直接调用fullTryAcquireShared获取共享读锁
    if (!readerShouldBlock() &&

        // #4 判断是否会溢出,其实数量很大的,2^16 - 1
        r < MAX_COUNT &&

        // #5 通过CAS更新 state 值:将高16位+1,如果成功,意味着成功获得读锁。
        compareAndSetState(c, c + SHARED_UNIT)) {

        // #6 走到这里,说明当前线程已成功获得「读锁」啦
        // 下面就是更新相关缓存

        // #7 如果 r=0,说明是首个获得「读锁」的线程,需要重置 firstReader
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // #8 首个线程重入了
            firstReaderHoldCount++;
        } else {
            // #9 更新 cachedHoldCounter,这个类是缓存最后一个获得读锁的线程
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                // #10 如果缓存的是其它线程,更新为当前线程的HoldCounter
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // #11 意味着 cacheHolderCounter 为当前线程,但是 count = 0
                // 更新当前线程ThreadLocal的值。
                readHolds.set(rh);

            // #12 count + 1
            rh.count++;
        }

        // #13 返回 1 表示成功获得「读锁」
        return 1;
    }

    // #14 走到这里,说明当前线程由于某些原因无法获得「读锁」,需要
    return fullTryAcquireShared(current);
}

线程走到 #14 代码处,需要满足以下情况:

  1. readerShouldBlock() 返回 false,这里会分公平锁和非公平锁之分。
    1. 公平模式:调用 hasQueuedPredecessors() 判断阻塞队列中是否有其它元素在等待锁。
    2. 非公平模式:调用 apparentlyFirstQueuedIsExclusive 判断阻塞队列中 head 的第一个后继节点是否需要获取写锁。如果是,让这个写锁先来,避免写锁饥饿。这里,读锁对写锁在非公平模式下做了让步。
  2. 或 CAS 设置 state 失败。

    fullTryAcquireShared

    来到这个方法的原因之一可能是 CAS 失败,如果是因为这样就要使得当前线程进入阻塞队列,消耗未免过大。所以这个方法其实是给 CAS 失败的线程一次重生的机会。

  3. 如果有线程持有写锁

    1. 重入线程,CAS 设置 state 的值。
    2. 非重入线程,返回 -1,表示获取读锁失败。
  4. 没有线程持有写锁exclusiveCount(c) == 0

    1. readerShouldBlock() 返回 true

      1. 如果是首个获得读锁的线程,放过。
      2. 非首个获得读锁线程, ```java // java.util.concurrent.locks.ReentrantReadWriteLock.Sync#fullTryAcquireShared final int fullTryAcquireShared(Thread current) { /*
      • This code is in part redundant with that in
      • tryAcquireShared but is simpler overall by not
      • complicating tryAcquireShared with interactions between
      • retries and lazily reading hold counts. */ HoldCounter rh = null;

      // #1 死循环 for (;;) { // #2 获取 state 状态 int c = getState();

      // #3 如果其它线程持有写锁,并且判断当前线程是否为持有写锁的线程 // 如果不是,返回 -1,锁写是独占锁 if (exclusiveCount(c) != 0) {

        if (getExclusiveOwnerThread() != current)
            return -1;
        // else we hold the exclusive lock; blocking here
        // would cause deadlock.
      

      } else if (readerShouldBlock()) {

        // #4 readerShouldBlock() = true 且没有线程持有「写锁」
        // Make sure we're not acquiring read lock reentrantly
        // 重入获得读锁
        // firstReader 线程读锁重入
        // 非公平模式下,readerShouldBlock 返回 true,说明阻塞队列不为空,
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
        } else {
            // 这一段代码目的是处理读锁重入。为了确保读锁重入操作能成功,而非
            // 被重新放入阻塞队列中等待。
            // 还需要注意对ThreadLocal变量的处理,有一个remove()操作
            if (rh == null) {
                   // 获取最近缓存的HoldCounter
                rh = cachedHoldCounter;
                // #5 如果 cachedHoldCounter 缓存的非当前线程或根本没有缓存
                // 从ThreadLocal获取当前线程的HoldCounter并更新
                if (rh == null || rh.tid != getThreadId(current)) {
                    // 从当前线程的ThreadLocal获取HoldCounter对象
                    rh = readHolds.get();
                    // #6 如果 count=0,属于上一行代码初始化造成,执行 remove
                    // 避免内存泄漏
                    if (rh.count == 0)
                        readHolds.remove();
                }
            }
      
            // #7 将当前线程放入阻塞队列中
            if (rh.count == 0)
                return -1;
        }
      

      } if (sharedCount(c) == MAX_COUNT)

        throw new Error("Maximum lock count exceeded");
      

      // #8 CAS 更新 if (compareAndSetState(c, c + SHARED_UNIT)) {

        if (sharedCount(c) == 0) {
            // #9 CAS成功,且发现 state = 0,就把当前线程设置为 firstReader
            // 表示首个获取读锁的线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // #10 将 cachedHoldCounter 更新为当前线程
            if (rh == null)
                rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
            cachedHoldCounter = rh; // cache for release
        }
      
        // #11 返回 >0 的数,表示成功获得读锁
        return 1;
      

      } } } ```

      释放读锁

      ```java // java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock public void unlock() { sync.releaseShared(1); }

// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

<a name="SO06d"></a>
### tryReleaseShared

1. 通过缓存来加速相关变量的读取操作。
1. CAS 更新 state 的值。
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();

    // #1 缓存加速:当前线程是否为首个获得读锁的线程
    // 直接更新缓存即可
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // #2 当前线程非首个获得读锁的线程,更新"二级缓存" cachedHoldCounter
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            // cacheHoldCounter不是当前线程的缓存,从ThreadLocal中读取
            rh = readHolds.get();

        // #3 这里就是最新值
        int count = rh.count;
        if (count <= 1) {
            // #4 表示已不再持有读锁,直接remove,避免内存泄漏
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }

        // #5 更新 count 值
        --rh.count;
    }

    // #6 CAS 更新 state
    for (;;) {
        int c = getState();
        // #7 相当于是 state 高 16 位 - 1 
        int nextc = c - SHARED_UNIT;

        // #8 CAS 更新
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.

            // #9 true:读/写锁都没有线程占用,主要是帮助后继线程唤醒获取「写锁」的线程
            // 防止写饥饿
            return nextc == 0;
    }
}

获取写锁

我们知道,当一个线程拥有写锁时,其它线程不能拥有读锁或写锁。它是一把独占锁。
如果有其它线程持有读锁或写锁,那么当前线程需要进入阻塞队列等待前驱节点唤醒。

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 如果获取独占锁失败,则线程被包装为Node对象并插入阻塞队列末尾
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire

  1. 如果 read count 不等于 0 或 write count 不等于,并且非重入线程,本次尝试获得锁失败。
  2. 如果计数已经饱和,本次尝试获取锁失败。
  3. 如果满足线程重入或满足入队策略,那么当前线程有资格获得锁。 ```java // java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire protected final boolean tryAcquire(int acquires) { /*

    • Walkthrough:
      1. If read count nonzero or write count nonzero
    • and owner is a different thread, fail.
      1. If count would saturate, fail. (This can only
    • happen if count is already nonzero.)
      1. Otherwise, this thread is eligible for lock if
    • it is either a reentrant acquire or
    • queue policy allows it. If so, update state
    • and set owner. */ // #1 获取当前线程 Thread current = Thread.currentThread();

      // #2 获取当前AQS的state int c = getState();

      // #3 获取写锁获取次数 int w = exclusiveCount(c);

      // c == 0:意味着读锁和写锁都没被其它线程所持有 // c != 0 意味着至少有一个线程持有写锁或读锁 // c !=0 && w == 0 意味着写锁可用,但是有线程(也包括自己)持有「读锁」。 // c != 0 && w != 0 意味着写锁不可用,其它线程(也包括自己)持有「写锁」。 // 要判断当前是否可拥有锁,在前面状态的基础上,可能还需要判断是否当前线程是否重入 if (c != 0) { // #5 读锁被其它线程占用了,无法获得写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false;

      // #6 溢出 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error(“Maximum lock count exceeded”);

      // #7 当前线程为重入线程,更新 state setState(c + acquires);

      // #8 true:成功获得「写锁」 return true; }

      // #9 c == 0,writerShouldBlock() 返回 false,就可以CAS更新 state 值 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;

      // #10 CAS 更新成功,当前线程成功获得「写锁」,设置独占线程 setExclusiveOwnerThread(current);

      // #11 return true return true; }

// 返回当前独占锁的线程数量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

<a name="hGKVJ"></a>
### writerShouldBlock
公平模式和非公平模式在这里是有区别的:

1. 非公平模式:直接返回 `false`。意味着当前线程直接去争抢**写锁**。
1. 公平模式:需要判断阻塞队列是否有其它线程正在排队。
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
// 非公平模式
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

// 公平模式
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

/**
 * ① 空队列,返回 false
 * ② 首个等待线程不是当前线程
 */ 
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

释放写锁

由于写锁是独占锁,因此,释放过程比较简单。当线程完全释放锁后,才会唤醒后驱节点。

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
    sync.release(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    // #1 当线程完全释放写锁时,才会返回true
    if (tryRelease(arg)) {
        // #2 进入到这里,意味着当前线程完全释放写锁,那么可以将唤醒后驱节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease

// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
/**
 * true:当前线程完全释放「写锁」
 * false:当前线程并未完全释放「写锁」
 */
protected final boolean tryRelease(int releases) {
    // #1 判断当前线程是否独占锁对象,如果不是,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();

    // #2 计算修改后的state值
    int nextc = getState() - releases;

    // #3 判断是否完全释放锁(可重入)
    boolean free = exclusiveCount(nextc) == 0;

    // #4 如果是的话,就将独占线程设置为null
    if (free)
        setExclusiveOwnerThread(null);

    // #5 更新 state 的值
    setState(nextc);

    // #6 返回判断是否已完全释放锁
    return free;
}

// c & 0..0 1...11111(15个1)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

锁降级

在非公平模式下获取读锁,线程会给写锁做一些让步:判断首个等待线程是否获取写锁。如果是,则乖乖排队,不会争抢。
而锁降级是指将持有写锁的线程,去获取读锁的这一行为称为锁降级(Lock downgrading)。此刻,线程既拥有了写锁,又拥有了读锁。
但是,逆向是不可以发生了,即当前线程在拥有读锁的情况下,去获取写锁。这会造成死锁。

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // 看下这里返回 false 的情况:
        //   c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
        //   c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
        //   也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        ...
    }
    ...
}

如果线程 t 先获得读锁,然后调用 tryAcquire() 尝试获得写锁,这个方法会返回 false。这就导致当前线程会加入阻塞队列并被阻塞,自己将自己休眠。但是由于写锁没有释放,其它线程无法获得读锁,也会加入阻塞队列。这就导致线程 t 永远无法被其它线程所唤醒。