接口

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. boolean add(E e);//插入方法,如果队列不可用时抛出异常
  3. boolean offer(E e);//插入方法,队列不可用时返回false
  4. void put(E e) throws InterruptedException;//插入方法,队列不可用时会一直阻塞
  5. boolean offer(E e, long timeout, TimeUnit unit)//插入方法,队列不可用时,超时退出
  6. throws InterruptedException;
  7. E take() throws InterruptedException;//获取,一直阻塞直到有值
  8. E poll(long timeout, TimeUnit unit)//获取,超时退出
  9. throws InterruptedException;
  10. int remainingCapacity();
  11. boolean remove(Object o);//移除
  12. public boolean contains(Object o);
  13. int drainTo(Collection<? super E> c);
  14. int drainTo(Collection<? super E> c, int maxElements);
  15. }

java中的阻塞队列

  • ArrayBlockingQueue 数组组成的有界阻塞队列。 默认不保证公平性,可以选择公平锁,FIFO
  • LinkedBlockingQueue 链表结构组成的有界阻塞队列 默认Integer.MAX_VALUE
  • PriorityBlockingQueue 支持优先级排序的无界阻塞队列 默认自然顺序升序,可以用compareTo()指定元素排序规则
  • DelayBlockingQueue 使用优先级队列实现的无界阻塞队列 支持延迟获取。创建元素时可以指定多久才能重队列中获取当前元素
  • SynchronousQueue 不存储元素的阻塞队列 默认非公平,支持公平
  • LinkedTransferQueue 由链表结构组成的无界阻塞队列 多了transfer/tryTransfer来返回消费结果
  • LinkedBlockingDeque 由链表结构组成的双向阻塞队列 双向队列可以两端插入和移除元素

    ArrayBlockingQueue源码分析

    构造器

    1. public ArrayBlockingQueue(int capacity, boolean fair) {
    2. if (capacity <= 0)//
    3. throw new IllegalArgumentException();
    4. this.items = new Object[capacity];//初始化一个传进来长度的Object数组
    5. lock = new ReentrantLock(fair);//创建重入锁
    6. notEmpty = lock.newCondition();//notEmpty阻塞Condition
    7. notFull = lock.newCondition();//notFull阻塞Condition
    8. }
    1. public ArrayBlockingQueue(int capacity, boolean fair,
    2. Collection<? extends E> c) {
    3. this(capacity, fair);//初始化数组
    4. final ReentrantLock lock = this.lock;
    5. lock.lock(); // Lock only for visibility, not mutual exclusion 锁只用于可见性,而不是互斥
    6. try {
    7. int i = 0;
    8. try {
    9. for (E e : c) {//遍历初始值
    10. checkNotNull(e);
    11. items[i++] = e;//放入队列
    12. }
    13. } catch (ArrayIndexOutOfBoundsException ex) {
    14. throw new IllegalArgumentException();
    15. }
    16. count = i;
    17. putIndex = (i == capacity) ? 0 : i;
    18. } finally {
    19. lock.unlock();
    20. }
    21. }

    带初始化值的队列。先初始化组织,然后通过加锁的方式,把初始化值加入队列中。这里加锁,是为了保证后续往队列里添加值得数组可见到初始化得值。及解决可见性问题。
    重入锁之所以能解决可见性问题,是因为,他在获取锁和释放锁得时候修改了volatile修饰的state。volatile是由mesi协议实现的。volatile修饰的变量的时候,编译成指令的时候会在前后插入内存屏障来保证可见性问题。

    put阻塞式方法添加数据

    1. public void put(E e) throws InterruptedException {
    2. checkNotNull(e);//元素为空
    3. final ReentrantLock lock = this.lock;//重入锁
    4. lock.lockInterruptibly();//加锁
    5. try {
    6. while (count == items.length)//队列满了
    7. notFull.await();//阻塞在这里
    8. enqueue(e);//真正放入队列的方法
    9. } finally {
    10. lock.unlock();//解锁
    11. }
    12. }

    队列会通过加重入锁的方式去添加数据,如果队列满了。用一个notFull阻塞在加入队列之前

    1. private void enqueue(E x) {
    2. // assert lock.getHoldCount() == 1;
    3. // assert items[putIndex] == null;
    4. final Object[] items = this.items;//获取数组
    5. items[putIndex] = x;//往队列里放数据
    6. if (++putIndex == items.length)//放入指针到数组指到数组尽头
    7. putIndex = 0;//重数组0继续开始
    8. count++;//长度+1
    9. notEmpty.signal();//唤醒notEmpty阻塞的condition
    10. }

    数据添加到队列后,唤醒notEmpty阻塞的condition。这样因为空被阻塞的notEmpty队列就会被唤醒

    take()阻塞式获取数据

    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;//同一把锁
    3. lock.lockInterruptibly();//锁住保证线程安全
    4. try {
    5. while (count == 0)//队列里无值
    6. notEmpty.await();//notEmpty阻塞
    7. return dequeue();//真的获取值的地方
    8. } finally {
    9. lock.unlock();
    10. }
    11. }

    获取的时候会跟放入的时候获取同一把锁

    1. private E dequeue() {
    2. // assert lock.getHoldCount() == 1;
    3. // assert items[takeIndex] != null;
    4. final Object[] items = this.items;
    5. @SuppressWarnings("unchecked")
    6. E x = (E) items[takeIndex];//从获取值指针处取值
    7. items[takeIndex] = null;//获取值之后将位置清空,方便gc回收
    8. if (++takeIndex == items.length)//获取到尾部,指针重O开始
    9. takeIndex = 0;
    10. count--;
    11. if (itrs != null)
    12. itrs.elementDequeued();//同时更新迭代器中的元素数据
    13. notFull.signal();//唤醒notFull的阻塞
    14. return x;
    15. }