Condition
在 Object 模型上,一个对象拥有一个同步队列和一个等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列。
调用方法 tryAcquire 尝试获取锁,获取失败后,会先自旋几次,在执行 park:
public final void acquire(int arg) {// 先尝试获取一次锁if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}// 将当前节点添加到同步队列的末尾private Node addWaiter(Node mode) {// 保存了当前线程和模式 Node,排他为 null,共享为 new Node()Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果没有前置节点,就创建一个新的节点作为前置节点enq(node);return node;}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获得前置节点final Node p = node.predecessor();// 如果前置节点为头节点,并且成功获取锁if (p == head && tryAcquire(arg)) {// 获取锁后,将头结点设为自己setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 判断是否要挂起线程,短路与后,执行 park,并检查中断状态// 如果有其他线程释 unpark 了当前线程,或者中断了当前线程// 当前线程就会退出 park,重新进入循环尝试获取锁if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/// 当前节点已经准备等待一个 release 信号来通知它,所以可以安全的挂起return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/// 前置节点被取消,do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/// 指示说,我们需要一个信号,但不挂起。// 调用者需要重新获取锁,来保证在挂起前确实获取不到锁compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 唤醒下一个节点unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 从后往前找,找到最前面的没有被 cancelled 的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒下一个节点if (s != null)LockSupport.unpark(s.thread);}
/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
