自定义阻塞队列
package multiThread;import org.omg.PortableInterceptor.INACTIVE;import java.util.ArrayList;import java.util.List;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class CustomBlockingQueue<E> { private final Object[] items; private ReentrantLock lock; private Condition isFull; private Condition isEmpty; private int takeIndex; private int putIndex; private int count; public static void main(String[] args) { CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(5); new Thread(()-> { for (int i = 0; i < 10; i++) { try { queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(()-> { for (int i = 5; i < 10; i++) { try { Thread.sleep(1); queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); } public CustomBlockingQueue(int capacity) { if(capacity < 0) { throw new IllegalArgumentException("param is illegal"); } items = new Object[capacity]; lock = new ReentrantLock(); isFull = lock.newCondition(); isEmpty = lock.newCondition(); } public void put(E e) throws InterruptedException { lock.lock(); try { while (count == items.length) { isFull.await(); } items[putIndex] = e; if(++putIndex == items.length) { putIndex = 0; } count++; isEmpty.signal(); System.out.println(Thread.currentThread().getName() + ":放入元素,当前队列长度:" + count); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) { isEmpty.await(); } E item = (E) items[takeIndex]; items[takeIndex] = null; count--; if(++takeIndex == items.length) { takeIndex = 0; } // if(takeIndex > 0) takeIndex--; isFull.signal(); System.out.println(Thread.currentThread().getName() + ":取出元素,当前队列长度:" + count); return item; } finally { lock.unlock(); } }}