一. RabbitMQ 简介

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。

RabbitMQ要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。

二.RabbitMQ 基本概况:

1.Channel(信道):数据通道,进行交换数据。(网卡-内存/内存-网卡)
2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。
7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
7.Broker:RabbitMQ Server,服务器实体。

三.Exchange类型

fanout类型

如下图,发布订阅模型,添加两个队列,分别各用一个消费者监听,设置一个交换机,类型为广播(fanout),交换机会将收到的消息广播给所有相连的队列:
RabbitMQ 使用 - 图1
RabbitMQ 使用 - 图2
具体思路,将多个队列绑定到同一个FanoutExchange上
交换机配置

  1. //发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
  2. @Configuration
  3. public class PublishSubscribeConfig {
  4. @Bean
  5. public Queue myQueue1() {
  6. Queue queue = new Queue("publish_queue1");
  7. return queue;
  8. }
  9. @Bean
  10. public Queue myQueue2() {
  11. Queue queue = new Queue("publish_queue2");
  12. return queue;
  13. }
  14. @Bean
  15. public FanoutExchange fanoutExchange() {
  16. FanoutExchange fanoutExchange = new FanoutExchange("fanout");
  17. return fanoutExchange;
  18. }
  19. @Bean
  20. public Binding binding1() {
  21. Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
  22. return binding;
  23. }
  24. @Bean
  25. public Binding binding2() {
  26. Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
  27. return binding;
  28. }
  29. }

发送数据

  1. @RestController
  2. public class SendMessageController {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
  5. // 发布订阅者
  6. @GetMapping("/publishSubscribe")
  7. public String publishSubscribe() {
  8. String messageId = String.valueOf(UUID.randomUUID());
  9. String messageData = "message: 发布订阅 ,扇形交换机";
  10. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  11. Map<String, Object> map = new HashMap<>();
  12. map.put("messageId", messageId);
  13. map.put("messageData", messageData);
  14. map.put("createTime", createTime);
  15. rabbitTemplate.convertAndSend("fanout", "", map);
  16. return "ok";
  17. }
  18. }

接收数据

@RabbitListener(queues = "publish_queue2")
public void subscribe2(Map map,Channel channel, Message message) throws IOException {
System.out.println(map.get("name"))
}

direct(路由模式)

这种模式是比较常用的模式

生产者消费者模型

生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会交替收到消息。这一过程虽然不用创建交换机,但会使用默认的交换机,并用默认的直连(default-direct)策略连接队列;
RabbitMQ 使用 - 图3
配置

@Configuration
public class ProducerConsumerConfig {
    @Bean
    public Queue myQueue() {
        Queue queue=new Queue("myqueue");
        return queue;
    }
}

发布数据
rabbitTemplate.convertAndSend(“myqueue”, manMap);
接收数据
@RabbitListener(queues = “myqueue”)
public void displayMail(Map mail, Channel channel, Message message)

direct直连交换机通信模型

包括一个direct交换机
Direct交换器需要消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。
RabbitMQ 使用 - 图4
配置

@Configuration
public class DirectRabbitConfig {

    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue", true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange", true, false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        //将队列 绑定to exchange with (绑定的key)
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

}
``
发送数据
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
接收数据
只需要监听队列就行
@RabbitListener(queues = "TestDirectQueue")
# Topic(通配符模式)
Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(也可以是零个或一个)。

![](https://img2022.cnblogs.com/blog/2671923/202204/2671923-20220412160003160-681395598.png)
配置

@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String man = “topic.man”;
public final static String total = “topic.total”;

@Bean
public Queue firstQueue() {
    return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
    return new Queue(TopicRabbitConfig.total);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("topicExchange");
}


//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
    return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
    return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

发布数据
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
接收数据
@RabbitListener(queues = "topic.total")
# headers(头部类型)
headers exchange主要通过发送的request message中的header进行匹配,其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。
```java
@Configuration
public class HeaderRabbitConfig {

    @Bean
    public Queue queueN1() {

        return new Queue("queueN1");
    }

    @Bean
    public Queue queueN2() {

        return new Queue("queueN2");

    }

    @Bean
    public HeadersExchange headersExchange(){

        return new HeadersExchange("headersExchange");
    }

    @Bean
    public Binding queueN1Binding(){

        Map<String,Object> map = new HashMap<>();
        map.put("queueName","queueN1");
        map.put("bindType","whereAll");
        return BindingBuilder.bind(queueN1()).to(headersExchange()).whereAll(map).match();
    }

    @Bean
    public Binding queueN2Binding(){

        Map<String,Object> map = new HashMap<>();
        map.put("queueName","queueN2");
        map.put("bindType","whereAny");
        return BindingBuilder.bind(queueN2()).to(headersExchange()).whereAny(map).match();
    }

}

发送信息

// 发布header模式
    @GetMapping("/headerAll")
    public String headersAll() {
        String messageData = "message: header 模式 All";
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("queueName","queueN1");
        messageProperties.setHeader("bindType","whereAll");
        Message message = new Message(messageData.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("headersExchange",null,message);
        return "ok";
    }
    @GetMapping("/headerAny")
    public String headersAny() {
        String messageData = "message: header 模式 any";
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("queueName","queueN2");
        messageProperties.setHeader("bindType","whereAll");
        Message message = new Message(messageData.getBytes(), messageProperties);
        rabbitTemplate.send("headersExchange",null,message);
        return "ok";
    }

接收消息
@RabbitListener(queues = “queueN1”)
@RabbitListener(queues = “queueN2”)

rpc

RabbitMQ 使用 - 图5

RPC的处理流程:

当客户端启动时,创建一个匿名的回调队列。
客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
请求被发送到rpc_queue队列中。
RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

实现方式

我们主要通过更改RabbitTemplate来实现,具体来说取消使用模板自动的返回队列,设置返回队列,设置使用UserCorrelationId.

 @Autowired
    ConnectionFactory connectionFactory;
    @Autowired
    // rpc 需要使用的template
    public RabbitTemplate directRabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //这一步非常关键
        template.setUseTemporaryReplyQueues(false);
        // 设置消息返回地址
        template.setReplyAddress("amq.rabbitmq.reply-to");
        // template.expectedQueueNames();
        // 设置唯一对应id,发送一条信息必须携带身份id,然后根据身份在返回地址接收数据
        template.setUserCorrelationId(true);
        //设置请求超时时间为10s
        template.setReplyTimeout(10000);
        return template;
    }

发送信息
主要是使用sendAndReceive接收返回值

// 测试rpc
    @GetMapping("/rpcSend")
    public String sendRpcMessage(){
        //设置消息唯一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        String start = "10";
        System.out.println(" [x] Requesting fib(" + start + ")");
        //直接发送message对象
        MessageProperties messageProperties = new MessageProperties();
        //过期时间10秒,也是为了减少消息挤压的可能
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(start.getBytes(), messageProperties);
        Message response =  directRabbitTemplate()
                .sendAndReceive("tut.rpc", "rpc",message,correlationId);
        if (response != null) {
            System.out.println(" [.] Got '" + new String(response.getBody()) + "'");
        }else{
            System.out.println("请求超时");
        }
        return "调用rpc:"+response;
    }

接收信息

设置return值。
@RabbitListener(queues = “tut.rpc.requests”)
public String process(String in, Channel channel, Message message) throws IOException {
}

rabbitmq 优化

发布确认

我们可以设置回调函数,可以自定义失败的处理函数,例如找不到exchange,找不到queque

//重写RabbitTemplate 重新设置callback
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("ReturnCallback:     "+"消息:"+returnedMessage.getMessage());
                System.out.println("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                System.out.println("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                System.out.println("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                System.out.println("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());
            }
        });
        return rabbitTemplate;
    }
}

接收确认和限流

将acknowledge-mode设定未manual
并且设定消费数量。
在yml添加设定

    listener:
      direct:
        acknowledge-mode: manual
        consumers-per-queue: 1

接收方需要手动确认
hannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
具体内容可以看gitee
https://gitee.com/jefferyeven/rabbitmp_demo