1.类图结构

ReentrantReadWriteLock.png
image.png

前提

  1. 读写锁内容维护了一个ReadLock 一个WriteLock,他们依赖Sync实现具体的功能,而Sync继承自AQS,并且也提供了非公平和公平的实现
  2. ReentrantReadWriteLock中使用高16位表示读锁,低16位表示写锁

核心参数

  1. static final int SHARED_SHIFT = 16;
  2. //共享锁(读锁)的状态值 65536
  3. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  4. //共享线程的最大个数 65536
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. //排他锁(写锁)掩码,二进制 15个1
  7. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  8. //返回读锁的线程数
  9. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  10. //返回写锁的重入个数
  11. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  12. //第一个获取到读锁的线程
  13. private transient Thread firstReader = null;
  14. //第一个获取到读锁的可重入次数
  15. private transient int firstReaderHoldCount;
  16. //记录最后一个获取锁的线程获取读锁的可重入次数
  17. private transient HoldCounter cachedHoldCounter;
  18. //ThreadLocal类型的变量,用来存储第一个获取读锁线程外其他线程获取读锁的可重入次数
  19. //继承自 ThreadLocal initialValue 方法返回一个 HoldCounter对象
  20. private transient ThreadLocalHoldCounter readHolds;

2.写锁的获取和释放

2.1 void lock()

  1. 写锁是一个独占锁,某个时间只有一个线程能够获取该锁,如果当前没有线程获取到读锁和写锁,则当前线程可以获取写锁然后返回
  2. 如果已经有线程获取到读锁和写锁,则当前线程会被阻塞
  3. 写锁是可重入锁,如果当前线程获取到写锁,再次获取时 可重入次数 +1 即可

代码实现

  1. public void lock() {
  2. sync.acquire(1);
  3. }
  4. //和ReentrantLock一样,调用AQS中的acquire()方法
  5. public final void acquire(int arg) {
  6. if (!tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8. selfInterrupt();
  9. }

查看Sync中重写的 tryAquire()

  1. protected final boolean tryAcquire(int acquires) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. int w = exclusiveCount(c);
  5. //说明写锁或读锁已经被其他线程获取了
  6. if (c != 0) {
  7. // w == 0说明写锁没有被获取,但是 c != 0 可以得出 读锁被其他线程获取了
  8. // w != 0说明写锁被其他线程获取了,且当前线程不是当前线程
  9. //上面两种情况都返回 false
  10. if (w == 0 || current != getExclusiveOwnerThread())
  11. return false;
  12. //能执行到这里说明是当前线程获取到了写锁,则计算可重入的次数,不能超过写锁的最大可重入次数
  13. if (w + exclusiveCount(acquires) > MAX_COUNT)
  14. throw new Error("Maximum lock count exceeded");
  15. // 设置可重入的次数
  16. setState(c + acquires);
  17. return true;
  18. }
  19. //锁没有被人获取,则直接获取锁
  20. if (writerShouldBlock() ||
  21. !compareAndSetState(c, c + acquires))
  22. return false;
  23. setExclusiveOwnerThread(current);
  24. return true;
  25. }

2.2 void unlock()

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. public final boolean release(int arg) {
  5. if (tryRelease(arg)) {
  6. //激活阻塞队列中的一个线程
  7. Node h = head;
  8. if (h != null && h.waitStatus != 0)
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }

Sync自定义的 tryRelease() 方法

  1. protected final boolean tryRelease(int releases) {
  2. //确认当前线程是否是写锁的持有者
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. //计算可重入次数
  6. int nextc = getState() - releases;
  7. //如果可重入的次数为0则返回true
  8. boolean free = exclusiveCount(nextc) == 0;
  9. if (free)
  10. //置空
  11. setExclusiveOwnerThread(null);
  12. //更新状态值
  13. setState(nextc);
  14. return free;
  15. }

3.读锁的获取和释放

3.1 void lock()

获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS的状态值state的高16位会 + 1,然后返回 如果一个线程持有写锁,则当前线程会被阻塞

  1. public void lock() {
  2. sync.acquireShared(1);
  3. }
  4. //调用AQS中的 acquireShared
  5. public final void acquireShared(int arg) {
  6. if (tryAcquireShared(arg) < 0)
  7. //调用AQS中的 doAcquireShared 方法
  8. doAcquireShared(arg);
  9. }

然后Sync中自定义了 tryAcquireShared()方法

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. //判断写锁是否被占用,如果写锁被占用且不是当前线程则返回 -1(false)
  5. //这里也就说明了一个线程既可以获取写锁,也可以获取读锁,但是释放的时候都要释放
  6. if (exclusiveCount(c) != 0 &&
  7. getExclusiveOwnerThread() != current)
  8. return -1;
  9. //获取读锁计数
  10. int r = sharedCount(c);
  11. //尝试获取读锁,多个读线程只有一个会成功,不成功的会进入到 fullTryAcquireShared() 进行重试
  12. //readerShouldBlock() 在非公平的实现中,会去查看AQS队列中是否存在一个元素,判断第一个元素是否正在尝试获取写锁
  13. //如果不是,则判断获取读锁的线程是否达到了最大值
  14. //最后执行CAS操作将高16位的值 + 1
  15. if (!readerShouldBlock() &&
  16. r < MAX_COUNT &&
  17. compareAndSetState(c, c + SHARED_UNIT)) {
  18. //如果还没有线程获取过读锁
  19. //则记录当前线程位读锁的第一个获取线程
  20. if (r == 0) {
  21. firstReader = current;
  22. firstReaderHoldCount = 1;
  23. //如果当前线程是第一个获取读锁的线程,则可重入次数 +1
  24. } else if (firstReader == current) {
  25. firstReaderHoldCount++;
  26. } else {
  27. //记录最后一个获取读锁的线程或记录其他线程读锁的可重入次数
  28. HoldCounter rh = cachedHoldCounter;
  29. if (rh == null || rh.tid != getThreadId(current))
  30. cachedHoldCounter = rh = readHolds.get();
  31. else if (rh.count == 0)
  32. readHolds.set(rh);
  33. rh.count++;
  34. }
  35. return 1;
  36. }
  37. //自旋获取,类似于 tryAquireShared()
  38. return fullTryAcquireShared(current);
  39. }

3.2 void unlock()

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }

同样是Sync自定义的 tryReleaseShared()

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. //如果当前线程是第一个获取读锁的线程
  4. if (firstReader == current) {
  5. //且重入的次数为1,则 firstReader = null
  6. if (firstReaderHoldCount == 1)
  7. firstReader = null;
  8. else
  9. //否则 重入次数-1
  10. firstReaderHoldCount--;
  11. } else {
  12. HoldCounter rh = cachedHoldCounter;
  13. //如果当前线程不是最后一个获取读锁的线程
  14. if (rh == null || rh.tid != getThreadId(current))
  15. rh = readHolds.get();
  16. //则从中间 readHolds 统计的重入次数 -1
  17. int count = rh.count;
  18. if (count <= 1) {
  19. //如果 count = 1 ,则移除 HoldCounter 计数
  20. readHolds.remove();
  21. //如果 == 0则说明,无法进行 -1操作,抛出异常
  22. if (count <= 0)
  23. throw unmatchedUnlockException();
  24. }
  25. //计数-1
  26. --rh.count;
  27. }
  28. //自旋更新读锁的状态值
  29. for (;;) {
  30. int c = getState();
  31. //相当于高16位的值进行了 -1操作
  32. int nextc = c - SHARED_UNIT;
  33. if (compareAndSetState(c, nextc))
  34. //当前线程释放读锁成功,但是可能还有其他线程持有读锁,所以要判断 nextc == 0
  35. //不为0则说明还有其他线程持有读锁,则读锁释放失败
  36. //为0则说明没有其他线程持有读锁了,读锁释放成功
  37. return nextc == 0;
  38. }
  39. }

4.案例

使用ReentrantReadWriteLock完成一个线程安全的List

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. public class ReentrantList {
  6. private List<String> seed = new ArrayList<>();
  7. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  8. private final Lock readLock = lock.readLock();
  9. private final Lock writeLock = lock.writeLock();
  10. public void add(String e){
  11. writeLock.lock();
  12. try {
  13. seed.add(e);
  14. }finally {
  15. writeLock.unlock();
  16. }
  17. }
  18. public void remove(String e){
  19. writeLock.lock();
  20. try {
  21. seed.remove(e);
  22. }finally {
  23. writeLock.unlock();
  24. }
  25. }
  26. public String get(int index){
  27. readLock.lock();
  28. try {
  29. return seed.get(index);
  30. }finally {
  31. readLock.unlock();
  32. }
  33. }
  34. }

5.总结

  1. ReentrantReadWriteLock的底层原理就是使用AQS实现的,ReentrantReadWriteLock使用高16位和低16位分别表示读锁和写锁的个数
  2. 通过CAS对其进行操作实现了读写分离,主要在读多写少的场景下使用