有界队列

有固定大小的队列

ArrayBlockingQueue

数组实现的有界阻塞队列 按照先进先出的原则 对元素进行排序

  1. //capacity = 队列长度 fair = 公平非公平
  2. public ArrayBlockingQueue(int capacity, boolean fair) {
  3. if (capacity <= 0)
  4. throw new IllegalArgumentException();
  5. this.items = new Object[capacity];
  6. lock = new ReentrantLock(fair);
  7. notEmpty = lock.newCondition();
  8. notFull = lock.newCondition();
  9. }
  1. public ArrayBlockingQueue(int capacity, boolean fair,
  2. Collection<? extends E> c) {
  3. this(capacity, fair);
  4. final ReentrantLock lock = this.lock;
  5. lock.lock(); // Lock only for visibility, not mutual exclusion
  6. try {
  7. int i = 0;
  8. try {
  9. for (E e : c) {
  10. checkNotNull(e);
  11. items[i++] = e;
  12. }
  13. } catch (ArrayIndexOutOfBoundsException ex) {
  14. throw new IllegalArgumentException();
  15. }
  16. count = i;
  17. putIndex = (i == capacity) ? 0 : i;
  18. } finally {
  19. lock.unlock();
  20. }
  21. }

LinkedBlockingQueue

链表实现的有界队列 默认最大长度为Integer.MAX_VALUE 按照先进先出的原则对元素进行排序

无界队列

没有固定大小的队列

PriorityBlockingQueue

支持优先级的无界阻塞队列 默认情况下元素按照自然顺序升序排序 也可以通过自定义类实现compareTo方法来指定元素排序规则 或者在初始化的时候进行排序

DelayQueue(延时队列)

优先级队列实现的无界阻塞队列

SynchronousQueue

不存储元素的阻塞队列 每一个put操作必须等待一个take操作 否则不能继续添加元素

LinkedTransferQueue

由链表实现的无界阻塞队列 TransferQueue 相对其他阻塞队列 多了 tryTransfer和transfer方法

LinkedBlockingQueue

由链表实现的双向阻塞队列 双向阻塞队列的好处是多线程入队时 可以减少竞争

Dingtalk_20211101144011.jpg

Put()

队列满了 阻塞当前添加元素的线程

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. // Note: convention in all put/take/etc is to preset local var
  4. // holding count negative to indicate failure unless set.
  5. int c = -1;
  6. //根据添加的元素构建一个node
  7. Node<E> node = new Node<E>(e);
  8. final ReentrantLock putLock = this.putLock;
  9. final AtomicInteger count = this.count;
  10. //加锁
  11. putLock.lockInterruptibly();
  12. try {
  13. /*
  14. * Note that count is used in wait guard even though it is
  15. * not protected by lock. This works because count can
  16. * only decrease at this point (all other puts are shut
  17. * out by lock), and we (or some other waiting put) are
  18. * signalled if it ever changes from capacity. Similarly
  19. * for all other uses of count in other wait guards.
  20. */
  21. //如果队列元素满了 阻塞
  22. while (count.get() == capacity) {
  23. notFull.await();
  24. }
  25. //将node 添加到链表中
  26. enqueue(node);
  27. //元素数递增
  28. c = count.getAndIncrement();
  29. //当队列中元素小于最大容量的时候唤醒阻塞的生产者线程
  30. if (c + 1 < capacity)
  31. notFull.signal();
  32. } finally {
  33. putLock.unlock();
  34. }
  35. if (c == 0)
  36. signalNotEmpty();
  37. }

take()

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. //拿到队列长度
  5. final AtomicInteger count = this.count;
  6. final ReentrantLock takeLock = this.takeLock;
  7. //加锁
  8. takeLock.lockInterruptibly();
  9. try {
  10. //如果队列长度== 0 阻塞
  11. while (count.get() == 0) {
  12. notEmpty.await();
  13. }
  14. //从队列中获取元素
  15. x = dequeue();
  16. //元素数递减
  17. c = count.getAndDecrement();
  18. //如果队列长度>1 唤醒处于阻塞状态下的消费者线程
  19. if (c > 1)
  20. notEmpty.signal();
  21. } finally {
  22. takeLock.unlock();
  23. }
  24. if (c == capacity)
  25. signalNotFull();
  26. return x;
  27. }