一.使用

  1. public class Test {
  2. public static void main(String[] args) {
  3. LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();
  4. PutThread putThread = new PutThread(linkedBlockingQueue);
  5. GetThread getThread = new GetThread(linkedBlockingQueue);
  6. putThread.start();
  7. getThread.start();
  8. }
  9. }
  10. class PutThread extends Thread {
  11. private LinkedBlockingQueue<Integer> linkedBlockingQueue;
  12. public PutThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {
  13. this.linkedBlockingQueue = linkedBlockingQueue;
  14. }
  15. @SneakyThrows
  16. @Override
  17. public void run() {
  18. for (int i = 0; i < 10; i++) {
  19. System.out.println("put" + i);
  20. linkedBlockingQueue.put(i);
  21. Thread.sleep(100);
  22. }
  23. }
  24. }
  25. class GetThread extends Thread {
  26. private LinkedBlockingQueue<Integer> linkedBlockingQueue;
  27. public GetThread(LinkedBlockingQueue<Integer> linkedBlockingQueue) {
  28. this.linkedBlockingQueue = linkedBlockingQueue;
  29. }
  30. @SneakyThrows
  31. @Override
  32. public void run() {
  33. for (int i = 0; i < 10; i++) {
  34. System.out.println("take" + linkedBlockingQueue.take());
  35. Thread.sleep(100);
  36. }
  37. }
  38. }
  39. 输出:
  40. put0
  41. take0
  42. put1
  43. take1
  44. put2
  45. take2
  46. put3
  47. take3
  48. put4
  49. take4
  50. put5
  51. take5
  52. put6
  53. take6
  54. put7
  55. take7
  56. put8
  57. take8
  58. put9
  59. take9

二.源码

构造方法

  1. private final AtomicInteger count = new AtomicInteger();
  2. private final ReentrantLock takeLock = new ReentrantLock();
  3. private final Condition notEmpty = takeLock.newCondition();
  4. private final ReentrantLock putLock = new ReentrantLock();
  5. private final Condition notFull = putLock.newCondition();
  6. public LinkedBlockingQueue() {
  7. this(Integer.MAX_VALUE);
  8. }
  9. public LinkedBlockingQueue(int capacity) {
  10. this.capacity = capacity;
  11. last = head = new Node<E>(null);
  12. }
  13. static class Node<E> {
  14. E item;
  15. Node<E> next;
  16. Node(E x) { item = x; }
  17. }

put

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. int c = -1;
  4. Node<E> node = new Node<E>(e);
  5. final ReentrantLock putLock = this.putLock;
  6. final AtomicInteger count = this.count;
  7. putLock.lockInterruptibly();
  8. try {
  9. while (count.get() == capacity) {
  10. notFull.await();
  11. }
  12. enqueue(node);
  13. c = count.getAndIncrement();
  14. if (c + 1 < capacity)
  15. notFull.signal();
  16. } finally {
  17. putLock.unlock();
  18. }
  19. if (c == 0)
  20. signalNotEmpty();
  21. }
  22. private void signalNotEmpty() {
  23. final ReentrantLock takeLock = this.takeLock;
  24. takeLock.lock();
  25. try {
  26. notEmpty.signal();
  27. } finally {
  28. takeLock.unlock();
  29. }
  30. }
  31. private void enqueue(Node<E> node) {
  32. last = last.next = node;
  33. }

take

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. while (count.get() == 0) {
  9. notEmpty.await();
  10. }
  11. x = dequeue();
  12. c = count.getAndDecrement();
  13. if (c > 1)
  14. notEmpty.signal();
  15. } finally {
  16. takeLock.unlock();
  17. }
  18. if (c == capacity)
  19. signalNotFull();
  20. return x;
  21. }
  22. private void signalNotFull() {
  23. final ReentrantLock putLock = this.putLock;
  24. putLock.lock();
  25. try {
  26. notFull.signal();
  27. } finally {
  28. putLock.unlock();
  29. }
  30. }
  31. private E dequeue() {
  32. Node<E> h = head;
  33. Node<E> first = h.next;
  34. h.next = h; // help GC
  35. head = first;
  36. E x = first.item;
  37. first.item = null;
  38. return x;
  39. }