读写锁接口:

  1. public interface ReadWriteLock {
  2. Lock readLock();//读锁
  3. Lock writeLock();//写锁
  4. }

读写锁维护两把锁,一把是读锁,一把是写锁

构造器

  1. public ReentrantReadWriteLock() {
  2. this(false);
  3. }
  4. public ReentrantReadWriteLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. readerLock = new ReadLock(this);
  7. writerLock = new WriteLock(this);
  8. }

构造器可以得知,读写锁也分为公平锁和非公平锁
读锁和写锁都实现了Lock接口
并持有同个AQS同步器

读锁的实现

  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. private static final long serialVersionUID = -5992448646407690164L;
  3. private final Sync sync;
  4. protected ReadLock(ReentrantReadWriteLock lock) {
  5. sync = lock.sync;
  6. }
  7. ....
  8. }

获取读锁

  1. public void lock() {
  2. sync.acquireShared(1);//获取共享锁
  3. }

读锁获取的是个共享锁

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

同步器共享方法中,返回>=0获取成功,<0获取失败

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();//获取当前线程
  3. int c = getState();//获取state
  4. if (exclusiveCount(c) != 0 &&//独占计数!=0,说明有线程占用读锁
  5. getExclusiveOwnerThread() != current)//这个线程非当前线程
  6. return -1;//返回-1 抢占共享锁失败
  7. int r = sharedCount(c);//获取共享计数
  8. if (!readerShouldBlock() &&//下个aqs下个被唤醒的不是写线程
  9. r < MAX_COUNT &&//允许最大的读线程
  10. compareAndSetState(c, c + SHARED_UNIT)) {//cas占有读锁
  11. if (r == 0) {//第一个占用锁的线程
  12. firstReader = current;//第一读的线程
  13. firstReaderHoldCount = 1;//第一个读的线程保持计数
  14. } else if (firstReader == current) {//第一个读的线程重入
  15. firstReaderHoldCount++;//计数++
  16. } else {
  17. HoldCounter rh = cachedHoldCounter;//获取最后一个读锁的线程计数器
  18. if (rh == null || rh.tid != getThreadId(current))//缓存为空或者 如果最后一个线程计数器不是当前线程
  19. cachedHoldCounter = rh = readHolds.get();//创建新的线程计数器
  20. else if (rh.count == 0)//缓存计数器无值
  21. readHolds.set(rh);
  22. rh.count++;
  23. }
  24. return 1;
  25. }
  26. //cas失败,继续获取
  27. return fullTryAcquireShared(current);
  28. }

exclusiveCount(c)判断独占锁

  1. static final int SHARED_SHIFT = 16;
  2. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  3. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

读写锁的状态都维护在state上。这是种按位切割使用,将变量切分成两部分,高16位表示读,低16位表示写。

未命名文件.jpg
当前同步状态表示一个线程已经获取了写锁,且重入两次,同时也连续获取了两次读锁。

如果没有线程占有独占锁,那么线程会判断读是否应该阻塞当前线程

readerShouldBlock()

非公平锁实现

  1. final boolean readerShouldBlock() {
  2. return apparentlyFirstQueuedIsExclusive();//队列第一是是否是独占
  3. }
  4. final boolean apparentlyFirstQueuedIsExclusive() {
  5. Node h, s;
  6. return (h = head) != null &&//获取队列头 不为空
  7. (s = h.next) != null &&//头的next节点不为空,及排在队列的一个节点
  8. !s.isShared() &&//状态不是共享状态。
  9. s.thread != null; //有线程
  10. }

非公平锁判断aqs排在第一的线程是写锁的的话则进行阻塞
公平锁实现

  1. final boolean readerShouldBlock() {
  2. return hasQueuedPredecessors();
  3. }
  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail; // Read fields in reverse initialization order 以相反的初始化顺序读取字段
  3. Node h = head;
  4. Node s;
  5. return h != t &&
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }

公平锁只要是aqs里有值就直接阻塞,让队列里的线程先访问资源

如果判断不需要阻塞,则进行一次cas,stete高位+1.成功的话获得锁,并记录是否是firstRead,或者holdCounter
需要阻塞或者cas失败的话,进入fullTryAcquireShared(current)

fullTryAcquireShared(current)自旋获取锁

  1. final int fullTryAcquireShared(Thread current) {
  2. HoldCounter rh = null;
  3. for (;;) {//通过自旋方式
  4. int c = getState();
  5. if (exclusiveCount(c) != 0) {//独占锁
  6. if (getExclusiveOwnerThread() != current)//非当前线程
  7. return -1;//获取失败
  8. // else we hold the exclusive lock; blocking here
  9. // would cause deadlock.
  10. } else if (readerShouldBlock()) {//读被阻塞。aqs队列排在第一的线程为写线程
  11. // Make sure we're not acquiring read lock reentrantly
  12. if (firstReader == current) {//当前线程是第一个获取写的线程
  13. // assert firstReaderHoldCount > 0;
  14. } else {
  15. if (rh == null) {
  16. rh = cachedHoldCounter;//代表的是最后一个获取读锁的线程的计数器。
  17. if (rh == null || rh.tid != getThreadId(current)) {//如果最后一个线程计数器是 null 或者不是当前线程
  18. rh = readHolds.get();
  19. if (rh.count == 0)
  20. readHolds.remove();
  21. }
  22. }
  23. if (rh.count == 0)
  24. return -1;
  25. }
  26. }
  27. if (sharedCount(c) == MAX_COUNT)//读锁满了
  28. throw new Error("Maximum lock count exceeded");//抛出异常
  29. if (compareAndSetState(c, c + SHARED_UNIT)) {//cas去获取线程
  30. if (sharedCount(c) == 0) {//第一个获取锁的线程
  31. firstReader = current;//记录第一线程
  32. firstReaderHoldCount = 1;
  33. } else if (firstReader == current) {
  34. firstReaderHoldCount++;
  35. } else {
  36. if (rh == null)
  37. rh = cachedHoldCounter;//最后一个线程计数器
  38. if (rh == null || rh.tid != getThreadId(current))//为空或者不是当前线程。
  39. rh = readHolds.get();//创建一个
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;//计算+1
  43. cachedHoldCounter = rh; // cache for release
  44. }
  45. return 1;
  46. }
  47. }
  48. }

自旋直到,写锁获取锁,或者需要被阻塞,或者获取当前线程获取锁,或者锁满。

doAcquireShared(arg)加入aqs等待队列,挂起

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);//传递唤醒
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }

挂起的逻辑与独占锁的逻辑一致,被唤醒后,的差异是会传递唤醒下一个也是share状态的锁

setHeadAndPropagate(node, r)传递唤醒

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node);//当前被唤醒的线程设为head
  4. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  5. (h = head) == null || h.waitStatus < 0) {
  6. Node s = node.next;
  7. if (s == null || s.isShared())
  8. doReleaseShared();//传递唤醒线程
  9. }
  10. }

读锁释放

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {//释放共享锁
  3. doReleaseShared();//唤醒aqs队列中等待的队列
  4. return true;
  5. }
  6. return false;
  7. }
  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. if (firstReader == current) {//当前线程是头线程
  4. // assert firstReaderHoldCount > 0;
  5. if (firstReaderHoldCount == 1)
  6. firstReader = null;
  7. else
  8. firstReaderHoldCount--;
  9. } else {
  10. HoldCounter rh = cachedHoldCounter;//最后一个线程
  11. if (rh == null || rh.tid != getThreadId(current))
  12. rh = readHolds.get();
  13. int count = rh.count;
  14. if (count <= 1) {
  15. readHolds.remove();//移除
  16. if (count <= 0)
  17. throw unmatchedUnlockException();
  18. }
  19. --rh.count;
  20. }
  21. for (;;) {
  22. int c = getState();
  23. int nextc = c - SHARED_UNIT;//高位减1
  24. if (compareAndSetState(c, nextc))//cas去减state
  25. return nextc == 0;//等与0 的话,读锁释放
  26. }
  27. }
  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue; // loop to recheck cases
  9. unparkSuccessor(h);//唤醒排在第一的线程
  10. }
  11. else if (ws == 0 &&
  12. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//PROPAGATE:-3表示下一次共享式同步状态获取将会无条件地被传播下去
  13. continue; // loop on failed CAS
  14. }
  15. if (h == head) // loop if head changed
  16. break;
  17. }
  18. }

写锁的实现

获取写锁

  1. public void lock() {
  2. sync.acquire(1);
  3. }
  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();//再次中断
  5. }
  1. protected final boolean tryAcquire(int acquires) {
  2. Thread current = Thread.currentThread();//获取当前线程
  3. int c = getState();
  4. int w = exclusiveCount(c);//独占线程次数
  5. if (c != 0) {//有线程占有锁
  6. // (Note: if c != 0 and w == 0 then shared count != 0)
  7. if (w == 0 || current != getExclusiveOwnerThread()) //不是独占锁,或者不是当前线程占用独占锁
  8. return false;//抢占失败
  9. if (w + exclusiveCount(acquires) > MAX_COUNT)
  10. throw new Error("Maximum lock count exceeded");
  11. // Reentrant acquire
  12. setState(c + acquires);//重入次数+1 ,由于此时该线程获取了锁,不需要cas
  13. return true;
  14. }
  15. if (writerShouldBlock() || //判断写线程是否需要阻塞,非公平锁永远不阻塞,公平锁判断aqs队列里是否有值
  16. !compareAndSetState(c, c + acquires))//cas去获取锁
  17. return false;//阻塞,或者获取失败直接返回 false
  18. setExclusiveOwnerThread(current);//设置当前线程获取独占锁
  19. return true;

获取失败的话加入aqs队列,等待被唤醒,并判断是否需要恢复中断标识。该逻辑与重入锁的逻辑一致,这里就不再次详细描述

释放写锁

  1. public void unlock() {
  2. sync.release(1);
  3. }
  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) { // 释放锁
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0) //有等待的线程
  5. unparkSuccessor(h);//唤醒头节点的线程
  6. return true;
  7. }
  8. return false;
  9. }
  1. protected final boolean tryRelease(int releases) {
  2. if (!isHeldExclusively())//非独占锁,异常
  3. throw new IllegalMonitorStateException();
  4. int nextc = getState() - releases;
  5. boolean free = exclusiveCount(nextc) == 0;//等与0的时候释放,不等于0,减去重入次数
  6. if (free)
  7. setExclusiveOwnerThread(null);//释放锁的线程
  8. setState(nextc);
  9. return free;
  10. }

因为线程独占锁所以释放的时候不需要cas
释放完成后会唤醒aqs的排在第一的线程。该唤醒与重入锁逻辑一致