阻塞队列模型
同步流程
条件队列
Condition队列模型
公共方法(封装好的API)
acquire
- 独占式获取(Acquires in exclusive mode, ignoring interrupts)
/**
* 先后调用tryAcquire()和acquireQueued()方法
* tryAcquire()是模板方法,用户自定义扩展,自定义获取状态的实现
* acquireQueued()是在状态获取失败情况下调用的,addWaiter()将节点追加到独占式队列尾部
* 将获取失败的节点添加到等待队列中
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireShared
- 共享式获取(Acquires in shared mode, ignoring interrupts.)
/**
* 同样是自定义的模板方法tryAcquireShared()
*
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
hasQueuedPredecessors
- 判断是否有前驱节点
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
getQueueLength
- 获取同步队列长度
- 从尾结点开始遍历,仅节点存在线程才有效
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
getQueuedThreads
- 获取同步线程队列
- 从尾结点开始遍历
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
getExclusiveQueuedThreads
- 获取独占式同步线程队列
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
getSharedQueuedThreads
- 获取共享式同步线程队列
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
release
- 独占式释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
模板方法(自定义实现)
tryAcquire
- 尝试独占式获取
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryRelease
- 尝试独占式释放
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared
- 尝试共享获取
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared
- 尝试共享释放
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
isHeldExclusively
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
私有方法(同步队列核心实现)
acquireQueued
- 在队列中尝试获取状态(独占队列)
- 成功获取状态的前提是前驱节点是头结点,然后执行自定义的tryAcquire方法去尝试获取状态
- 注:在acquire方法中会优先调用tryAcquire方法在调用此方法的逻辑,非公平锁在这点上可以提前忽略p == head的条件而进行一次抢占式获取。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared
- 在队列中尝试获取状态(共享队列)
- 在循环中,有个setHeadAndPropagate的方法,目的在于设置新的头结点,并传播给后续的Node.SHARED模式的节点
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
enq
- 添加到同步队列
/**
* 循环CAS
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 以下代码段同enq()一样,这里只是尝试快速添加到尾结点,如若失败再通过enq循环CAS
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
setHead
- 替换头结点
- 一般用在抢占队列中,抢夺锁成功的节点将成为新的头结点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
unparkSuccessor
- 唤醒后继节点,如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历,直到找到合适的节点并将其唤醒。
private void unparkSuccessor(Node node) {
/*
* 将当前节点的等待状态设置为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历
* 直到找到合适的节点并将其唤醒。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
doReleaseShared
- 发出释放信号并确保传播
- 用于共享队列(非独占式队列)
- 应用于共享队列的操作中
- @see setHeadAndPropagate(Node node, int propagate)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
setHeadAndPropagate
- 设置头结点并传播
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 如果propagate大于0(表示传播)或者头结点(此时为node)是空或等待状态小于0,
* 且后继节点不存在或是共享队列
* 那么进行释放信号的传播
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
shouldParkAfterFailedAcquire
- 在将线程(失败获取状态的node节点对应的线程)park前需要判断前驱节点是否为SIGNAL状态
- pred.waitStatus == Node.SIGNAL表明前驱节点的后继节点(也就是当前节点)是阻塞的,在前驱节点释放的时候可以根据waitStatus判断是否有阻塞节点,如果前驱节点的状态不是Node.SIGNAL,表明当前节点的位置不是安全的(下一次抢夺状态的时候无法被唤醒),同时如果当前节点的前驱节点的状态是CANCELLED(中断或超时),应该循环判断将前驱节点前移,直到大于0(CANCELLED是大于0的一种)
- 要么就尝试将前驱节点状态更改为Node.SIGNAL,然后才可以”safely park”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Condition
一个同步器可以构造多个ConditionObject,用于不同场景下的条件阻塞。
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
节点属性的操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}