读写锁是怎样实现分别记录读写状态的?

在 ReentrantLock 中,使用 Sync ( 实际是 AQS )的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:高16位表示读,低16位表示写。高位表示读锁被持有的数量,低位表示写锁重入数

分割之后,通过位运算确定读锁和写锁的状态呢 ,假如当前同步状态为S,那么:

  • 写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)。 当写状态加1,等于S+1.
  • 读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。当读状态加1,等于S+(1<<16),也就是S+0x00010000

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。
截屏2022-03-27 22.26.49.png

  1. //切割标志,16为
  2. static final int SHARED_SHIFT = 16;
  3. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  4. //最大线程数量65535
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  7. //获得持有读状态的锁的线程数量
  8. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  9. //获得持有写状态的锁的次数
  10. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

不同于写锁,读锁可以同时被多个线程持有。而每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter 计数器

HoldCounter 计数器

读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
截屏2022-03-27 22.36.23.png
通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。

  • HoldCounter是用来记录读锁重入数的对象
  • ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象

写锁的获取

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程, 则当前线程进入等待状态。
写锁的获取是通过重写AQS中的tryAcquire方法实现的。
截屏2022-03-27 22.43.03.png

  1. protected final boolean tryAcquire(int acquires) {
  2. //当前线程
  3. Thread current = Thread.currentThread();
  4. //获取state状态,存在读锁或者写锁则不为0
  5. int c = getState();
  6. //获取写锁的重入数
  7. int w = exclusiveCount(c);
  8. //c!=0 表示有读锁或者写锁存在
  9. if (c != 0) {
  10. // (Note: if c != 0 and w == 0 then shared count != 0)
  11. // w=0 && c!=0 表示有读锁,读写互斥,当前写锁不允许添加
  12. // w!=0 && current != getExclusiveOwnerThread() 表示有写锁存在,
  13. //而且当前的线程不是占有写锁的线程,直接返回,写写互斥
  14. if (w == 0 || current != getExclusiveOwnerThread())
  15. return false;
  16. // 超出最大范围 65535
  17. if (w + exclusiveCount(acquires) > MAX_COUNT)
  18. throw new Error("Maximum lock count exceeded");
  19. // 同步state状态,写锁重入数+1
  20. setState(c + acquires);
  21. return true;
  22. }
  23. //writerShouldBlock有公平与非公平的实现, 非公平返回false,会尝试通过cas加锁
  24. //c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
  25. if (writerShouldBlock() ||
  26. !compareAndSetState(c, c + acquires))
  27. return false;
  28. //设置写锁为当前线程所有
  29. setExclusiveOwnerThread(current);
  30. return true;
  31. }

截屏2022-03-27 23.03.48.png
通过源码我们可以知道:

  • 读写互斥
  • 写写互斥
  • 写锁支持同一个线程重入
  • writerShouldBlock写锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

截屏2022-03-27 23.04.56.png

写锁的释放

  1. //releases=1
  2. protected final boolean tryRelease(int releases) {
  3. //当前当前线程,不是拿到锁的线程,抛异常
  4. if (!isHeldExclusively())
  5. throw new IllegalMonitorStateException();
  6. //写锁-1,可能有重入
  7. int nextc = getState() - releases;
  8. //当前写状态是否为0,为0则释放写锁
  9. boolean free = exclusiveCount(nextc) == 0;
  10. if (free)
  11. setExclusiveOwnerThread(null);
  12. setState(nextc);
  13. return free;
  14. }

读锁的获取

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. //exclusiveCount(c)表示写锁的次数
  5. //写锁!=0,而且不是持有锁不是当前线程,返回-1(判断锁降级)
  6. if (exclusiveCount(c) != 0 &&
  7. getExclusiveOwnerThread() != current)
  8. return -1;
  9. //计算出读锁的数量
  10. int r = sharedCount(c);
  11. //读锁是否阻塞readerShouldBlock()
  12. //r < MAX_COUNT: 持有读锁的线程小于最大数(65535)
  13. //compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
  14. if (!readerShouldBlock() &&
  15. r < MAX_COUNT &&
  16. compareAndSetState(c, c + SHARED_UNIT)) {
  17. if (r == 0) {//设置第一个获取读锁的线程
  18. firstReader = current;
  19. firstReaderHoldCount = 1;//设置第一个获取读锁线程的重入数
  20. } else if (firstReader == current) {// 表示第一个获取读锁的线程重入
  21. firstReaderHoldCount++;
  22. } else {// 非第一个获取读锁的线程
  23. HoldCounter rh = cachedHoldCounter;
  24. if (rh == null || rh.tid != getThreadId(current))
  25. cachedHoldCounter = rh = readHolds.get();
  26. else if (rh.count == 0)
  27. readHolds.set(rh);
  28. rh.count++;//记录其他获取读锁的线程的重入次数
  29. }
  30. return 1;
  31. }
  32. // 尝试通过自旋的方式获取读锁,实现了重入逻辑
  33. return fullTryAcquireShared(current);
  34. }

截屏2022-03-27 23.23.52.png

  • 读锁共享,读读不互斥
  • 读锁可重入,每个获取读锁的线程都会记录对应的重入数
  • 读写互斥,锁降级场景除外
  • 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
  • readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

为什么要搞一个firstRea,需要记录第一个线程?是为了一个效率问题,firstReader是不会放入到readHolds中的,如果读锁仅有一个的情况下就会避免查找readHolds。

读锁的释放

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. //如果当前线程是第一个获取读锁的线程
  4. if (firstReader == current) {
  5. // assert firstReaderHoldCount > 0;
  6. if (firstReaderHoldCount == 1)
  7. firstReader = null;
  8. else
  9. firstReaderHoldCount--;//重入次数减1
  10. } else {//不是第一个获取读锁的线程
  11. HoldCounter rh = cachedHoldCounter;
  12. if (rh == null || rh.tid != getThreadId(current))
  13. rh = readHolds.get();
  14. int count = rh.count;
  15. if (count <= 1) {
  16. readHolds.remove();
  17. if (count <= 0)
  18. throw unmatchedUnlockException();
  19. }
  20. --rh.count;/重入次数减1
  21. }
  22. for (;;) {//cas更新同步状态
  23. int c = getState();
  24. int nextc = c - SHARED_UNIT;
  25. if (compareAndSetState(c, nextc))
  26. // Releasing the read lock has no effect on readers,
  27. // but it may allow waiting writers to proceed if
  28. // both read and write locks are now free.
  29. return nextc == 0;
  30. }
  31. }