使用最多的场景就是线程池
ArrayBlockingQueue
LinkedBlockingQueue
SynchonousQueue
jdk提供的四种线程池
定长 fix LinkedBlockingQueue
单一 single LinkedBlockingQueue
变长 cache SynchonousQueue
数组实现:维护一个数组
链表实现:维护一个内部类Node
ArrayBlockingQueue
底层基于数组,具有阻塞的功能,FIFO,生产消费者模式
Queue提供的api
add,offer 插入元素
remove,poll 取元素
element,peek 查询元素
抛异常 阻塞 超时 返回特殊值(一般情况都是boolean,null)
插入元素(遇到慢的情况) add put offer offer
取元素(删除取出的元素) remove take poll poll
查询元素(对队列没有影响) element peek
ArrayBlockingQueue结构
items
存放元素
takeIndex
取元素的指针,记录下一次出队的位置
putIndex
存元素的指针,记录下一次入队的位置 (指向下一个空位置)
两个指针可以形成一个环形数组,take追着put跑,当put到数组的最大值的时候会将putIndex指向下标为0的节点,takeIndex也是从下标为0的节点开始处理,这样会重复利用数组的节点,当takeIndex==putIndex说明元素被取空了或者满了
count
计算元素个数
ReentrantLook
ArrayBlockingQueue中维护了一把锁,入队和出队是相互阻塞的
Condition
有两个Condition是通过ReentrantLook来获取的,这里使用到await和signal,lock.newCondition返回的对象await必须要用同一个对象signal唤醒
notEmpty 出队的对象,说明队列不为空,唤醒的是take
notFull 入队的对象,说明队列
LinkedBlockingQueue(FIFO)
容量:默认取得是Integer.MAX_VALUE
内部使用两把锁,所以使用AtomicInteger来计数保证原子性,出队入队用的不同的锁,互相不阻塞
属性:
AtomicInteger count 用来计数
Node head 头节点,初始化的时候 last = head = new Node
Node last :尾节点,入队的时候 last = last.next = node
出队的时候从头节点出队,把当前head=head.next, 吧head.item返回
出队代码:
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
入队代码:
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
SynchronousQueue
无锁的队列,使用UNSAFE保证线程安全,主要就是cas
无缓冲队列,不存储元素,用来阻塞线程,必须匹配到有一个入队一个出队两个线程才会同时返回
一个线程put,一个线程take才会执行,否则就是阻塞
阻塞的是一组线程,把线程维护起来,有公平和非公平的概念
通过isData来判断是出队还是入队,要是和阻塞的线程的isData不匹配或者队列为空,那就阻塞
ConcurrentLinkedQueue
单向链表
无锁的,通过CAS来实现线程安全,不用切换线程上下文,但是再并发大的时候会导致线程一直自旋使CPU飙高
内部维护了一个head和一个tail两个node节点,其中再添加元素的时候tail节点不是每次添加玩都会去移动,是每入队两次tail才移动一次,这样减少了tail的移动次数,tail.next == null 那么tail就是真正的尾节点(如果tail直接指向尾节点的话因为设置tail是CAS操作,性能上不如两次移动一次)
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
// 这里设置移动tail节点,第一次p == t 就不会设置tail节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) // p.next = p 并发下可能会p.next = p
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
优先级队列
PriorityBlockingQueue(线程安全)/priorityQueue(非线程安全)
内部维护了一个数组,初始容量是11
小顶堆:
入队的时候向上浮(入队的时候先把元素放到最后,然后向上浮)
出队向下沉(出队的时候吧最后一个元素放到出队元素的地方然后向下沉)
数组的第一个元素为空目的就是快速根据子节点找到父节点的位置 index/2 向下取整 = 父节点下标
可以进行排序
@Test
public void test5(){
int[] arr = {3,2,5,8,1,9};
PriorityQueue<Integer> priorityQueue = new PriorityQueue(arr.length);
for(int i : arr){
priorityQueue.add(i);
}
Iterator iterator = priorityQueue.iterator();
while (iterator.hasNext()){
System.out.print(iterator.next() + ",");
}
// 输出:1,2,5,8,3,9, 迭代器没有顺序
int size = priorityQueue.size();
for (int i = 0; i < size; i++) {
System.out.print(priorityQueue.poll() + ",");
}
// 输出:1,2,3,5,8,9, poll是有顺序的
}
入队:
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
// 这里执行扩容的方法,使用的cas,因为已进入方法就吧锁释放了
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
// 这里释放锁的原因是不阻塞读
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 只需要一个线程做扩容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// 让扩容后的线程优先拿到资源
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
DelayQueue 延迟队列
具有阻塞的功能,线程安全
// 创建一个延迟线程池里面是通过它的内部类DelayedWorkQueue来做的延迟功能,但是原理和DelayQueue差不多
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
private final transient ReentrantLock lock = new ReentrantLock();
// 内部调用的就是优先级队列的方法
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 当前是否有线程再排队,用于取元素的时候,当取得时候发现有线程在排队那就不用取了,直接排 //队就好了
private Thread leader = null;
private final Condition available = lock.newCondition();
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队直接调用PriorityQueue的offer方法
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
扩展:禁止指令重拍的方式
volatile,final,sync,lock