自定义阻塞队列

  1. package multiThread;
  2. import org.omg.PortableInterceptor.INACTIVE;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.locks.Condition;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. public class CustomBlockingQueue<E> {
  8. private final Object[] items;
  9. private ReentrantLock lock;
  10. private Condition isFull;
  11. private Condition isEmpty;
  12. private int takeIndex;
  13. private int putIndex;
  14. private int count;
  15. public static void main(String[] args) {
  16. CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(5);
  17. new Thread(()-> {
  18. for (int i = 0; i < 10; i++) {
  19. try {
  20. queue.put(i);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }, "A").start();
  26. new Thread(()-> {
  27. for (int i = 5; i < 10; i++) {
  28. try {
  29. Thread.sleep(1);
  30. queue.take();
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }, "B").start();
  36. }
  37. public CustomBlockingQueue(int capacity) {
  38. if(capacity < 0) {
  39. throw new IllegalArgumentException("param is illegal");
  40. }
  41. items = new Object[capacity];
  42. lock = new ReentrantLock();
  43. isFull = lock.newCondition();
  44. isEmpty = lock.newCondition();
  45. }
  46. public void put(E e) throws InterruptedException {
  47. lock.lock();
  48. try {
  49. while (count == items.length) {
  50. isFull.await();
  51. }
  52. items[putIndex] = e;
  53. if(++putIndex == items.length) {
  54. putIndex = 0;
  55. }
  56. count++;
  57. isEmpty.signal();
  58. System.out.println(Thread.currentThread().getName() + ":放入元素,当前队列长度:" + count);
  59. } finally {
  60. lock.unlock();
  61. }
  62. }
  63. public E take() throws InterruptedException {
  64. lock.lock();
  65. try {
  66. while (count == 0) {
  67. isEmpty.await();
  68. }
  69. E item = (E) items[takeIndex];
  70. items[takeIndex] = null;
  71. count--;
  72. if(++takeIndex == items.length) {
  73. takeIndex = 0;
  74. }
  75. // if(takeIndex > 0) takeIndex--;
  76. isFull.signal();
  77. System.out.println(Thread.currentThread().getName() + ":取出元素,当前队列长度:" + count);
  78. return item;
  79. } finally {
  80. lock.unlock();
  81. }
  82. }
  83. }