一.使用
public class Test {public static void main(String[] args) {LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();PutThread putThread = new PutThread(linkedBlockingQueue);GetThread getThread = new GetThread(linkedBlockingQueue);putThread.start();getThread.start();}}class PutThread extends Thread {private LinkedBlockingQueue<Integer> linkedBlockingQueue;public PutThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {this.linkedBlockingQueue = linkedBlockingQueue;}@SneakyThrows@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println("put" + i);linkedBlockingQueue.put(i);Thread.sleep(100);}}}class GetThread extends Thread {private LinkedBlockingQueue<Integer> linkedBlockingQueue;public GetThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {this.linkedBlockingQueue = linkedBlockingQueue;}@SneakyThrows@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println("take" + linkedBlockingQueue.take());Thread.sleep(100);}}}输出:put0take0put1take1put2take2put3take3put4take4put5take5put6take6put7take7put8take8put9take9
二.源码
构造方法
private final AtomicInteger count = new AtomicInteger();private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {this.capacity = capacity;last = head = new Node<E>(null);}static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}
put
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}private void enqueue(Node<E> node) {last = last.next = node;}
take
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}
