本小节来和小伙伴们分享一下 RabbitMQ 的七种消息传递形式。一起来看看。
大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。
1. RabbitMQ 架构简介
一图胜千言,如下:
这张图中涉及到如下一些概念:
- 生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
- 交换机(Exchange):和生产者建立连接并接收生产者的消息。
- 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
- 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
- 路由(Routes):交换机转发消息到队列的规则。
2. 准备工作
首先创建一个空工程
工程名叫做mq_demo01
在这个mq_demo01工程中用Spring Initializr的方式创建一个SpringBoot的module
创建一个消费者的模块,模块名叫做consumer
选择依赖
项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:
# 配置RabbitMQ的基本信息
# RabbitMQ的地址
spring.rabbitmq.host=localhost
# RabbitMQ的端口号
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 这个可以不用给,默认就是/
spring.rabbitmq.virtual-hosts=/
然后在org.javaboy.consumer.config包下面创建一个消息队列
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "javaboy_queue";
@Bean
Queue javaboyQueue() {
//1. 第一个参数是队列的名字
//2。第二个参数是持久化,是什么意思呢?将来的消息会从交换机进入到消息队列里面去,如果设置成true在消息队列里面会把消息存下来,这样子如果RabbitMQ异常重启了,那你之前那些没有被消费的消息依然还在,相当于它把这些消息持久化到硬盘中去。如果设置成false,就是代表不给它持久化,那么如果RabbitMQ异常重启了,那么那些消息就会丢失。
//3. 该队列是否具有排他性,这个队列是由哪个connection创建的,将来就只能由这一个connection去操作它,去消费它。如果设置为true的话,那么这个消息队列就只能由创建它的connection才能访问,其他的connection都不能访问这个消息队列。但是如果试图在不同的连接中重新声明或者访问排他性队列的话,那么系统会报一个资源被锁定的错误。同时对于排他性队列而言,当连接断开的时候,这个队列就会被自动地删除。
//4。如果该队列没有消费者,那么是否自动删除该队列
return new Queue(QUEUE_NAME, true, false, false);
}
}
然后在org.javaboy.consumer.receiver包下面创建一个消息的消费者
/**
* 这是一个消息消费者
*/
@Component
public class MsgReceiver {
/**
* 通过 @RabbitListener 注解指定该方法监听的消息队列,该注解的参数就是消息队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg) {
System.out.println("msg = " + msg);
}
}
启动应用,在RabbitMQ的web管理页面的Connections的选项卡中就可以看到已经有一个连接了
在RabbitMQ的web管理页面的Channels的选项卡中就可以看到已经有一个通道了
在RabbitMQ的web管理页面的Queues的选项卡中就可以看到我们自己定义的队列
我们可以在RabbitMQ的web管理页面的Queues选项卡中的指定的消息队列中发布消息
消息发布后,我们可以在Idea的控制台看到这条我们在RabbitMQ的web管理页面中发布的消息了
3. 消息收发
在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 官网介绍了如下几种消息分发的形式:
这里给出了七种,其中第七种是消息确认,消息确认这块松哥之前发过相关的文章,传送门:
所以这里我主要和大家介绍前六种消息收发方式。
3.1. Hello World
咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者,不需要我们自己提供交换机,会有一个默认的交换机提供给我们。消息传播图如下:
这个时候使用的其实是默认的直连交换机(DirectExchange)
来看看代码实现:
再创建一个新的Empty Project,工程名叫做mq_demo02
在这个mq_demo02工程中用Spring Initializr的方式创建两个SpringBoot的module
创建一个消息生产者的模块,模块名叫做publisher
选择依赖
然后以同样的方法在mq_demo02工程中创建一个消息的消费者的模块,模块名叫做consumer,勾选的依赖同上
在mq_demo02工程的publisher模块的application.properties配置文件中作如下配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
在mq_demo02工程的publisher模块的org.javaboy.publisher.config包下面创建消息队列
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "hello_world_queue_name";
@Bean
Queue helloWorldQueue() {
return new Queue(QUEUE_NAME, true, false, false);
}
}
在mq_demo02工程的publisher模块的src/test/java目录的org.javaboy.publisher包的PublisherApplicationTests类中发布消息到消息队列
@SpringBootTest
class PublisherApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "hello 江南一点雨");
}
}
在mq_demo02工程的consumer模块的application.properties配置文件中作如下配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
在mq_demo02工程的consumer模块的org.javaboy.publisher.config包下面创建消息队列
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "hello_world_queue_name";
@Bean
Queue helloWorldQueue() {
return new Queue(QUEUE_NAME, true, false, false);
}
}
在mq_demo02工程的consumer模块的org.javaboy.publisher.receiver包下面创建消息消费者
@Component
public class MsgReceiver {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg) {
System.out.println("msg = " + msg);
}
}
先运行consumer模块的应用,然后再运行publisher模块的单元测试方法contextLoads(),最后可以看到consumer模块的控制台收到了publisher模块发布的消息。
我们在RabbitMQ的web管理页面的Queues选项卡中也可以看到我们创建的消息队列
我们发消息的时候,消费者可以不用在线,这其实就是一个很好的解耦。比如:我们这里可以停止consumer的应用,然后在publisher应用再次发送消息仍然可以成功。我们可以在RabbitMQ的web管理页面的Queues选项卡中看到我们的消息队列hello_world_queue_name中的Ready的值是1表示我们的消息队列hello_world_queue_name中有一条消息待消费。当消费者重新上线后又可以继续消费消息队列中待消费(Ready)的消息。
上图中的Unacked表示消息队列中未确认的消息数量,因为把消息发送给消费者让消费者去消费,消费者消费完之后需要告诉RabbitMQ这条消息我已经消费了。如果消息已经发送给消费者,但是消费者没有告诉RabbitMQ这条消息已经消费了,那么这就属于Unacked的状态。Ready + Unacked = Total
3.2. Work queues
这种情况是这样的:
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack(ack就是确认消息),来决定是否要消费某一条消息。
来看看代码实现:
创建mq_demo03工程,原封不动的复制mq_demo02工程的代码到mq_demo03工程中
在mq_demo03工程的consumer应用的MsgReceiver类中中再添加一个消费者
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg2(String msg) {
System.out.println("msg2 = " + msg);
}
启动consumer应用,然后再在mq_demo03工程的publisher应用的单元测试类中的测试方法中循环发送20条消息
@SpringBootTest
class PublisherApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "hello 江南一点雨:" + i);
}
}
}
运行上面的单元测试方法。我们发现consumer应用的控制台中无规律的打印了这20条消息
通过我们的RabbitMQ的web管理页面的Channels选项卡可以看到此时有两个通道,其实就是对应mq_demo03工程的consumer应用的两个消费者
mq_demo03工程的consumer应用中给消费者作并发能力的配置
/**
* concurrency 指的是并发数量,即这个消费者将开启 20 个子线程去消费消息
*
* @param msg
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME, concurrency = "20")
public void handleMsg2(Message message,Channel channel) throws IOException {
System.out.println("msg2 = " + msg+"--->"+Thread.currentThread().getName());
}
启动consumer应用,然后在RabbitMQ的web管理页面的Channels选项卡可以看到此时一共有21个通道
此时我们运行mq_demo03工程的publisher应用中的测试方法中发送消息,可以看到mq_demo03工程的consumer应用的控制台,几乎全部都是msg2这个消费者消费的,因为msg2这个消费者的并发能力强,它开了20个子线程去消费消息
我们在SpringBoot应用中会自动ack(ack就是确认消息),我们也可以在consumer应用的配置文件application.properties中设置成手动确认
# 手动 ack,注意这是要在消费者工程中配置,而不是在生产者工程中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
重启mq_demo03工程的consumer应用,然后在mq_demo03工程的publisher应用的测试方法中发送消息,可以看到consumer应用的控制台打印了这20条消息,然后再次重启mq_demo03工程的consumer应用,我们会发现consumer应用的控制台又打印了这20条消息,但是此时我们并没有在publisher应用中发送消息啊,我们可以通过RabbitMQ的web管理页面的Queues选项卡查看hello_world_queue_name的消息队列的Unacked的值是20,说明此时我们有20条消息未确认。原因是因为未确认的消息RabbitMQ就会认为没有成功消费,等消费者下次连接上RabbitMQ又会去发给消费者,让消费者重新消费这些未确认的消息。
我们可以在代码中手动ack确认消息,例如在mq_demo03工程的consumer应用中修改消费者代码
@Component
public class MsgReceiver {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(Message message, Channel channel) throws IOException {
System.out.println("receive = " + message.getPayload());
//确认消息,就是告诉 RabbitMQ,这条消息我已经消费成功了
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), false);
}
/**
* concurrency 指的是并发数量,即这个消费者将开启 20 个子线程去消费消息
*
* @param msg
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME, concurrency = "20")
public void handleMsg2(Message message,Channel channel) throws IOException {
//拒绝消费该消息,第二个参数requeue 表示被拒绝的消息是否重新进入队列中,如果设置成true,那么RabbitMQ又会把它发给另外的消费者去消费
channel.basicReject((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG),true);
}
}
重启mq_demo03工程的consumer应用,然后在consumer应用的控制台可以看到全部的消息都被handleMsg消费了。此时第二个消费者拒绝了所有消息,第一个消费者handleMsg消费了所有消息。
3.3. Publish/Subscribe
再来看发布订阅模式,这种情况是这样:
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
这种情况下,我们有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
我分别来给大家举一个简单例子看下。
3.3.1. Direct
Direct交换机的工作方式:消息发送到Direct交换机上面,然后交换机和队列之间会有一个routing key把它们关联起来,然后我们发送消息的时候,会让消息带一个routing key,消息到Direct交换机上,交换机会根据消息携带的routing key来找消息对应的队列然后再发送到对应的队列上去,消费者直接消费队列就行。
演示代码:
创建一个新的Empty Project,工程名叫做mq_demo04
在这个mq_demo04工程中用Spring Initializr的方式创建两个SpringBoot的module
先创建一个消息生产者的模块,模块名叫publisher
选择依赖
然后以同样的方法在mq_demo04工程中创建一个消息的消费者的模块,模块名叫做consumer,勾选的依赖同上
在mq_demo04工程的publisher模块和consumer模块的application.properties配置文件中都作如下配置:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
在mq_demo04工程的publisher模块的org.javaboy.publisher.config包下创建消息队列
/**
* Direct:这种路由策略,将消息队列绑定到 DirectExchange 上,当消息到达交换机的时候,消息会携带一个 routing_key,然后交换机会找到名为 routing_key 的队列,将消息路由过去
*/
@Configuration
public class RabbitConfig {
public static final String DIRECT_QUEUE_NAME = "direct_queue_name";
public static final String DIRECT_QUEUE_NAME2 = "direct_queue_name2";
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
@Bean
Queue directQueue1() {
return new Queue(DIRECT_QUEUE_NAME, true, false, false);
}
@Bean
Queue directQueue2() {
return new Queue(DIRECT_QUEUE_NAME2, true, false, false);
}
/**
* 定义一个直连交换机
* @return
*/
@Bean
DirectExchange directExchange() {
//1. 交换机的名称
//2。交换机是否持久化:这里的持久化指的是交换机本身能否持久化,如果你将它设为false,将来RabbitMQ重启之后,这个交换机就没有了,如果设置为true,将来RabbitMQ重启之后,这个交换机就还在。
//3. 如果没有与之绑定的队列,是否删除交换机
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 这个定义是将交换机和队列绑定起来
* @return
*/
@Bean
Binding directBinding1() {
return BindingBuilder
//设置绑定的队列
.bind(directQueue1())
//设置绑定的交换机
.to(directExchange())
//设置 routing_key
.with(DIRECT_QUEUE_NAME);
}
@Bean
Binding directBinding2() {
return BindingBuilder
.bind(directQueue2())
.to(directExchange())
.with(DIRECT_QUEUE_NAME2);
}
}
在mq_demo04工程的consumer模块的org.javaboy.consumer.config包下创建消息队列
/**
* Direct:这种路由策略,将消息队列绑定到 DirectExchange 上,当消息到达交换机的时候,消息会携带一个 routing_key,然后交换机会找到名为 routing_key 的队列,将消息路由过去
*/
@Configuration
public class RabbitConfig {
public static final String DIRECT_QUEUE_NAME = "direct_queue_name";
public static final String DIRECT_QUEUE_NAME2 = "direct_queue_name2";
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
@Bean
Queue directQueue1() {
return new Queue(DIRECT_QUEUE_NAME, true, false, false);
}
@Bean
Queue directQueue2() {
return new Queue(DIRECT_QUEUE_NAME2, true, false, false);
}
/**
* 定义一个直连交换机
* @return
*/
@Bean
DirectExchange directExchange() {
//1. 交换机的名称
//2。交换机是否持久化:这里的持久化指的是交换机本身能否持久化,如果你将它设为false,将来RabbitMQ重启之后,这个交换机就没有了,如果设置为true,将来RabbitMQ重启之后,这个交换机就还在。
//3. 如果没有与之绑定的队列,是否删除交换机
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 这个定义是将交换机和队列绑定起来
* @return
*/
@Bean
Binding directBinding1() {
return BindingBuilder
//设置绑定的队列
.bind(directQueue1())
//设置绑定的交换机
.to(directExchange())
//设置 routing_key
.with(DIRECT_QUEUE_NAME);
}
@Bean
Binding directBinding2() {
return BindingBuilder
.bind(directQueue2())
.to(directExchange())
.with(DIRECT_QUEUE_NAME2);
}
}
在mq_demo04工程的consumer模块的org.javaboy.consumer.receiver包下创建消费者
@Component
public class MsgReceiver {
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_NAME)
public void msgHandler(String msg) {
System.out.println("msg = " + msg);
}
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_NAME2)
public void msgHandler2(String msg) {
System.out.println("msg2 = " + msg);
}
}
先运行在mq_demo04工程的consumer应用
然后在mq_demo04工程的publisher模块的src/test/java目录的org.javaboy.publisher包的PublisherApplicationTests类中发布消息
@SpringBootTest
class PublisherApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, RabbitConfig.DIRECT_QUEUE_NAME, "这条消息发给队列1");
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, RabbitConfig.DIRECT_QUEUE_NAME2, "这条消息发给队列2");
}
}
运行上面的测试方法,然后查看mq_demo04工程的consumer模块的控制台,发现两个消费者分别收到了一条消息
3.3.2. Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
演示代码:
在mq_demo04工程的consumer模块的org.javaboy.consumer.config包下创建FanoutConfig.java
/**
* fanout 交换机会将到达交换机的所有消息路由到与他绑定的所有队列上面来
*/
@Configuration
public class FanoutConfig {
public static final String FANOUT_QUEUE_NAME = "fanout_queue_name";
public static final String FANOUT_QUEUE_NAME2 = "fanout_queue_name2";
public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name";
@Bean
Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE_NAME, true, false, false);
}
@Bean
Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_NAME2, true, false, false);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1())
.to(fanoutExchange());
}
@Bean
Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2())
.to(fanoutExchange());
}
}
在mq_demo04工程的consumer模块的org.javaboy.consumer.receiver包下创建FanoutMsgReceiver.java
@Component
public class FanoutMsgReceiver {
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_NAME)
public void msgHandler(String msg) {
System.out.println("FanoutMsgReceiver = " + msg);
}
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_NAME2)
public void msgHandler2(String msg) {
System.out.println("FanoutMsgReceiver2 = " + msg);
}
}
重启mq_demo04工程的consumer应用,把FanoutConfig拷贝到mq_demo04工程的publisher模块的org.javaboy.publisher.config包下面。
在mq_demo04工程的publisher模块的src/test/java目录下的org.javaboy.publisher包的PublisherApplicationTests类中发布消息
@Test
void test01() {
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, null, "hello fanout!");
}
运行上面的测试方法,然后查看mq_demo04工程的consumer模块的控制台,发现两个消费者都收到了消息
注意:我们前面讲的Work queues是队列到消费者的方案,而我们现在讲的Fanout是交换机到队列的方案,所以它们并不冲突。
3.3.3. Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。
演示代码:
在mq_demo04工程的consumer模块的org.javaboy.consumer.config包下创建TopicConfig.java
@Configuration
public class TopicConfig {
public static final String XIAOMI_QUEUE_NAME = "xiaomi_queue_name";
public static final String HUAWEI_QUEUE_NAME = "huawei_queue_name";
public static final String PHONE_QUEUE_NAME = "phone_queue_name";
public static final String TOPIC_EXCHANGE_NAME = "topic_queue_name";
@Bean
Queue xiaomiQueue() {
return new Queue(XIAOMI_QUEUE_NAME, true, false, false);
}
@Bean
Queue huaweiQueue() {
return new Queue(HUAWEI_QUEUE_NAME, true, false, false);
}
@Bean
Queue phoneQueue() {
return new Queue(PHONE_QUEUE_NAME, true, false, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomiQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是以 xiaomi 开头,都将被路由到 xiaomiQueue
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huaweiQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是以 huawei 开头,都将被路由到 huaweiQueue
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phoneQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是包含phone,都将被路由到 phoneQueue
.with("#.phone.#");
}
}
在mq_demo04工程的consumer模块的org.javaboy.consumer.receiver包下创建TopicMsgReceiver.java
@Component
public class TopicMsgReceiver {
@RabbitListener(queues = TopicConfig.HUAWEI_QUEUE_NAME)
public void huawei(String msg) {
System.out.println("huawei = " + msg);
}
@RabbitListener(queues = TopicConfig.XIAOMI_QUEUE_NAME)
public void xiaomi(String msg) {
System.out.println("xiaomi = " + msg);
}
@RabbitListener(queues = TopicConfig.PHONE_QUEUE_NAME)
public void phone(String msg) {
System.out.println("phone = " + msg);
}
}
重启mq_demo04工程的consumer应用,把TopicConfig拷贝到mq_demo04工程的publisher模块的org.javaboy.publisher.config包下面。
在mq_demo04工程的publisher模块的src/test/java目录下的org.javaboy.publisher包的PublisherApplicationTests类中发布消息
@Test
void test02() {
//rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, "xiaomi.news", "小米新闻");
//rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, "huawei.news", "华为新闻");
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, "huawei.phone.news", "华为手机新闻");
}
运行上面的测试方法,然后查看mq_demo04工程的consumer模块的控制台,发现xiaomi消费者和phone消费者都收到了消息
3.3.4. Header
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
演示代码:
在mq_demo04工程的consumer模块的org.javaboy.consumer.config包下创建HeaderConfig.java
/**
* 交换机根据消息的头信息来决定消息去哪一个队列
*/
@Configuration
public class HeaderConfig {
public static final String HEADER_QUEUE_NAME_NAME = "header_queue_name_name";
public static final String HEADER_QUEUE_AGE_NAME = "header_queue_age_name";
public static final String HEADER_EXCHANGE_NAME = "header_exchange_name";
@Bean
Queue headerNameQueue() {
return new Queue(HEADER_QUEUE_NAME_NAME, true, false,false);
}
@Bean
Queue headerAgeQueue() {
return new Queue(HEADER_QUEUE_AGE_NAME, true, false,false);
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE_NAME, true, false);
}
@Bean
Binding nameBinding() {
return BindingBuilder.bind(headerNameQueue())
.to(headersExchange())
//如果将来消息头部中包含 name 属性,就算匹配成功
.where("name").exists();
}
@Bean
Binding ageBinding() {
return BindingBuilder.bind(headerAgeQueue())
.to(headersExchange())
//将来头信息中必须要有 age 属性,并且 age 属性值为 99
.where("age")
.matches(99);
}
}
在mq_demo04工程的consumer模块的org.javaboy.consumer.receiver包下创建HeaderMsgReceiver.java
@Component
public class HeaderMsgReceiver {
@RabbitListener(queues = HeaderConfig.HEADER_QUEUE_NAME_NAME)
public void nameMsgHandler(byte[] msg) {
System.out.println("nameMsgHandler >>> " + new String(msg, 0, msg.length));
}
@RabbitListener(queues = HeaderConfig.HEADER_QUEUE_AGE_NAME)
public void ageMsgHandler(byte[] msg) {
System.out.println("ageMsgHandler >>> " + new String(msg, 0, msg.length));
}
}
重启mq_demo04工程的consumer应用,把HeaderConfig拷贝到mq_demo04工程的publisher模块的org.javaboy.publisher.config包下面。
在mq_demo04工程的publisher模块的src/test/java目录下的org.javaboy.publisher包的PublisherApplicationTests类中发布消息
@Test
void test03() {
Message nameMsg = MessageBuilder.withBody("hello zhangsan".getBytes()).setHeader("name", "aaa").build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, nameMsg);
Message ageMsg = MessageBuilder.withBody("hello lisi 99".getBytes()).setHeader("age", 99).build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, ageMsg);
}
运行上面的测试方法,然后查看mq_demo04工程的consumer模块的控制台
3.3.5. 四种交换机总结
交换机路由策略主要是指消息到达交换机之后,如何从交换机到达消息队列。其它的(包括消息怎么到达交换机,消息怎么从消息队列到消费者)它不管
3.4. Routing
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
如下图:
这个就是按照 routing key 去路由消息,我这里就不再举例子了
3.5. Topics
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 *
和 #
关键字,对指定 RoutingKey
内容,编写时注意格式 xxx.xxx.xxx
去编写。
如下图:
这个我也就不举例啦,前面 3.3.3 小节已经举过例子了,不再赘述。
3.6. RPC
RPC 这种消息收发形式,松哥前两天刚刚写了文章和大家介绍,这里就不多说了,传送门:
3.7. Publisher Confirms
这种发送确认松哥之前有写过相关文章,传送门: