SynchronousQueue

特性:

SynchronousQueue没有一个地方来暂存元素,即它的容量为 0。
每次取数据都要先阻塞,直到有数据被放入;
同理,每次放数据的时候也会阻塞,直到有消费者来取。
SynchronousQueue的队列中封装的节点更多针对的不是数据,而是要执行的操作,

应用场景:

Executors.newCachedThreadPool()就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则 会重复使用,线程空闲了60秒后会被回收。

数据结构:

链表
先消费后生产的场景:
1:第一个线程Thread0是消费者访问,此次队列为空,则入队(创建Node结点并赋值)
2:第二个线程Thread1也是消费者访问,继续入队
3:第三个线程Thread2是生产者,携带了数据,直接将该线程携带的数据返回给队首的消费者,并唤醒队首线程Thread1出队
先生产后消费的场景,原理一样

锁:

CAS+自旋 ,自旋一定次数后调用LockSupport.park

公平与非公平实现:

非公平模式(默认策略):底层使用TransferStack,栈顶匹配,栈顶出栈,后进先出

  1. volatile SNode next; // 指向下一个节点的指针
  2. volatile SNode match; // 存放和它进行匹配的节点
  3. volatile Thread waiter; // 保存阻塞的线程
  4. Object item;
  5. int mode;
  6. SNode(Object item) {
  7. this.item = item;
  8. }

公平模式:底层使用TransferQueue, 队尾匹配(判断模式),队头出队,先进先出

  1. volatile QNode next; // next node in queue
  2. volatile Object item; // CAS'ed to or from null
  3. volatile Thread waiter; // to control park/unpark
  4. final boolean isData;
  5. QNode(Object item, boolean isData) {
  6. this.item = item;
  7. this.isData = isData;
  8. }

PriorityBlockingQueue

  1. /**
  2. * 数据结构:数组+二叉树
  3. */
  4. private transient Object[] queue;
  5. //大小
  6. private transient int size;
  7. //比较器
  8. private transient Comparator<? super E> comparator;
  9. //一把锁
  10. private final ReentrantLock lock;
  11. //等待条件
  12. private final Condition notEmpty;

1)一个支持优先级排序的无界堵塞队列:
入队:根据比较器进行排序
出队:优先级高的先出队、优先级低的后出队

数据结构:

物理上是用数组去存储,会自动扩容
逻辑上用二叉堆去排序
关于二叉堆知识的补充:
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始 排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不 同,二叉堆又可以分为两个类型: 大顶堆和小顶堆。

  • 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
  • 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。

image.png
image.png
入队的时候小顶堆相关代码:

  1. private static <T> void siftUpComparable(int k, T x, Object[] array) {
  2. Comparable<? super T> key = (Comparable<? super T>) x;
  3. while (k > 0) {
  4. int parent = (k - 1) >>> 1;
  5. Object e = array[parent];
  6. if (key.compareTo((T) e) >= 0)
  7. break;
  8. array[k] = e;
  9. k = parent;
  10. }
  11. array[k] = key;
  12. }

出队

    /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

2)应用场景:vip用户插队

DelayQueue

image.png
1)DelayQueue 是一个支持延时获取元素的阻塞队列,
入队:元素会根据延迟时间的长短被放到队列中,延迟时间较短的放队列前面
出队:堆顶元素过期时间小于等于0就出队,否则堵塞
2)应用场景:处理延迟任务,具体case

  • 清理设置了过期时间的缓存
  • 订单未支付,超时自动关闭 ```java //存储的数据结构 private final PriorityQueue q = new PriorityQueue();

    //一把锁 private final transient ReentrantLock lock = new ReentrantLock();

    //等待条件 private final Condition available = lock.newCondition();

```

如何选择适合的阻塞队列

image.png
1)考虑功能能否实现,如优先级排序、延迟 执行等。
2)容量
根据任务数量来推算出 合适的容量,从而去选取合适的 BlockingQueue。
固定容量:ArrayBlockingQueue
无限容量:LinkedBlockingQueue,DelayQueue
无容量:SynchronousQueue
自动扩容:PriorityBlockingQueue
3)性能
从支持高并发角度去看,LinkedBlockingQueue比ArrayBlockingQueue高

fox老师对于堵塞队列总结的脑图
https://www.processon.com/view/link/618ce3941e0853689b0818e2#map