1. 读写锁介绍

在并发场景下解决线程安全问题,我们一般会使用独占锁(synchronized 或 concurrents 包中实现了 Lock 接口的ReentrantLock),也就是在同一时刻只有一个线程能获取到锁。但在读多写少的场景下,用独占锁会出现性能瓶颈,对此 java 还提供了另外一个实现 Lock 接口的 ReentrantReadWriteLock (读写锁)。

读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞

  1. 公平性选择:支持非公平性(默认)和公平的锁获取方式,吞吐量 还是非公平优于公平;
  2. 重入性:支持重入,读锁获取后能再次获取,写锁获取之后能够再次获取写锁,同时也能够获取读锁;
  3. 锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁


2. 写锁详解

2.1 写锁的获取

同步组件的实现聚合了同步器(AQS),并通过重写重写同步器(AQS)中的方法实现同步组件的同步语义(关于同步组件的实现层级结构可以看这篇文章,AQS的底层实现分析可以看这篇文章)。因此,写锁的实现依然也是采用这种方式。在同一时刻写锁是不能被多个线程所获取,很显然写锁是独占式锁,而实现写锁的同步语义是通过重写 AQS 中的 tryAcquire 方法实现的。源码为:

  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1. If read count nonzero or write count nonzero
  5. * and owner is a different thread, fail.
  6. * 2. If count would saturate, fail. (This can only
  7. * happen if count is already nonzero.)
  8. * 3. Otherwise, this thread is eligible for lock if
  9. * it is either a reentrant acquire or
  10. * queue policy allows it. If so, update state
  11. * and set owner.
  12. */
  13. Thread current = Thread.currentThread();
  14. // 1. 获取写锁当前的同步状态
  15. int c = getState();
  16. // 2. 获取写锁获取的次数
  17. int w = exclusiveCount(c);
  18. if (c != 0) {
  19. // (Note: if c != 0 and w == 0 then shared count != 0)
  20. // 3.1 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话
  21. // 当前线程获取写锁失败
  22. if (w == 0 || current != getExclusiveOwnerThread())
  23. return false;
  24. if (w + exclusiveCount(acquires) > MAX_COUNT)
  25. throw new Error("Maximum lock count exceeded");
  26. // Reentrant acquire
  27. // 3.2 当前线程获取写锁,支持可重复加锁
  28. setState(c + acquires);
  29. return true;
  30. }
  31. // 3.3 写锁未被任何线程获取,当前线程可获取写锁
  32. if (writerShouldBlock() ||
  33. !compareAndSetState(c, c + acquires))
  34. return false;
  35. setExclusiveOwnerThread(current);
  36. return true;
  37. }

这段代码的逻辑请看注释,这里有一个地方需要重点关注,exclusiveCount(c)方法,该方法源码为:

  1. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

其中 EXCLUSIVE_MASK 为:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE _MASK 为 1 左移 16 位然后减 1,即为 0x0000FFFF。而 exclusiveCount 方法是将同步状态(state 为int类型)与 0x0000FFFF 相 ,即取同步状态的低16位。那么低16位代表什么呢?根据 exclusiveCount 方法的注释为独占式获取的次数即写锁被获取的次数,现在就可以得出来一个结论:同步状态的低16位用来表示写锁的获取次数。同时还有一个方法值得我们注意:

  1. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

该方法是获取读锁被获取的次数,是将同步状态(int c)右移16次,即取同步状态的高16位,现在我们可以得出另外一个结论同步状态的高16位用来表示读锁被获取的次数。现在还记得我们开篇说的需要弄懂的第一个问题吗?读写锁是怎样实现分别记录读锁和写锁的状态的,现在这个问题的答案就已经被我们弄清楚了,其示意图如下图所示:

读写锁 ReentrantReadWriteLock - 图1

现在我们回过头来看写锁获取方法 tryAcquire,其主要逻辑为:当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增加写状态。

2.2.写锁的释放

写锁释放通过重写 AQS 的 tryRelease 方法,源码为:

  1. protected final boolean tryRelease(int releases) {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. //1. 同步状态减去写状态
  5. int nextc = getState() - releases;
  6. //2. 当前写状态是否为0,为0则释放写锁
  7. boolean free = exclusiveCount(nextc) == 0;
  8. if (free)
  9. setExclusiveOwnerThread(null);
  10. //3. 不为0则更新同步状态
  11. setState(nextc);
  12. return free;
  13. }

源码的实现逻辑请看注释,不难理解与 ReentrantLock 基本一致,这里需要注意的是,减少写状态
int nextc = getState() - releases;
只需要用当前同步状态直接减去写状态的原因正是我们刚才所说的写状态是由同步状态的**低16位**表示的

3.读锁详解

3.1.读锁的获取

看完了写锁,现在来看看读锁,读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取也就是一种共享式锁。按照之前对AQS 介绍,实现共享式同步组件的同步语义需要通过重写AQS的 tryAcquireShared 方法和tryReleaseShared 方法。读锁的获取实现方法为:

  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. * Walkthrough:
  4. * 1. If write lock held by another thread, fail.
  5. * 2. Otherwise, this thread is eligible for
  6. * lock wrt state, so ask if it should block
  7. * because of queue policy. If not, try
  8. * to grant by CASing state and updating count.
  9. * Note that step does not check for reentrant
  10. * acquires, which is postponed to full version
  11. * to avoid having to check hold count in
  12. * the more typical non-reentrant case.
  13. * 3. If step 2 fails either because thread
  14. * apparently not eligible or CAS fails or count
  15. * saturated, chain to version with full retry loop.
  16. */
  17. Thread current = Thread.currentThread();
  18. int c = getState();
  19. //1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前
  20. // 线程获取读锁失败返回-1
  21. if (exclusiveCount(c) != 0 &&
  22. getExclusiveOwnerThread() != current)
  23. return -1;
  24. int r = sharedCount(c);
  25. if (!readerShouldBlock() &&
  26. r < MAX_COUNT &&
  27. //2. 当前线程获取读锁
  28. compareAndSetState(c, c + SHARED_UNIT)) {
  29. //3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法
  30. //返回当前获取读锁的次数
  31. if (r == 0) {
  32. firstReader = current;
  33. firstReaderHoldCount = 1;
  34. } else if (firstReader == current) {
  35. firstReaderHoldCount++;
  36. } else {
  37. HoldCounter rh = cachedHoldCounter;
  38. if (rh == null || rh.tid != getThreadId(current))
  39. cachedHoldCounter = rh = readHolds.get();
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;
  43. }
  44. return 1;
  45. }
  46. //4. 处理在第二步中CAS操作失败的自旋已经实现重入性
  47. return fullTryAcquireShared(current);
  48. }

代码的逻辑请看注释,需要注意的是 当写锁被其他线程获取后,读锁获取失败,否则获取成功利用 CAS 更新同步状态。另外,当前同步状态需要加上 SHARED_UNIT((1 << SHARED_SHIFT)即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的,这段代码就不展开说了,有兴趣可以看看。

3.2.读锁的释放

读锁释放的实现主要通过方法tryReleaseShared,源码如下,主要逻辑请看注释:

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. // 前面还是为了实现getReadHoldCount等新功能
  4. if (firstReader == current) {
  5. // assert firstReaderHoldCount > 0;
  6. if (firstReaderHoldCount == 1)
  7. firstReader = null;
  8. else
  9. firstReaderHoldCount--;
  10. } else {
  11. HoldCounter rh = cachedHoldCounter;
  12. if (rh == null || rh.tid != getThreadId(current))
  13. rh = readHolds.get();
  14. int count = rh.count;
  15. if (count <= 1) {
  16. readHolds.remove();
  17. if (count <= 0)
  18. throw unmatchedUnlockException();
  19. }
  20. --rh.count;
  21. }
  22. for (;;) {
  23. int c = getState();
  24. // 读锁释放 将同步状态减去读状态即可
  25. int nextc = c - SHARED_UNIT;
  26. if (compareAndSetState(c, nextc))
  27. // Releasing the read lock has no effect on readers,
  28. // but it may allow waiting writers to proceed if
  29. // both read and write locks are now free.
  30. return nextc == 0;
  31. }
  32. }

4.锁降级

读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级,关于锁降级下面的示例代码摘自 ReentrantWriteReadLock 源码中:

  1. void processCachedData() {
  2. rwl.readLock().lock();
  3. if (!cacheValid) {
  4. // Must release read lock before acquiring write lock
  5. rwl.readLock().unlock();
  6. rwl.writeLock().lock();
  7. try {
  8. // Recheck state because another thread might have
  9. // acquired write lock and changed state before we did.
  10. if (!cacheValid) {
  11. data = ...
  12. cacheValid = true;
  13. }
  14. // Downgrade by acquiring read lock before releasing write lock
  15. rwl.readLock().lock();
  16. } finally {
  17. rwl.writeLock().unlock(); // Unlock write, still hold read
  18. }
  19. }
  20. try {
  21. use(data);
  22. } finally {
  23. rwl.readLock().unlock();
  24. }
  25. }
  26. }