queue使用

  1. public static void main(String[] args) {
  2. LinkedBlockingQueue<String> linkedBlockingQueue =
  3. new LinkedBlockingQueue<>(Integer.MAX_VALUE);
  4. /** throw new Exception
  5. * add实际用的 offer(e) add会抛出异常
  6. * offer 会拒绝
  7. * put 会阻塞
  8. * private final ReentrantLock putLock = new ReentrantLock()来保证线程安全
  9. * private final Condition notEmpty = takeLock.newCondition();保证阻塞线程的唤醒
  10. */
  11. linkedBlockingQueue.add("1");
  12. linkedBlockingQueue.remove();
  13. /** return false */
  14. linkedBlockingQueue.offer("3");
  15. linkedBlockingQueue.poll();
  16. /** throws InterruptedException 阻塞 */
  17. try {
  18. linkedBlockingQueue.put("2");
  19. linkedBlockingQueue.take();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }

init queue

/**
* 001 init queue node
*/
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

offer方法

/**
 * 002 offer
 */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    //putLock
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //判断队列是否已满
        if (count.get() < capacity) {
            // last = last.next = node;
            enqueue(node);
            c = count.getAndIncrement();
            // 判断队列是否有可用空间,如果有唤醒下一个阻塞线程进行添加操作
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // 如果添加成功,唤醒其他阻塞任务
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

enqueue 尾加


/**
 * 003 enqueue
 */
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

poll方法

/**
 * 004 poll
 */
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    // takeLock 用于移除数据时的lock
    // 移除数据有多种方法故统一用takeLock,保证不会相互冲突
    final ReentrantLock takeLock = this.takeLock;
    //加锁
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            //cas减操作
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue头删

/**
 * 005 dequeue 头删
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}