可重入的读写锁.
线程进入读锁的前提条件:

  • 没有其他的线程在写,如果是同一个线程在写,可以获得读锁

线程进入写锁的前提条件:

  • 没有线程在写
  • 没有线程在读
先读再写(不释放读锁) 先写再读(不释放写锁)
同一个线程 阻塞,获取不到写锁 可以
不同线程 阻塞,获取不到写锁 阻塞,获取不到读锁

源码解析

ReentrantReadWriteLock里有5个内部类,分别是Sync(同步类,继承了AQS),NonfairSync(非公平同步类,继承Sync),FairSync(公平同步类,继承Sync),ReadLock(读锁),WriteLock(写锁)。其中Sync中又包含两个内部类:HoldCounter(和读锁配套使用,记录某个读线程的重入次数和线程id);ThreadLocalHoldCounter(本地线程计数器,继承了ThreadLocal,将线程和HoldCounter对象关联

写锁(独占锁)

  1. protected final boolean tryAcquire(int acquires) {
  2. //获取当前线程
  3. Thread current = Thread.currentThread();
  4. //获得state,持有锁的线程数量
  5. int c = getState();
  6. //独占线程的数量
  7. int w = exclusiveCount(c);
  8. if (c != 0) {
  9. // (Note: if c != 0 and w == 0 then shared count != 0)
  10. if (w == 0 || current != getExclusiveOwnerThread())
  11. return false;
  12. //原有独占线程的数量+本次占有的线程数>最大限制?
  13. if (w + exclusiveCount(acquires) > MAX_COUNT)
  14. throw new Error("Maximum lock count exceeded");
  15. // Reentrant acquire
  16. //设置状态
  17. setState(c + acquires);
  18. return true;
  19. }
  20. //写锁是否阻塞 CAS是否成功
  21. if (writerShouldBlock() ||
  22. !compareAndSetState(c, c + acquires))
  23. return false;
  24. //设置独占线程
  25. setExclusiveOwnerThread(current);
  26. return true;
  27. }

image.png

读锁(共享锁)

  1. protected final int tryAcquireShared(int unused) {
  2. // 获取当前线程
  3. Thread current = Thread.currentThread();
  4. // 获取状态
  5. int c = getState();
  6. //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级
  7. if (exclusiveCount(c) != 0 &&
  8. getExclusiveOwnerThread() != current)
  9. return -1;
  10. // 读锁数量
  11. int r = sharedCount(c);
  12. /*
  13. * readerShouldBlock():读锁是否需要等待(公平锁原则)
  14. * r < MAX_COUNT:持有线程小于最大数(65535)
  15. * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
  16. */
  17. // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
  18. if (!readerShouldBlock() &&
  19. r < MAX_COUNT &&
  20. compareAndSetState(c, c + SHARED_UNIT)) {
  21. //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
  22. if (r == 0) { // 读锁数量为0
  23. // 设置第一个读线程
  24. firstReader = current;
  25. // 读线程占用的资源数为1
  26. firstReaderHoldCount = 1;
  27. } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
  28. // 占用资源数加1
  29. firstReaderHoldCount++;
  30. } else { // 读锁数量不为0并且不为当前线程
  31. // 获取计数器
  32. HoldCounter rh = cachedHoldCounter;
  33. // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
  34. if (rh == null || rh.tid != getThreadId(current))
  35. // 获取当前线程对应的计数器
  36. cachedHoldCounter = rh = readHolds.get();
  37. else if (rh.count == 0) // 计数为0
  38. //加入到readHolds中
  39. readHolds.set(rh);
  40. //计数+1
  41. rh.count++;
  42. }
  43. return 1;
  44. }
  45. return fullTryAcquireShared(current);
  46. }
  1. final int fullTryAcquireShared(Thread current) {
  2. HoldCounter rh = null;
  3. for (;;) { // 无限循环
  4. // 获取状态
  5. int c = getState();
  6. if (exclusiveCount(c) != 0) { // 写线程数量不为0
  7. if (getExclusiveOwnerThread() != current) // 不为当前线程
  8. return -1;
  9. } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞
  10. // Make sure we're not acquiring read lock reentrantly
  11. if (firstReader == current) { // 当前线程为第一个读线程
  12. // assert firstReaderHoldCount > 0;
  13. } else { // 当前线程不为第一个读线程
  14. if (rh == null) { // 计数器不为空
  15. //
  16. rh = cachedHoldCounter;
  17. if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
  18. rh = readHolds.get();
  19. if (rh.count == 0)
  20. readHolds.remove();
  21. }
  22. }
  23. if (rh.count == 0)
  24. return -1;
  25. }
  26. }
  27. if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
  28. throw new Error("Maximum lock count exceeded");
  29. if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功
  30. if (sharedCount(c) == 0) { // 读线程数量为0
  31. // 设置第一个读线程
  32. firstReader = current;
  33. //
  34. firstReaderHoldCount = 1;
  35. } else if (firstReader == current) {
  36. firstReaderHoldCount++;
  37. } else {
  38. if (rh == null)
  39. rh = cachedHoldCounter;
  40. if (rh == null || rh.tid != getThreadId(current))
  41. rh = readHolds.get();
  42. else if (rh.count == 0)
  43. readHolds.set(rh);
  44. rh.count++;
  45. cachedHoldCounter = rh; // cache for release
  46. }
  47. return 1;
  48. }
  49. }
  50. }

image.png

image.png

使用场景

  1. ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
  2. ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
  3. ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
  4. int QUEUE_SIZE = 5;
  5. LinkedBlockingQueue queue = new LinkedBlockingQueue(QUEUE_SIZE);
  6. public static void main(String[] args) throws InterruptedException {
  7. ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
  8. Thread readThread = new Thread(() -> {
  9. try {
  10. demo.readLock.lock();
  11. // TimeUnit.SECONDS.sleep(1);
  12. while (demo.queue.size() == 0) {
  13. }
  14. int currQueueSize = demo.queue.size();
  15. for (int i = 0; i < currQueueSize; i++) {
  16. System.out.println(Thread.currentThread().getName() + "读取到数据:" + demo.queue.poll());
  17. }
  18. } catch (Exception e) {
  19. } finally {
  20. demo.readLock.unlock();
  21. }
  22. }, "readThread");
  23. Thread writeThread = new Thread(() -> {
  24. try {
  25. demo.writeLock.lock();
  26. while (demo.queue.size() < demo.QUEUE_SIZE) {
  27. int i = new Random().nextInt(20);
  28. demo.queue.offer(i);
  29. System.out.println(Thread.currentThread().getName() + "写入数据:" + i);
  30. TimeUnit.SECONDS.sleep(1);
  31. }
  32. } catch (Exception e) {
  33. } finally {
  34. demo.writeLock.unlock();
  35. }
  36. }, "writeThread");
  37. writeThread.start();readThread.start();
  38. //同一个线程
  39. //先获取写锁,然后可以获取读锁
  40. //可以不用释放写锁,就能获取读锁
  41. new Thread(() -> {
  42. try{
  43. demo.writeLock.lock();
  44. System.out.println("写入数据");
  45. demo.queue.offer(new Random().nextInt(10));
  46. demo.readLock.lock();
  47. System.out.println("读取到数据:" + demo.queue.poll());
  48. }catch(Exception e){
  49. }finally {
  50. demo.readLock.unlock();
  51. demo.writeLock.unlock();
  52. }
  53. }).start();
  54. //同一个线程
  55. // 先获取读锁,再获取写锁
  56. //必须得先把读锁释放,才可获得写锁
  57. new Thread(() -> {
  58. try{
  59. demo.readLock.lock();
  60. System.out.println("读取到数据:" + demo.queue.poll());
  61. demo.writeLock.lock();
  62. demo.queue.offer(new Random().nextInt(10));
  63. System.out.println("写入数据");
  64. }catch(Exception e){
  65. }finally {
  66. demo.writeLock.unlock();
  67. demo.readLock.unlock();
  68. }
  69. }).start();
  70. new Thread(() -> {
  71. try{
  72. demo.readLock.lock();
  73. System.out.println("读取到数据:" + demo.queue.poll());
  74. }catch(Exception e){
  75. }finally {
  76. demo.readLock.unlock();
  77. }
  78. }).start();
  79. new Thread(() -> {
  80. try{
  81. demo.writeLock.lock();
  82. System.out.println("写入数据");
  83. demo.queue.offer(new Random().nextInt(10));
  84. demo.readLock.lock();
  85. }catch(Exception e){
  86. }finally {
  87. demo.writeLock.unlock();
  88. }
  89. }).start();
  90. new Thread(() -> {
  91. try{
  92. demo.writeLock.lock();
  93. System.out.println("写入数据");
  94. demo.queue.offer(new Random().nextInt(10));
  95. }catch(Exception e){
  96. }finally {
  97. demo.writeLock.unlock();
  98. }
  99. }).start();
  100. TimeUnit.SECONDS.sleep(10);
  101. new Thread(() -> {
  102. try{
  103. demo.readLock.lock();
  104. System.out.println("读取到数据:" + demo.queue.poll());
  105. }catch(Exception e){
  106. }finally {
  107. demo.readLock.unlock();
  108. }
  109. }).start();
  110. }