一个没有数据缓冲的BlockingQueue,生产者线程的插入操作(put),必须等待消费者的移除操作(take)。
容量为 0,不存储数据,只作为数据传递的渠道。导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
先工作的一方,将被阻塞
先启动消费者,则消费者被阻塞
先启动生产者,则生产者被阻塞.
应用场景
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
一、原理解析
构造函数
无法初始化队列长度,并且长度总是返回0
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public int size() {
return 0;
}
put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
transfer方法队列实现方式
//两种实现:队列实现;栈实现
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
//put操作:e不为空,take操作:e为空
QNode s = null;
boolean isData = (e != null);//可以标志:是否为put操作
for (;;) {
QNode t = tail;
QNode h = head;
//初始化队列是,在构造函数时,tail、head已经初始化(QNode h = new QNode(null, false);)
if (t == null || h == null) // saw uninitialized value
continue; // spin
//h==t,表示队列为空。默认情况下,QNode中isData为false。
if (h == t || t.isData == isData) { // empty or same-mode
//执行到此处 可能是put线程、也可能是take线程
QNode tn = t.next;
if (t != tail) // inconsistent read
//如果成立,表示其他线程执行完成put操作
continue;
if (tn != null) { // lagging tail
//如果成立,表示其他线程执行完成put操作
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
//构建队列节点,并插入队列尾中
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
//【进行阻塞操作】
Object x = awaitFulfill(s, e, timed, nanos);
//唤醒后的操作
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
//队列不为null。即已经存在元素
} else { // complementary-mode
//执行到此处 可能是put线程、也可能是take线程
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
//执行唤醒操作。如果当前为put操作,则唤醒阻塞的take线程
//如果当前为take操作,则唤醒阻塞的put操作
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
awaitFulfill方法
阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
//timed==true,则表示阻塞指定时间,否则一直阻塞,直到被唤醒
final long deadline = timed ? System.nanoTime() + nanos : 0L;
获取当前线程,等待阻塞操作
Thread w = Thread.currentThread();
//获取自旋次数
//不是队列中的节点,则spins=0。
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
//如果当前线程w,执行中断操作。则移除节点
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
//执行阻塞。需要等待被唤醒
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//阻塞指定时间
LockSupport.parkNanos(this, nanos);
}
}
take方法
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}