类图结构

J.U.C 的锁组件中 类相对较少,从 JDK 相应的包中也能看出来,下图标记了其中最主要的几个接口和类,也是本文要分析的重点。

avatar

下图 将这几个接口和类 以类图的方式展现出来,其中包含了它们所声明的主要方法。

avatar

Lock 组件

Lock 组件的结构很简单,只有一个接口和一个实现类,源码如下。

  1. public interface Lock {
  2. /**
  3. * 获取锁
  4. */
  5. void lock();
  6. /**
  7. * 获取锁,除非当前线程中断
  8. */
  9. void lockInterruptibly() throws InterruptedException;
  10. /**
  11. * 只有当调用时 锁是空闲的情况下,才获取锁
  12. */
  13. boolean tryLock();
  14. /**
  15. * 如果锁在给定的等待时间内空闲且当前线程未被中断,则获取该锁
  16. */
  17. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  18. /**
  19. * 释放锁
  20. */
  21. void unlock();
  22. }
  23. public class ReentrantLock implements Lock, java.io.Serializable {
  24. /** 提供所有实现机制的同步器,ReentrantLock 的主要方法都依赖于该对象进行实现 */
  25. private final Sync sync;
  26. /**
  27. * ReentrantLock锁 的同步控制基础。它的两个子类分别实现了公平锁和非公平锁,如下。
  28. */
  29. abstract static class Sync extends AbstractQueuedSynchronizer {
  30. private static final long serialVersionUID = -5179523762034025860L;
  31. abstract void lock();
  32. /**
  33. * Performs non-fair tryLock. tryAcquire is implemented in
  34. * subclasses, but both need nonfair try for trylock method.
  35. */
  36. final boolean nonfairTryAcquire(int acquires) {
  37. final Thread current = Thread.currentThread();
  38. int c = getState();
  39. if (c == 0) {
  40. if (compareAndSetState(0, acquires)) {
  41. setExclusiveOwnerThread(current);
  42. return true;
  43. }
  44. }
  45. else if (current == getExclusiveOwnerThread()) {
  46. int nextc = c + acquires;
  47. if (nextc < 0) // overflow
  48. throw new Error("Maximum lock count exceeded");
  49. setState(nextc);
  50. return true;
  51. }
  52. return false;
  53. }
  54. protected final boolean tryRelease(int releases) {
  55. int c = getState() - releases;
  56. if (Thread.currentThread() != getExclusiveOwnerThread())
  57. throw new IllegalMonitorStateException();
  58. boolean free = false;
  59. if (c == 0) {
  60. free = true;
  61. setExclusiveOwnerThread(null);
  62. }
  63. setState(c);
  64. return free;
  65. }
  66. final boolean isLocked() {
  67. return getState() != 0;
  68. }
  69. }
  70. /**
  71. * 非公平锁,基于上面的 Sync类
  72. */
  73. static final class NonfairSync extends Sync {
  74. private static final long serialVersionUID = 7316153563782823691L;
  75. final void lock() {
  76. if (compareAndSetState(0, 1))
  77. setExclusiveOwnerThread(Thread.currentThread());
  78. else
  79. acquire(1);
  80. }
  81. protected final boolean tryAcquire(int acquires) {
  82. return nonfairTryAcquire(acquires);
  83. }
  84. }
  85. /**
  86. * 公平锁,基于上面的 Sync类
  87. */
  88. static final class FairSync extends Sync {
  89. private static final long serialVersionUID = -3000897897090466540L;
  90. final void lock() {
  91. acquire(1);
  92. }
  93. protected final boolean tryAcquire(int acquires) {
  94. final Thread current = Thread.currentThread();
  95. int c = getState();
  96. if (c == 0) {
  97. if (!hasQueuedPredecessors() &&
  98. compareAndSetState(0, acquires)) {
  99. setExclusiveOwnerThread(current);
  100. return true;
  101. }
  102. }
  103. else if (current == getExclusiveOwnerThread()) {
  104. int nextc = c + acquires;
  105. if (nextc < 0)
  106. throw new Error("Maximum lock count exceeded");
  107. setState(nextc);
  108. return true;
  109. }
  110. return false;
  111. }
  112. }
  113. /**
  114. * 无参初始化时,默认实例化 非公平锁
  115. */
  116. public ReentrantLock() {
  117. sync = new NonfairSync();
  118. }
  119. /**
  120. * 可通过参数fair 控制实例化的是 公平锁还是非公平锁
  121. */
  122. public ReentrantLock(boolean fair) {
  123. sync = fair ? new FairSync() : new NonfairSync();
  124. }
  125. public void lock() {
  126. sync.lock();
  127. }
  128. public boolean tryLock() {
  129. return sync.nonfairTryAcquire(1);
  130. }
  131. public boolean tryLock(long timeout, TimeUnit unit)
  132. throws InterruptedException {
  133. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  134. }
  135. public void unlock() {
  136. sync.release(1);
  137. }
  138. public boolean isLocked() {
  139. return sync.isLocked();
  140. }
  141. public final boolean isFair() {
  142. return sync instanceof FairSync;
  143. }
  144. }

ReadWriteLock 组件

ReadWriteLock 组件的结构也很简单,与上面的 Lock 组件 不同的是,它提供了 公平的读锁写锁,以及非公平的读锁写锁。

  1. public interface ReadWriteLock {
  2. /**
  3. * 获取一个 读锁
  4. */
  5. Lock readLock();
  6. /**
  7. * 获取一个 写锁
  8. */
  9. Lock writeLock();
  10. }
  11. public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
  12. /** 由内部类提供的读锁 */
  13. private final ReentrantReadWriteLock.ReadLock readerLock;
  14. /** 由内部类提供的写锁 */
  15. private final ReentrantReadWriteLock.WriteLock writerLock;
  16. /** 提供所有实现机制的同步器 */
  17. final Sync sync;
  18. /**
  19. * 默认创建 非公平的读锁写锁
  20. */
  21. public ReentrantReadWriteLock() {
  22. this(false);
  23. }
  24. /**
  25. * 由参数 fair 指定读锁写锁是公平的还是非公平的
  26. */
  27. public ReentrantReadWriteLock(boolean fair) {
  28. sync = fair ? new FairSync() : new NonfairSync();
  29. readerLock = new ReadLock(this);
  30. writerLock = new WriteLock(this);
  31. }
  32. /**
  33. * 获取写锁
  34. * 获取读锁
  35. */
  36. public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
  37. public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  38. abstract static class Sync extends AbstractQueuedSynchronizer {
  39. protected final boolean tryRelease(int releases) {
  40. if (!isHeldExclusively())
  41. throw new IllegalMonitorStateException();
  42. int nextc = getState() - releases;
  43. boolean free = exclusiveCount(nextc) == 0;
  44. if (free)
  45. setExclusiveOwnerThread(null);
  46. setState(nextc);
  47. return free;
  48. }
  49. protected final boolean tryAcquire(int acquires) {
  50. /*
  51. * Walkthrough:
  52. * 1. If read count nonzero or write count nonzero
  53. * and owner is a different thread, fail.
  54. * 2. If count would saturate, fail. (This can only
  55. * happen if count is already nonzero.)
  56. * 3. Otherwise, this thread is eligible for lock if
  57. * it is either a reentrant acquire or
  58. * queue policy allows it. If so, update state
  59. * and set owner.
  60. */
  61. Thread current = Thread.currentThread();
  62. int c = getState();
  63. int w = exclusiveCount(c);
  64. if (c != 0) {
  65. // (Note: if c != 0 and w == 0 then shared count != 0)
  66. if (w == 0 || current != getExclusiveOwnerThread())
  67. return false;
  68. if (w + exclusiveCount(acquires) > MAX_COUNT)
  69. throw new Error("Maximum lock count exceeded");
  70. // Reentrant acquire
  71. setState(c + acquires);
  72. return true;
  73. }
  74. if (writerShouldBlock() ||
  75. !compareAndSetState(c, c + acquires))
  76. return false;
  77. setExclusiveOwnerThread(current);
  78. return true;
  79. }
  80. protected final boolean tryReleaseShared(int unused) {
  81. Thread current = Thread.currentThread();
  82. if (firstReader == current) {
  83. // assert firstReaderHoldCount > 0;
  84. if (firstReaderHoldCount == 1)
  85. firstReader = null;
  86. else
  87. firstReaderHoldCount--;
  88. } else {
  89. HoldCounter rh = cachedHoldCounter;
  90. if (rh == null || rh.tid != getThreadId(current))
  91. rh = readHolds.get();
  92. int count = rh.count;
  93. if (count <= 1) {
  94. readHolds.remove();
  95. if (count <= 0)
  96. throw unmatchedUnlockException();
  97. }
  98. --rh.count;
  99. }
  100. for (;;) {
  101. int c = getState();
  102. int nextc = c - SHARED_UNIT;
  103. if (compareAndSetState(c, nextc))
  104. // Releasing the read lock has no effect on readers,
  105. // but it may allow waiting writers to proceed if
  106. // both read and write locks are now free.
  107. return nextc == 0;
  108. }
  109. }
  110. protected final int tryAcquireShared(int unused) {
  111. /*
  112. * Walkthrough:
  113. * 1. If write lock held by another thread, fail.
  114. * 2. Otherwise, this thread is eligible for
  115. * lock wrt state, so ask if it should block
  116. * because of queue policy. If not, try
  117. * to grant by CASing state and updating count.
  118. * Note that step does not check for reentrant
  119. * acquires, which is postponed to full version
  120. * to avoid having to check hold count in
  121. * the more typical non-reentrant case.
  122. * 3. If step 2 fails either because thread
  123. * apparently not eligible or CAS fails or count
  124. * saturated, chain to version with full retry loop.
  125. */
  126. Thread current = Thread.currentThread();
  127. int c = getState();
  128. if (exclusiveCount(c) != 0 &&
  129. getExclusiveOwnerThread() != current)
  130. return -1;
  131. int r = sharedCount(c);
  132. if (!readerShouldBlock() &&
  133. r < MAX_COUNT &&
  134. compareAndSetState(c, c + SHARED_UNIT)) {
  135. if (r == 0) {
  136. firstReader = current;
  137. firstReaderHoldCount = 1;
  138. } else if (firstReader == current) {
  139. firstReaderHoldCount++;
  140. } else {
  141. HoldCounter rh = cachedHoldCounter;
  142. if (rh == null || rh.tid != getThreadId(current))
  143. cachedHoldCounter = rh = readHolds.get();
  144. else if (rh.count == 0)
  145. readHolds.set(rh);
  146. rh.count++;
  147. }
  148. return 1;
  149. }
  150. return fullTryAcquireShared(current);
  151. }
  152. /**
  153. * Performs tryLock for write, enabling barging in both modes.
  154. * This is identical in effect to tryAcquire except for lack
  155. * of calls to writerShouldBlock.
  156. */
  157. final boolean tryWriteLock() {
  158. Thread current = Thread.currentThread();
  159. int c = getState();
  160. if (c != 0) {
  161. int w = exclusiveCount(c);
  162. if (w == 0 || current != getExclusiveOwnerThread())
  163. return false;
  164. if (w == MAX_COUNT)
  165. throw new Error("Maximum lock count exceeded");
  166. }
  167. if (!compareAndSetState(c, c + 1))
  168. return false;
  169. setExclusiveOwnerThread(current);
  170. return true;
  171. }
  172. /**
  173. * Performs tryLock for read, enabling barging in both modes.
  174. * This is identical in effect to tryAcquireShared except for
  175. * lack of calls to readerShouldBlock.
  176. */
  177. final boolean tryReadLock() {
  178. Thread current = Thread.currentThread();
  179. for (;;) {
  180. int c = getState();
  181. if (exclusiveCount(c) != 0 &&
  182. getExclusiveOwnerThread() != current)
  183. return false;
  184. int r = sharedCount(c);
  185. if (r == MAX_COUNT)
  186. throw new Error("Maximum lock count exceeded");
  187. if (compareAndSetState(c, c + SHARED_UNIT)) {
  188. if (r == 0) {
  189. firstReader = current;
  190. firstReaderHoldCount = 1;
  191. } else if (firstReader == current) {
  192. firstReaderHoldCount++;
  193. } else {
  194. HoldCounter rh = cachedHoldCounter;
  195. if (rh == null || rh.tid != getThreadId(current))
  196. cachedHoldCounter = rh = readHolds.get();
  197. else if (rh.count == 0)
  198. readHolds.set(rh);
  199. rh.count++;
  200. }
  201. return true;
  202. }
  203. }
  204. }
  205. final boolean isWriteLocked() {
  206. return exclusiveCount(getState()) != 0;
  207. }
  208. }
  209. /**
  210. * 非公平锁
  211. */
  212. static final class NonfairSync extends Sync {
  213. final boolean writerShouldBlock() {
  214. return false; // writers can always barge
  215. }
  216. final boolean readerShouldBlock() {
  217. /* As a heuristic to avoid indefinite writer starvation,
  218. * block if the thread that momentarily appears to be head
  219. * of queue, if one exists, is a waiting writer. This is
  220. * only a probabilistic effect since a new reader will not
  221. * block if there is a waiting writer behind other enabled
  222. * readers that have not yet drained from the queue.
  223. */
  224. return apparentlyFirstQueuedIsExclusive();
  225. }
  226. }
  227. /**
  228. * 公平锁
  229. */
  230. static final class FairSync extends Sync {
  231. final boolean writerShouldBlock() {
  232. return hasQueuedPredecessors();
  233. }
  234. final boolean readerShouldBlock() {
  235. return hasQueuedPredecessors();
  236. }
  237. }
  238. /**
  239. * 读锁
  240. */
  241. public static class ReadLock implements Lock, java.io.Serializable {
  242. private final Sync sync;
  243. protected ReadLock(ReentrantReadWriteLock lock) {
  244. sync = lock.sync;
  245. }
  246. public void lock() {
  247. sync.acquireShared(1);
  248. }
  249. public void lockInterruptibly() throws InterruptedException {
  250. sync.acquireSharedInterruptibly(1);
  251. }
  252. public boolean tryLock() {
  253. return sync.tryReadLock();
  254. }
  255. public boolean tryLock(long timeout, TimeUnit unit)
  256. throws InterruptedException {
  257. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  258. }
  259. public void unlock() {
  260. sync.releaseShared(1);
  261. }
  262. }
  263. /**
  264. * 写锁
  265. */
  266. public static class WriteLock implements Lock, java.io.Serializable {
  267. private final Sync sync;
  268. protected WriteLock(ReentrantReadWriteLock lock) {
  269. sync = lock.sync;
  270. }
  271. public void lock() {
  272. sync.acquire(1);
  273. }
  274. public void lockInterruptibly() throws InterruptedException {
  275. sync.acquireInterruptibly(1);
  276. }
  277. public boolean tryLock( ) {
  278. return sync.tryWriteLock();
  279. }
  280. public boolean tryLock(long timeout, TimeUnit unit)
  281. throws InterruptedException {
  282. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  283. }
  284. public void unlock() {
  285. sync.release(1);
  286. }
  287. }
  288. public final boolean isFair() {
  289. return sync instanceof FairSync;
  290. }
  291. public boolean isWriteLocked() {
  292. return sync.isWriteLocked();
  293. }
  294. }

AbstractQueuedSynchronizer

最后看一下抽象类 AbstractQueuedSynchronizer,在同步组件的实现中,AQS 是核心部分,同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义,AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS 的核心包括:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是 AQS 提供出来的模板方法。源码如下。

  1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
  2. implements java.io.Serializable {
  3. /**
  4. * 当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。
  5. * 就数据结构而言,队列的实现方式无外乎两者一是通过数组的形式,另外一种则是链表的形式。
  6. * AQS中的同步队列则是通过链式方式进行实现,下面的内部类Node便是其实现的载体
  7. */
  8. static final class Node {
  9. /** Marker to indicate a node is waiting in shared mode */
  10. static final Node SHARED = new Node();
  11. /** Marker to indicate a node is waiting in exclusive mode */
  12. static final Node EXCLUSIVE = null;
  13. // 节点从同步队列中取消
  14. static final int CANCELLED = 1;
  15. // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,
  16. // 使得后继节点的线程能够运行;
  17. static final int SIGNAL = -1;
  18. // 当前节点进入等待队列中
  19. static final int CONDITION = -2;
  20. // 表示下一次共享式同步状态获取将会无条件传播下去
  21. static final int PROPAGATE = -3;
  22. // 节点状态
  23. volatile int waitStatus;
  24. // 当前节点/线程的前驱节点
  25. volatile Node prev;
  26. // 当前节点/线程的后驱节点
  27. volatile Node next;
  28. // 加入同步队列的线程引用
  29. volatile Thread thread;
  30. // 等待队列中的下一个节点
  31. Node nextWaiter;
  32. final boolean isShared() {
  33. return nextWaiter == SHARED;
  34. }
  35. final Node predecessor() throws NullPointerException {
  36. Node p = prev;
  37. if (p == null)
  38. throw new NullPointerException();
  39. else
  40. return p;
  41. }
  42. Node() { // Used to establish initial head or SHARED marker
  43. }
  44. Node(Thread thread, Node mode) { // Used by addWaiter
  45. this.nextWaiter = mode;
  46. this.thread = thread;
  47. }
  48. Node(Thread thread, int waitStatus) { // Used by Condition
  49. this.waitStatus = waitStatus;
  50. this.thread = thread;
  51. }
  52. }
  53. /**
  54. * AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,
  55. * 释放锁时对同步队列中的线程进行通知等核心方法。
  56. */
  57. private transient volatile Node head;
  58. private transient volatile Node tail;
  59. /**
  60. * 获取独占式锁
  61. */
  62. public final void acquire(int arg) {
  63. if (!tryAcquire(arg) &&
  64. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  65. selfInterrupt();
  66. }
  67. /**
  68. * 释放独占式锁
  69. */
  70. public final boolean release(int arg) {
  71. if (tryRelease(arg)) {
  72. Node h = head;
  73. if (h != null && h.waitStatus != 0)
  74. unparkSuccessor(h);
  75. return true;
  76. }
  77. return false;
  78. }
  79. /**
  80. * 获取可中断式锁
  81. */
  82. public final void acquireInterruptibly(int arg)
  83. throws InterruptedException {
  84. if (Thread.interrupted())
  85. throw new InterruptedException();
  86. if (!tryAcquire(arg))
  87. doAcquireInterruptibly(arg);
  88. }
  89. /**
  90. * 获取共享锁
  91. */
  92. public final void acquireShared(int arg) {
  93. if (tryAcquireShared(arg) < 0)
  94. doAcquireShared(arg);
  95. }
  96. /**
  97. * 释放共享锁
  98. */
  99. public final boolean releaseShared(int arg) {
  100. if (tryReleaseShared(arg)) {
  101. doReleaseShared();
  102. return true;
  103. }
  104. return false;
  105. }
  106. }