SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取
SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
截屏2022-03-31 22.47.12.png

SynchronousQueue使用

SynchronousQueue有公平和非公平两种模式,公平是用队列,先进先出。非公平用栈结构,先进后出。默认是非公平。

  1. public SynchronousQueue() {
  2. this(false);
  3. }
  4. /**
  5. * Creates a {@code SynchronousQueue} with the specified fairness policy.
  6. *
  7. * @param fair if true, waiting threads contend in FIFO order for
  8. * access; otherwise the order is unspecified.
  9. */
  10. public SynchronousQueue(boolean fair) {
  11. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  12. }
  1. final static BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
  2. public static void main(String[] args) throws InterruptedException {
  3. //TODO 模拟消费者取数据
  4. new Thread(()->{take();},"consumer1").start();
  5. // 控制第一个消费者先调用
  6. Thread.sleep(10);
  7. new Thread(()->{take();},"consumer2").start();
  8. Thread.sleep(100);
  9. //TODO 模拟生产者写数据
  10. new Thread(()->{put(1);},"producer1").start();
  11. // 控制第一个生产者先调用
  12. Thread.sleep(10);
  13. new Thread(()->{put(5);},"producer2").start();
  14. }
  15. public static void take(){
  16. try {
  17. Integer item = blockingQueue.take();
  18. System.out.println(Thread.currentThread().getName() + " Take: " + item);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. public static void put(Integer item){
  24. try {
  25. blockingQueue.put(item);
  26. System.out.println(Thread.currentThread().getName() + " Put: " + item);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }

非公平的模式下:
producer�1的任务被consumer2消费了。而producer2的任务被consumer1消费了。
截屏2022-03-31 23.13.36.png
公平模式的情况下:
producer�1的任务被consumer1消费了。而producer2的任务被consumer2消费了。
截屏2022-03-31 23.17.35.png

03-31SynchronousQueue - 图4
03-31SynchronousQueue - 图5

应用场景

  • SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
  • SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。