一.使用
public class Test {
public static void main(String[] args) {
LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();
PutThread putThread = new PutThread(linkedBlockingQueue);
GetThread getThread = new GetThread(linkedBlockingQueue);
putThread.start();
getThread.start();
}
}
class PutThread extends Thread {
private LinkedBlockingQueue<Integer> linkedBlockingQueue;
public PutThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {
this.linkedBlockingQueue = linkedBlockingQueue;
}
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("put" + i);
linkedBlockingQueue.put(i);
Thread.sleep(100);
}
}
}
class GetThread extends Thread {
private LinkedBlockingQueue<Integer> linkedBlockingQueue;
public GetThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {
this.linkedBlockingQueue = linkedBlockingQueue;
}
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("take" + linkedBlockingQueue.take());
Thread.sleep(100);
}
}
}
输出:
put0
take0
put1
take1
put2
take2
put3
take3
put4
take4
put5
take5
put6
take6
put7
take7
put8
take8
put9
take9
二.源码
构造方法
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
this.capacity = capacity;
last = head = new Node<E>(null);
}
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}