1. 引入

前面所介绍的解决线程同步的方案中,不管是作为重量级锁的synchronized,或是优化后的synchronized,以及同样支持可重入且可以更好的操作加锁和释放锁的ReentranLock,它们都属于排它锁。当多线程中的某一个线程竞争到了锁,那么不管其他线程执行的是读操作还是写操作,在没有获得锁的前提下都无法进行,这显然有些不太合理。按照常理来说,当不同的线程同时对共享变量执行读操作时,它们不应该彼此之间是互斥的,但是如果有线程想要执行写操作是应该阻塞的。另外,如果某个线程对于共享变量执行写操作,那么其他的线程不管是读还是写都应该阻塞。

如果想要实现上述的功能,那么就需要一个读写锁,它维护了一个读锁和一个写锁,这通过分隔读锁和写锁,使得并发性能大幅提高。下面即将介绍的ReentrantWriteReadLock就是这样的一种机制。

深入体会优于synchronized的ReentrantLock的实现原理

synchronized实现原理和底层优化解读


2. ReentrantWriteReadLock

2.1 概念

ReentrantWriteReadLockt通过读写锁实现:写锁被获取时,后续的读写操作都被阻塞;读锁被获取时,写操作被阻塞,读操作不受影响。

ReentrantWriteReadLock相比于之前的ReentrantLock具有如下特性:

  • 公平性选择:支持公平锁和非公平锁两种模式,当要求吞吐量时推荐使用非公平锁
  • 重入:支持锁重入,当线程获得写锁后,可再次获取写锁或读锁;当线程获取读锁后,能够再次获取读锁
  • 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁可以降级为读锁

ReentrantWriteReadLock的源码定义如下:

  1. public class ReentrantReadWriteLock
  2. implements ReadWriteLock, java.io.Serializable {
  3. private static final long serialVersionUID = -6992448646407690164L;
  4. /** Inner class providing readlock */
  5. private final ReentrantReadWriteLock.ReadLock readerLock;
  6. /** Inner class providing writelock */
  7. private final ReentrantReadWriteLock.WriteLock writerLock;
  8. /** Performs all synchronization mechanics */
  9. final Sync sync;
  10. }

可以看到它内部维护了一个读写锁,并且同样使用了自定义的同步器。其中,ReadWriteLock接口中只定义了如下的两个方法用于获取写锁和读锁。

  1. public interface ReadWriteLock {
  2. Lock readLock();
  3. Lock writeLock();
  4. }

构造函数同样有无参和带参两种形式,其中无参默认使用非公平锁,带参可以设置使用公平锁。

  1. public ReentrantReadWriteLock() {
  2. this(false);
  3. }
  4. public ReentrantReadWriteLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. readerLock = new ReadLock(this);
  7. writerLock = new WriteLock(this);
  8. }

另外,除了和ReentrantLock中相同的方法外,ReentrantWriteReadLock还提供了一些展示内部工作状态的方法:

  • getReadLOckCount():获取当前读锁别所有线程获取的总次数
  • getReadHoldCount():获取当前线程获取读锁的次数
  • isWriteLocked():判断读锁是否被获取
  • getWriteHoldCount():获取当前线程获取写锁的次数

2.2 核心

ReentrantLock中需要使用AQS的同步状态来维护线程之间的同步操作,根据上面的源码定义可知,ReentrantWriteReadLock同样需要AQS的同步状态来维护读进程和写进程之间的同步操作。

假设线程同步状态使用32bit的变量来表示,那么可以使用高16位表示读,低16位表示写,整体上就可以用来同时维护读写状态。如下所示:
ReentrantWriteReadLock原理解读 - 图1

如果想要获取写状态,即获取低16位的值,那么可以使用与操作,如下所示:

ReentrantWriteReadLock原理解读 - 图2

如果想要获取读状态,那么只需要获取高16位,直接左移16位即可。如下所示:

ReentrantWriteReadLock原理解读 - 图3

2.3 图解流程

在了解了如何使用一个同步状态的高低位来实现读写锁的控制,结合前面对于ReentrantLock的分析,下面通过例子看一下ReentrantWriteReadLock是如何控制写锁和读锁的,这里只以非公平锁为例说明。

当没有线程加读锁,也没有线程加写锁时,读写锁如下所示,state高位和低位计数都为0:
ReentrantWriteReadLock原理解读 - 图4

如果此时线程1想要加写锁,由于此时并没有线程加锁。因此加锁成功,修改state为0_1

  1. public static class WriteLock implements Lock, java.io.Serializable {
  2. public void lock() {
  3. sync.acquire(1);
  4. }
  5. }

acqurie()为AQS中的方法,如下所示:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

tryAcquire()定义如下:

  1. protected final boolean tryAcquire(int acquires) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. // 获取写锁被所有线程获取的次数
  5. int w = exclusiveCount(c);
  6. // 如果c!=0,表示写锁已经被线程获取
  7. if (c != 0) {
  8. // 如果读锁也被获取,而且持锁线程不是当前线程
  9. if ( w == 0 || current != getExclusiveOwnerThread()
  10. ) {
  11. // 那么当前线程获取锁失败
  12. return false;
  13. }
  14. // 写锁计数超过低 16 位, 报异常
  15. if (w + exclusiveCount(acquires) > MAX_COUNT)
  16. throw new Error("Maximum lock count exceeded");
  17. // 写锁重入, 获得写锁成功
  18. setState(c + acquires);
  19. return true;
  20. }
  21. // 判断写锁是否该阻塞,或尝试更改写锁计数值
  22. if ( writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
  23. // 获得锁失败
  24. return false;
  25. }
  26. // 获得锁成功
  27. setExclusiveOwnerThread(current);
  28. return true;
  29. }
  30. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

如图所示:
ReentrantWriteReadLock原理解读 - 图5

接着线程2想要加读锁,根据加锁规则,此时线程2应该被阻塞。它首先进入读锁的acquireShared(1)流程,接着进入tryAqcquire()尝试获取锁。因为t1已经加了写锁,方法返回-1表示加锁失败。

方法的返回值有三种情况:

  • -1:失败
  • 0:成功,但是不会继续唤醒后继节点
  • 整数:成功,数值表示后续需要唤醒的节点个数
  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. public void lock() {
  3. sync.acquireShared(1);
  4. }
  5. }
  6. public final void acquireShared(int arg) {
  7. if (tryAcquireShared(arg) < 0)
  8. doAcquireShared(arg);
  9. }

其中尝试获取读锁的方法tryAcquireShared()定义如下:

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. // 如果此时已有其他的线程持有写锁,并且持锁线程不是当前线程,则读锁获取失败
  5. if (exclusiveCount(c) != 0 &&
  6. getExclusiveOwnerThread() != current)
  7. return -1;
  8. // 获取读状态位
  9. int r = sharedCount(c);
  10. if (
  11. // 读锁不该阻塞
  12. !readerShouldBlock() &&
  13. // 读锁计数小于最大值
  14. r < MAX_COUNT &&
  15. // 使用CAS尝试增加state中读锁计数
  16. compareAndSetState(c, c + SHARED_UNIT)) {
  17. if (r == 0) {
  18. firstReader = current;
  19. firstReaderHoldCount = 1;
  20. } else if (firstReader == current) {
  21. firstReaderHoldCount++;
  22. } else {
  23. HoldCounter rh = cachedHoldCounter;
  24. if (rh == null || rh.tid != getThreadId(current))
  25. cachedHoldCounter = rh = readHolds.get();
  26. else if (rh.count == 0)
  27. readHolds.set(rh);
  28. rh.count++;
  29. }
  30. return 1;
  31. }
  32. return fullTryAcquireShared(current);
  33. }
  34. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  1. // 不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
  2. final int fullTryAcquireShared(Thread current) {
  3. HoldCounter rh = null;
  4. for (;;) {
  5. int c = getState();
  6. if (exclusiveCount(c) != 0) {
  7. if (getExclusiveOwnerThread() != current)
  8. return -1;
  9. } else if (readerShouldBlock()) {
  10. // ... 省略不重要的代码
  11. }
  12. if (sharedCount(c) == MAX_COUNT)
  13. throw new Error("Maximum lock count exceeded");
  14. if (compareAndSetState(c, c + SHARED_UNIT)) {
  15. // ... 省略不重要的代码
  16. return 1;
  17. }
  18. }
  19. }

如图所示:
ReentrantWriteReadLock原理解读 - 图6

由于线程2加锁被阻塞,此时会进入doAcquireShared(1)流程,也是调用addWaiter()添加节点,此时的节点被设置为Node.SHARED模式,而且线程2此时仍处于活跃状态。
ReentrantWriteReadLock原理解读 - 图7

  1. private void doAcquireShared(int arg) {
  2. // 将当前线程关联到一个 Node 对象上, 模式为共享模式
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. boolean interrupted = false;
  7. for (;;) {
  8. final Node p = node.predecessor();
  9. if (p == head) {
  10. // 再一次尝试获取读锁
  11. int r = tryAcquireShared(arg);
  12. // 成功
  13. if (r >= 0) {
  14. // r 表示可用资源数, 在这里总是1,允许链式唤醒,继续唤醒下一个SHARED节点
  15. setHeadAndPropagate(node, r);
  16. p.next = null; // help GC
  17. if (interrupted)
  18. selfInterrupt();
  19. failed = false;
  20. return;
  21. }
  22. }
  23. if (
  24. // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
  25. shouldParkAfterFailedAcquire(p, node) &&
  26. // park 当前线程
  27. parkAndCheckInterrupt()
  28. ) {
  29. interrupted = true;
  30. }
  31. }
  32. } finally {
  33. if (failed)
  34. cancelAcquire(node);
  35. }
  36. }

此时,线程2会看他是不是哨兵节点后的第一个节点,如果是,继续调用tryAcquireShared(1)尝试加锁。但由于线程1还没有释放写锁,加锁依然会失败。线程2会在doAcquireShared()内的死循环中继续循环一次,将它的前驱节点的waitStatus改为-1。然后再次调用tryAcquireShared(1)尝试加锁,如果还是失败,则在parkAndCheckInterrupt()处被阻塞,不再尝试加锁。
ReentrantWriteReadLock原理解读 - 图8

线程2加锁失败后,假设又有线程3想要加读锁,线程4想要加写锁。但是由于线程1仍然持有写锁,它们加锁也会失败,进入等待队列。
ReentrantWriteReadLock原理解读 - 图9

  1. private void doReleaseShared() {
  2. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  3. for (;;) {
  4. Node h = head;
  5. // 队列还有节点
  6. if (h != null && h != tail) {
  7. int ws = h.waitStatus;
  8. if (ws == Node.SIGNAL) {
  9. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  10. continue; // loop to recheck cases
  11. // 下一个节点 unpark 如果成功获取读锁
  12. // 并且下下个节点还是 shared, 继续 doReleaseShared
  13. unparkSuccessor(h);
  14. }
  15. else if (ws == 0 &&
  16. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  17. continue; // loop on failed CAS
  18. }
  19. if (h == head) // loop if head changed
  20. break;
  21. }
  22. }
  23. }

千辛万苦,终于等于线程1释放锁,修改exclusiveOwnerThread为null,如下所示:
ReentrantWriteReadLock原理解读 - 图10

释放锁成功后调用unparkSuccessor()唤醒等待队列中可能的阻塞线程,此时,线程2在doAcquireShared()parkAndCheckInterrupt()处恢复运行。接着再执行一次循环,调用tryAcquireShared()让读状态计数加一。

  1. static final class NonfairSync extends Sync {
  2. public void unlock() {
  3. sync.release(1);
  4. }
  5. public final boolean release(int arg) {
  6. // 尝试释放写锁成功
  7. if (tryRelease(arg)) {
  8. // 唤醒等待队列中的线程
  9. Node h = head;
  10. if (h != null && h.waitStatus != 0)
  11. unparkSuccessor(h);
  12. return true;
  13. }
  14. return false;
  15. }
  16. protected final boolean tryRelease(int releases) {
  17. if (!isHeldExclusively())
  18. throw new IllegalMonitorStateException();
  19. int nextc = getState() - releases;
  20. // 因为可重入的原因, 写锁计数为 0, 才算释放成功
  21. boolean free = exclusiveCount(nextc) == 0;
  22. if (free) {
  23. setExclusiveOwnerThread(null);
  24. }
  25. setState(nextc);
  26. return free;
  27. }
  28. }

因此,此时state值为1_0
ReentrantWriteReadLock原理解读 - 图11

线程2加锁成功后,调用setHeadAndPropagate()将本来所在的节点设置为头节点。然后在方法内检查下一个节点的状态是否是SHARED,如果是,则调用doReleaseShared()将头节点的waitStatus修改为-1,并唤醒后继线程3,它会在parkAndCheckInterrupt()处恢复运行。执行上面相同的操作,修改state为2_0,并将原本的节点设置为头节点,waitStatus修改为-1
ReentrantWriteReadLock原理解读 - 图12

不久之后,线程2和线程3相继执行结束,调用releaseShared(1)修改读状态计数值为0。

  1. static final class NonfairSync extends Sync {
  2. public void unlock() {
  3. sync.releaseShared(1);
  4. }
  5. public final boolean releaseShared(int arg) {
  6. if (tryReleaseShared(arg)) {
  7. doReleaseShared();
  8. return true;
  9. }
  10. return false;
  11. }
  12. protected final boolean tryReleaseShared(int unused) {
  13. // ... 省略不重要的代码
  14. for (;;) {
  15. int c = getState();
  16. int nextc = c - SHARED_UNIT;
  17. if (compareAndSetState(c, nextc)) {
  18. // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
  19. // 计数为 0 才是真正释放
  20. return nextc == 0;
  21. }
  22. }
  23. }
  24. private void doReleaseShared() {
  25. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  26. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  27. for (;;) {
  28. Node h = head;
  29. if (h != null && h != tail) {
  30. int ws = h.waitStatus;
  31. // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
  32. // 防止 unparkSuccessor 被多次执行
  33. if (ws == Node.SIGNAL) {
  34. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  35. continue; // loop to recheck cases
  36. unparkSuccessor(h);
  37. }
  38. // 如果已经是 0 了,改为 -3,用来解决传播性
  39. else if (ws == 0 &&
  40. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  41. continue; // loop on failed CAS
  42. }
  43. if (h == head) // loop if head changed
  44. break;
  45. }
  46. }
  47. }

然后调用doReleaseShared()将头节点waitStatus修改为0,唤醒线程4
ReentrantWriteReadLock原理解读 - 图13

线程4同样在parkAndCheckInterrupt()处恢复运行,并且发现自己是哨兵节点的后继,而且此时没有其他的线程竞争锁,则加锁成功,修改state为0_1
ReentrantWriteReadLock原理解读 - 图14

2.4 锁降级

最后一个点就是前面特性中提高的锁降级,它是指把持住写锁,再获取读锁,随后释放写锁的过程。经过锁降级之后,写锁就会被降级为读锁。之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程对于数据的更新对当前线程不可见。如果当前线程先获取读锁,那么想要获取写锁的线程就都会被阻塞,只有当前线程成功释放了写锁,其他竞争写锁的线程才能成功获取到。


3. 使用

假设demo代码如下所示:

  1. /**
  2. * @Author dyliang
  3. * @Date 2020/9/6 15:06
  4. * @Version 1.0
  5. */
  6. public class Test {
  7. static ReentrantWriteReadLockDemo d = new ReentrantWriteReadLockDemo();
  8. public static void main(String[] args) {
  9. // ReadRead();
  10. // ReadWrite();
  11. // WriteWrite();
  12. }
  13. public static void ReadRead() {
  14. new Thread(() -> {
  15. d.read();
  16. }, "t1").start();
  17. new Thread(() -> {
  18. d.read();
  19. }, "t2").start();
  20. }
  21. public static void ReadWrite(){
  22. new Thread(() -> {
  23. d.read();
  24. }, "t2").start();
  25. new Thread(() -> {
  26. d.write();
  27. }, "t3").start();
  28. }
  29. public static void WriteWrite(){
  30. new Thread(() -> {
  31. d.write();
  32. }, "t3").start();
  33. new Thread(() -> {
  34. d.write();
  35. }, "t4").start();
  36. }
  37. }
  38. class ReentrantWriteReadLockDemo{
  39. ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  40. ReadLock readLock = lock.readLock();
  41. WriteLock writeLock = lock.writeLock();
  42. public void read(){
  43. try {
  44. readLock.lock();
  45. System.out.println("Thread-" + Thread.currentThread().getName() + " enter...");
  46. Thread.sleep(3000);
  47. System.out.println("Thread-" + Thread.currentThread().getName() + " leave...");
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }finally{
  51. readLock.unlock();
  52. }
  53. }
  54. public void write(){
  55. try {
  56. writeLock.lock();
  57. System.out.println("Thread-" + Thread.currentThread().getName() + " enter...");
  58. Thread.sleep(3000);
  59. System.out.println("Thread-" + Thread.currentThread().getName() + " leave...");
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }finally{
  63. writeLock.unlock();
  64. }
  65. }
  66. }

如果运行ReadRead(),可以看到线程1和线程2同时进入,证明一个线程持有读锁不会影响其他线程加读锁。

  1. Thread-t1 enter...
  2. Thread-t2 enter...
  3. Thread-t2 leave...
  4. Thread-t1 leave...

如果运行ReadWrite(),控制台输出:

  1. Thread-t2 enter...
  2. Thread-t2 leave...
  3. Thread-t3 enter...
  4. Thread-t3 leave...

可以看到,当线程2先加读锁后,线程3只有等到读锁被释放才能加写锁。如果运行WriteWrite(),控制台输出:

  1. Thread-t3 enter...
  2. Thread-t3 leave...
  3. Thread-t4 enter...
  4. Thread-t4 leave...

可以看到只有线程3释放了写锁后,线程4才能加写锁。