1. 读写锁详解

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:
线程进入读锁的前提条件:
没有其他线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

2. 源码解读

2.1 整体结构

  1. public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
  2. /** 读锁 */
  3. private final ReentrantReadWriteLock.ReadLock readerLock;
  4. /** 写锁 */
  5. private final ReentrantReadWriteLock.WriteLock writerLock;
  6. final Sync sync;
  7. /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
  8. public ReentrantReadWriteLock() {
  9. this(false);
  10. }
  11. /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
  12. public ReentrantReadWriteLock(boolean fair) {
  13. sync = fair ? new FairSync() : new NonfairSync();
  14. readerLock = new ReadLock(this);
  15. writerLock = new WriteLock(this);
  16. }
  17. /** 返回用于写入操作的锁 */
  18. public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
  19. /** 返回用于读取操作的锁 */
  20. public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  21. abstract static class Sync extends AbstractQueuedSynchronizer {}
  22. static final class NonfairSync extends Sync {}
  23. static final class FairSync extends Sync {}
  24. public static class ReadLock implements Lock, java.io.Serializable {}
  25. public static class WriteLock implements Lock, java.io.Serializable {}
  26. }

2.2 写锁的获取与释放

看下WriteLock类中的lock和unlock方法:

  1. public void lock() {
  2. sync.acquire(1);
  3. }
  4. public void unlock() {
  5. sync.release(1);
  6. }

2.2.1 写锁的获取

  1. protected final boolean tryAcquire(int acquires) {
  2. //当前线程
  3. Thread current = Thread.currentThread();
  4. //获取状态
  5. int c = getState();
  6. //写线程数量(即获取独占锁的重入数)
  7. int w = exclusiveCount(c);
  8. //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
  9. if (c != 0) {
  10. // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
  11. // 如果写锁状态不为0且写锁没有被当前线程持有返回false
  12. if (w == 0 || current != getExclusiveOwnerThread())
  13. return false;
  14. //判断同一线程获取写锁是否超过最大次数(65535),支持可重入
  15. if (w + exclusiveCount(acquires) > MAX_COUNT)
  16. throw new Error("Maximum lock count exceeded");
  17. //更新状态
  18. //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
  19. setState(c + acquires);
  20. return true;
  21. }
  22. //到这里说明此时c=0,读锁和写锁都没有被获取
  23. //writerShouldBlock表示是否阻塞
  24. if (writerShouldBlock() ||
  25. !compareAndSetState(c, c + acquires))
  26. return false;
  27. //设置锁为当前线程所有
  28. setExclusiveOwnerThread(current);
  29. return true;
  30. }

从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程如下:
ReentrantReadWriteLock原理详解 - 图1

2.2.2 写锁的释放

  1. protected final boolean tryRelease(int releases) {
  2. //若锁的持有者不是当前线程,抛出异常
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. //写锁的新线程数
  6. int nextc = getState() - releases;
  7. //如果独占模式重入数为0了,说明独占模式被释放
  8. boolean free = exclusiveCount(nextc) == 0;
  9. if (free)
  10. //若写锁的新线程数为0,则将锁的持有者设置为null
  11. setExclusiveOwnerThread(null);
  12. //设置写锁的新线程数
  13. //不管独占模式是否被释放,更新独占重入数
  14. setState(nextc);
  15. return free;
  16. }

方法流程如下:
ReentrantReadWriteLock原理详解 - 图2

2.3 读锁的获取和释放

2.3.1 读锁的获取

  1. protected final int tryAcquireShared(int unused) {
  2. // 获取当前线程
  3. Thread current = Thread.currentThread();
  4. // 获取状态
  5. int c = getState();
  6. //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级
  7. if (exclusiveCount(c) != 0 &&
  8. getExclusiveOwnerThread() != current)
  9. return -1;
  10. // 读锁数量
  11. int r = sharedCount(c);
  12. /*
  13. * readerShouldBlock():读锁是否需要等待(公平锁原则)
  14. * r < MAX_COUNT:持有线程小于最大数(65535)
  15. * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
  16. */
  17. // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
  18. if (!readerShouldBlock() &&
  19. r < MAX_COUNT &&
  20. compareAndSetState(c, c + SHARED_UNIT)) {
  21. //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
  22. if (r == 0) { // 读锁数量为0
  23. // 设置第一个读线程
  24. firstReader = current;
  25. // 读线程占用的资源数为1
  26. firstReaderHoldCount = 1;
  27. } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
  28. // 占用资源数加1
  29. firstReaderHoldCount++;
  30. } else { // 读锁数量不为0并且不为当前线程
  31. // 获取计数器
  32. HoldCounter rh = cachedHoldCounter;
  33. // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
  34. if (rh == null || rh.tid != getThreadId(current))
  35. // 获取当前线程对应的计数器
  36. cachedHoldCounter = rh = readHolds.get();
  37. else if (rh.count == 0) // 计数为0
  38. //加入到readHolds中
  39. readHolds.set(rh);
  40. //计数+1
  41. rh.count++;
  42. }
  43. return 1;
  44. }
  45. return fullTryAcquireShared(current);
  46. }

ReentrantReadWriteLock原理详解 - 图3
fullTryAcquireShared方法:

  1. final int fullTryAcquireShared(Thread current) {
  2. HoldCounter rh = null;
  3. for (;;) { // 无限循环
  4. // 获取状态
  5. int c = getState();
  6. if (exclusiveCount(c) != 0) { // 写线程数量不为0
  7. if (getExclusiveOwnerThread() != current) // 不为当前线程
  8. return -1;
  9. } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞
  10. // Make sure we're not acquiring read lock reentrantly
  11. if (firstReader == current) { // 当前线程为第一个读线程
  12. // assert firstReaderHoldCount > 0;
  13. } else { // 当前线程不为第一个读线程
  14. if (rh == null) { // 计数器不为空
  15. //
  16. rh = cachedHoldCounter;
  17. if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
  18. rh = readHolds.get();
  19. if (rh.count == 0)
  20. readHolds.remove();
  21. }
  22. }
  23. if (rh.count == 0)
  24. return -1;
  25. }
  26. }
  27. if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
  28. throw new Error("Maximum lock count exceeded");
  29. if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功
  30. if (sharedCount(c) == 0) { // 读线程数量为0
  31. // 设置第一个读线程
  32. firstReader = current;
  33. //
  34. firstReaderHoldCount = 1;
  35. } else if (firstReader == current) {
  36. firstReaderHoldCount++;
  37. } else {
  38. if (rh == null)
  39. rh = cachedHoldCounter;
  40. if (rh == null || rh.tid != getThreadId(current))
  41. rh = readHolds.get();
  42. else if (rh.count == 0)
  43. readHolds.set(rh);
  44. rh.count++;
  45. cachedHoldCounter = rh; // cache for release
  46. }
  47. return 1;
  48. }
  49. }
  50. }

在tryAcquireShared函数中,如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。其逻辑与tryAcquireShared逻辑类似,不再累赘。

2.3.2 读锁的释放

  1. protected final boolean tryReleaseShared(int unused) {
  2. // 获取当前线程
  3. Thread current = Thread.currentThread();
  4. if (firstReader == current) { // 当前线程为第一个读线程
  5. // assert firstReaderHoldCount > 0;
  6. if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
  7. firstReader = null;
  8. else // 减少占用的资源
  9. firstReaderHoldCount--;
  10. } else { // 当前线程不为第一个读线程
  11. // 获取缓存的计数器
  12. HoldCounter rh = cachedHoldCounter;
  13. if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
  14. // 获取当前线程对应的计数器
  15. rh = readHolds.get();
  16. // 获取计数
  17. int count = rh.count;
  18. if (count <= 1) { // 计数小于等于1
  19. // 移除
  20. readHolds.remove();
  21. if (count <= 0) // 计数小于等于0,抛出异常
  22. throw unmatchedUnlockException();
  23. }
  24. // 减少计数
  25. --rh.count;
  26. }
  27. for (;;) { // 无限循环
  28. // 获取状态
  29. int c = getState();
  30. // 获取状态
  31. int nextc = c - SHARED_UNIT;
  32. if (compareAndSetState(c, nextc)) // 比较并进行设置
  33. // Releasing the read lock has no effect on readers,
  34. // but it may allow waiting writers to proceed if
  35. // both read and write locks are now free.
  36. return nextc == 0;
  37. }
  38. }

此方法表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,如果计数器的计数count小于等于1,则移除当前线程对应的计数器,如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。无论何种情况,都会进入无限循环,该循环可以确保成功设置状态state。

在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。

3. 总结

通过上面的源码分析,我们可以发现一个现象:
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
综上:
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。