一个没有数据缓冲的BlockingQueue,生产者线程的插入操作(put),必须等待消费者的移除操作(take)。
容量为 0,不存储数据,只作为数据传递的渠道。导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
先工作的一方,将被阻塞
先启动消费者,则消费者被阻塞
先启动生产者,则生产者被阻塞.
应用场景
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
一、原理解析
构造函数
无法初始化队列长度,并且长度总是返回0
public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}public int size() {return 0;}
put方法
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}
transfer方法队列实现方式
//两种实现:队列实现;栈实现E transfer(E e, boolean timed, long nanos) {/* Basic algorithm is to loop trying to take either of* two actions:** 1. If queue apparently empty or holding same-mode nodes,* try to add node to queue of waiters, wait to be* fulfilled (or cancelled) and return matching item.** 2. If queue apparently contains waiting items, and this* call is of complementary mode, try to fulfill by CAS'ing* item field of waiting node and dequeuing it, and then* returning matching item.** In each case, along the way, check for and try to help* advance head and tail on behalf of other stalled/slow* threads.** The loop starts off with a null check guarding against* seeing uninitialized head or tail values. This never* happens in current SynchronousQueue, but could if* callers held non-volatile/final ref to the* transferer. The check is here anyway because it places* null checks at top of loop, which is usually faster* than having them implicitly interspersed.*///put操作:e不为空,take操作:e为空QNode s = null;boolean isData = (e != null);//可以标志:是否为put操作for (;;) {QNode t = tail;QNode h = head;//初始化队列是,在构造函数时,tail、head已经初始化(QNode h = new QNode(null, false);)if (t == null || h == null) // saw uninitialized valuecontinue; // spin//h==t,表示队列为空。默认情况下,QNode中isData为false。if (h == t || t.isData == isData) { // empty or same-mode//执行到此处 可能是put线程、也可能是take线程QNode tn = t.next;if (t != tail) // inconsistent read//如果成立,表示其他线程执行完成put操作continue;if (tn != null) { // lagging tail//如果成立,表示其他线程执行完成put操作advanceTail(t, tn);continue;}if (timed && nanos <= 0) // can't waitreturn null;//构建队列节点,并插入队列尾中if (s == null)s = new QNode(e, isData);if (!t.casNext(null, s)) // failed to link incontinue;advanceTail(t, s); // swing tail and wait//【进行阻塞操作】Object x = awaitFulfill(s, e, timed, nanos);//唤醒后的操作if (x == s) { // wait was cancelledclean(t, s);return null;}if (!s.isOffList()) { // not already unlinkedadvanceHead(t, s); // unlink if headif (x != null) // and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;//队列不为null。即已经存在元素} else { // complementary-mode//执行到此处 可能是put线程、也可能是take线程QNode m = h.next; // node to fulfillif (t != tail || m == null || h != head)continue; // inconsistent readObject x = m.item;if (isData == (x != null) || // m already fulfilledx == m || // m cancelled!m.casItem(x, e)) { // lost CASadvanceHead(h, m); // dequeue and retrycontinue;}//执行唤醒操作。如果当前为put操作,则唤醒阻塞的take线程//如果当前为take操作,则唤醒阻塞的put操作advanceHead(h, m); // successfully fulfilledLockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}}}
awaitFulfill方法
阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {/* Same idea as TransferStack.awaitFulfill *///timed==true,则表示阻塞指定时间,否则一直阻塞,直到被唤醒final long deadline = timed ? System.nanoTime() + nanos : 0L;获取当前线程,等待阻塞操作Thread w = Thread.currentThread();//获取自旋次数//不是队列中的节点,则spins=0。int spins = ((head.next == s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {//如果当前线程w,执行中断操作。则移除节点if (w.isInterrupted())s.tryCancel(e);Object x = s.item;if (x != e)return x;if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel(e);continue;}}if (spins > 0)--spins;else if (s.waiter == null)s.waiter = w;else if (!timed)//执行阻塞。需要等待被唤醒LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)//阻塞指定时间LockSupport.parkNanos(this, nanos);}}
take方法
public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();}
