发布/订阅模式 和 一对一消费模式的区别,就是同一个消息会发给多个消费者,机制其实就是发布到fanout类型的交换机,让它接收到生产者发布的消息后,路由推送到多个队列里。
在控制台先创建一个fanout类型的交换机:
为了演示方便,【Durability】设置为持久化,以后的实际使用,可以根据情况来设定,如果非必须那么可以不设置成持久化的。
======== 和之前一对一消费模式区分开,可以考虑将原有代码注释以便混淆。 ========
在common模块下创建一个新的接口Constant2,用于存储本次演示需要的常量。
package com.zhaoxy.study.common;/*** rabbitmq,发布订阅模式*/public interface Constant2 {String exchangeName = "study_fanout_exchange";String queueName1 = "Constant2-Queue_1";String queueName2 = "Constant2-Queue_2";}
exchangeName属性是和刚才控制台创建的交换机名完全相同的。
生产者这里,使用的方法还是一样的,只不过是声明了一下交换机:
package com.zhaoxy.mqprod;import com.zhaoxy.study.common.Constant2;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.util.UUID;@Componentpublic class MqProdService {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 消息生产者,用途就是每次项目启动后都生成一次消息。** 一遍两遍三四遍,五遍六遍七八遍,* 九遍十遍十一遍,学习完毕全不见。*/@PostConstructpublic void prodMessage(){// 模式2,给指定交换机发送一个uuid,然后推送到多个队列中for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constant2.exchangeName, "",UUID.randomUUID());}}}
然后我们在消费者这里需要binding配置,演示设置了一个交换机推送到两个队列:
package com.zhaoxy.mqconsumer;import com.zhaoxy.study.common.Constant2;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MqConsumerListener {@RabbitListener(// bindings 支持多个配置bindings = {// 队列绑定,绑定交换机【Constant2.exchangeName】数据推送到这个队列的【Constant2.queueName1】@QueueBinding(value = @Queue(value = Constant2.queueName1, durable = "true"),exchange = @Exchange(value = Constant2.exchangeName,type = ExchangeTypes.FANOUT))})public void listener1(String message) throws InterruptedException {System.out.println("接收到了【"+Constant2.queueName1+"】队列的消息:" + message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = Constant2.queueName2, durable = "true"),exchange = @Exchange(value = Constant2.exchangeName,type = ExchangeTypes.FANOUT))})public void listener2(String message) throws InterruptedException {System.out.println("接收到了【"+Constant2.queueName2+"】队列的消息:" + message);}}
然后先启动consumer消费者服务,然后可以在控制台上看到新增的队列:
然后启动生产者服务,之后查看consumer消费者的控制台,对于数据的消费情况,看看是否存在多个队列接收到相同消息的情况。
很明显,已经完成了,那么这种情况适用于我的一个消息,作为不确定数量消费者触发业务逻辑的条件,例如我发布了一个订单消息,那么相应的就可以有物流服务,积分服务,会员服务,后续还可以加上优惠券服务等等。
而且可以保证的是,当我服务处于集群部署的情况下,那么在一个队列里,也只会有一个节点能消费到这条数据。
在cloud项目内新建一个consumer2模块,内容和consumer1完全相同,然后启动consumer1和2,再启动生产服务推送些数据,然后观察两个服务的控制台输出情况

