一个没有数据缓冲的BlockingQueue,生产者线程的插入操作(put),必须等待消费者的移除操作(take)。
容量为 0,不存储数据,只作为数据传递的渠道。导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

先工作的一方,将被阻塞
先启动消费者,则消费者被阻塞
先启动生产者,则生产者被阻塞.

应用场景
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

一、原理解析

构造函数

无法初始化队列长度,并且长度总是返回0

  1. public SynchronousQueue() {
  2. this(false);
  3. }
  4. public SynchronousQueue(boolean fair) {
  5. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  6. }
  7. public int size() {
  8. return 0;
  9. }

put方法

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. if (transferer.transfer(e, false, 0) == null) {
  4. Thread.interrupted();
  5. throw new InterruptedException();
  6. }
  7. }

transfer方法队列实现方式

  1. //两种实现:队列实现;栈实现
  2. E transfer(E e, boolean timed, long nanos) {
  3. /* Basic algorithm is to loop trying to take either of
  4. * two actions:
  5. *
  6. * 1. If queue apparently empty or holding same-mode nodes,
  7. * try to add node to queue of waiters, wait to be
  8. * fulfilled (or cancelled) and return matching item.
  9. *
  10. * 2. If queue apparently contains waiting items, and this
  11. * call is of complementary mode, try to fulfill by CAS'ing
  12. * item field of waiting node and dequeuing it, and then
  13. * returning matching item.
  14. *
  15. * In each case, along the way, check for and try to help
  16. * advance head and tail on behalf of other stalled/slow
  17. * threads.
  18. *
  19. * The loop starts off with a null check guarding against
  20. * seeing uninitialized head or tail values. This never
  21. * happens in current SynchronousQueue, but could if
  22. * callers held non-volatile/final ref to the
  23. * transferer. The check is here anyway because it places
  24. * null checks at top of loop, which is usually faster
  25. * than having them implicitly interspersed.
  26. */
  27. //put操作:e不为空,take操作:e为空
  28. QNode s = null;
  29. boolean isData = (e != null);//可以标志:是否为put操作
  30. for (;;) {
  31. QNode t = tail;
  32. QNode h = head;
  33. //初始化队列是,在构造函数时,tail、head已经初始化(QNode h = new QNode(null, false);)
  34. if (t == null || h == null) // saw uninitialized value
  35. continue; // spin
  36. //h==t,表示队列为空。默认情况下,QNode中isData为false。
  37. if (h == t || t.isData == isData) { // empty or same-mode
  38. //执行到此处 可能是put线程、也可能是take线程
  39. QNode tn = t.next;
  40. if (t != tail) // inconsistent read
  41. //如果成立,表示其他线程执行完成put操作
  42. continue;
  43. if (tn != null) { // lagging tail
  44. //如果成立,表示其他线程执行完成put操作
  45. advanceTail(t, tn);
  46. continue;
  47. }
  48. if (timed && nanos <= 0) // can't wait
  49. return null;
  50. //构建队列节点,并插入队列尾中
  51. if (s == null)
  52. s = new QNode(e, isData);
  53. if (!t.casNext(null, s)) // failed to link in
  54. continue;
  55. advanceTail(t, s); // swing tail and wait
  56. //【进行阻塞操作】
  57. Object x = awaitFulfill(s, e, timed, nanos);
  58. //唤醒后的操作
  59. if (x == s) { // wait was cancelled
  60. clean(t, s);
  61. return null;
  62. }
  63. if (!s.isOffList()) { // not already unlinked
  64. advanceHead(t, s); // unlink if head
  65. if (x != null) // and forget fields
  66. s.item = s;
  67. s.waiter = null;
  68. }
  69. return (x != null) ? (E)x : e;
  70. //队列不为null。即已经存在元素
  71. } else { // complementary-mode
  72. //执行到此处 可能是put线程、也可能是take线程
  73. QNode m = h.next; // node to fulfill
  74. if (t != tail || m == null || h != head)
  75. continue; // inconsistent read
  76. Object x = m.item;
  77. if (isData == (x != null) || // m already fulfilled
  78. x == m || // m cancelled
  79. !m.casItem(x, e)) { // lost CAS
  80. advanceHead(h, m); // dequeue and retry
  81. continue;
  82. }
  83. //执行唤醒操作。如果当前为put操作,则唤醒阻塞的take线程
  84. //如果当前为take操作,则唤醒阻塞的put操作
  85. advanceHead(h, m); // successfully fulfilled
  86. LockSupport.unpark(m.waiter);
  87. return (x != null) ? (E)x : e;
  88. }
  89. }
  90. }

awaitFulfill方法

阻塞

  1. Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
  2. /* Same idea as TransferStack.awaitFulfill */
  3. //timed==true,则表示阻塞指定时间,否则一直阻塞,直到被唤醒
  4. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  5. 获取当前线程,等待阻塞操作
  6. Thread w = Thread.currentThread();
  7. //获取自旋次数
  8. //不是队列中的节点,则spins=0。
  9. int spins = ((head.next == s) ?
  10. (timed ? maxTimedSpins : maxUntimedSpins) : 0);
  11. for (;;) {
  12. //如果当前线程w,执行中断操作。则移除节点
  13. if (w.isInterrupted())
  14. s.tryCancel(e);
  15. Object x = s.item;
  16. if (x != e)
  17. return x;
  18. if (timed) {
  19. nanos = deadline - System.nanoTime();
  20. if (nanos <= 0L) {
  21. s.tryCancel(e);
  22. continue;
  23. }
  24. }
  25. if (spins > 0)
  26. --spins;
  27. else if (s.waiter == null)
  28. s.waiter = w;
  29. else if (!timed)
  30. //执行阻塞。需要等待被唤醒
  31. LockSupport.park(this);
  32. else if (nanos > spinForTimeoutThreshold)
  33. //阻塞指定时间
  34. LockSupport.parkNanos(this, nanos);
  35. }
  36. }

take方法

  1. public E take() throws InterruptedException {
  2. E e = transferer.transfer(null, false, 0);
  3. if (e != null)
  4. return e;
  5. Thread.interrupted();
  6. throw new InterruptedException();
  7. }