接口
public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);//插入方法,如果队列不可用时抛出异常boolean offer(E e);//插入方法,队列不可用时返回falsevoid put(E e) throws InterruptedException;//插入方法,队列不可用时会一直阻塞boolean offer(E e, long timeout, TimeUnit unit)//插入方法,队列不可用时,超时退出throws InterruptedException;E take() throws InterruptedException;//获取,一直阻塞直到有值E poll(long timeout, TimeUnit unit)//获取,超时退出throws InterruptedException;int remainingCapacity();boolean remove(Object o);//移除public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);}
java中的阻塞队列
- ArrayBlockingQueue 数组组成的有界阻塞队列。 默认不保证公平性,可以选择公平锁,FIFO
- LinkedBlockingQueue 链表结构组成的有界阻塞队列 默认Integer.MAX_VALUE
- PriorityBlockingQueue 支持优先级排序的无界阻塞队列 默认自然顺序升序,可以用compareTo()指定元素排序规则
- DelayBlockingQueue 使用优先级队列实现的无界阻塞队列 支持延迟获取。创建元素时可以指定多久才能重队列中获取当前元素
- SynchronousQueue 不存储元素的阻塞队列 默认非公平,支持公平
- LinkedTransferQueue 由链表结构组成的无界阻塞队列 多了transfer/tryTransfer来返回消费结果
LinkedBlockingDeque 由链表结构组成的双向阻塞队列 双向队列可以两端插入和移除元素
ArrayBlockingQueue源码分析
构造器
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)//throw new IllegalArgumentException();this.items = new Object[capacity];//初始化一个传进来长度的Object数组lock = new ReentrantLock(fair);//创建重入锁notEmpty = lock.newCondition();//notEmpty阻塞ConditionnotFull = lock.newCondition();//notFull阻塞Condition}
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);//初始化数组final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusion 锁只用于可见性,而不是互斥try {int i = 0;try {for (E e : c) {//遍历初始值checkNotNull(e);items[i++] = e;//放入队列}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}
带初始化值的队列。先初始化组织,然后通过加锁的方式,把初始化值加入队列中。这里加锁,是为了保证后续往队列里添加值得数组可见到初始化得值。及解决可见性问题。
重入锁之所以能解决可见性问题,是因为,他在获取锁和释放锁得时候修改了volatile修饰的state。volatile是由mesi协议实现的。volatile修饰的变量的时候,编译成指令的时候会在前后插入内存屏障来保证可见性问题。put阻塞式方法添加数据
public void put(E e) throws InterruptedException {checkNotNull(e);//元素为空final ReentrantLock lock = this.lock;//重入锁lock.lockInterruptibly();//加锁try {while (count == items.length)//队列满了notFull.await();//阻塞在这里enqueue(e);//真正放入队列的方法} finally {lock.unlock();//解锁}}
队列会通过加重入锁的方式去添加数据,如果队列满了。用一个notFull阻塞在加入队列之前
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//获取数组items[putIndex] = x;//往队列里放数据if (++putIndex == items.length)//放入指针到数组指到数组尽头putIndex = 0;//重数组0继续开始count++;//长度+1notEmpty.signal();//唤醒notEmpty阻塞的condition}
数据添加到队列后,唤醒notEmpty阻塞的condition。这样因为空被阻塞的notEmpty队列就会被唤醒
take()阻塞式获取数据
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//同一把锁lock.lockInterruptibly();//锁住保证线程安全try {while (count == 0)//队列里无值notEmpty.await();//notEmpty阻塞return dequeue();//真的获取值的地方} finally {lock.unlock();}}
获取的时候会跟放入的时候获取同一把锁
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];//从获取值指针处取值items[takeIndex] = null;//获取值之后将位置清空,方便gc回收if (++takeIndex == items.length)//获取到尾部,指针重O开始takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//同时更新迭代器中的元素数据notFull.signal();//唤醒notFull的阻塞return x;}
