自定义阻塞队列
package multiThread;
import org.omg.PortableInterceptor.INACTIVE;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CustomBlockingQueue<E> {
private final Object[] items;
private ReentrantLock lock;
private Condition isFull;
private Condition isEmpty;
private int takeIndex;
private int putIndex;
private int count;
public static void main(String[] args) {
CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(5);
new Thread(()-> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()-> {
for (int i = 5; i < 10; i++) {
try {
Thread.sleep(1);
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
public CustomBlockingQueue(int capacity) {
if(capacity < 0) {
throw new IllegalArgumentException("param is illegal");
}
items = new Object[capacity];
lock = new ReentrantLock();
isFull = lock.newCondition();
isEmpty = lock.newCondition();
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
isFull.await();
}
items[putIndex] = e;
if(++putIndex == items.length) {
putIndex = 0;
}
count++;
isEmpty.signal();
System.out.println(Thread.currentThread().getName() + ":放入元素,当前队列长度:" + count);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
isEmpty.await();
}
E item = (E) items[takeIndex];
items[takeIndex] = null;
count--;
if(++takeIndex == items.length) {
takeIndex = 0;
}
// if(takeIndex > 0) takeIndex--;
isFull.signal();
System.out.println(Thread.currentThread().getName() + ":取出元素,当前队列长度:" + count);
return item;
} finally {
lock.unlock();
}
}
}