读写锁在同一时刻允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁,一个写锁,通过分离读锁和写锁,使得并发性相比一般的排它锁有了很大提升。

特性 说明
公平性选择 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
重进入 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁后能够再次获取写锁,也可以获取读锁。
锁降级 遵循获取写锁、获取读锁再释放写锁的顺序,写锁能够降级为读锁
特性 说明
int getReadLockCount() 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数。
int getReadHoldCount() 返回当前线程获取读锁的次数
Boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

读写锁的实现

1、读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。
读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。通过位运算来得出读锁和写锁获取的次数。

  1. /**这个非公平策略的同步器是写锁优先的,申请写锁时总是不阻塞。
  2. */
  3. static final class NonfairSync extends Sync {
  4. private static final long serialVersionUID = -8159625535654395037L;
  5. final boolean writerShouldBlock() {
  6. return false; // 写线程总是可以突入
  7. }
  8. final boolean readerShouldBlock() {
  9. /* 作为一个启发用于避免写线程饥饿,如果线程临时出现在等待队列的头部则阻塞,
  10. * 如果存在这样的,则是写线程。
  11. */
  12. return apparentlyFirstQueuedIsExclusive();
  13. }
  14. }
  1. /*公平的 Sync,它的策略是:如果线程准备获取锁时,
  2. * 同步队列里有等待线程,则阻塞获取锁,不管是否是重入
  3. * 这也就需要tryAcqire、tryAcquireShared方法进行处理。
  4. */
  5. static final class FairSync extends Sync {
  6. private static final long serialVersionUID = -2274990926593161451L;
  7. final boolean writerShouldBlock() {
  8. return hasQueuedPredecessors();
  9. }
  10. final boolean readerShouldBlock() {
  11. return hasQueuedPredecessors();
  12. }
  13. }

2、写锁的获取与释放

写锁是一个支持重入的排它锁。如果当前线程已经已经获取了写锁,则增加写状态。如果当前线程在获取锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在允许的其他读线程就无法感知到当前线程的操作。
lock

  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1.如果读取计数非零或写入计数非零且所有者是不同的线程,则失败。
  5. 2.如果计数会饱和,那就失败了。 (只有在计数已经非零时才会发生这种情况。)
  6. 3.否则,如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁定。
  7. 如果是,请更新状态并设置所有者。
  8. */
  9. Thread current = Thread.currentThread();
  10. int c = getState();
  11. int w = exclusiveCount(c);
  12. if (c != 0) {
  13. // (Note: if c != 0 and w == 0 then shared count != 0)
  14. //写锁获取次数等于0,或者当前线程不是排它锁拥有线程
  15. if (w == 0 || current != getExclusiveOwnerThread())
  16. return false;
  17. if (w + exclusiveCount(acquires) > MAX_COUNT)
  18. throw new Error("Maximum lock count exceeded");
  19. // Reentrant acquire
  20. setState(c + acquires);
  21. return true;
  22. }
  23. //写锁是否阻塞
  24. if (writerShouldBlock() ||
  25. //更新同步状态
  26. !compareAndSetState(c, c + acquires))
  27. return false;
  28. //设置排它锁拥有线程
  29. setExclusiveOwnerThread(current);
  30. return true;
  31. }

tryLock,尝试获取写锁,立即返回,不阻塞

  1. final boolean tryWriteLock() {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c != 0) {
  5. int w = exclusiveCount(c);
  6. if (w == 0 || current != getExclusiveOwnerThread())
  7. return false;
  8. if (w == MAX_COUNT)
  9. throw new Error("Maximum lock count exceeded");
  10. }
  11. if (!compareAndSetState(c, c + 1))
  12. return false;
  13. setExclusiveOwnerThread(current);
  14. return true;
  15. }

写锁的释放,因为只允许一个线程获取写锁,所以锁的释放不需要考虑线程安全问题

  1. protected final boolean tryRelease(int releases) {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. int nextc = getState() - releases;
  5. boolean free = exclusiveCount(nextc) == 0;
  6. if (free)
  7. setExclusiveOwnerThread(null);
  8. setState(nextc);
  9. return free;
  10. }

3、读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总会被成功的获取,而所做的只是增加读状态
读锁的获取

  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. *1.如果另一个线程持有写锁定,则失败。
  4. 2.否则,此线程符合锁定wrt状态,因此请询问是否应该因为队列策略而阻塞。 如果没有,请尝试通过CASing状态授予并更新count.Note该步骤不检查可重入获取,这被推迟到完整版本以避免在更典型的非重入情况下检查保持计数。
  5. 3.如果步骤2因为线程显然不符合条件或CAS失败或计数饱和而失败,则链接到具有完全重试循环的版本。
  6. */
  7. Thread current = Thread.currentThread();
  8. int c = getState();
  9. //排它锁获取次数不为0,并且排它锁拥有线程不是当线程,返回失败
  10. if (exclusiveCount(c) != 0 &&
  11. getExclusiveOwnerThread() != current)
  12. return -1;
  13. //共享锁获取次数
  14. int r = sharedCount(c);
  15. //不阻塞、没有达到最大值并且同步状态修改成功
  16. if (!readerShouldBlock() &&
  17. r < MAX_COUNT &&
  18. compareAndSetState(c, c + SHARED_UNIT)) {
  19. //第一次获取读锁
  20. if (r == 0) {
  21. firstReader = current;
  22. firstReaderHoldCount = 1;
  23. //再次获取读锁,并且读锁拥有线程是当前线程
  24. } else if (firstReader == current) {
  25. firstReaderHoldCount++;
  26. //再次获取读锁,当前线程不是读锁拥有线程
  27. } else {
  28. HoldCounter rh = cachedHoldCounter;
  29. if (rh == null || rh.tid != getThreadId(current))
  30. cachedHoldCounter = rh = readHolds.get();
  31. else if (rh.count == 0)
  32. readHolds.set(rh);
  33. rh.count++;
  34. }
  35. return 1;
  36. }
  37. return fullTryAcquireShared(current);
  38. }
  1. final int fullTryAcquireShared(Thread current) {
  2. /*
  3. * 此代码与tryAcquireShared中的代码部分冗余,
  4. 但总体上更简单,因为不会使tryAcquireShared与重试之间的交互和懒惰读取保持计数复杂化。
  5. */
  6. HoldCounter rh = null;
  7. //自旋
  8. for (;;) {
  9. int c = getState();
  10. //排它锁获取次数
  11. if (exclusiveCount(c) != 0) {
  12. //排它锁拥有线程不是当前线程,返回
  13. if (getExclusiveOwnerThread() != current)
  14. return -1;
  15. // else we hold the exclusive lock; blocking here
  16. // would cause deadlock.
  17. //是否阻塞读锁
  18. } else if (readerShouldBlock()) {
  19. // Make sure we're not acquiring read lock reentrantly
  20. if (firstReader == current) {
  21. // assert firstReaderHoldCount > 0;
  22. } else {
  23. if (rh == null) {
  24. rh = cachedHoldCounter;
  25. if (rh == null || rh.tid != getThreadId(current)) {
  26. rh = readHolds.get();
  27. if (rh.count == 0)
  28. readHolds.remove();
  29. }
  30. }
  31. if (rh.count == 0)
  32. return -1;
  33. }
  34. }
  35. //共享锁获取次数
  36. if (sharedCount(c) == MAX_COUNT)
  37. throw new Error("Maximum lock count exceeded");
  38. //共享状态修改成功
  39. if (compareAndSetState(c, c + SHARED_UNIT)) {
  40. //第一次获取
  41. if (sharedCount(c) == 0) {
  42. firstReader = current;
  43. firstReaderHoldCount = 1;
  44. } else if (firstReader == current) {
  45. firstReaderHoldCount++;
  46. } else {
  47. if (rh == null)
  48. rh = cachedHoldCounter;
  49. if (rh == null || rh.tid != getThreadId(current))
  50. rh = readHolds.get();
  51. else if (rh.count == 0)
  52. readHolds.set(rh);
  53. rh.count++;
  54. cachedHoldCounter = rh; // cache for release
  55. }
  56. return 1;
  57. }
  58. }
  59. }
  1. final boolean tryReadLock() {
  2. Thread current = Thread.currentThread();
  3. for (;;) {
  4. int c = getState();
  5. if (exclusiveCount(c) != 0 &&
  6. getExclusiveOwnerThread() != current)
  7. return false;
  8. int r = sharedCount(c);
  9. if (r == MAX_COUNT)
  10. throw new Error("Maximum lock count exceeded");
  11. if (compareAndSetState(c, c + SHARED_UNIT)) {
  12. if (r == 0) {
  13. firstReader = current;
  14. firstReaderHoldCount = 1;
  15. } else if (firstReader == current) {
  16. firstReaderHoldCount++;
  17. } else {
  18. HoldCounter rh = cachedHoldCounter;
  19. if (rh == null || rh.tid != getThreadId(current))
  20. cachedHoldCounter = rh = readHolds.get();
  21. else if (rh.count == 0)
  22. readHolds.set(rh);
  23. rh.count++;
  24. }
  25. return true;
  26. }
  27. }
  28. }

读锁的释放

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. if (firstReader == current) {
  4. // assert firstReaderHoldCount > 0;
  5. if (firstReaderHoldCount == 1)
  6. firstReader = null;
  7. else
  8. firstReaderHoldCount--;
  9. } else {
  10. HoldCounter rh = cachedHoldCounter;
  11. if (rh == null || rh.tid != getThreadId(current))
  12. rh = readHolds.get();
  13. int count = rh.count;
  14. if (count <= 1) {
  15. readHolds.remove();
  16. if (count <= 0)
  17. throw unmatchedUnlockException();
  18. }
  19. --rh.count;
  20. }
  21. //自旋更新同步状态
  22. for (;;) {
  23. int c = getState();
  24. int nextc = c - SHARED_UNIT;
  25. if (compareAndSetState(c, nextc))
  26. // Releasing the read lock has no effect on readers,
  27. // but it may allow waiting writers to proceed if
  28. // both read and write locks are now free.
  29. return nextc == 0;
  30. }
  31. }

4、锁降级

锁降级是指写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再次获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住写锁,再获取到读锁,随后释放写锁的过程。