



  1. public interface ReadWriteLock {
  2. /**
  3. * Returns the lock used for reading.
  4. *
  5. * @return the lock used for reading
  6. */
  7. Lock readLock();
  8. /**
  9. * Returns the lock used for writing.
  10. *
  11. * @return the lock used for writing
  12. */
  13. Lock writeLock();
  14. }


  1. public class ReentrantReadWriteLock
  2. implements ReadWriteLock, java.io.Serializable {
  3. private static final long serialVersionUID = -6992448646407690164L;
  4. /** Inner class providing readlock */
  5. private final ReentrantReadWriteLock.ReadLock readerLock;
  6. /** Inner class providing writelock */
  7. private final ReentrantReadWriteLock.WriteLock writerLock;
  8. /** Performs all synchronization mechanics */
  9. final Sync sync;


  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. private static final long serialVersionUID = -5992448646407690164L;
  3. private final Sync sync;
  4. }
  5. public static class WriteLock implements Lock, java.io.Serializable {
  6. private static final long serialVersionUID = -4992448646407690164L;
  7. private final Sync sync;
  8. }


  1. abstract static class Sync extends AbstractQueuedSynchronizer {


  1. static final class FairSync extends Sync {}
  2. static final class NonfairSync extends Sync {}



  1. /**
  2. * Creates a new {@code ReentrantReadWriteLock} with
  3. * default (nonfair) ordering properties.
  4. */
  5. public ReentrantReadWriteLock() {
  6. this(false);
  7. }
  8. /**
  9. * Creates a new {@code ReentrantReadWriteLock} with
  10. * the given fairness policy.
  11. *
  12. * @param fair {@code true} if this lock should use a fair ordering policy
  13. */
  14. public ReentrantReadWriteLock(boolean fair) {
  15. sync = fair ? new FairSync() : new NonfairSync();
  16. readerLock = new ReadLock(this);
  17. writerLock = new WriteLock(this);
  18. }


  1. protected ReadLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }
  1. protected WriteLock(ReentrantReadWriteLock lock) {
  2. sync = lock.sync;
  3. }





  1. public void lock() {
  2. sync.acquireShared(1);
  3. }


  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }


  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. * Walkthrough:
  4. * 1. If write lock held by another thread, fail.
  5. * 2. Otherwise, this thread is eligible for
  6. * lock wrt state, so ask if it should block
  7. * because of queue policy. If not, try
  8. * to grant by CASing state and updating count.
  9. * Note that step does not check for reentrant
  10. * acquires, which is postponed to full version
  11. * to avoid having to check hold count in
  12. * the more typical non-reentrant case.
  13. * 3. If step 2 fails either because thread
  14. * apparently not eligible or CAS fails or count
  15. * saturated, chain to version with full retry loop.
  16. */
  17. Thread current = Thread.currentThread();
  18. int c = getState();
  19. //如果写锁被持有的数量不等于0并且同步器的所有者线程不是当前线程
  20. //意味着有其他线程正在进行写操作 返回-1
  21. if (exclusiveCount(c) != 0 &&
  22. getExclusiveOwnerThread() != current)
  23. return -1;
  24. //获取读锁被持有的数量
  25. int r = sharedCount(c);
  26. if (!readerShouldBlock() &&
  27. r < MAX_COUNT &&
  28. compareAndSetState(c, c + SHARED_UNIT)) {
  29. //如果当前等待队列中的第一个线程不是处于独占请求状态并且当前读锁被持有的数量小于最大数量
  30. //并且原子性的设置读状态成功 则继续执行
  31. if (r == 0) {
  32. firstReader = current;
  33. firstReaderHoldCount = 1;
  34. } else if (firstReader == current) {
  35. firstReaderHoldCount++;
  36. } else {
  37. HoldCounter rh = cachedHoldCounter;
  38. if (rh == null || rh.tid != getThreadId(current))
  39. cachedHoldCounter = rh = readHolds.get();
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;
  43. }
  44. return 1;
  45. }
  46. return fullTryAcquireShared(current);
  47. }


  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }




  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }


  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. if (firstReader == current) {
  4. // assert firstReaderHoldCount > 0;
  5. if (firstReaderHoldCount == 1)
  6. firstReader = null;
  7. else
  8. firstReaderHoldCount--;
  9. } else {
  10. HoldCounter rh = cachedHoldCounter;
  11. if (rh == null || rh.tid != getThreadId(current))
  12. rh = readHolds.get();
  13. int count = rh.count;
  14. if (count <= 1) {
  15. readHolds.remove();
  16. if (count <= 0)
  17. throw unmatchedUnlockException();
  18. }
  19. --rh.count;
  20. }
  21. for (;;) {
  22. int c = getState();
  23. int nextc = c - SHARED_UNIT;
  24. if (compareAndSetState(c, nextc))
  25. // Releasing the read lock has no effect on readers,
  26. // but it may allow waiting writers to proceed if
  27. // both read and write locks are now free.
  28. return nextc == 0;
  29. }
  30. }





  1. public void lock() {
  2. sync.acquire(1);
  3. }


  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }


  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1. If read count nonzero or write count nonzero
  5. * and owner is a different thread, fail.
  6. * 2. If count would saturate, fail. (This can only
  7. * happen if count is already nonzero.)
  8. * 3. Otherwise, this thread is eligible for lock if
  9. * it is either a reentrant acquire or
  10. * queue policy allows it. If so, update state
  11. * and set owner.
  12. */
  13. Thread current = Thread.currentThread();
  14. //获取锁状态
  15. int c = getState();
  16. //获取独占锁也就是写锁被持有的数量
  17. int w = exclusiveCount(c);
  18. if (c != 0) {
  19. //锁状态不为0 也就是读锁或者写锁被持有
  20. // (Note: if c != 0 and w == 0 then shared count != 0)
  21. //如果写锁被持有的次数是0 也就是读锁被持有 或者 写锁被其他线程持有 此时不能获取锁
  22. if (w == 0 || current != getExclusiveOwnerThread())
  23. return false;
  24. if (w + exclusiveCount(acquires) > MAX_COUNT)
  25. throw new Error("Maximum lock count exceeded");
  26. // Reentrant acquire
  27. //走到这一步说明写锁被当前线程持有 重入
  28. setState(c + acquires);
  29. return true;
  30. }
  31. if (writerShouldBlock() ||
  32. !compareAndSetState(c, c + acquires))
  33. return false;
  34. //获取锁成功 将同步器的所有者线程设置为当前线程
  35. setExclusiveOwnerThread(current);
  36. return true;
  37. }

如果tryAcquire没有成功,调用的是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,这个方法与前面讲到的ReentrantLock中独占锁的获取逻辑是一样的,这里不再赘述。



  1. public void unlock() {
  2. sync.release(1);
  3. }


  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }


  1. protected final boolean tryRelease(int releases) {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. int nextc = getState() - releases;
  5. boolean free = exclusiveCount(nextc) == 0;
  6. if (free)
  7. setExclusiveOwnerThread(null);
  8. setState(nextc);
  9. return free;
  10. }

