1. 定义
    要点

    • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
    • 消费队列可以用来平衡生产和消费的线程资源
    • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
    • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
    • JDK 中各种阻塞队列,采用的就是这种模式
    • 阻塞队列有:BlockingQueue

    异步模式之生产者/消费者(Queue原理) - 图1
    BlockingQueue的源码

    1. // 队列项
    2. final Object[] items;
    3. // 保护所有通道的主锁
    4. final ReentrantLock lock;
    5. // 等待条件
    6. private final Condition notEmpty;
    7. // 等待插入锁
    8. private final Condition notFull;
    9. // 容量
    10. int count;
    11. // 下一个 put、offer 或 add 的项目索引
    12. int putIndex;
    13. // 每加入一个的元素,容量加一
    14. private void enqueue(E x) {
    15. final Object[] items = this.items;
    16. items[putIndex] = x;
    17. if (++putIndex == items.length)
    18. putIndex = 0;
    19. count++;
    20. notEmpty.signal();
    21. }
    22. /***
    23. * 构造函数
    24. */
    25. public ArrayBlockingQueue(int capacity, boolean fair) {
    26. if (capacity <= 0)
    27. throw new IllegalArgumentException();
    28. // 将数组的容量赋值
    29. this.items = new Object[capacity];
    30. // fair为 true ,先进先出(FIFO)
    31. lock = new ReentrantLock(fair);
    32. // 创建两个新的锁
    33. notEmpty = lock.newCondition();
    34. notFull = lock.newCondition();
    35. }
    36. public void put(E e) throws InterruptedException {
    37. // 检查对象是否为空
    38. checkNotNull(e);
    39. // 创建lock锁
    40. final ReentrantLock lock = this.lock;
    41. // 等待锁
    42. lock.lockInterruptibly();
    43. try {
    44. // 该队列容量满了
    45. while (count == items.length)
    46. // 睡眠
    47. notFull.await();
    48. enqueue(e);
    49. } finally {
    50. // 唤醒
    51. lock.unlock();
    52. }
    53. }
    54. public E take() throws InterruptedException {
    55. final ReentrantLock lock = this.lock;
    56. lock.lockInterruptibly();
    57. try {
    58. while (count == 0)
    59. notEmpty.await();
    60. return dequeue();
    61. } finally {
    62. lock.unlock();
    63. }
    64. }

    示例

    1. @Slf4j(topic = "c.Test21")
    2. public class Test21 {
    3. public static void main(String[] args) {
    4. MessageQueue queue = new MessageQueue(2);
    5. for (int i = 0; i < 3; i++) {
    6. int id = i;
    7. new Thread(() -> {
    8. queue.put(new Message(id , "值"+id));
    9. }, "生产者" + i).start();
    10. }
    11. new Thread(() -> {
    12. while(true) {
    13. sleep(1);
    14. Message message = queue.take();
    15. }
    16. }, "消费者").start();
    17. }
    18. }
    19. // 消息队列类 , java 线程之间通信
    20. @Slf4j(topic = "c.MessageQueue")
    21. class MessageQueue {
    22. // 消息的队列集合
    23. private LinkedList<Message> list = new LinkedList<>();
    24. // 队列容量
    25. private int capcity;
    26. public MessageQueue(int capcity) {
    27. this.capcity = capcity;
    28. }
    29. // 获取消息
    30. public Message take() {
    31. // 检查队列是否为空
    32. synchronized (list) {
    33. while(list.isEmpty()) {
    34. try {
    35. log.debug("队列为空, 消费者线程等待");
    36. list.wait();
    37. } catch (InterruptedException e) {
    38. e.printStackTrace();
    39. }
    40. }
    41. // 从队列头部获取消息并返回
    42. Message message = list.removeFirst();
    43. log.debug("已消费消息 {}", message);
    44. list.notifyAll();
    45. return message;
    46. }
    47. }
    48. // 存入消息
    49. public void put(Message message) {
    50. synchronized (list) {
    51. // 检查对象是否已满
    52. while(list.size() == capcity) {
    53. try {
    54. log.debug("队列已满, 生产者线程等待");
    55. list.wait();
    56. } catch (InterruptedException e) {
    57. e.printStackTrace();
    58. }
    59. }
    60. // 将消息加入队列尾部
    61. list.addLast(message);
    62. log.debug("已生产消息 {}", message);
    63. list.notifyAll();
    64. }
    65. }
    66. }
    67. final class Message {
    68. private int id;
    69. private Object value;
    70. public Message(int id, Object value) {
    71. this.id = id;
    72. this.value = value;
    73. }
    74. public int getId() {
    75. return id;
    76. }
    77. public Object getValue() {
    78. return value;
    79. }
    80. @Override
    81. public String toString() {
    82. return "Message{" +
    83. "id=" + id +
    84. ", value=" + value +
    85. '}';
    86. }
    87. }

    结果

    1. 22:58:21.875 c.MessageQueue [生产者1] - 已生产消息 Message{id=1, value=值1}
    2. 22:58:21.878 c.MessageQueue [生产者0] - 已生产消息 Message{id=0, value=值0}
    3. 22:58:21.878 c.MessageQueue [生产者2] - 队列已满, 生产者线程等待
    4. 22:58:22.877 c.MessageQueue [消费者] - 已消费消息 Message{id=1, value=值1}
    5. 22:58:22.877 c.MessageQueue [生产者2] - 已生产消息 Message{id=2, value=值2}
    6. 22:58:23.891 c.MessageQueue [消费者] - 已消费消息 Message{id=0, value=值0}
    7. 22:58:24.906 c.MessageQueue [消费者] - 已消费消息 Message{id=2, value=值2}
    8. 22:58:25.920 c.MessageQueue [消费者] - 队列为空, 消费者线程等待