现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了,这就是经典的 “读者-写者问题”。

读写锁介绍

针对这种读写共存的场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock ,在其内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁。
微信截图_20211122002851.png

线程进入读锁的前提条件:

  • 没有其他线程的写锁
  • 没有写请求或者,有写请求但调用线程和持有锁的线程是同一个。

线程进入写锁的前提条件:

  • 没有其他线程的读锁
  • 没有其他线程的写锁

而读写锁有以下三个重要的特性:

  • 公平选择性:支持非公平和公平的锁获取方式,吞吐量还是非公平优于公平
  • 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
  • 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁

锁升级与锁降级

下面我们再来看一下锁的升降级,首先我们看一下这段代码,这段代码演示了在更新缓存的时候,如何利用锁的降级功能。

  1. public class CachedData {
  2. public Object data;
  3. public volatile boolean cacheValid;
  4. public final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5. public void processCachedData() {
  6. rwl.readLock().lock();
  7. if (!cacheValid) {
  8. //在获取写锁之前,必须首先释放读锁。
  9. rwl.readLock().unlock();
  10. rwl.writeLock().lock();
  11. try {
  12. //这里需要再次判断数据的有效性,因为在我们释放读锁和获取写锁的空隙之内,可能有其他线程修改了数据。
  13. if (!cacheValid) {
  14. data = new Object();
  15. cacheValid = true;
  16. }
  17. //在不释放写锁的情况下,直接获取读锁,这就是读写锁的降级。
  18. rwl.readLock().lock();
  19. } finally {
  20. //释放了写锁,但是依然持有读锁
  21. rwl.writeLock().unlock();
  22. }
  23. }
  24. try {
  25. System.out.println(data);
  26. } finally {
  27. //释放读锁
  28. rwl.readLock().unlock();
  29. }
  30. }
  31. }

锁降级指的是写锁降级成为读锁。特别说明,如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级可以帮助我们拿到当前线程修改后的结果而不被其他线程所破坏,防止更新丢失。

为什么需要锁降级?
如果我们在刚才的方法中,一直使用写锁,最后才释放写锁的话,虽然确实是线程安全的,但是也是没有必要的,因为我们只有一处修改数据的代码:

data = new Object();

后面我们对于 data 仅仅是读取。如果还一直使用写锁的话,就不能让多个线程同时来读取了,持有写锁是浪费资源的,降低了整体的效率,所以这个时候利用锁的降级是很好的办法,可以提高整体性能。

锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

为什么不支持锁升级?
我们知道读写锁的特点是如果线程都申请读锁,是可以多个线程同时持有的,可是如果是写锁,只能有一个线程持有,并且不可能存在读锁和写锁同时持有的情况。正是因为不可能有读锁和写锁同时持有的情况,所以升级写锁的过程中,需要等到所有的读锁都释放,此时才能进行升级。

假设有 A,B 和 C 三个线程,它们都已持有读锁。假设线程 A 尝试从读锁升级到写锁。那么它必须等待 B 和 C 释放掉已经获取到的读锁。如果随着时间推移,B 和 C 逐渐释放了它们的读锁,此时线程 A 确实是可以成功升级并获取写锁。但是我们考虑一种特殊情况。假设线程 A 和 B 都想升级到写锁,那么对于线程 A 而言,它需要等待其他所有线程,包括线程 B 在内释放读锁。而线程 B 也需要等待所有的线程,包括线程 A 释放读锁。这就是一种非常典型的死锁的情况,都愿不愿意率先释放掉自己手中的锁。

但是读写锁的升级并不是不可能的,也有可以实现的方案,如果我们保证每次只有一个线程可以升级,那么就可以保证线程安全。只不过最常见的 ReentrantReadWriteLock 对此并不支持。

RentrantReadWriteLock源码解析

在解析源码之前,我们先思考如下几个问题:

  1. 读写锁是怎样实现分别记录读写状态的?
  2. 写锁是怎样获取和释放的?
  3. 读锁是怎样获取和释放的?

读写状态设计

在 ReentrantLock 中,使用 AQS 中的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:高16为表示读,低16为表示写,这也是 RentrantReadWriteLock
设计的精髓。
微信截图_20211122004746.png

  • 写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去) 当写状态加1,等于S+1.
  • 读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。当读状态加1,等于S+(1<<16),也就是 S+0x00010000

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

  1. static final int SHARED_SHIFT = 16;
  2. //读状态 0x00010000
  3. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  4. //最大可读数 0x0000FFFF
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

获取持有读锁线程数方法:

  1. static int sharedCount(int c) {
  2. //获取高16位的值
  3. return c >>> SHARED_SHIFT;
  4. }

获取持有写锁的状态:

  1. static int exclusiveCount(int c) {
  2. // c & 0x0000FFFF
  3. return c & EXCLUSIVE_MASK;
  4. }

RentrantReadWriteLock 是支持锁重入的,对于写锁来说重入是很容易实现,只需要往锁变量的写状态位自增就好了。但是对于读锁的重入,往锁变量的读状态位上增加就不那么合适,因为如果所有获取写锁的线程都往读状态位上写,就无法区分哪个线程重入了多少次。在 RentrantReadWriteLock 中利用 ThreadLocal 解决了这一问题。

  1. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  2. public HoldCounter initialValue() {
  3. return new HoldCounter();
  4. }
  5. }
  6. static final class HoldCounter {
  7. int count = 0;
  8. final long tid = getThreadId(Thread.currentThread());
  9. }

通过 ThreadLocalHoldCounter 类,HoldCounter 类与线程绑定,HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。
HoldCounter 是用来记录读锁重入数的对象,ThreadLocalHoldCounter 是 ThreadLocal 变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象。

读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对 HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。

写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

  1. public final void acquire(int arg) {
  2. //如果获取锁失败则进入CHL队列阻塞等待
  3. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }
  6. protected final boolean tryAcquire(int acquires) {
  7. Thread current = Thread.currentThread();
  8. int c = getState();
  9. //获取写锁状态
  10. int w = exclusiveCount(c);
  11. //如果当锁不为0,已有线程获取读锁或写锁
  12. if (c != 0) {
  13. // 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
  14. if (w == 0 || current != getExclusiveOwnerThread())
  15. return false;
  16. //超出可重入次数抛异常
  17. if (w + exclusiveCount(acquires) > MAX_COUNT)
  18. throw new Error("Maximum lock count exceeded");
  19. //同步state,重入成功
  20. setState(c + acquires);
  21. return true;
  22. }
  23. //writerShouldBlock 有公平和非公平实现
  24. //公平实现需要判断当前CHL队列中有无等待线程,有则需要排队
  25. //非公平实现直接返回false,可以直接修改state
  26. //c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
  27. if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
  28. return false;
  29. //获取写锁成功,设置当前获取写锁线程
  30. etExclusiveOwnerThread(current);
  31. return true;
  32. }

微信截图_20211123001747.png
写阻塞的公平与非公平实现代码如下:

  1. //公平实现
  2. final boolean writerShouldBlock() {
  3. //判断CHL队列中是否有阻塞线程,有则需要排队
  4. return hasQueuedPredecessors();
  5. }
  6. //非公平实现
  7. final boolean writerShouldBlock() {
  8. //非公平实现不用管CHL队列中有无阻塞线程,可以直接写
  9. return false; // writers can always barge
  10. }

写锁的释放源码如下,释放写锁后不需要去唤醒 CHL 队列中线程

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. //唤醒CHL中的阻塞线程,(只唤醒第一个节点)
  6. unparkSuccessor(h);
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected final boolean tryRelease(int releases) {
  12. //若锁的持有者不是当前线程,抛出异常
  13. if (!isHeldExclusively())
  14. throw new IllegalMonitorStateException();
  15. //当前写状态是否为0,为0则释放写锁
  16. int nextc = getState() - releases;
  17. boolean free = exclusiveCount(nextc) == 0;
  18. //设置获取写锁线程为null
  19. if (free)
  20. setExclusiveOwnerThread(null);
  21. setState(nextc);
  22. return free;
  23. }

读锁的获取与释放

实现共享式同步组件的同步语义需要通过重写 AQS 的 tryAcquireShared() 方法和
tryReleaseShared() 方法。读锁的获取实现方法为:

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. // 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回‐1
  5. if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
  6. return -1;
  7. //计算读锁的数量
  8. int r = sharedCount(c);
  9. //判断是否需要读阻塞(公平实现与非公平)
  10. //判断当前是否小于最大可读线程数
  11. // cas设置获取读锁线程的数量
  12. //上述三个条件都成功,则获取读锁成功
  13. if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
  14. if (r == 0) {
  15. //如果是第一个获取读锁线程,需要特殊记录
  16. firstReader = current;
  17. firstReaderHoldCount = 1;
  18. } else if (firstReader == current) {
  19. //如果是第一个读线程重入,则firstReaderHoldCount自增
  20. firstReaderHoldCount++;
  21. } else {
  22. //如果不是第一个获取读锁线程,则在ThreadLocal记录其次数
  23. HoldCounter rh = cachedHoldCounter;
  24. if (rh == null || rh.tid != getThreadId(current))
  25. cachedHoldCounter = rh = readHolds.get();
  26. else if (rh.count == 0)
  27. readHolds.set(rh);
  28. rh.count++;
  29. }
  30. return 1;
  31. }
  32. //如果获取读锁失败则重试
  33. return fullTryAcquireShared(current);
  34. }
  • 读锁共享,读读不互斥
  • 读锁可重入,每个获取读锁的线程都会记录对应的重入数
  • 读写互斥,锁降级场景除外
  • 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
  • readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync 和 NonfairSync)

微信截图_20211123215522.png

读阻塞的公平与非公平实现代码如下:

  1. //公平实现
  2. final boolean readerShouldBlock() {
  3. //判断CHL队列中是否有阻塞线程,有则需要排队
  4. return hasQueuedPredecessors();
  5. }
  6. //非公平实现
  7. final boolean readerShouldBlock() {
  8. //避免写饥饿
  9. return apparentlyFirstQueuedIsExclusive();
  10. }

避免写饥饿
在 RentrantReadWriteLock 维护的 CHL 队列中,如果是读线程则封装成 SHARED 节点放入 CHL 队列中;如果是写线程则封装成 EXCLUSIVE 节点放入 CHL 队列中。在读阻塞的非公平实现,如果 CHL 队列中头节点(header节点的下一个节点)如果是 EXCLUSIVE 节点在阻塞排队,则该线程不能获取读锁,优先让写线程获取写锁。

  1. final boolean apparentlyFirstQueuedIsExclusive() {
  2. Node h, s;
  3. //CHL队列头节点不为null(CHL队列已经被初始化,因为head节点是哨兵节点)
  4. //CHL队列中头结点后是否有节点(是否有线程阻塞)
  5. //该节点是否是SHARED节点(读线程为SHARED,写线程为EXCLUSIVE)
  6. //该节点是否绑定线程
  7. //同时满足以上条件则返回true,需要读阻塞
  8. return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
  9. }

写锁的释放代码如下:

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. //以传播的方式去唤醒
  4. doReleaseShared();
  5. return true;
  6. }
  7. return false;
  8. }
  9. protected final boolean tryReleaseShared(int unused) {
  10. Thread current = Thread.currentThread();
  11. //如果当前线程是第一个获取读锁的线程
  12. if (firstReader == current) {
  13. if (firstReaderHoldCount == 1)
  14. firstReader = null;
  15. else
  16. //记录数自减
  17. firstReaderHoldCount--;
  18. //如果不是第一个获取读锁的线程
  19. } else {
  20. HoldCounter rh = cachedHoldCounter;
  21. if (rh == null || rh.tid != getThreadId(current))
  22. rh = readHolds.get();
  23. int count = rh.count;
  24. if (count <= 1) {
  25. readHolds.remove();
  26. if (count <= 0)
  27. throw unmatchedUnlockException();
  28. }
  29. --rh.count;
  30. }
  31. //修改state值
  32. for (;;) {
  33. int c = getState();
  34. int nextc = c - SHARED_UNIT;
  35. if (compareAndSetState(c, nextc))
  36. return nextc == 0;
  37. }
  38. }

总结
当分析 ReentranctReadWriteLock 时,或者说分析内部使用AQS实现的工具类时,需要明白的就是 AQS 的 state 代表的是什么。ReentrantLockReadWriteLock 中的 state 同时表示写锁和读锁的个数。为了实现这种功能,state 的高16位表示读锁的个数,低16位表示写锁的个数。AQS 有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;另外一点需要记住的即使,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。