简介

1.什么是消息中间件

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进 程间的通信。对于消息中间件,常见的角色大致也就有 Producer(生产者)、Consumer(消 费者)

2.常见的消息中间件

(1)**ActiveMQ**
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完 全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。

(2)RabbitMQ
AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯, OpenStack 开源云平台的通信组件,最先在金融行业得到运用。

(3)ZeroMQ
史上最快的消息队列系统

(4)Kafka
Apache 下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统。适合处理海量数据。

JMS

1 简介

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便 于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的 接口简化企业应用的开发。

JMS 本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系 统。它类似于 JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关 系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。

许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA 的 Weblogic JMS service 和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或 路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对 象,由两部分组成:报头和消息主体
报头 由路由信息以及有关该消息的元数据组成。
消息 主体则携带着应用程序的数据或有效负载

2 消息正文格式:


·
TextMessage—一个字符串对象
·
MapMessage—一套名称-值对
·
ObjectMessage—一个序列化的 Java 对象
·
BytesMessage—一个字节的数据流
·
StreamMessage **— Java 原始值的数据流

3 消息传递类型

点对点消息类型下,如果没有消费者,队列会将消息存储,在消费者连接的时候将消息发送给消费者.如果由两个或以上的消费者监听同一个队列,使用轮询的模式进行消息处理

发布模式类型下,不管有没有消费者监听,容器都会将消息发布出去,有多少个消费者就会由多少个消费者接收到消息.
**一种是点对点的,即一个消息只能由一个消费者消费;
image.png
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
image.png

ActiveMQ

1.下载
官方网站下载:http://activemq.apache.org/
2.安装(Linux) 和启动
**(1)将 下载好的apache-activemq-5.12.0-bin.tar.gz 上传至服务器

(2)解压此文件

  1. tar zxvf apache-activemq-5.12.0-bin.tar.gz

(3)为 apache-activemq-5.12.0 目录赋权

  1. chmod 777 apache-activemq-5.12.0

(4)进入 apache-activemq-5.12.0\bin 目录

(5)赋与执行权限

  1. chmod 755 activemq

(6) 启动

  1. ./activemq start

启动成功:
image.png

访问管理页面
http://ip:8161/ 进入中管理页面
image.png
登陆:
默认用户名和密码
admin,admin
image.png

各列的含义:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量

入门Demo

准备:
引入依赖
image.png

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-client</artifactId>
  4. <version>5.13.4</version>
  5. </dependency>

Queue

1.创建queue消息生产对象
image.png

  1. public class QueueProducer {
  2. public static void main(String[] args) throws JMSException {
  3. //1.创建连接工厂
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
  5. //2.获取连接
  6. Connection connection = connectionFactory.createConnection();
  7. //3.启动连接
  8. connection.start();
  9. //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
  10. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  11. //5.创建队列对象
  12. Queue queue = session.createQueue("test-queue");
  13. //6.创建消息生产者
  14. MessageProducer producer = session.createProducer(queue);
  15. //7.创建消息
  16. TextMessage textMessage = session.createTextMessage("欢迎使用消息中间件");
  17. //8.发送消息
  18. producer.send(textMessage);
  19. //9.关闭资源
  20. producer.close();
  21. session.close();
  22. connection.close();
  23. }
  24. }

第四步中参数的类型
image.png

2.创建queue消息消费者
image.png

  1. //1.创建连接工厂
  2. ConnectionFactory connectionFactory=new
  3. ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
  4. //2.获取连接
  5. Connection connection = connectionFactory.createConnection();
  6. //3.启动连接
  7. connection.start();
  8. //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5.创建队列对象
  11. Queue queue = session.createQueue("test-queue");
  12. //6.创建消息消费
  13. MessageConsumer consumer = session.createConsumer(queue);
  14. //7.监听消息
  15. consumer.setMessageListener(new MessageListener() {
  16. public void onMessage(Message message) {
  17. TextMessage textMessage=(TextMessage)message;
  18. try {
  19. System.out.println("接收到消息:"+textMessage.getText());
  20. } catch (JMSException e) {
  21. // TODO Auto-generated catch block
  22. e.printStackTrace();
  23. }
  24. }
  25. });
  26. //8.等待键盘输入
  27. System.in.read();
  28. //9.关闭资源
  29. consumer.close();
  30. session.close();
  31. connection.close();

3.运行结果
image.png
image.png

Topic

1.创建topic生产者
image.png

  1. public class TopicProducer {
  2. public static void main(String[] args) throws JMSException {
  3. //1.创建连接工厂
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
  5. //2.获取连接
  6. Connection connection = connectionFactory.createConnection();
  7. //3.启动连接
  8. connection.start();
  9. //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
  10. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  11. //5.创建主题对象
  12. Topic topic = session.createTopic("test-topic");
  13. //6.创建消息生产者
  14. MessageProducer producer = session.createProducer(topic);
  15. //7.创建消息
  16. TextMessage textMessage = session.createTextMessage("欢迎使用消息中间件服务");
  17. //8.发送消息
  18. producer.send(textMessage);
  19. //9.关闭资源
  20. producer.close();
  21. session.close();
  22. connection.close();
  23. }
  24. }

2创建topic消费者
image.png

  1. public class TopicConsumer {
  2. public static void main(String[] args) throws JMSException, IOException {
  3. //1.创建连接工厂
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
  5. //2.获取连接
  6. Connection connection = connectionFactory.createConnection();
  7. //3.启动连接
  8. connection.start();
  9. //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
  10. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  11. //5.创建主题对象
  12. //Queue queue = session.createQueue("test-queue");
  13. Topic topic = session.createTopic("test-topic");
  14. //6.创建消息消费
  15. MessageConsumer consumer = session.createConsumer(topic);
  16. //7.监听消息
  17. consumer.setMessageListener(new MessageListener() {
  18. public void onMessage(Message message) {
  19. TextMessage textMessage = (TextMessage) message;
  20. try {
  21. System.out.println("接收到消息:" + textMessage.getText());
  22. } catch (JMSException e) {
  23. // TODO Auto-generated catch block
  24. e.printStackTrace();
  25. }
  26. }
  27. });
  28. //8.等待键盘输入
  29. System.in.read();
  30. //9.关闭资源
  31. consumer.close();
  32. session.close();
  33. connection.close();
  34. }
  35. }

运行结果:
image.png
image.png
image.png
注意:
需要先启动消费者才会收到消息

spring整合消息中间件

1.准备

创建两个maven项目,同时导入依赖坐标
image.png
image.png

  1. <properties>
  2. <spring.version>4.2.4.RELEASE</spring.version>
  3. </properties>
  4. <!-- jms -->
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.springframework</groupId>
  8. <artifactId>spring-jms</artifactId>
  9. <version>${spring.version}</version>
  10. </dependency>
  11. <!-- 测试 -->
  12. <dependency>
  13. <groupId>org.springframework</groupId>
  14. <artifactId>spring-test</artifactId>
  15. <version>${spring.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>junit</groupId>
  19. <artifactId>junit</artifactId>
  20. <version>4.9</version>
  21. </dependency>
  22. <!-- activate mq -->
  23. <dependency>
  24. <groupId>org.apache.activemq</groupId>
  25. <artifactId>activemq-client</artifactId>
  26. <version>5.13.4</version>
  27. </dependency>
  28. </dependencies>

2.Queue生产者项目

2.1resources/spring/applicationContext-jms-producer.xml

image.png

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans
  5. http://www.springframework.org/schema/beans/spring-beans.xsd
  6. http://www.springframework.org/schema/context
  7. http://www.springframework.org/schema/context/spring-context.xsd">
  8. <!-- 注解扫描 -->
  9. <context:component-scan base-package="com.moonhu.demo"></context:component-scan>
  10. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  11. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  12. <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
  13. </bean>
  14. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  15. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  16. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  17. <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  18. </bean>
  19. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
  20. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  21. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  22. <property name="connectionFactory" ref="connectionFactory"/>
  23. </bean>
  24. <!--1.这个是队列目的地,点对点的 文本信息-->
  25. <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
  26. <!-- 队列名称 -->
  27. <constructor-arg value="queue_text"/>
  28. </bean>
  29. <!--2.这个是订阅模式 文本信息-->
  30. <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
  31. <constructor-arg value="topic_text"/>
  32. </bean>
  33. </beans>

2.2QueueDemo

image.png

  1. @Component
  2. public class QueueDemo {
  3. /**
  4. * 模板
  5. */
  6. @Autowired
  7. private JmsTemplate jmsTemplate;
  8. /**
  9. * 容器
  10. */
  11. @Autowired
  12. private Destination queueTextDestination;
  13. /**
  14. * 发送文本消息
  15. *
  16. * @param text
  17. */
  18. public void sendTextMessage(final String text) {
  19. // 发送消息
  20. jmsTemplate.send(queueTextDestination, new MessageCreator() {
  21. public Message createMessage(Session session) throws JMSException {
  22. // 创建文本信息
  23. return session.createTextMessage(text);
  24. }
  25. });
  26. }
  27. }

2.3测试方法

image.png

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer.xml")
  3. public class QueueTest {
  4. @Autowired
  5. private QueueDemo queueDemo;
  6. @Test
  7. public void testSend() {
  8. queueDemo.sendTextMessage("SpringJms-点对点");
  9. }
  10. }

3.Queue消费者项目

3.1resources/spring/applicationContext-jms-consumer.xml

image.png

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://www.springframework.org/schema/beans
  4. http://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  6. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  7. <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
  8. </bean>
  9. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  10. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  11. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  12. <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  13. </bean>
  14. <!--这个是队列目的地,点对点的 文本信息-->
  15. <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
  16. <constructor-arg value="queue_text"/>
  17. </bean>
  18. <!-- 我的监听类 -->
  19. <bean id="queueListener" class="com.moonhu.demo.QueueListener"/>
  20. <!-- 消息监听容器 -->
  21. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  22. <property name="connectionFactory" ref="connectionFactory"/>
  23. <property name="destination" ref="queueTextDestination"/>
  24. <property name="messageListener" ref="queueListener"/>
  25. </bean>
  26. </beans>

3.2QueueListener

image.png

  1. public class QueueListener implements MessageListener {
  2. /**
  3. * MessageListener接口的抽象方法
  4. *
  5. * @param message 接收的数据
  6. */
  7. public void onMessage(Message message) {
  8. // 数据强转
  9. TextMessage textMessage = (TextMessage) message;
  10. try {
  11. // 获取数据并输出
  12. System.out.println("接收到消息:" + textMessage.getText());
  13. } catch (JMSException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

3.3测试类

image.png

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:/spring/applicationContext-jms-consumer.xml")
  3. public class QueueTest {
  4. /**
  5. * Queue消费者测试方法
  6. */
  7. @Test
  8. public void testReceive() {
  9. try {
  10. // 只需要使程序不结束监听数据即可
  11. System.in.read();
  12. } catch (IOException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }

3.4测试结果

image.png

4.Topic生产者

4.1resources/spring/applicationContext-jms-producer.xml

需要有Topic的ActiveMQTopicbean对象
image.png

  1. <!--2.这个是订阅模式 文本信息-->
  2. <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
  3. <constructor-arg value="topic_text"/>
  4. </bean>

4.2TopicDemo

image.png

  1. @Component
  2. public class TopicDemo {
  3. @Autowired
  4. private JmsTemplate jmsTemplate;
  5. @Autowired
  6. private Destination topicTextDestination;
  7. /**
  8. * 发送文本消息
  9. *
  10. * @param text
  11. */
  12. public void sendTextMessage(final String text) {
  13. jmsTemplate.send(topicTextDestination, new MessageCreator() {
  14. public Message createMessage(Session session) throws JMSException {
  15. return session.createTextMessage(text);
  16. }
  17. });
  18. }
  19. }

4.3测试方法

image.png

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer.xml")
  3. public class TopicTest {
  4. @Autowired
  5. private TopicDemo topicDemo;
  6. /**
  7. * Topic发送消息测试
  8. */
  9. @Test
  10. public void testSend(){
  11. // 发送消息
  12. topicDemo.sendTextMessage("Topic-send-test");
  13. }
  14. }

5.Topic消费者

5.1resources/spring/applicationContext-jms-consumer.xml
image.png

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://www.springframework.org/schema/beans
  4. http://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  6. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  7. <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
  8. </bean>
  9. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  10. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  11. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  12. <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  13. </bean>
  14. <!-- 队列方法开始 -->
  15. <!--这个是队列目的地,点对点的 文本信息-->
  16. <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
  17. <constructor-arg value="queue_text"/>
  18. </bean>
  19. <!-- 我的监听类 -->
  20. <bean id="queueListener" class="com.moonhu.demo.QueueListener"/>
  21. <!-- 消息监听容器 -->
  22. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  23. <property name="connectionFactory" ref="connectionFactory"/>
  24. <property name="destination" ref="queueTextDestination"/>
  25. <property name="messageListener" ref="queueListener"/>
  26. </bean>
  27. <!-- 队列方法结束 -->
  28. <!-- 发布的方法开始 -->
  29. <!--这个是队列目的地,点对点的 文本信息-->
  30. <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
  31. <constructor-arg value="queue_text"/>
  32. </bean>
  33. <!-- 我的监听类 -->
  34. <bean id="topicListener" class="com.moonhu.demo.TopicListener"/>
  35. <!-- 消息监听容器 -->
  36. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  37. <property name="connectionFactory" ref="connectionFactory"/>
  38. <property name="destination" ref="topicTextDestination"/>
  39. <property name="messageListener" ref="topicListener"/>
  40. </bean>
  41. <!-- 发布的方法结束 -->
  42. </beans>

5.2TopicListener

image.png

  1. /**
  2. * Topic消费者的监听类
  3. */
  4. public class TopicListener implements MessageListener {
  5. public void onMessage(Message message) {
  6. TextMessage textMessage = (TextMessage) message;
  7. try {
  8. System.out.println("接收到的消息: " + textMessage.getText());
  9. } catch (JMSException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }

5.3测试方法

image.png

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring/applicationContext-jms-consumer.xml")
  3. public class TopicTest {
  4. @Test
  5. public void testReceive() {
  6. try {
  7. System.in.read();
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

5.4测试结果

image.png

6.SpringBoot整合ActivateMQ

创建一个springboot的MAVEN项目

6.1. 引入相关依赖:

image.png

  1. <properties>
  2. <!-- 指定java版本 -->
  3. <java.version>1.7</java.version>
  4. </properties>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>1.4.0.RELEASE</version>
  9. </parent>
  10. <dependencies>
  11. <!-- springBoot -->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-web</artifactId>
  15. </dependency>
  16. <!-- 热部署 -->
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-devtools</artifactId>
  20. </dependency>
  21. <!-- 整合activateMQ -->
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-activemq</artifactId>
  25. </dependency>
  26. </dependencies>

6.2. 创建引导类

image.png

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. /***
  4. * 引导类
  5. */
  6. @SpringBootApplication
  7. public class SpringBootDemo {
  8. public static void main(String[] args) {
  9. SpringApplication.run(SpringBootDemo.class, args);
  10. }
  11. }

6.3创建控制类
image.png

  1. @RestController
  2. public class HelloWordTest {
  3. @Autowired
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. /**
  6. * activate发消息测试
  7. */
  8. @RequestMapping("/sendMsg")
  9. public void sendMessage() {
  10. jmsMessagingTemplate.convertAndSend("boot", "hello,你好");
  11. }
  12. }

6.4创建消费者类

创建消费者1
image.png

  1. /**
  2. * 监听器
  3. */
  4. @Component
  5. public class ConsumeDemo1 {
  6. /**
  7. * 接收消息
  8. *
  9. * @param msg
  10. */
  11. @JmsListener(destination = "boot")
  12. public void getMessage(String msg) {
  13. System.out.println(msg+1);
  14. }
  15. }

创建消费者2
image.png

  1. /**
  2. * 监听器
  3. */
  4. @Component
  5. public class ConsumeDemo2 {
  6. /**
  7. * 接收消息
  8. *
  9. * @param msg
  10. */
  11. @JmsListener(destination = "boot")
  12. public void getMessage(String msg) {
  13. System.out.println(msg+2);
  14. }
  15. }

6.5启动引导类

image.png

6.6 访问地址查看结果

  1. http://localhost:8080/sendMsg

多次访问后输出结果
image.png

解析:
在spring的spring-boot-starter-activemq的依赖中,传递依赖了ActivateMQ的包不需要连接远端的ActivateMq服务器即可实现消息的发送和接收,使用的是内置的服务.发送的是Queue消息
如果要使用自己部署的服务器需要在配置文件中指定ActivateMQ的地址
image.png

  1. spring.activemq.broker-url=tcp://192.168.25.128:61616

注意:
key值不能改变

思考

消费者接收不到消息
1.生产者和消费者桶名称不同
image.png