SynchronousQueue

没有数据缓冲的BlockingQueue
生产者线程对其的插入操作put必须等待消费者的移除操作take
image.png
容量为0
每次取数据都会先阻塞,直到数据被放入,同理:每次放输入也会等消费者来取
所做的就是直接传递(direct handoff)
不需要存储,

应用场景

适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
Executors.newCachedThreadPool()就使用了SynchronousQueue,有根据需求创建新的线程,如果有空闲就会重复使用
线程空闲60秒,会被回收
不确定来自生产者请求数量,但这些请求需要很快处理掉,适合为每一个生产者请求分配一个消费线程的处理高效的方法

SynchronousQueue使用

BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
公平非公平,一个是队列结构,一个是栈结构

image.png

PriorityBlockingQueue使用

  1. //创建优先级阻塞队列 Comparator为null,自然排序
  2. PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(5);
  3. //自定义Comparator
  4. PriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(
  5. 5, new Comparator<Integer>() {
  6. // 重写比较方法
  7. @Override
  8. public int compare(Integer o1, Integer o2) {
  9. return o2-o1;
  10. }
  11. }

如果构建一个有限队列的思想:

底层实现:二叉堆

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,
二叉堆又可以分为两个类型:

  1. 大顶堆:父结点的键值大于或等于任何一个子结点的键值
  2. 小顶堆:父结点的键值小于或等于任何一个子结点的键值

image.png

  1. //Inserts the specified element into this priority queue.
  2. public boolean offer(E e) {
  3. if (e == null)
  4. throw new NullPointerException();
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. int n, cap;
  8. Object[] array;
  9. while ((n = size) >= (cap = (array = queue).length))
  10. //Tries to grow array to accommodate at least one more element
  11. tryGrow(array, cap);
  12. try {
  13. Comparator<? super E> cmp = comparator;
  14. if (cmp == null)
  15. //没有定义comparator 就走默认的排序规定
  16. siftUpComparable(n, e, array);
  17. else
  18. //走自定义的比较规则
  19. siftUpUsingComparator(n, e, array, cmp);
  20. size = n + 1;
  21. notEmpty.signal();
  22. } finally {
  23. lock.unlock();
  24. }
  25. return true;
  26. }
  27. private void tryGrow(Object[] array, int oldCap) {
  28. lock.unlock(); // must release and then re-acquire main lock
  29. Object[] newArray = null;
  30. if (allocationSpinLock == 0 &&
  31. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  32. 0, 1)) {
  33. try {
  34. int newCap = oldCap + ((oldCap < 64) ?
  35. (oldCap + 2) : // grow faster if small
  36. (oldCap >> 1));
  37. if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
  38. int minCap = oldCap + 1;
  39. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
  40. throw new OutOfMemoryError();
  41. newCap = MAX_ARRAY_SIZE;
  42. }
  43. if (newCap > oldCap && queue == array)
  44. newArray = new Object[newCap];
  45. } finally {
  46. allocationSpinLock = 0;
  47. }
  48. }
  49. if (newArray == null) // back off if another thread is allocating
  50. Thread.yield();
  51. lock.lock();
  52. if (newArray != null && queue == array) {
  53. queue = newArray;
  54. System.arraycopy(array, 0, newArray, 0, oldCap);
  55. }
  56. }

image.png

应用场景

根据客户优先级的排队

DelayQueue

支持延时获取元素的阻塞队列, 采用优先队列 PriorityQueue 存储元素,
同时元素必须实现 Delayed 接口。设置延迟时间

延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
无界队列

  1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  2. implements BlockingQueue<E> {
  3. private final transient ReentrantLock lock = new ReentrantLock();\
  4. //采用priorityQueue存储元素
  5. private final PriorityQueue<E> q = new PriorityQueue<E>();
  6. }
  1. public interface Delayed extends Comparable<Delayed> {
  2. //getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
  3. //如果返回 0 或者负数则代表任务已过期。
  4. //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
  5. long getDelay(TimeUnit unit);
  6. }

image.png

DelayQueue使用

  1. DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();
  2. // 对象需要实现delayed 接口
  3. class OrderInfo implements Delayed {
  4. private String name;
  5. private long time; //延时时间
  6. @Override
  7. public long getDelay(TimeUnit unit) {
  8. long diff = time - System.currentTimeMillis();
  9. return unit.convert(diff, TimeUnit.MILLISECONDS);
  10. }
  11. @Override
  12. public int compareTo(Delayed obj) {
  13. if (this.time < ((DelayObject) obj).time) {
  14. return -1;
  15. }
  16. if (this.time > ((DelayObject) obj).time) {
  17. return 1;
  18. }
  19. return 0;
  20. }
  21. }

DelayQueue的原理

数据结构

  1. //用于保证队列操作的线程安全
  2. private final transient ReentrantLock lock = new ReentrantLock();
  3. // 优先级队列,存储元素,用于保证延迟低的优先执行
  4. private final PriorityQueue<E> q = new PriorityQueue<E>();
  5. // 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
  6. private Thread leader = null;
  7. // 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
  8. private final Condition available = lock.newCondition();
  9. public DelayQueue() {}
  10. public DelayQueue(Collection<? extends E> c) {
  11. this.addAll(c);
  12. }

入队put方法

  1. public void put(E e) {
  2. offer(e);
  3. }
  4. public boolean offer(E e) {
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. // 入队
  9. q.offer(e);
  10. if (q.peek() == e) {
  11. // 若入队的元素位于队列头部,说明当前元素延迟最小
  12. // 将 leader 置空
  13. leader = null;
  14. // available条件队列转同步队列,准备唤醒阻塞在available上的线程
  15. available.signal();
  16. }
  17. return true;
  18. } finally {
  19. lock.unlock(); // 解锁,真正唤醒阻塞的线程
  20. }
  21. }入队put方法

出队take方法

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. for (;;) {
  6. E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
  7. if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
  8. available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
  9. else {
  10. long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间
  11. if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
  12. return q.poll();
  13. // 如果delay大于0 ,则下面要阻塞了
  14. // 将first置为空方便gc
  15. first = null;
  16. // 如果有线程争抢的Leader线程,则进行无限期等待。
  17. if (leader != null)
  18. available.await();
  19. else {
  20. // 如果leader为null,把当前线程赋值给它
  21. Thread thisThread = Thread.currentThread();
  22. leader = thisThread;
  23. try {
  24. // 等待剩余等待时间
  25. available.awaitNanos(delay);
  26. } finally {
  27. // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
  28. if (leader == thisThread)
  29. leader = null;
  30. }
  31. }
  32. }
  33. }
  34. } finally {
  35. // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
  36. if (leader == null && q.peek() != null)
  37. // available条件队列转同步队列,准备唤醒阻塞在available上的线程
  38. available.signal();
  39. // 解锁,真正唤醒阻塞的线程
  40. lock.unlock();
  41. }
  42. }
  1. 获取锁
  2. 获取最早过期的元素,先不弹出元素素。
  3. 判断最早是否为空,如果是空就无线等吧,
  4. 如果为不为空,判断剩余的过期时间,
  5. 如果已经过期则直接返回当前元素
  6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,就是说还轮不到当前的线程执行,当前线程需要进行无限期等待,如果Leader为空,就可以只需等待剩余的时间就ok了
  7. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

总结就是:如果获取的对象以及过期就直接出队,如果哦没有就等下剩余的时间

如何选择适合的阻塞队列

线程池对于阻塞队列的选择

  • FixedThreadPool———> LinkedBlockingQueue
  • SingleThreadExecutor ———> LinkedBlockingQueue
  • CachedThreadPool ———> SynchronousQueue
  • ScheduledThreadPool ——->DelayQueue
  • SingleThreadScheduledExecutor ——->DelayQueue
    选择策略
    通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
  1. 功能:
    需要排序?延迟?
  2. 容量:
    考虑并发量,和业务需求是否有大量的任务
    固定容量的:ArrayBlockingQueue
    无界队列:LinkedBlockingQueue,DelayQueue
  3. 是否扩容
    看业务是否稳定类型,还是高并发类型
    不能自动扩容:ArrayBlockingQueue
    会自动扩容:PriorityBlockingQueue
  4. 内存结构:
    考虑内存利用率的性能
    ArrayBlockingQueue:数据实现,没有所谓的结点,利用率高
    LinkedBlockingQueue :链表实现,多了一层结点
  5. 性能:
    ArrayBlockingQueue :一把锁,读写都会
    LinkedBlockingQueue :两把锁,读写分离
    SynchronousQueue :数据直传,没有存储的过程