一.使用

  1. public class Test {
  2. public static void main(String[] args) {
  3. ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
  4. PutThread putThread = new PutThread(arrayBlockingQueue);
  5. GetThread getThread = new GetThread(arrayBlockingQueue);
  6. putThread.start();
  7. getThread.start();
  8. }
  9. }
  10. class PutThread extends Thread {
  11. private ArrayBlockingQueue<Integer> arrayBlockingQueue;
  12. public PutThread(ArrayBlockingQueue<Integer> arrayBlockingQueue) {
  13. this.arrayBlockingQueue = arrayBlockingQueue;
  14. }
  15. @SneakyThrows
  16. @Override
  17. public void run() {
  18. for (int i = 0; i < 10; i++) {
  19. System.out.println("put" + i);
  20. arrayBlockingQueue.put(i);
  21. Thread.sleep(50);
  22. }
  23. }
  24. }
  25. class GetThread extends Thread {
  26. private ArrayBlockingQueue<Integer> arrayBlockingQueue;
  27. public GetThread(ArrayBlockingQueue<Integer> arrayBlockingQueue) {
  28. this.arrayBlockingQueue = arrayBlockingQueue;
  29. }
  30. @SneakyThrows
  31. @Override
  32. public void run() {
  33. for (int i = 0; i < 15; i++) {
  34. System.out.println("take" + arrayBlockingQueue.take());
  35. Thread.sleep(50);
  36. }
  37. }
  38. }
  39. 输出:
  40. put0
  41. take0
  42. put1
  43. take1
  44. put2
  45. take2
  46. put3
  47. take3
  48. put4
  49. take4
  50. put5
  51. take5
  52. put6
  53. take6
  54. put7
  55. take7
  56. put8
  57. take8
  58. put9
  59. take9

二.源码

构造方法

  1. public ArrayBlockingQueue(int capacity) {
  2. this(capacity, false);
  3. }
  4. public ArrayBlockingQueue(int capacity, boolean fair) {
  5. // final Object[] items;
  6. this.items = new Object[capacity];
  7. lock = new ReentrantLock(fair);
  8. notEmpty = lock.newCondition();
  9. notFull = lock.newCondition();
  10. }

put

  1. public void put(E e) throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == items.length)
  6. notFull.await();
  7. enqueue(e);
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. private void enqueue(E x) {
  13. final Object[] items = this.items;
  14. items[putIndex] = x;
  15. if (++putIndex == items.length)
  16. putIndex = 0;
  17. count++;
  18. notEmpty.signal();
  19. }

take

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. private E dequeue() {
  13. final Object[] items = this.items;
  14. E x = (E) items[takeIndex];
  15. items[takeIndex] = null;
  16. if (++takeIndex == items.length)
  17. takeIndex = 0;
  18. count--;
  19. notFull.signal();
  20. return x;
  21. }