阻塞队列模型
同步流程
条件队列
Condition队列模型
公共方法(封装好的API)
acquire
- 独占式获取(Acquires in exclusive mode, ignoring interrupts)
/*** 先后调用tryAcquire()和acquireQueued()方法* tryAcquire()是模板方法,用户自定义扩展,自定义获取状态的实现* acquireQueued()是在状态获取失败情况下调用的,addWaiter()将节点追加到独占式队列尾部* 将获取失败的节点添加到等待队列中*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
acquireShared
- 共享式获取(Acquires in shared mode, ignoring interrupts.)
/*** 同样是自定义的模板方法tryAcquireShared()**/public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
hasQueuedPredecessors
- 判断是否有前驱节点
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
getQueueLength
- 获取同步队列长度
- 从尾结点开始遍历,仅节点存在线程才有效
public final int getQueueLength() {int n = 0;for (Node p = tail; p != null; p = p.prev) {if (p.thread != null)++n;}return n;}
getQueuedThreads
- 获取同步线程队列
- 从尾结点开始遍历
public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list;}
getExclusiveQueuedThreads
- 获取独占式同步线程队列
public final Collection<Thread> getExclusiveQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {if (!p.isShared()) {Thread t = p.thread;if (t != null)list.add(t);}}return list;}
getSharedQueuedThreads
- 获取共享式同步线程队列
public final Collection<Thread> getSharedQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {if (p.isShared()) {Thread t = p.thread;if (t != null)list.add(t);}}return list;}
release
- 独占式释放
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
模板方法(自定义实现)
tryAcquire
- 尝试独占式获取
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
tryRelease
- 尝试独占式释放
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
tryAcquireShared
- 尝试共享获取
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
tryReleaseShared
- 尝试共享释放
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}
isHeldExclusively
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
私有方法(同步队列核心实现)
acquireQueued
- 在队列中尝试获取状态(独占队列)
- 成功获取状态的前提是前驱节点是头结点,然后执行自定义的tryAcquire方法去尝试获取状态
- 注:在acquire方法中会优先调用tryAcquire方法在调用此方法的逻辑,非公平锁在这点上可以提前忽略p == head的条件而进行一次抢占式获取。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
doAcquireShared
- 在队列中尝试获取状态(共享队列)
- 在循环中,有个setHeadAndPropagate的方法,目的在于设置新的头结点,并传播给后续的Node.SHARED模式的节点
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
enq
- 添加到同步队列
/*** 循环CAS*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 以下代码段同enq()一样,这里只是尝试快速添加到尾结点,如若失败再通过enq循环CASNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
setHead
- 替换头结点
- 一般用在抢占队列中,抢夺锁成功的节点将成为新的头结点
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
unparkSuccessor
- 唤醒后继节点,如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历,直到找到合适的节点并将其唤醒。
private void unparkSuccessor(Node node) {/** 将当前节点的等待状态设置为0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** 如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历* 直到找到合适的节点并将其唤醒。*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
doReleaseShared
- 发出释放信号并确保传播
- 用于共享队列(非独占式队列)
- 应用于共享队列的操作中
- @see setHeadAndPropagate(Node node, int propagate)
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases//唤醒后继节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
setHeadAndPropagate
- 设置头结点并传播
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** 如果propagate大于0(表示传播)或者头结点(此时为node)是空或等待状态小于0,* 且后继节点不存在或是共享队列* 那么进行释放信号的传播*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
shouldParkAfterFailedAcquire
- 在将线程(失败获取状态的node节点对应的线程)park前需要判断前驱节点是否为SIGNAL状态
- pred.waitStatus == Node.SIGNAL表明前驱节点的后继节点(也就是当前节点)是阻塞的,在前驱节点释放的时候可以根据waitStatus判断是否有阻塞节点,如果前驱节点的状态不是Node.SIGNAL,表明当前节点的位置不是安全的(下一次抢夺状态的时候无法被唤醒),同时如果当前节点的前驱节点的状态是CANCELLED(中断或超时),应该循环判断将前驱节点前移,直到大于0(CANCELLED是大于0的一种)
- 要么就尝试将前驱节点状态更改为Node.SIGNAL,然后才可以”safely park”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
Condition
一个同步器可以构造多个ConditionObject,用于不同场景下的条件阻塞。
await
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
signal
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
节点属性的操作
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}/*** CAS head field. Used only by enq.*/private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}/*** CAS tail field. Used only by enq.*/private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}/*** CAS waitStatus field of a node.*/private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);}/*** CAS next field of a node.*/private static final boolean compareAndSetNext(Node node,Node expect,Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);}
