引言

上一篇文章,我们学习了ReentrantLock和AQS,ReentrantLock能够实现排他重入锁的逻辑。排他锁意味着任意时刻只有一个线程能够获取锁。但是在读多写少的场景下,排他锁的性能可能不太理想,因为一个读线程也会阻塞其他的读线程和写线程,而数据的读取实际上是不需要对其他读线程加锁的。这篇文章,我们来看juc中提供的读写锁ReentrantReadWriteLock。

类的体系结构

ReetrantReadWriteLock类是ReadWriteLock接口的实现类,ReadWriteLock只定义了两个方法:

  1. public interface ReadWriteLock {
  2. /**
  3. * Returns the lock used for reading.
  4. *
  5. * @return the lock used for reading
  6. */
  7. Lock readLock();
  8. /**
  9. * Returns the lock used for writing.
  10. *
  11. * @return the lock used for writing
  12. */
  13. Lock writeLock();
  14. }

这两个方法分别用来返回读写锁中的读锁和写锁。也就是说,读写锁中是有两个锁的,一个用来读,一个用来写。
我们再来看ReentrantReadWriteLock,它有三个重要的字段,一个是读锁,一个是写锁,还有一个同步器,如下所示:

  1. public class ReentrantReadWriteLock
  2. implements ReadWriteLock, java.io.Serializable {
  3. private static final long serialVersionUID = -6992448646407690164L;
  4. /** Inner class providing readlock */
  5. private final ReentrantReadWriteLock.ReadLock readerLock;
  6. /** Inner class providing writelock */
  7. private final ReentrantReadWriteLock.WriteLock writerLock;
  8. /** Performs all synchronization mechanics */
  9. final Sync sync;

其中ReadLock和WriteLock都是ReentrantReadWriteLock的内部类,它俩都实现了lock接口:

  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. private static final long serialVersionUID = -5992448646407690164L;
  3. private final Sync sync;
  4. }
  5. public static class WriteLock implements Lock, java.io.Serializable {
  6. private static final long serialVersionUID = -4992448646407690164L;
  7. private final Sync sync;
  8. }

读锁和写锁也分别有一个同步器。
同步器Sync与ReentrantLock中的Sync一样,都继承了AbstractQueuedSynchronizer:

  1. abstract static class Sync extends AbstractQueuedSynchronizer {

并且ReentrantReadWriteLock也提供了同步器的非公平版本和公平版本,分别是NonfairSync和FairSync。

  1. static final class FairSync extends Sync {}
  2. static final class NonfairSync extends Sync {}

读写锁和同步器的初始化

从类的体系结构我们可以看出,ReentrantReadWriteLock有两个锁,一个读锁一个写锁,还有一个同步器,那么它是怎么实例化的呢?我们来看它的构造方法:

  1. /**
  2. * Creates a new {@code ReentrantReadWriteLock} with
  3. * default (nonfair) ordering properties.
  4. */
  5. public ReentrantReadWriteLock() {
  6. this(false);
  7. }
  8. /**
  9. * Creates a new {@code ReentrantReadWriteLock} with
  10. * the given fairness policy.
  11. *
  12. * @param fair {@code true} if this lock should use a fair ordering policy
  13. */
  14. public ReentrantReadWriteLock(boolean fair) {
  15. sync = fair ? new FairSync() : new NonfairSync();
  16. readerLock = new ReadLock(this);
  17. writerLock = new WriteLock(this);
  18. }

它有两个构造方法,但都是通过第二个来实现的。传入的参数指定是公平锁还是非公平锁,公平锁和非公平锁对应的同步器分别是FairSync和NonFairSync。然后会使用这个同步器来初始化读锁和写锁,下面两个是读锁和写锁的构造方法:

  1. protected ReadLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }
  1. protected WriteLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }

也就是,读锁和写锁的同步器就是ReetrantReadWriteLock的同步器。
由于ReetrantReadWriteLock的读锁和写锁是分离的,所以ReetrantReadWriteLock本身不会提供lock和unlock这类的方法,加锁和解锁都是分别由ReadLock和WriteLock提供的。下面我们分别分析这两种锁的加锁和解锁。

读锁的加锁和解锁

加锁

读锁的加锁调用的是同步器的acquireShared方法:

  1. public void lock() {
  2. sync.acquireShared(1);
  3. }

该方法是AQS提供的:

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

它首先调用的是tryAcquireShared方法:

  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. * Walkthrough:
  4. * 1. If write lock held by another thread, fail.
  5. * 2. Otherwise, this thread is eligible for
  6. * lock wrt state, so ask if it should block
  7. * because of queue policy. If not, try
  8. * to grant by CASing state and updating count.
  9. * Note that step does not check for reentrant
  10. * acquires, which is postponed to full version
  11. * to avoid having to check hold count in
  12. * the more typical non-reentrant case.
  13. * 3. If step 2 fails either because thread
  14. * apparently not eligible or CAS fails or count
  15. * saturated, chain to version with full retry loop.
  16. */
  17. Thread current = Thread.currentThread();
  18. int c = getState();
  19. //如果写锁被持有的数量不等于0并且同步器的所有者线程不是当前线程
  20. //意味着有其他线程正在进行写操作 返回-1
  21. if (exclusiveCount(c) != 0 &&
  22. getExclusiveOwnerThread() != current)
  23. return -1;
  24. //获取读锁被持有的数量
  25. int r = sharedCount(c);
  26. if (!readerShouldBlock() &&
  27. r < MAX_COUNT &&
  28. compareAndSetState(c, c + SHARED_UNIT)) {
  29. //如果当前等待队列中的第一个线程不是处于独占请求状态并且当前读锁被持有的数量小于最大数量
  30. //并且原子性的设置读状态成功 则继续执行
  31. if (r == 0) {
  32. firstReader = current;
  33. firstReaderHoldCount = 1;
  34. } else if (firstReader == current) {
  35. firstReaderHoldCount++;
  36. } else {
  37. HoldCounter rh = cachedHoldCounter;
  38. if (rh == null || rh.tid != getThreadId(current))
  39. cachedHoldCounter = rh = readHolds.get();
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;
  43. }
  44. return 1;
  45. }
  46. return fullTryAcquireShared(current);
  47. }

如果tryAcquireShared方法返回-1,也就是没有取到读锁,会执行doAcquireShared方法:

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }

这个方法是将线程构造成Node并加入等待队列,然后不断的自旋中去继续获取锁。与上一篇文章的独占锁的获取类似,如果没有获取到锁,当前线程会将自己挂起,等待其他线程唤醒或者中断。

解锁

读锁的释放需要调用readLock的unlock方法,unlock方法调用的是releaseShared方法:

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

来看tryReleaseShared方法:

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. if (firstReader == current) {
  4. // assert firstReaderHoldCount > 0;
  5. if (firstReaderHoldCount == 1)
  6. firstReader = null;
  7. else
  8. firstReaderHoldCount--;
  9. } else {
  10. HoldCounter rh = cachedHoldCounter;
  11. if (rh == null || rh.tid != getThreadId(current))
  12. rh = readHolds.get();
  13. int count = rh.count;
  14. if (count <= 1) {
  15. readHolds.remove();
  16. if (count <= 0)
  17. throw unmatchedUnlockException();
  18. }
  19. --rh.count;
  20. }
  21. for (;;) {
  22. int c = getState();
  23. int nextc = c - SHARED_UNIT;
  24. if (compareAndSetState(c, nextc))
  25. // Releasing the read lock has no effect on readers,
  26. // but it may allow waiting writers to proceed if
  27. // both read and write locks are now free.
  28. return nextc == 0;
  29. }
  30. }

读锁的每次释放都会减少读锁被持有的次数,减少的值是1<<16。

写锁的加锁和解锁

加锁

写锁的加锁调用的是WriterLock的lock方法:

  1. public void lock() {
  2. sync.acquire(1);
  3. }

acquire方法是AQS提供的:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

首先会调用tryAcquire来获取锁,tryAcquire是Sync给出的:

  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1. If read count nonzero or write count nonzero
  5. * and owner is a different thread, fail.
  6. * 2. If count would saturate, fail. (This can only
  7. * happen if count is already nonzero.)
  8. * 3. Otherwise, this thread is eligible for lock if
  9. * it is either a reentrant acquire or
  10. * queue policy allows it. If so, update state
  11. * and set owner.
  12. */
  13. Thread current = Thread.currentThread();
  14. //获取锁状态
  15. int c = getState();
  16. //获取独占锁也就是写锁被持有的数量
  17. int w = exclusiveCount(c);
  18. if (c != 0) {
  19. //锁状态不为0 也就是读锁或者写锁被持有
  20. // (Note: if c != 0 and w == 0 then shared count != 0)
  21. //如果写锁被持有的次数是0 也就是读锁被持有 或者 写锁被其他线程持有 此时不能获取锁
  22. if (w == 0 || current != getExclusiveOwnerThread())
  23. return false;
  24. if (w + exclusiveCount(acquires) > MAX_COUNT)
  25. throw new Error("Maximum lock count exceeded");
  26. // Reentrant acquire
  27. //走到这一步说明写锁被当前线程持有 重入
  28. setState(c + acquires);
  29. return true;
  30. }
  31. if (writerShouldBlock() ||
  32. !compareAndSetState(c, c + acquires))
  33. return false;
  34. //获取锁成功 将同步器的所有者线程设置为当前线程
  35. setExclusiveOwnerThread(current);
  36. return true;
  37. }

如果tryAcquire没有成功,调用的是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,这个方法与前面讲到的ReentrantLock中独占锁的获取逻辑是一样的,这里不再赘述。

解锁

写锁的解锁调用的是writeLock的unlock方法:

  1. public void unlock() {
  2. sync.release(1);
  3. }

unlock调用的是release方法:

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

tryRelease方法是Sync提供的:

  1. protected final boolean tryRelease(int releases) {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. int nextc = getState() - releases;
  5. boolean free = exclusiveCount(nextc) == 0;
  6. if (free)
  7. setExclusiveOwnerThread(null);
  8. setState(nextc);
  9. return free;
  10. }

首先判断锁是否被当前线程持有。之后的逻辑就是减少写锁被重入的数量,如果写锁被当前线程持有并且没有重入,那么写锁的数量就会是零,也就是锁会被释放,否则,将锁被持有的数量减1,锁仍然被当前线程持有。

锁降级