Lock接口

Lock接口提供的synchronized关键字不具备的主要特性

特性 描述
尝试非阻塞的获取锁 当前线程尝试获取锁,如果这一时刻没有被其他线程获取到,则成功获取并持有锁
能被中断的获取锁 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回

队列同步器(AQS)

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。其中内部状态state,等待队列的头节点head和尾节点head,都是通过volatile修饰,保证了多线程之间的可见性。

  1. package java.util.concurrent.locks;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.ArrayList;
  4. import java.util.Collection;
  5. import java.util.Date;
  6. import sun.misc.Unsafe;
  7. public abstract class AbstractQueuedSynchronizer
  8. extends AbstractOwnableSynchronizer
  9. implements java.io.Serializable {
  10. private static final long serialVersionUID = 7373984972572414691L;
  11. /**
  12. * 创建一个初始同步状态为零的新实例
  13. */
  14. protected AbstractQueuedSynchronizer() { }
  15. /**
  16. * 等待队列节点类。
  17. *
  18. * 等待队列是“CLH”(Craig、Landin 和 * Hagersten)锁定队列的变体。 CLH 锁通常用于自旋锁。
  19. * 相反,我们将它们用于阻塞同步器,但使用相同的基本策略即在其节点的前身中保存有关线程
  20. * 的一些控制信息。每个节点中的“状态”字段跟踪线程是否应该阻塞。
  21. */
  22. static final class Node {
  23. /** Marker to indicate a node is waiting in shared mode */
  24. static final Node SHARED = new Node();
  25. /** Marker to indicate a node is waiting in exclusive mode */
  26. static final Node EXCLUSIVE = null;
  27. /** waitStatus value to indicate thread has cancelled */
  28. static final int CANCELLED = 1;
  29. /** waitStatus value to indicate successor's thread needs unparking */
  30. static final int SIGNAL = -1;
  31. /** waitStatus value to indicate thread is waiting on condition */
  32. static final int CONDITION = -2;
  33. /**
  34. * waitStatus value to indicate the next acquireShared should
  35. * unconditionally propagate
  36. */
  37. static final int PROPAGATE = -3;
  38. 1. waitStatus
  39. a. CANCELLED
  40. 1waitStatus value to indicate thread has cancelled
  41. (表示当前节点的线程由于超时或中断而被取消。节点永远不会离开此状态。
  42. 特别是,具有取消节点的线程永远不会再次阻塞。)
  43. b. SIGNAL
  44. -1waitStatus value to indicate successor's thread needs unparking
  45. (当前节点的线程如果释放了同步状态或者被取消,需要通知后继节点)
  46. c. CONDITION
  47. -2waitStatus value to indicate thread is waiting on condition
  48. (当前节点等待在Condition上,当其他线程对Condition调用了singal()方法时,
  49. 这个节点会加入到对同步状态的获取中)
  50. d. PROPAGATE
  51. -3waitStatus value to indicate the next acquireShared should unconditionally
  52. propagate(当前节点属于共享类型,下一次共享式同步状态获取将会无条件的被传播下去)
  53. volatile int waitStatus;
  54. /**
  55. * 前驱节点
  56. */
  57. volatile Node prev;
  58. /**
  59. * 后继几点
  60. */
  61. volatile Node next;
  62. /**
  63. * 当前节点所在的线程
  64. */
  65. volatile Thread thread;
  66. /**
  67. * 如果当前节点是共享的,则值为SHARED常量,如果当前节点是独占的,
  68. 则表示等待条件的下一个节点。
  69. */
  70. Node nextWaiter;
  71. /**
  72. * Returns true if node is waiting in shared mode.
  73. */
  74. final boolean isShared() {
  75. return nextWaiter == SHARED;
  76. }
  77. /**
  78. * Returns previous node, or throws NullPointerException if null.
  79. * Use when predecessor cannot be null. The null check could
  80. * be elided, but is present to help the VM.
  81. *
  82. * @return the predecessor of this node
  83. */
  84. final Node predecessor() throws NullPointerException {
  85. Node p = prev;
  86. if (p == null)
  87. throw new NullPointerException();
  88. else
  89. return p;
  90. }
  91. Node() { // Used to establish initial head or SHARED marker
  92. }
  93. Node(Thread thread, Node mode) { // Used by addWaiter
  94. this.nextWaiter = mode;
  95. this.thread = thread;
  96. }
  97. Node(Thread thread, int waitStatus) { // Used by Condition
  98. this.waitStatus = waitStatus;
  99. this.thread = thread;
  100. }
  101. }
  102. /**
  103. * 等待队列的头部,延迟初始化。除初始化外,只能通过 setHead 方法进行修改。
  104. * 注意:如果 head 存在,它的 waitStatus 保证不会是 CANCELLED
  105. */
  106. private transient volatile Node head;
  107. /**
  108. * 等待队列的尾部,延迟初始化。仅通过方法 enq 修改以添加新的等待节点。
  109. */
  110. private transient volatile Node tail;
  111. /**
  112. * 同步状态。
  113. */
  114. private volatile int state;
  115. /**
  116. * 返回同步状态的当前值。 此操作具有volatile读取的内存语义。
  117. */
  118. protected final int getState() {
  119. return state;
  120. }
  121. /**
  122. * 设置同步状态的值。 此操作具有volatile写入的内存语义
  123. */
  124. protected final void setState(int newState) {
  125. state = newState;
  126. }
  127. /**
  128. * 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
  129. * 此操作具有volatile读取和写入的内存语义。
  130. */
  131. protected final boolean compareAndSetState(int expect, int update) {
  132. // See below for intrinsics setup to support this
  133. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  134. }
  135. // Queuing utilities
  136. /**
  137. * 旋转比使用定时停车更快的纳秒数。粗略的估计就足以以非常短的超时提高响应能力。
  138. */
  139. static final long spinForTimeoutThreshold = 1000L;
  140. /**
  141. * 将节点插入队列,必要时进行初始化
  142. */
  143. private Node enq(final Node node) {
  144. for (;;) {
  145. Node t = tail;
  146. if (t == null) { // Must initialize
  147. if (compareAndSetHead(new Node()))
  148. tail = head;
  149. } else {
  150. node.prev = t;
  151. if (compareAndSetTail(t, node)) {
  152. t.next = node;
  153. return t;
  154. }
  155. }
  156. }
  157. }
  158. /**
  159. * 为当前线程和给定模式创建和排队节点
  160. *
  161. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  162. * @return the new node
  163. */
  164. private Node addWaiter(Node mode) {
  165. Node node = new Node(Thread.currentThread(), mode);
  166. // Try the fast path of enq; backup to full enq on failure
  167. Node pred = tail;
  168. if (pred != null) {
  169. node.prev = pred;
  170. if (compareAndSetTail(pred, node)) {
  171. pred.next = node;
  172. return node;
  173. }
  174. }
  175. enq(node);
  176. return node;
  177. }
  178. /**
  179. * 将队列头设置为节点,从而出队。仅由获取方法调用。
  180. * 为了 GC 并抑制不必要的信号和遍历,还清空未使用的字段。
  181. *
  182. * @param node the node
  183. */
  184. private void setHead(Node node) {
  185. head = node;
  186. node.thread = null;
  187. node.prev = null;
  188. }
  189. /**
  190. * 唤醒节点的后继者(如果存在)
  191. *
  192. * @param node the node
  193. */
  194. private void unparkSuccessor(Node node) {
  195. /*
  196. * If status is negative (i.e., possibly needing signal) try
  197. * to clear in anticipation of signalling. It is OK if this
  198. * fails or if status is changed by waiting thread.
  199. */
  200. int ws = node.waitStatus;
  201. if (ws < 0)
  202. compareAndSetWaitStatus(node, ws, 0);
  203. /*
  204. * Thread to unpark is held in successor, which is normally
  205. * just the next node. But if cancelled or apparently null,
  206. * traverse backwards from tail to find the actual
  207. * non-cancelled successor.
  208. */
  209. Node s = node.next;
  210. if (s == null || s.waitStatus > 0) {
  211. s = null;
  212. for (Node t = tail; t != null && t != node; t = t.prev)
  213. if (t.waitStatus <= 0)
  214. s = t;
  215. }
  216. if (s != null)
  217. LockSupport.unpark(s.thread);
  218. }
  219. /**
  220. * 共享模式的释放操作——发出后继信号并确保传播。
  221. * 注:独占模式下,如果需要信号,释放就相当于调用head的unparkSuccessor。
  222. */
  223. private void doReleaseShared() {
  224. for (;;) {
  225. Node h = head;
  226. if (h != null && h != tail) {
  227. int ws = h.waitStatus;
  228. if (ws == Node.SIGNAL) {
  229. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  230. continue; // loop to recheck cases
  231. unparkSuccessor(h);
  232. }
  233. else if (ws == 0 &&
  234. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  235. continue; // loop on failed CAS
  236. }
  237. if (h == head) // loop if head changed
  238. break;
  239. }
  240. }
  241. /**
  242. * 设置队列头,并检查后继者是否可能在共享模式下等待
  243. * 如果设置了传播 > 0 或PROPAGATE 状态,则传播。
  244. *
  245. * @param node the node
  246. * @param propagate the return value from a tryAcquireShared
  247. */
  248. private void setHeadAndPropagate(Node node, int propagate) {
  249. Node h = head; // Record old head for check below
  250. setHead(node);
  251. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  252. (h = head) == null || h.waitStatus < 0) {
  253. Node s = node.next;
  254. if (s == null || s.isShared())
  255. doReleaseShared();
  256. }
  257. }
  258. // Utilities for various versions of acquire
  259. /**
  260. * 取消正在进行的获取尝试。
  261. *
  262. * @param node the node
  263. */
  264. private void cancelAcquire(Node node) {
  265. // Ignore if node doesn't exist
  266. if (node == null)
  267. return;
  268. node.thread = null;
  269. Node pred = node.prev;
  270. while (pred.waitStatus > 0)
  271. node.prev = pred = pred.prev;
  272. Node predNext = pred.next;
  273. node.waitStatus = Node.CANCELLED;
  274. if (node == tail && compareAndSetTail(node, pred)) {
  275. compareAndSetNext(pred, predNext, null);
  276. } else {
  277. int ws;
  278. if (pred != head &&
  279. ((ws = pred.waitStatus) == Node.SIGNAL ||
  280. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  281. pred.thread != null) {
  282. Node next = node.next;
  283. if (next != null && next.waitStatus <= 0)
  284. compareAndSetNext(pred, predNext, next);
  285. } else {
  286. unparkSuccessor(node);
  287. }
  288. node.next = node; // help GC
  289. }
  290. }
  291. /**
  292. * 检查并更新未能获取的节点的状态。如果线程应该阻塞,则返回 true。
  293. * 这是所有采集循环中的主要信号控制。要求 pred == node.prev。
  294. */
  295. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  296. int ws = pred.waitStatus;
  297. if (ws == Node.SIGNAL)
  298. return true;
  299. if (ws > 0) {
  300. do {
  301. node.prev = pred = pred.prev;
  302. } while (pred.waitStatus > 0);
  303. pred.next = node;
  304. } else {
  305. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  306. }
  307. return false;
  308. }
  309. /**
  310. * 中断当前线程的便捷方法。
  311. */
  312. static void selfInterrupt() {
  313. Thread.currentThread().interrupt();
  314. }
  315. /**
  316. * 停车后检查是否中断的便捷方法
  317. */
  318. private final boolean parkAndCheckInterrupt() {
  319. LockSupport.park(this);
  320. return Thread.interrupted();
  321. }
  322. /**
  323. * 以独占不间断模式获取已在队列中的线程。由条件等待方法和获取使用。
  324. */
  325. final boolean acquireQueued(final Node node, int arg) {
  326. boolean failed = true;
  327. try {
  328. boolean interrupted = false;
  329. for (;;) {
  330. final Node p = node.predecessor();
  331. if (p == head && tryAcquire(arg)) {
  332. setHead(node);
  333. p.next = null; // help GC
  334. failed = false;
  335. return interrupted;
  336. }
  337. if (shouldParkAfterFailedAcquire(p, node) &&
  338. parkAndCheckInterrupt())
  339. interrupted = true;
  340. }
  341. } finally {
  342. if (failed)
  343. cancelAcquire(node);
  344. }
  345. }
  346. /**
  347. * 以独占可中断模式获取。
  348. */
  349. private void doAcquireInterruptibly(int arg)
  350. throws InterruptedException {
  351. final Node node = addWaiter(Node.EXCLUSIVE);
  352. boolean failed = true;
  353. try {
  354. for (;;) {
  355. final Node p = node.predecessor();
  356. if (p == head && tryAcquire(arg)) {
  357. setHead(node);
  358. p.next = null; // help GC
  359. failed = false;
  360. return;
  361. }
  362. if (shouldParkAfterFailedAcquire(p, node) &&
  363. parkAndCheckInterrupt())
  364. throw new InterruptedException();
  365. }
  366. } finally {
  367. if (failed)
  368. cancelAcquire(node);
  369. }
  370. }
  371. /**
  372. * 以独占定时模式获取。
  373. */
  374. private boolean doAcquireNanos(int arg, long nanosTimeout)
  375. throws InterruptedException {
  376. if (nanosTimeout <= 0L)
  377. return false;
  378. final long deadline = System.nanoTime() + nanosTimeout;
  379. final Node node = addWaiter(Node.EXCLUSIVE);
  380. boolean failed = true;
  381. try {
  382. for (;;) {
  383. final Node p = node.predecessor();
  384. if (p == head && tryAcquire(arg)) {
  385. setHead(node);
  386. p.next = null; // help GC
  387. failed = false;
  388. return true;
  389. }
  390. nanosTimeout = deadline - System.nanoTime();
  391. if (nanosTimeout <= 0L)
  392. return false;
  393. if (shouldParkAfterFailedAcquire(p, node) &&
  394. nanosTimeout > spinForTimeoutThreshold)
  395. LockSupport.parkNanos(this, nanosTimeout);
  396. if (Thread.interrupted())
  397. throw new InterruptedException();
  398. }
  399. } finally {
  400. if (failed)
  401. cancelAcquire(node);
  402. }
  403. }
  404. /**
  405. * 以共享不间断模式获取。
  406. * @param arg the acquire argument
  407. */
  408. private void doAcquireShared(int arg) {
  409. final Node node = addWaiter(Node.SHARED);
  410. boolean failed = true;
  411. try {
  412. boolean interrupted = false;
  413. for (;;) {
  414. final Node p = node.predecessor();
  415. if (p == head) {
  416. int r = tryAcquireShared(arg);
  417. if (r >= 0) {
  418. setHeadAndPropagate(node, r);
  419. p.next = null; // help GC
  420. if (interrupted)
  421. selfInterrupt();
  422. failed = false;
  423. return;
  424. }
  425. }
  426. if (shouldParkAfterFailedAcquire(p, node) &&
  427. parkAndCheckInterrupt())
  428. interrupted = true;
  429. }
  430. } finally {
  431. if (failed)
  432. cancelAcquire(node);
  433. }
  434. }
  435. /**
  436. * 以共享可中断模式获取。
  437. */
  438. private void doAcquireSharedInterruptibly(int arg)
  439. throws InterruptedException {
  440. final Node node = addWaiter(Node.SHARED);
  441. boolean failed = true;
  442. try {
  443. for (;;) {
  444. final Node p = node.predecessor();
  445. if (p == head) {
  446. int r = tryAcquireShared(arg);
  447. if (r >= 0) {
  448. setHeadAndPropagate(node, r);
  449. p.next = null; // help GC
  450. failed = false;
  451. return;
  452. }
  453. }
  454. if (shouldParkAfterFailedAcquire(p, node) &&
  455. parkAndCheckInterrupt())
  456. throw new InterruptedException();
  457. }
  458. } finally {
  459. if (failed)
  460. cancelAcquire(node);
  461. }
  462. }
  463. /**
  464. * 以共享定时模式获取。
  465. */
  466. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  467. throws InterruptedException {
  468. if (nanosTimeout <= 0L)
  469. return false;
  470. final long deadline = System.nanoTime() + nanosTimeout;
  471. final Node node = addWaiter(Node.SHARED);
  472. boolean failed = true;
  473. try {
  474. for (;;) {
  475. final Node p = node.predecessor();
  476. if (p == head) {
  477. int r = tryAcquireShared(arg);
  478. if (r >= 0) {
  479. setHeadAndPropagate(node, r);
  480. p.next = null; // help GC
  481. failed = false;
  482. return true;
  483. }
  484. }
  485. nanosTimeout = deadline - System.nanoTime();
  486. if (nanosTimeout <= 0L)
  487. return false;
  488. if (shouldParkAfterFailedAcquire(p, node) &&
  489. nanosTimeout > spinForTimeoutThreshold)
  490. LockSupport.parkNanos(this, nanosTimeout);
  491. if (Thread.interrupted())
  492. throw new InterruptedException();
  493. }
  494. } finally {
  495. if (failed)
  496. cancelAcquire(node);
  497. }
  498. }
  499. // Main exported methods
  500. /**
  501. * 尝试以独占模式获取。此方法应查询对象的状态是否允许以独占模式获取它,如果是则获取它。
  502. * <p>此方法总是由执行获取的线程调用。如果此方法报告失败,
  503. * 则获取方法可以将线程排入队列,如果它尚未排入队列,
  504. * 直到由某个其他线程的释放发出信号。这可以用于实现方法 {@link Lock#tryLock()}。
  505. */
  506. protected boolean tryAcquire(int arg) {
  507. throw new UnsupportedOperationException();
  508. }
  509. /**
  510. * 尝试设置状态以反映独占模式下的发布
  511. */
  512. protected boolean tryRelease(int arg) {
  513. throw new UnsupportedOperationException();
  514. }
  515. /**
  516. * Attempts to acquire in shared mode. This method should query if
  517. * the state of the object permits it to be acquired in the shared
  518. * mode, and if so to acquire it.
  519. *
  520. * <p>This method is always invoked by the thread performing
  521. * acquire. If this method reports failure, the acquire method
  522. * may queue the thread, if it is not already queued, until it is
  523. * signalled by a release from some other thread.
  524. *
  525. * <p>The default implementation throws {@link
  526. * UnsupportedOperationException}.
  527. *
  528. * @param arg the acquire argument. This value is always the one
  529. * passed to an acquire method, or is the value saved on entry
  530. * to a condition wait. The value is otherwise uninterpreted
  531. * and can represent anything you like.
  532. * @return a negative value on failure; zero if acquisition in shared
  533. * mode succeeded but no subsequent shared-mode acquire can
  534. * succeed; and a positive value if acquisition in shared
  535. * mode succeeded and subsequent shared-mode acquires might
  536. * also succeed, in which case a subsequent waiting thread
  537. * must check availability. (Support for three different
  538. * return values enables this method to be used in contexts
  539. * where acquires only sometimes act exclusively.) Upon
  540. * success, this object has been acquired.
  541. * @throws IllegalMonitorStateException if acquiring would place this
  542. * synchronizer in an illegal state. This exception must be
  543. * thrown in a consistent fashion for synchronization to work
  544. * correctly.
  545. * @throws UnsupportedOperationException if shared mode is not supported
  546. */
  547. protected int tryAcquireShared(int arg) {
  548. throw new UnsupportedOperationException();
  549. }
  550. /**
  551. * Attempts to set the state to reflect a release in shared mode.
  552. *
  553. * <p>This method is always invoked by the thread performing release.
  554. *
  555. * <p>The default implementation throws
  556. * {@link UnsupportedOperationException}.
  557. *
  558. * @param arg the release argument. This value is always the one
  559. * passed to a release method, or the current state value upon
  560. * entry to a condition wait. The value is otherwise
  561. * uninterpreted and can represent anything you like.
  562. * @return {@code true} if this release of shared mode may permit a
  563. * waiting acquire (shared or exclusive) to succeed; and
  564. * {@code false} otherwise
  565. * @throws IllegalMonitorStateException if releasing would place this
  566. * synchronizer in an illegal state. This exception must be
  567. * thrown in a consistent fashion for synchronization to work
  568. * correctly.
  569. * @throws UnsupportedOperationException if shared mode is not supported
  570. */
  571. protected boolean tryReleaseShared(int arg) {
  572. throw new UnsupportedOperationException();
  573. }
  574. /**
  575. * Returns {@code true} if synchronization is held exclusively with
  576. * respect to the current (calling) thread. This method is invoked
  577. * upon each call to a non-waiting {@link ConditionObject} method.
  578. * (Waiting methods instead invoke {@link #release}.)
  579. *
  580. * <p>The default implementation throws {@link
  581. * UnsupportedOperationException}. This method is invoked
  582. * internally only within {@link ConditionObject} methods, so need
  583. * not be defined if conditions are not used.
  584. *
  585. * @return {@code true} if synchronization is held exclusively;
  586. * {@code false} otherwise
  587. * @throws UnsupportedOperationException if conditions are not supported
  588. */
  589. protected boolean isHeldExclusively() {
  590. throw new UnsupportedOperationException();
  591. }
  592. /**
  593. * Acquires in exclusive mode, ignoring interrupts. Implemented
  594. * by invoking at least once {@link #tryAcquire},
  595. * returning on success. Otherwise the thread is queued, possibly
  596. * repeatedly blocking and unblocking, invoking {@link
  597. * #tryAcquire} until success. This method can be used
  598. * to implement method {@link Lock#lock}.
  599. *
  600. * @param arg the acquire argument. This value is conveyed to
  601. * {@link #tryAcquire} but is otherwise uninterpreted and
  602. * can represent anything you like.
  603. */
  604. public final void acquire(int arg) {
  605. if (!tryAcquire(arg) &&
  606. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  607. selfInterrupt();
  608. }
  609. /**
  610. * Acquires in exclusive mode, aborting if interrupted.
  611. * Implemented by first checking interrupt status, then invoking
  612. * at least once {@link #tryAcquire}, returning on
  613. * success. Otherwise the thread is queued, possibly repeatedly
  614. * blocking and unblocking, invoking {@link #tryAcquire}
  615. * until success or the thread is interrupted. This method can be
  616. * used to implement method {@link Lock#lockInterruptibly}.
  617. *
  618. * @param arg the acquire argument. This value is conveyed to
  619. * {@link #tryAcquire} but is otherwise uninterpreted and
  620. * can represent anything you like.
  621. * @throws InterruptedException if the current thread is interrupted
  622. */
  623. public final void acquireInterruptibly(int arg)
  624. throws InterruptedException {
  625. if (Thread.interrupted())
  626. throw new InterruptedException();
  627. if (!tryAcquire(arg))
  628. doAcquireInterruptibly(arg);
  629. }
  630. /**
  631. * Attempts to acquire in exclusive mode, aborting if interrupted,
  632. * and failing if the given timeout elapses. Implemented by first
  633. * checking interrupt status, then invoking at least once {@link
  634. * #tryAcquire}, returning on success. Otherwise, the thread is
  635. * queued, possibly repeatedly blocking and unblocking, invoking
  636. * {@link #tryAcquire} until success or the thread is interrupted
  637. * or the timeout elapses. This method can be used to implement
  638. * method {@link Lock#tryLock(long, TimeUnit)}.
  639. *
  640. * @param arg the acquire argument. This value is conveyed to
  641. * {@link #tryAcquire} but is otherwise uninterpreted and
  642. * can represent anything you like.
  643. * @param nanosTimeout the maximum number of nanoseconds to wait
  644. * @return {@code true} if acquired; {@code false} if timed out
  645. * @throws InterruptedException if the current thread is interrupted
  646. */
  647. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  648. throws InterruptedException {
  649. if (Thread.interrupted())
  650. throw new InterruptedException();
  651. return tryAcquire(arg) ||
  652. doAcquireNanos(arg, nanosTimeout);
  653. }
  654. /**
  655. * Releases in exclusive mode. Implemented by unblocking one or
  656. * more threads if {@link #tryRelease} returns true.
  657. * This method can be used to implement method {@link Lock#unlock}.
  658. *
  659. * @param arg the release argument. This value is conveyed to
  660. * {@link #tryRelease} but is otherwise uninterpreted and
  661. * can represent anything you like.
  662. * @return the value returned from {@link #tryRelease}
  663. */
  664. public final boolean release(int arg) {
  665. if (tryRelease(arg)) {
  666. Node h = head;
  667. if (h != null && h.waitStatus != 0)
  668. unparkSuccessor(h);
  669. return true;
  670. }
  671. return false;
  672. }
  673. /**
  674. * Acquires in shared mode, ignoring interrupts. Implemented by
  675. * first invoking at least once {@link #tryAcquireShared},
  676. * returning on success. Otherwise the thread is queued, possibly
  677. * repeatedly blocking and unblocking, invoking {@link
  678. * #tryAcquireShared} until success.
  679. *
  680. * @param arg the acquire argument. This value is conveyed to
  681. * {@link #tryAcquireShared} but is otherwise uninterpreted
  682. * and can represent anything you like.
  683. */
  684. public final void acquireShared(int arg) {
  685. if (tryAcquireShared(arg) < 0)
  686. doAcquireShared(arg);
  687. }
  688. /**
  689. * Acquires in shared mode, aborting if interrupted. Implemented
  690. * by first checking interrupt status, then invoking at least once
  691. * {@link #tryAcquireShared}, returning on success. Otherwise the
  692. * thread is queued, possibly repeatedly blocking and unblocking,
  693. * invoking {@link #tryAcquireShared} until success or the thread
  694. * is interrupted.
  695. * @param arg the acquire argument.
  696. * This value is conveyed to {@link #tryAcquireShared} but is
  697. * otherwise uninterpreted and can represent anything
  698. * you like.
  699. * @throws InterruptedException if the current thread is interrupted
  700. */
  701. public final void acquireSharedInterruptibly(int arg)
  702. throws InterruptedException {
  703. if (Thread.interrupted())
  704. throw new InterruptedException();
  705. if (tryAcquireShared(arg) < 0)
  706. doAcquireSharedInterruptibly(arg);
  707. }
  708. /**
  709. * Attempts to acquire in shared mode, aborting if interrupted, and
  710. * failing if the given timeout elapses. Implemented by first
  711. * checking interrupt status, then invoking at least once {@link
  712. * #tryAcquireShared}, returning on success. Otherwise, the
  713. * thread is queued, possibly repeatedly blocking and unblocking,
  714. * invoking {@link #tryAcquireShared} until success or the thread
  715. * is interrupted or the timeout elapses.
  716. *
  717. * @param arg the acquire argument. This value is conveyed to
  718. * {@link #tryAcquireShared} but is otherwise uninterpreted
  719. * and can represent anything you like.
  720. * @param nanosTimeout the maximum number of nanoseconds to wait
  721. * @return {@code true} if acquired; {@code false} if timed out
  722. * @throws InterruptedException if the current thread is interrupted
  723. */
  724. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  725. throws InterruptedException {
  726. if (Thread.interrupted())
  727. throw new InterruptedException();
  728. return tryAcquireShared(arg) >= 0 ||
  729. doAcquireSharedNanos(arg, nanosTimeout);
  730. }
  731. /**
  732. * Releases in shared mode. Implemented by unblocking one or more
  733. * threads if {@link #tryReleaseShared} returns true.
  734. *
  735. * @param arg the release argument. This value is conveyed to
  736. * {@link #tryReleaseShared} but is otherwise uninterpreted
  737. * and can represent anything you like.
  738. * @return the value returned from {@link #tryReleaseShared}
  739. */
  740. public final boolean releaseShared(int arg) {
  741. if (tryReleaseShared(arg)) {
  742. doReleaseShared();
  743. return true;
  744. }
  745. return false;
  746. }
  747. // Queue inspection methods
  748. /**
  749. * Queries whether any threads are waiting to acquire. Note that
  750. * because cancellations due to interrupts and timeouts may occur
  751. * at any time, a {@code true} return does not guarantee that any
  752. * other thread will ever acquire.
  753. *
  754. * <p>In this implementation, this operation returns in
  755. * constant time.
  756. *
  757. * @return {@code true} if there may be other threads waiting to acquire
  758. */
  759. public final boolean hasQueuedThreads() {
  760. return head != tail;
  761. }
  762. /**
  763. * Queries whether any threads have ever contended to acquire this
  764. * synchronizer; that is if an acquire method has ever blocked.
  765. *
  766. * <p>In this implementation, this operation returns in
  767. * constant time.
  768. *
  769. * @return {@code true} if there has ever been contention
  770. */
  771. public final boolean hasContended() {
  772. return head != null;
  773. }
  774. /**
  775. * Returns the first (longest-waiting) thread in the queue, or
  776. * {@code null} if no threads are currently queued.
  777. *
  778. * <p>In this implementation, this operation normally returns in
  779. * constant time, but may iterate upon contention if other threads are
  780. * concurrently modifying the queue.
  781. *
  782. * @return the first (longest-waiting) thread in the queue, or
  783. * {@code null} if no threads are currently queued
  784. */
  785. public final Thread getFirstQueuedThread() {
  786. // handle only fast path, else relay
  787. return (head == tail) ? null : fullGetFirstQueuedThread();
  788. }
  789. /**
  790. * Version of getFirstQueuedThread called when fastpath fails
  791. */
  792. private Thread fullGetFirstQueuedThread() {
  793. /*
  794. * The first node is normally head.next. Try to get its
  795. * thread field, ensuring consistent reads: If thread
  796. * field is nulled out or s.prev is no longer head, then
  797. * some other thread(s) concurrently performed setHead in
  798. * between some of our reads. We try this twice before
  799. * resorting to traversal.
  800. */
  801. Node h, s;
  802. Thread st;
  803. if (((h = head) != null && (s = h.next) != null &&
  804. s.prev == head && (st = s.thread) != null) ||
  805. ((h = head) != null && (s = h.next) != null &&
  806. s.prev == head && (st = s.thread) != null))
  807. return st;
  808. /*
  809. * Head's next field might not have been set yet, or may have
  810. * been unset after setHead. So we must check to see if tail
  811. * is actually first node. If not, we continue on, safely
  812. * traversing from tail back to head to find first,
  813. * guaranteeing termination.
  814. */
  815. Node t = tail;
  816. Thread firstThread = null;
  817. while (t != null && t != head) {
  818. Thread tt = t.thread;
  819. if (tt != null)
  820. firstThread = tt;
  821. t = t.prev;
  822. }
  823. return firstThread;
  824. }
  825. /**
  826. * Returns true if the given thread is currently queued.
  827. *
  828. * <p>This implementation traverses the queue to determine
  829. * presence of the given thread.
  830. *
  831. * @param thread the thread
  832. * @return {@code true} if the given thread is on the queue
  833. * @throws NullPointerException if the thread is null
  834. */
  835. public final boolean isQueued(Thread thread) {
  836. if (thread == null)
  837. throw new NullPointerException();
  838. for (Node p = tail; p != null; p = p.prev)
  839. if (p.thread == thread)
  840. return true;
  841. return false;
  842. }
  843. /**
  844. * Returns {@code true} if the apparent first queued thread, if one
  845. * exists, is waiting in exclusive mode. If this method returns
  846. * {@code true}, and the current thread is attempting to acquire in
  847. * shared mode (that is, this method is invoked from {@link
  848. * #tryAcquireShared}) then it is guaranteed that the current thread
  849. * is not the first queued thread. Used only as a heuristic in
  850. * ReentrantReadWriteLock.
  851. */
  852. final boolean apparentlyFirstQueuedIsExclusive() {
  853. Node h, s;
  854. return (h = head) != null &&
  855. (s = h.next) != null &&
  856. !s.isShared() &&
  857. s.thread != null;
  858. }
  859. /**
  860. * Queries whether any threads have been waiting to acquire longer
  861. * than the current thread.
  862. *
  863. * <p>An invocation of this method is equivalent to (but may be
  864. * more efficient than):
  865. * <pre> {@code
  866. * getFirstQueuedThread() != Thread.currentThread() &&
  867. * hasQueuedThreads()}</pre>
  868. *
  869. * <p>Note that because cancellations due to interrupts and
  870. * timeouts may occur at any time, a {@code true} return does not
  871. * guarantee that some other thread will acquire before the current
  872. * thread. Likewise, it is possible for another thread to win a
  873. * race to enqueue after this method has returned {@code false},
  874. * due to the queue being empty.
  875. *
  876. * <p>This method is designed to be used by a fair synchronizer to
  877. * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
  878. * Such a synchronizer's {@link #tryAcquire} method should return
  879. * {@code false}, and its {@link #tryAcquireShared} method should
  880. * return a negative value, if this method returns {@code true}
  881. * (unless this is a reentrant acquire). For example, the {@code
  882. * tryAcquire} method for a fair, reentrant, exclusive mode
  883. * synchronizer might look like this:
  884. *
  885. * <pre> {@code
  886. * protected boolean tryAcquire(int arg) {
  887. * if (isHeldExclusively()) {
  888. * // A reentrant acquire; increment hold count
  889. * return true;
  890. * } else if (hasQueuedPredecessors()) {
  891. * return false;
  892. * } else {
  893. * // try to acquire normally
  894. * }
  895. * }}</pre>
  896. *
  897. * @return {@code true} if there is a queued thread preceding the
  898. * current thread, and {@code false} if the current thread
  899. * is at the head of the queue or the queue is empty
  900. * @since 1.7
  901. */
  902. public final boolean hasQueuedPredecessors() {
  903. // The correctness of this depends on head being initialized
  904. // before tail and on head.next being accurate if the current
  905. // thread is first in queue.
  906. Node t = tail; // Read fields in reverse initialization order
  907. Node h = head;
  908. Node s;
  909. return h != t &&
  910. ((s = h.next) == null || s.thread != Thread.currentThread());
  911. }
  912. // Instrumentation and monitoring methods
  913. /**
  914. * Returns an estimate of the number of threads waiting to
  915. * acquire. The value is only an estimate because the number of
  916. * threads may change dynamically while this method traverses
  917. * internal data structures. This method is designed for use in
  918. * monitoring system state, not for synchronization
  919. * control.
  920. *
  921. * @return the estimated number of threads waiting to acquire
  922. */
  923. public final int getQueueLength() {
  924. int n = 0;
  925. for (Node p = tail; p != null; p = p.prev) {
  926. if (p.thread != null)
  927. ++n;
  928. }
  929. return n;
  930. }
  931. /**
  932. * Returns a collection containing threads that may be waiting to
  933. * acquire. Because the actual set of threads may change
  934. * dynamically while constructing this result, the returned
  935. * collection is only a best-effort estimate. The elements of the
  936. * returned collection are in no particular order. This method is
  937. * designed to facilitate construction of subclasses that provide
  938. * more extensive monitoring facilities.
  939. *
  940. * @return the collection of threads
  941. */
  942. public final Collection<Thread> getQueuedThreads() {
  943. ArrayList<Thread> list = new ArrayList<Thread>();
  944. for (Node p = tail; p != null; p = p.prev) {
  945. Thread t = p.thread;
  946. if (t != null)
  947. list.add(t);
  948. }
  949. return list;
  950. }
  951. /**
  952. * Returns a collection containing threads that may be waiting to
  953. * acquire in exclusive mode. This has the same properties
  954. * as {@link #getQueuedThreads} except that it only returns
  955. * those threads waiting due to an exclusive acquire.
  956. *
  957. * @return the collection of threads
  958. */
  959. public final Collection<Thread> getExclusiveQueuedThreads() {
  960. ArrayList<Thread> list = new ArrayList<Thread>();
  961. for (Node p = tail; p != null; p = p.prev) {
  962. if (!p.isShared()) {
  963. Thread t = p.thread;
  964. if (t != null)
  965. list.add(t);
  966. }
  967. }
  968. return list;
  969. }
  970. /**
  971. * Returns a collection containing threads that may be waiting to
  972. * acquire in shared mode. This has the same properties
  973. * as {@link #getQueuedThreads} except that it only returns
  974. * those threads waiting due to a shared acquire.
  975. *
  976. * @return the collection of threads
  977. */
  978. public final Collection<Thread> getSharedQueuedThreads() {
  979. ArrayList<Thread> list = new ArrayList<Thread>();
  980. for (Node p = tail; p != null; p = p.prev) {
  981. if (p.isShared()) {
  982. Thread t = p.thread;
  983. if (t != null)
  984. list.add(t);
  985. }
  986. }
  987. return list;
  988. }
  989. /**
  990. * Returns a string identifying this synchronizer, as well as its state.
  991. * The state, in brackets, includes the String {@code "State ="}
  992. * followed by the current value of {@link #getState}, and either
  993. * {@code "nonempty"} or {@code "empty"} depending on whether the
  994. * queue is empty.
  995. *
  996. * @return a string identifying this synchronizer, as well as its state
  997. */
  998. public String toString() {
  999. int s = getState();
  1000. String q = hasQueuedThreads() ? "non" : "";
  1001. return super.toString() +
  1002. "[State = " + s + ", " + q + "empty queue]";
  1003. }
  1004. // Internal support methods for Conditions
  1005. /**
  1006. * Returns true if a node, always one that was initially placed on
  1007. * a condition queue, is now waiting to reacquire on sync queue.
  1008. * @param node the node
  1009. * @return true if is reacquiring
  1010. */
  1011. final boolean isOnSyncQueue(Node node) {
  1012. if (node.waitStatus == Node.CONDITION || node.prev == null)
  1013. return false;
  1014. if (node.next != null) // If has successor, it must be on queue
  1015. return true;
  1016. /*
  1017. * node.prev can be non-null, but not yet on queue because
  1018. * the CAS to place it on queue can fail. So we have to
  1019. * traverse from tail to make sure it actually made it. It
  1020. * will always be near the tail in calls to this method, and
  1021. * unless the CAS failed (which is unlikely), it will be
  1022. * there, so we hardly ever traverse much.
  1023. */
  1024. return findNodeFromTail(node);
  1025. }
  1026. /**
  1027. * Returns true if node is on sync queue by searching backwards from tail.
  1028. * Called only when needed by isOnSyncQueue.
  1029. * @return true if present
  1030. */
  1031. private boolean findNodeFromTail(Node node) {
  1032. Node t = tail;
  1033. for (;;) {
  1034. if (t == node)
  1035. return true;
  1036. if (t == null)
  1037. return false;
  1038. t = t.prev;
  1039. }
  1040. }
  1041. /**
  1042. * Transfers a node from a condition queue onto sync queue.
  1043. * Returns true if successful.
  1044. * @param node the node
  1045. * @return true if successfully transferred (else the node was
  1046. * cancelled before signal)
  1047. */
  1048. final boolean transferForSignal(Node node) {
  1049. /*
  1050. * If cannot change waitStatus, the node has been cancelled.
  1051. */
  1052. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  1053. return false;
  1054. /*
  1055. * Splice onto queue and try to set waitStatus of predecessor to
  1056. * indicate that thread is (probably) waiting. If cancelled or
  1057. * attempt to set waitStatus fails, wake up to resync (in which
  1058. * case the waitStatus can be transiently and harmlessly wrong).
  1059. */
  1060. Node p = enq(node);
  1061. int ws = p.waitStatus;
  1062. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  1063. LockSupport.unpark(node.thread);
  1064. return true;
  1065. }
  1066. /**
  1067. * Transfers node, if necessary, to sync queue after a cancelled wait.
  1068. * Returns true if thread was cancelled before being signalled.
  1069. *
  1070. * @param node the node
  1071. * @return true if cancelled before the node was signalled
  1072. */
  1073. final boolean transferAfterCancelledWait(Node node) {
  1074. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  1075. enq(node);
  1076. return true;
  1077. }
  1078. /*
  1079. * If we lost out to a signal(), then we can't proceed
  1080. * until it finishes its enq(). Cancelling during an
  1081. * incomplete transfer is both rare and transient, so just
  1082. * spin.
  1083. */
  1084. while (!isOnSyncQueue(node))
  1085. Thread.yield();
  1086. return false;
  1087. }
  1088. /**
  1089. * Invokes release with current state value; returns saved state.
  1090. * Cancels node and throws exception on failure.
  1091. * @param node the condition node for this wait
  1092. * @return previous sync state
  1093. */
  1094. final int fullyRelease(Node node) {
  1095. boolean failed = true;
  1096. try {
  1097. int savedState = getState();
  1098. if (release(savedState)) {
  1099. failed = false;
  1100. return savedState;
  1101. } else {
  1102. throw new IllegalMonitorStateException();
  1103. }
  1104. } finally {
  1105. if (failed)
  1106. node.waitStatus = Node.CANCELLED;
  1107. }
  1108. }
  1109. // Instrumentation methods for conditions
  1110. /**
  1111. * Queries whether the given ConditionObject
  1112. * uses this synchronizer as its lock.
  1113. *
  1114. * @param condition the condition
  1115. * @return {@code true} if owned
  1116. * @throws NullPointerException if the condition is null
  1117. */
  1118. public final boolean owns(ConditionObject condition) {
  1119. return condition.isOwnedBy(this);
  1120. }
  1121. /**
  1122. * Queries whether any threads are waiting on the given condition
  1123. * associated with this synchronizer. Note that because timeouts
  1124. * and interrupts may occur at any time, a {@code true} return
  1125. * does not guarantee that a future {@code signal} will awaken
  1126. * any threads. This method is designed primarily for use in
  1127. * monitoring of the system state.
  1128. *
  1129. * @param condition the condition
  1130. * @return {@code true} if there are any waiting threads
  1131. * @throws IllegalMonitorStateException if exclusive synchronization
  1132. * is not held
  1133. * @throws IllegalArgumentException if the given condition is
  1134. * not associated with this synchronizer
  1135. * @throws NullPointerException if the condition is null
  1136. */
  1137. public final boolean hasWaiters(ConditionObject condition) {
  1138. if (!owns(condition))
  1139. throw new IllegalArgumentException("Not owner");
  1140. return condition.hasWaiters();
  1141. }
  1142. /**
  1143. * Returns an estimate of the number of threads waiting on the
  1144. * given condition associated with this synchronizer. Note that
  1145. * because timeouts and interrupts may occur at any time, the
  1146. * estimate serves only as an upper bound on the actual number of
  1147. * waiters. This method is designed for use in monitoring of the
  1148. * system state, not for synchronization control.
  1149. *
  1150. * @param condition the condition
  1151. * @return the estimated number of waiting threads
  1152. * @throws IllegalMonitorStateException if exclusive synchronization
  1153. * is not held
  1154. * @throws IllegalArgumentException if the given condition is
  1155. * not associated with this synchronizer
  1156. * @throws NullPointerException if the condition is null
  1157. */
  1158. public final int getWaitQueueLength(ConditionObject condition) {
  1159. if (!owns(condition))
  1160. throw new IllegalArgumentException("Not owner");
  1161. return condition.getWaitQueueLength();
  1162. }
  1163. /**
  1164. * Returns a collection containing those threads that may be
  1165. * waiting on the given condition associated with this
  1166. * synchronizer. Because the actual set of threads may change
  1167. * dynamically while constructing this result, the returned
  1168. * collection is only a best-effort estimate. The elements of the
  1169. * returned collection are in no particular order.
  1170. *
  1171. * @param condition the condition
  1172. * @return the collection of threads
  1173. * @throws IllegalMonitorStateException if exclusive synchronization
  1174. * is not held
  1175. * @throws IllegalArgumentException if the given condition is
  1176. * not associated with this synchronizer
  1177. * @throws NullPointerException if the condition is null
  1178. */
  1179. public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
  1180. if (!owns(condition))
  1181. throw new IllegalArgumentException("Not owner");
  1182. return condition.getWaitingThreads();
  1183. }
  1184. /**
  1185. * Condition implementation for a {@link
  1186. * AbstractQueuedSynchronizer} serving as the basis of a {@link
  1187. * Lock} implementation.
  1188. *
  1189. * <p>Method documentation for this class describes mechanics,
  1190. * not behavioral specifications from the point of view of Lock
  1191. * and Condition users. Exported versions of this class will in
  1192. * general need to be accompanied by documentation describing
  1193. * condition semantics that rely on those of the associated
  1194. * {@code AbstractQueuedSynchronizer}.
  1195. *
  1196. * <p>This class is Serializable, but all fields are transient,
  1197. * so deserialized conditions have no waiters.
  1198. */
  1199. public class ConditionObject implements Condition, java.io.Serializable {
  1200. private static final long serialVersionUID = 1173984872572414699L;
  1201. /** First node of condition queue. */
  1202. private transient Node firstWaiter;
  1203. /** Last node of condition queue. */
  1204. private transient Node lastWaiter;
  1205. /**
  1206. * Creates a new {@code ConditionObject} instance.
  1207. */
  1208. public ConditionObject() { }
  1209. // Internal methods
  1210. /**
  1211. * Adds a new waiter to wait queue.
  1212. * @return its new wait node
  1213. */
  1214. private Node addConditionWaiter() {
  1215. Node t = lastWaiter;
  1216. // If lastWaiter is cancelled, clean out.
  1217. if (t != null && t.waitStatus != Node.CONDITION) {
  1218. unlinkCancelledWaiters();
  1219. t = lastWaiter;
  1220. }
  1221. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  1222. if (t == null)
  1223. firstWaiter = node;
  1224. else
  1225. t.nextWaiter = node;
  1226. lastWaiter = node;
  1227. return node;
  1228. }
  1229. /**
  1230. * Removes and transfers nodes until hit non-cancelled one or
  1231. * null. Split out from signal in part to encourage compilers
  1232. * to inline the case of no waiters.
  1233. * @param first (non-null) the first node on condition queue
  1234. */
  1235. private void doSignal(Node first) {
  1236. do {
  1237. if ( (firstWaiter = first.nextWaiter) == null)
  1238. lastWaiter = null;
  1239. first.nextWaiter = null;
  1240. } while (!transferForSignal(first) &&
  1241. (first = firstWaiter) != null);
  1242. }
  1243. /**
  1244. * Removes and transfers all nodes.
  1245. * @param first (non-null) the first node on condition queue
  1246. */
  1247. private void doSignalAll(Node first) {
  1248. lastWaiter = firstWaiter = null;
  1249. do {
  1250. Node next = first.nextWaiter;
  1251. first.nextWaiter = null;
  1252. transferForSignal(first);
  1253. first = next;
  1254. } while (first != null);
  1255. }
  1256. /**
  1257. * Unlinks cancelled waiter nodes from condition queue.
  1258. * Called only while holding lock. This is called when
  1259. * cancellation occurred during condition wait, and upon
  1260. * insertion of a new waiter when lastWaiter is seen to have
  1261. * been cancelled. This method is needed to avoid garbage
  1262. * retention in the absence of signals. So even though it may
  1263. * require a full traversal, it comes into play only when
  1264. * timeouts or cancellations occur in the absence of
  1265. * signals. It traverses all nodes rather than stopping at a
  1266. * particular target to unlink all pointers to garbage nodes
  1267. * without requiring many re-traversals during cancellation
  1268. * storms.
  1269. */
  1270. private void unlinkCancelledWaiters() {
  1271. Node t = firstWaiter;
  1272. Node trail = null;
  1273. while (t != null) {
  1274. Node next = t.nextWaiter;
  1275. if (t.waitStatus != Node.CONDITION) {
  1276. t.nextWaiter = null;
  1277. if (trail == null)
  1278. firstWaiter = next;
  1279. else
  1280. trail.nextWaiter = next;
  1281. if (next == null)
  1282. lastWaiter = trail;
  1283. }
  1284. else
  1285. trail = t;
  1286. t = next;
  1287. }
  1288. }
  1289. // public methods
  1290. /**
  1291. * Moves the longest-waiting thread, if one exists, from the
  1292. * wait queue for this condition to the wait queue for the
  1293. * owning lock.
  1294. *
  1295. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1296. * returns {@code false}
  1297. */
  1298. public final void signal() {
  1299. if (!isHeldExclusively())
  1300. throw new IllegalMonitorStateException();
  1301. Node first = firstWaiter;
  1302. if (first != null)
  1303. doSignal(first);
  1304. }
  1305. /**
  1306. * Moves all threads from the wait queue for this condition to
  1307. * the wait queue for the owning lock.
  1308. *
  1309. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1310. * returns {@code false}
  1311. */
  1312. public final void signalAll() {
  1313. if (!isHeldExclusively())
  1314. throw new IllegalMonitorStateException();
  1315. Node first = firstWaiter;
  1316. if (first != null)
  1317. doSignalAll(first);
  1318. }
  1319. /**
  1320. * Implements uninterruptible condition wait.
  1321. * <ol>
  1322. * <li> Save lock state returned by {@link #getState}.
  1323. * <li> Invoke {@link #release} with saved state as argument,
  1324. * throwing IllegalMonitorStateException if it fails.
  1325. * <li> Block until signalled.
  1326. * <li> Reacquire by invoking specialized version of
  1327. * {@link #acquire} with saved state as argument.
  1328. * </ol>
  1329. */
  1330. public final void awaitUninterruptibly() {
  1331. Node node = addConditionWaiter();
  1332. int savedState = fullyRelease(node);
  1333. boolean interrupted = false;
  1334. while (!isOnSyncQueue(node)) {
  1335. LockSupport.park(this);
  1336. if (Thread.interrupted())
  1337. interrupted = true;
  1338. }
  1339. if (acquireQueued(node, savedState) || interrupted)
  1340. selfInterrupt();
  1341. }
  1342. /*
  1343. * For interruptible waits, we need to track whether to throw
  1344. * InterruptedException, if interrupted while blocked on
  1345. * condition, versus reinterrupt current thread, if
  1346. * interrupted while blocked waiting to re-acquire.
  1347. */
  1348. /** Mode meaning to reinterrupt on exit from wait */
  1349. private static final int REINTERRUPT = 1;
  1350. /** Mode meaning to throw InterruptedException on exit from wait */
  1351. private static final int THROW_IE = -1;
  1352. /**
  1353. * Checks for interrupt, returning THROW_IE if interrupted
  1354. * before signalled, REINTERRUPT if after signalled, or
  1355. * 0 if not interrupted.
  1356. */
  1357. private int checkInterruptWhileWaiting(Node node) {
  1358. return Thread.interrupted() ?
  1359. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  1360. 0;
  1361. }
  1362. /**
  1363. * Throws InterruptedException, reinterrupts current thread, or
  1364. * does nothing, depending on mode.
  1365. */
  1366. private void reportInterruptAfterWait(int interruptMode)
  1367. throws InterruptedException {
  1368. if (interruptMode == THROW_IE)
  1369. throw new InterruptedException();
  1370. else if (interruptMode == REINTERRUPT)
  1371. selfInterrupt();
  1372. }
  1373. /**
  1374. * Implements interruptible condition wait.
  1375. * <ol>
  1376. * <li> If current thread is interrupted, throw InterruptedException.
  1377. * <li> Save lock state returned by {@link #getState}.
  1378. * <li> Invoke {@link #release} with saved state as argument,
  1379. * throwing IllegalMonitorStateException if it fails.
  1380. * <li> Block until signalled or interrupted.
  1381. * <li> Reacquire by invoking specialized version of
  1382. * {@link #acquire} with saved state as argument.
  1383. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  1384. * </ol>
  1385. */
  1386. public final void await() throws InterruptedException {
  1387. if (Thread.interrupted())
  1388. throw new InterruptedException();
  1389. Node node = addConditionWaiter();
  1390. int savedState = fullyRelease(node);
  1391. int interruptMode = 0;
  1392. while (!isOnSyncQueue(node)) {
  1393. LockSupport.park(this);
  1394. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1395. break;
  1396. }
  1397. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1398. interruptMode = REINTERRUPT;
  1399. if (node.nextWaiter != null) // clean up if cancelled
  1400. unlinkCancelledWaiters();
  1401. if (interruptMode != 0)
  1402. reportInterruptAfterWait(interruptMode);
  1403. }
  1404. /**
  1405. * Implements timed condition wait.
  1406. * <ol>
  1407. * <li> If current thread is interrupted, throw InterruptedException.
  1408. * <li> Save lock state returned by {@link #getState}.
  1409. * <li> Invoke {@link #release} with saved state as argument,
  1410. * throwing IllegalMonitorStateException if it fails.
  1411. * <li> Block until signalled, interrupted, or timed out.
  1412. * <li> Reacquire by invoking specialized version of
  1413. * {@link #acquire} with saved state as argument.
  1414. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  1415. * </ol>
  1416. */
  1417. public final long awaitNanos(long nanosTimeout)
  1418. throws InterruptedException {
  1419. if (Thread.interrupted())
  1420. throw new InterruptedException();
  1421. Node node = addConditionWaiter();
  1422. int savedState = fullyRelease(node);
  1423. final long deadline = System.nanoTime() + nanosTimeout;
  1424. int interruptMode = 0;
  1425. while (!isOnSyncQueue(node)) {
  1426. if (nanosTimeout <= 0L) {
  1427. transferAfterCancelledWait(node);
  1428. break;
  1429. }
  1430. if (nanosTimeout >= spinForTimeoutThreshold)
  1431. LockSupport.parkNanos(this, nanosTimeout);
  1432. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1433. break;
  1434. nanosTimeout = deadline - System.nanoTime();
  1435. }
  1436. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1437. interruptMode = REINTERRUPT;
  1438. if (node.nextWaiter != null)
  1439. unlinkCancelledWaiters();
  1440. if (interruptMode != 0)
  1441. reportInterruptAfterWait(interruptMode);
  1442. return deadline - System.nanoTime();
  1443. }
  1444. /**
  1445. * Implements absolute timed condition wait.
  1446. * <ol>
  1447. * <li> If current thread is interrupted, throw InterruptedException.
  1448. * <li> Save lock state returned by {@link #getState}.
  1449. * <li> Invoke {@link #release} with saved state as argument,
  1450. * throwing IllegalMonitorStateException if it fails.
  1451. * <li> Block until signalled, interrupted, or timed out.
  1452. * <li> Reacquire by invoking specialized version of
  1453. * {@link #acquire} with saved state as argument.
  1454. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  1455. * <li> If timed out while blocked in step 4, return false, else true.
  1456. * </ol>
  1457. */
  1458. public final boolean awaitUntil(Date deadline)
  1459. throws InterruptedException {
  1460. long abstime = deadline.getTime();
  1461. if (Thread.interrupted())
  1462. throw new InterruptedException();
  1463. Node node = addConditionWaiter();
  1464. int savedState = fullyRelease(node);
  1465. boolean timedout = false;
  1466. int interruptMode = 0;
  1467. while (!isOnSyncQueue(node)) {
  1468. if (System.currentTimeMillis() > abstime) {
  1469. timedout = transferAfterCancelledWait(node);
  1470. break;
  1471. }
  1472. LockSupport.parkUntil(this, abstime);
  1473. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1474. break;
  1475. }
  1476. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1477. interruptMode = REINTERRUPT;
  1478. if (node.nextWaiter != null)
  1479. unlinkCancelledWaiters();
  1480. if (interruptMode != 0)
  1481. reportInterruptAfterWait(interruptMode);
  1482. return !timedout;
  1483. }
  1484. /**
  1485. * Implements timed condition wait.
  1486. * <ol>
  1487. * <li> If current thread is interrupted, throw InterruptedException.
  1488. * <li> Save lock state returned by {@link #getState}.
  1489. * <li> Invoke {@link #release} with saved state as argument,
  1490. * throwing IllegalMonitorStateException if it fails.
  1491. * <li> Block until signalled, interrupted, or timed out.
  1492. * <li> Reacquire by invoking specialized version of
  1493. * {@link #acquire} with saved state as argument.
  1494. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  1495. * <li> If timed out while blocked in step 4, return false, else true.
  1496. * </ol>
  1497. */
  1498. public final boolean await(long time, TimeUnit unit)
  1499. throws InterruptedException {
  1500. long nanosTimeout = unit.toNanos(time);
  1501. if (Thread.interrupted())
  1502. throw new InterruptedException();
  1503. Node node = addConditionWaiter();
  1504. int savedState = fullyRelease(node);
  1505. final long deadline = System.nanoTime() + nanosTimeout;
  1506. boolean timedout = false;
  1507. int interruptMode = 0;
  1508. while (!isOnSyncQueue(node)) {
  1509. if (nanosTimeout <= 0L) {
  1510. timedout = transferAfterCancelledWait(node);
  1511. break;
  1512. }
  1513. if (nanosTimeout >= spinForTimeoutThreshold)
  1514. LockSupport.parkNanos(this, nanosTimeout);
  1515. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1516. break;
  1517. nanosTimeout = deadline - System.nanoTime();
  1518. }
  1519. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1520. interruptMode = REINTERRUPT;
  1521. if (node.nextWaiter != null)
  1522. unlinkCancelledWaiters();
  1523. if (interruptMode != 0)
  1524. reportInterruptAfterWait(interruptMode);
  1525. return !timedout;
  1526. }
  1527. // support for instrumentation
  1528. /**
  1529. * Returns true if this condition was created by the given
  1530. * synchronization object.
  1531. *
  1532. * @return {@code true} if owned
  1533. */
  1534. final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
  1535. return sync == AbstractQueuedSynchronizer.this;
  1536. }
  1537. /**
  1538. * Queries whether any threads are waiting on this condition.
  1539. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
  1540. *
  1541. * @return {@code true} if there are any waiting threads
  1542. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1543. * returns {@code false}
  1544. */
  1545. protected final boolean hasWaiters() {
  1546. if (!isHeldExclusively())
  1547. throw new IllegalMonitorStateException();
  1548. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1549. if (w.waitStatus == Node.CONDITION)
  1550. return true;
  1551. }
  1552. return false;
  1553. }
  1554. /**
  1555. * Returns an estimate of the number of threads waiting on
  1556. * this condition.
  1557. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
  1558. *
  1559. * @return the estimated number of waiting threads
  1560. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1561. * returns {@code false}
  1562. */
  1563. protected final int getWaitQueueLength() {
  1564. if (!isHeldExclusively())
  1565. throw new IllegalMonitorStateException();
  1566. int n = 0;
  1567. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1568. if (w.waitStatus == Node.CONDITION)
  1569. ++n;
  1570. }
  1571. return n;
  1572. }
  1573. /**
  1574. * Returns a collection containing those threads that may be
  1575. * waiting on this Condition.
  1576. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
  1577. *
  1578. * @return the collection of threads
  1579. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1580. * returns {@code false}
  1581. */
  1582. protected final Collection<Thread> getWaitingThreads() {
  1583. if (!isHeldExclusively())
  1584. throw new IllegalMonitorStateException();
  1585. ArrayList<Thread> list = new ArrayList<Thread>();
  1586. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1587. if (w.waitStatus == Node.CONDITION) {
  1588. Thread t = w.thread;
  1589. if (t != null)
  1590. list.add(t);
  1591. }
  1592. }
  1593. return list;
  1594. }
  1595. }
  1596. /**
  1597. * Setup to support compareAndSet. We need to natively implement
  1598. * this here: For the sake of permitting future enhancements, we
  1599. * cannot explicitly subclass AtomicInteger, which would be
  1600. * efficient and useful otherwise. So, as the lesser of evils, we
  1601. * natively implement using hotspot intrinsics API. And while we
  1602. * are at it, we do the same for other CASable fields (which could
  1603. * otherwise be done with atomic field updaters).
  1604. */
  1605. private static final Unsafe unsafe = Unsafe.getUnsafe();
  1606. private static final long stateOffset;
  1607. private static final long headOffset;
  1608. private static final long tailOffset;
  1609. private static final long waitStatusOffset;
  1610. private static final long nextOffset;
  1611. static {
  1612. try {
  1613. stateOffset = unsafe.objectFieldOffset
  1614. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  1615. headOffset = unsafe.objectFieldOffset
  1616. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  1617. tailOffset = unsafe.objectFieldOffset
  1618. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  1619. waitStatusOffset = unsafe.objectFieldOffset
  1620. (Node.class.getDeclaredField("waitStatus"));
  1621. nextOffset = unsafe.objectFieldOffset
  1622. (Node.class.getDeclaredField("next"));
  1623. } catch (Exception ex) { throw new Error(ex); }
  1624. }
  1625. /**
  1626. * CAS head field. Used only by enq.
  1627. */
  1628. private final boolean compareAndSetHead(Node update) {
  1629. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  1630. }
  1631. /**
  1632. * CAS tail field. Used only by enq.
  1633. */
  1634. private final boolean compareAndSetTail(Node expect, Node update) {
  1635. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  1636. }
  1637. /**
  1638. * CAS waitStatus field of a node.
  1639. */
  1640. private static final boolean compareAndSetWaitStatus(Node node,
  1641. int expect,
  1642. int update) {
  1643. return unsafe.compareAndSwapInt(node, waitStatusOffset,
  1644. expect, update);
  1645. }
  1646. /**
  1647. * CAS next field of a node.
  1648. */
  1649. private static final boolean compareAndSetNext(Node node,
  1650. Node expect,
  1651. Node update) {
  1652. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  1653. }
  1654. }

0179d201aa8c3cd5bdd6c9da985d54f.png

独占式同步状态获取与释放
e13b4ffa7a1e1a51cb537e1e1701156.png

分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况。

重入锁

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。 这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。

  1. 实现重进入:重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞

    1. 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
    2. 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该
    3. ReentrantLock中同步器的实现:

      1. abstract static class Sync extends AbstractQueuedSynchronizer {
      2. private static final long serialVersionUID = -5179523762034025860L;
      3. /**
      4. * Performs {@link Lock#lock}. The main reason for subclassing
      5. * is to allow fast path for nonfair version.
      6. */
      7. abstract void lock();
      8. /**
      9. * 通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的
      10. * 线程再次请求,则将同步状态值进行增强并返回true,表示同步状态成功
      11. */
      12. final boolean nonfairTryAcquire(int acquires) {
      13. final Thread current = Thread.currentThread();
      14. int c = getState();
      15. if (c == 0) {
      16. if (compareAndSetState(0, acquires)) {
      17. setExclusiveOwnerThread(current);
      18. return true;
      19. }
      20. }
      21. else if (current == getExclusiveOwnerThread()) {
      22. int nextc = c + acquires;
      23. if (nextc < 0) // overflow
      24. throw new Error("Maximum lock count exceeded");
      25. setState(nextc);
      26. return true;
      27. }
      28. return false;
      29. }
      30. /**
      31. * 该锁被获取了n次,那么释放就需要n次,前n-1次调用tryRelease方法必须返回false
      32. * 当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功
      33. */
      34. protected final boolean tryRelease(int releases) {
      35. int c = getState() - releases;
      36. if (Thread.currentThread() != getExclusiveOwnerThread())
      37. throw new IllegalMonitorStateException();
      38. boolean free = false;
      39. if (c == 0) {
      40. free = true;
      41. setExclusiveOwnerThread(null);
      42. }
      43. setState(c);
      44. return free;
      45. }
      46. }

      公平锁与非公平获取锁的区别

      公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。

      1. protected final boolean tryAcquire(int acquires) {
      2. final Thread current = Thread.currentThread();
      3. int c = getState();
      4. if (c == 0) {
      5. if (!hasQueuedPredecessors() &&
      6. compareAndSetState(0, acquires)) {
      7. setExclusiveOwnerThread(current);
      8. return true;
      9. }
      10. }
      11. else if (current == getExclusiveOwnerThread()) {
      12. int nextc = c + acquires;
      13. if (nextc < 0)
      14. throw new Error("Maximum lock count exceeded");
      15. setState(nextc);
      16. return true;
      17. }
      18. return false;
      19. }
      20. //多了hasQueuedPredecessors方法,即加入了同步队列中当前节点是否有前驱节点的判断,
      21. 如果该 方法返回true,则表示有线程比当前线程更早地请求获取锁,
      22. 因此需要等待前驱线程获取并释 放锁之后才能继续获取锁。

      在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。可以看出,公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

      读写锁

      之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
      一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock

特性 说明
公平性选择 支持非公平和公平的所获取方式。吞吐量还是非公平优于公平
可重入 该锁支持重进入,已读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁
锁降级 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁

ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级

  1. 读写状态的设计:读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
  2. 写锁的获取与释放:该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞
  3. 读锁的获取与释放:读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。
  4. 锁降级:锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

    LockSupport工具

    LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Threadthread)方法来唤醒一个被阻塞的线程。Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开。
    LockSupport提供的阻塞和唤醒方法:
方法名称 描述
void park() 阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回
void parkNanos() 阻塞当前线程。最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回
void parkUntil(long deadline) 阻塞当前线程,知道deadline时间
void unpark(Thread thread) 唤醒处于阻塞状态的线程thread

Condition接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如果不加说明均指的是ConditionObject。

  1. 等待队列:等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点 的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部 。在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列加入等待队列d3801788d6d5c81def377465e2eecb0.png
  2. 等待:调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
  3. 通知:调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。c18d46ae8b54c61490c9f44159a5817.png