Java 队列


queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue 阻塞 可配置 存取采用2把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
ConcurrentLinkedQueue 非阻塞 无界 CAS 对全局的集合进行操作的场景 size() 是要遍历一遍集合,慎用


  • ArrayBlockingQueue用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
  • LinkedBlockingQueue用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。


  • ArrayBlockingQueue有界,适合已知最大存储容量的场景

  • LinkedBlockingQueue可有界可以无界



    Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.



  • ArrayBlockingQueue采用一把锁,两个condition ```java /* Main lock guarding all access / final ReentrantLock lock;

/* Condition for waiting takes / private final Condition notEmpty;

/* Condition for waiting puts / private final Condition notFull;


  1. * Inserts element at current put position, advances, and signals.
  2. * Call only when holding lock.
  3. */

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }


  1. * Extracts element at current take position, advances, and signals.
  2. * Call only when holding lock.
  3. */

private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings(“unchecked”) E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count—; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

  1. 此外还支持公平锁
  2. ```java
  3. /**
  4. * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  5. * capacity and the specified access policy.
  6. *
  7. * @param capacity the capacity of this queue
  8. * @param fair if {@code true} then queue accesses for threads blocked
  9. * on insertion or removal, are processed in FIFO order;
  10. * if {@code false} the access order is unspecified.
  11. * @throws IllegalArgumentException if {@code capacity < 1}
  12. */
  13. public ArrayBlockingQueue(int capacity, boolean fair) {
  14. if (capacity <= 0)
  15. throw new IllegalArgumentException();
  16. this.items = new Object[capacity];
  17. lock = new ReentrantLock(fair);
  18. notEmpty = lock.newCondition();
  19. notFull = lock.newCondition();
  20. }
  • LinkedBlockingQueue头尾各1把锁 ```java /* Lock held by take, poll, etc / private final ReentrantLock takeLock = new ReentrantLock();

/* Wait queue for waiting takes / private final Condition notEmpty = takeLock.newCondition();

/* Lock held by put, offer, etc / private final ReentrantLock putLock = new ReentrantLock();

/* Wait queue for waiting puts / private final Condition notFull = putLock.newCondition();


  1. * Inserts the specified element at the tail of this queue if it is
  2. * possible to do so immediately without exceeding the queue's capacity,
  3. * returning {@code true} upon success and {@code false} if this queue
  4. * is full.
  5. * When using a capacity-restricted queue, this method is generally
  6. * preferable to method {@link BlockingQueue#add add}, which can fail to
  7. * insert an element only by throwing an exception.
  8. *
  9. * @throws NullPointerException if the specified element is null
  10. */

public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }

public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }

  1. <a name="fBnvg"></a>
  2. ### 应用实例
  3. <a name="QBcQh"></a>
  4. #### `Executors`用了`LinkedBlockingQueue`
  5. ```java
  6. public static ExecutorService newFixedThreadPool(int nThreads) {
  7. return new ThreadPoolExecutor(nThreads, nThreads,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>());
  10. }
  11. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  12. return new ThreadPoolExecutor(nThreads, nThreads,
  13. 0L, TimeUnit.MILLISECONDS,
  14. new LinkedBlockingQueue<Runnable>(),
  15. threadFactory);
  16. }
  17. public static ExecutorService newSingleThreadExecutor() {
  18. return new FinalizableDelegatedExecutorService
  19. (new ThreadPoolExecutor(1, 1,
  20. 0L, TimeUnit.MILLISECONDS,
  21. new LinkedBlockingQueue<Runnable>()));
  22. }
  23. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  24. return new FinalizableDelegatedExecutorService
  25. (new ThreadPoolExecutor(1, 1,
  26. 0L, TimeUnit.MILLISECONDS,
  27. new LinkedBlockingQueue<Runnable>(),
  28. threadFactory));
  29. }


  1. public class BungeeLogger extends Logger {
  2. private final ColouredWriter writer;
  3. private final Formatter formatter = new ConciseFormatter();
  4. // private final LogDispatcher dispatcher = new LogDispatcher(this);
  5. private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();
  6. volatile boolean running = true;
  7. Thread recvThread = new Thread(){
  8. @Override
  9. public void run() {
  10. while (!isInterrupted() && running) {
  11. LogRecord record;
  12. try {
  13. record = queue.take();
  14. } catch (InterruptedException ex) {
  15. continue;
  16. }
  17. doLog(record);
  18. }
  19. for (LogRecord record : queue) {
  20. doLog(record);
  21. }
  22. }
  23. };
  24. public BungeeLogger() throws IOException {
  25. super("BungeeCord", null);
  26. this.writer = new ColouredWriter(new ConsoleReader());
  27. try {
  28. FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);
  29. handler.setFormatter(formatter);
  30. addHandler(handler);
  31. } catch (IOException ex) {
  32. System.err.println("Could not register logger!");
  33. ex.printStackTrace();
  34. }
  35. recvThread.start();
  36. Runtime.getRuntime().addShutdownHook(new Thread(){
  37. @Override
  38. public void run() {
  39. running = false;
  40. }
  41. });
  42. }
  43. @Override
  44. public void log(LogRecord record) {
  45. if (running) {
  46. queue.add(record);
  47. }
  48. }
  49. void doLog(LogRecord record) {
  50. super.log(record);
  51. writer.print(formatter.format(record));
  52. }
  53. }