Condition

Object 模型上,一个对象拥有一个同步队列和一个等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列

调用方法 tryAcquire 尝试获取锁,获取失败后,会先自旋几次,在执行 park:

  1. public final void acquire(int arg) {
  2. // 先尝试获取一次锁
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }
  7. // 将当前节点添加到同步队列的末尾
  8. private Node addWaiter(Node mode) {
  9. // 保存了当前线程和模式 Node,排他为 null,共享为 new Node()
  10. Node node = new Node(Thread.currentThread(), mode);
  11. // Try the fast path of enq; backup to full enq on failure
  12. Node pred = tail;
  13. if (pred != null) {
  14. node.prev = pred;
  15. if (compareAndSetTail(pred, node)) {
  16. pred.next = node;
  17. return node;
  18. }
  19. }
  20. // 如果没有前置节点,就创建一个新的节点作为前置节点
  21. enq(node);
  22. return node;
  23. }
  24. final boolean acquireQueued(final Node node, int arg) {
  25. boolean failed = true;
  26. try {
  27. boolean interrupted = false;
  28. for (;;) {
  29. // 获得前置节点
  30. final Node p = node.predecessor();
  31. // 如果前置节点为头节点,并且成功获取锁
  32. if (p == head && tryAcquire(arg)) {
  33. // 获取锁后,将头结点设为自己
  34. setHead(node);
  35. p.next = null; // help GC
  36. failed = false;
  37. return interrupted;
  38. }
  39. // 判断是否要挂起线程,短路与后,执行 park,并检查中断状态
  40. // 如果有其他线程释 unpark 了当前线程,或者中断了当前线程
  41. // 当前线程就会退出 park,重新进入循环尝试获取锁
  42. if (shouldParkAfterFailedAcquire(p, node) &&
  43. parkAndCheckInterrupt())
  44. interrupted = true;
  45. }
  46. } finally {
  47. if (failed)
  48. cancelAcquire(node);
  49. }
  50. }
  51. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  52. int ws = pred.waitStatus;
  53. if (ws == Node.SIGNAL)
  54. /*
  55. * This node has already set status asking a release
  56. * to signal it, so it can safely park.
  57. */
  58. // 当前节点已经准备等待一个 release 信号来通知它,所以可以安全的挂起
  59. return true;
  60. if (ws > 0) {
  61. /*
  62. * Predecessor was cancelled. Skip over predecessors and
  63. * indicate retry.
  64. */
  65. // 前置节点被取消,
  66. do {
  67. node.prev = pred = pred.prev;
  68. } while (pred.waitStatus > 0);
  69. pred.next = node;
  70. } else {
  71. /*
  72. * waitStatus must be 0 or PROPAGATE. Indicate that we
  73. * need a signal, but don't park yet. Caller will need to
  74. * retry to make sure it cannot acquire before parking.
  75. */
  76. // 指示说,我们需要一个信号,但不挂起。
  77. // 调用者需要重新获取锁,来保证在挂起前确实获取不到锁
  78. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  79. }
  80. return false;
  81. }
  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. // 唤醒下一个节点
  6. unparkSuccessor(h);
  7. return true;
  8. }
  9. return false;
  10. }
  11. /**
  12. * Wakes up node's successor, if one exists.
  13. *
  14. * @param node the node
  15. */
  16. private void unparkSuccessor(Node node) {
  17. /*
  18. * If status is negative (i.e., possibly needing signal) try
  19. * to clear in anticipation of signalling. It is OK if this
  20. * fails or if status is changed by waiting thread.
  21. */
  22. int ws = node.waitStatus;
  23. if (ws < 0)
  24. compareAndSetWaitStatus(node, ws, 0);
  25. /*
  26. * Thread to unpark is held in successor, which is normally
  27. * just the next node. But if cancelled or apparently null,
  28. * traverse backwards from tail to find the actual
  29. * non-cancelled successor.
  30. */
  31. Node s = node.next;
  32. if (s == null || s.waitStatus > 0) {
  33. s = null;
  34. // 从后往前找,找到最前面的没有被 cancelled 的节点
  35. for (Node t = tail; t != null && t != node; t = t.prev)
  36. if (t.waitStatus <= 0)
  37. s = t;
  38. }
  39. // 唤醒下一个节点
  40. if (s != null)
  41. LockSupport.unpark(s.thread);
  42. }
  1. /** waitStatus value to indicate thread has cancelled */
  2. static final int CANCELLED = 1;
  3. /** waitStatus value to indicate successor's thread needs unparking */
  4. static final int SIGNAL = -1;
  5. /** waitStatus value to indicate thread is waiting on condition */
  6. static final int CONDITION = -2;
  7. /**
  8. * waitStatus value to indicate the next acquireShared should
  9. * unconditionally propagate
  10. */
  11. static final int PROPAGATE = -3;