发布/订阅模式 和 一对一消费模式的区别,就是同一个消息会发给多个消费者,机制其实就是发布到fanout类型的交换机,让它接收到生产者发布的消息后,路由推送到多个队列里。

    在控制台先创建一个fanout类型的交换机:
    image.png
    为了演示方便,【Durability】设置为持久化,以后的实际使用,可以根据情况来设定,如果非必须那么可以不设置成持久化的。

    ======== 和之前一对一消费模式区分开,可以考虑将原有代码注释以便混淆。 ========
    在common模块下创建一个新的接口Constant2,用于存储本次演示需要的常量。

    1. package com.zhaoxy.study.common;
    2. /**
    3. * rabbitmq,发布订阅模式
    4. */
    5. public interface Constant2 {
    6. String exchangeName = "study_fanout_exchange";
    7. String queueName1 = "Constant2-Queue_1";
    8. String queueName2 = "Constant2-Queue_2";
    9. }

    exchangeName属性是和刚才控制台创建的交换机名完全相同的。

    生产者这里,使用的方法还是一样的,只不过是声明了一下交换机:

    1. package com.zhaoxy.mqprod;
    2. import com.zhaoxy.study.common.Constant2;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.stereotype.Component;
    5. import javax.annotation.PostConstruct;
    6. import javax.annotation.Resource;
    7. import java.util.UUID;
    8. @Component
    9. public class MqProdService {
    10. @Resource
    11. private RabbitTemplate rabbitTemplate;
    12. /**
    13. * 消息生产者,用途就是每次项目启动后都生成一次消息。
    14. *
    15. * 一遍两遍三四遍,五遍六遍七八遍,
    16. * 九遍十遍十一遍,学习完毕全不见。
    17. */
    18. @PostConstruct
    19. public void prodMessage(){
    20. // 模式2,给指定交换机发送一个uuid,然后推送到多个队列中
    21. for (int i = 0; i < 10; i++) {
    22. rabbitTemplate.convertAndSend(Constant2.exchangeName, "",
    23. UUID.randomUUID());
    24. }
    25. }
    26. }

    然后我们在消费者这里需要binding配置,演示设置了一个交换机推送到两个队列:

    1. package com.zhaoxy.mqconsumer;
    2. import com.zhaoxy.study.common.Constant2;
    3. import org.springframework.amqp.core.ExchangeTypes;
    4. import org.springframework.amqp.rabbit.annotation.Exchange;
    5. import org.springframework.amqp.rabbit.annotation.Queue;
    6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.stereotype.Component;
    9. @Component
    10. public class MqConsumerListener {
    11. @RabbitListener(
    12. // bindings 支持多个配置
    13. bindings = {
    14. // 队列绑定,绑定交换机【Constant2.exchangeName】数据推送到这个队列的【Constant2.queueName1】
    15. @QueueBinding(value = @Queue(value = Constant2.queueName1, durable = "true"),
    16. exchange = @Exchange(value = Constant2.exchangeName,type = ExchangeTypes.FANOUT))
    17. }
    18. )
    19. public void listener1(String message) throws InterruptedException {
    20. System.out.println("接收到了【"+Constant2.queueName1+"】队列的消息:" + message);
    21. }
    22. @RabbitListener(bindings = {
    23. @QueueBinding(value = @Queue(value = Constant2.queueName2, durable = "true"),
    24. exchange = @Exchange(value = Constant2.exchangeName,type = ExchangeTypes.FANOUT))})
    25. public void listener2(String message) throws InterruptedException {
    26. System.out.println("接收到了【"+Constant2.queueName2+"】队列的消息:" + message);
    27. }
    28. }

    然后先启动consumer消费者服务,然后可以在控制台上看到新增的队列:
    image.png
    然后启动生产者服务,之后查看consumer消费者的控制台,对于数据的消费情况,看看是否存在多个队列接收到相同消息的情况。
    image.png

    很明显,已经完成了,那么这种情况适用于我的一个消息,作为不确定数量消费者触发业务逻辑的条件,例如我发布了一个订单消息,那么相应的就可以有物流服务,积分服务,会员服务,后续还可以加上优惠券服务等等。

    而且可以保证的是,当我服务处于集群部署的情况下,那么在一个队列里,也只会有一个节点能消费到这条数据。
    在cloud项目内新建一个consumer2模块,内容和consumer1完全相同,然后启动consumer1和2,再启动生产服务推送些数据,然后观察两个服务的控制台输出情况
    image.png
    image.png