queue使用
public static void main(String[] args) { LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE); /** throw new Exception * add实际用的 offer(e) add会抛出异常 * offer 会拒绝 * put 会阻塞 * private final ReentrantLock putLock = new ReentrantLock()来保证线程安全 * private final Condition notEmpty = takeLock.newCondition();保证阻塞线程的唤醒 */ linkedBlockingQueue.add("1"); linkedBlockingQueue.remove(); /** return false */ linkedBlockingQueue.offer("3"); linkedBlockingQueue.poll(); /** throws InterruptedException 阻塞 */ try { linkedBlockingQueue.put("2"); linkedBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); }}
init queue
/**
* 001 init queue node
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
offer方法
/**
* 002 offer
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//putLock
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//判断队列是否已满
if (count.get() < capacity) {
// last = last.next = node;
enqueue(node);
c = count.getAndIncrement();
// 判断队列是否有可用空间,如果有唤醒下一个阻塞线程进行添加操作
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果添加成功,唤醒其他阻塞任务
if (c == 0)
signalNotEmpty();
return c >= 0;
}
enqueue 尾加
/**
* 003 enqueue
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
poll方法
/**
* 004 poll
*/
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// takeLock 用于移除数据时的lock
// 移除数据有多种方法故统一用takeLock,保证不会相互冲突
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
//cas减操作
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue头删
/**
* 005 dequeue 头删
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}