AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议),是进程之间传递异步消息的网络协议。

AMQP工作工程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
image.png

RabbiMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

  • 应用耦合

image.png ====》image.png

  • 异步消息

image.png ===》image.png

  • 流量削峰

image.png ===》image.png

RabbitMQ使用场景:

  • 排队算法 : 使用消息队列特性
  • 秒杀活动 : 使用消息队列特性
  • 消息分发 : 使用消息异步特性
  • 异步处理 : 使用消息异步特性
  • 数据同步 : 使用消息异步特性
  • 处理耗时任务 : 使用消息异步特性
  • 流量销峰

    RabbitMQ架构

    image.png
1.Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/
2.Connection
链接。指rabbitMQ服务器和服务建立的TCP链接。
3.Channel
信道。1,Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。2,TCP一旦打开,就会创建AMQP信道。3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
4.Message
消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
5.Publisher
消息的生产者。也是一个向交换器发布消息的客户端应用程序。
6.Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
7.Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
三种常用的交换器类型1. direct(发布与订阅 完全匹配)2. fanout(广播)3. topic(主题,规则匹配)
8.Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
9.Routing-key
路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的名称,路由键是key,队列是value)队列通过路由键绑定到交换器。消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞。
10.Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
11.Borker
表示消息队列服务器实体。
12.交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。 路由键可以理解为匹配的规则。
13.RabbitMQ为什么需要信道?为什么不是TCP直接通信?
1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。2. 如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

RabbitMQ通信方式

RabbitMQ 客户端操作

导入依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.14.2</version>
  5. </dependency>

创建RabbitMQ工具类

public class RabbitMQUtil {

    /**
    * rabbitMQ 服务器ip地址
    */
    private static final String RABBITMQ_HOST = "192.168.126.128";
    /**
    * rabbitMQ 服务器端口
    */
    private static final int RABBITMQ_PORT = 5672;
    /**
    * rabbitMQ 虚拟服务器uri
    */
    private static final String RABBIT_VIRTUAL_HOST = "/";
    /**
    * rabbitMQ 用户名
    */
    private static final String RABBITMQ_USERNAME = "admin";
    /**
    * rabbitMQ 用户密码
    */
    private static final String RABBITMQ_PASSWORD = "admin";

    /**
    * 获取一个connection
    * @return Connection 对象
    * @throws IOException  IO异常
    * @throws TimeoutException 超时异常
    */

    public static Connection getConnection() throws IOException, TimeoutException {

        // 创建ConnectionFactory 工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置rabbitMQ连接信息
        connectionFactory.setHost(RABBITMQ_HOST);
        connectionFactory.setPort(RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RABBIT_VIRTUAL_HOST);
        connectionFactory.setUsername(RABBITMQ_USERNAME);
        connectionFactory.setPassword(RABBITMQ_PASSWORD);
        // 创建一个rabbitMQ 连接Connection 对象返回
        return connectionFactory.newConnection();
    }
}

HelloWord通信

采用默认的交换机Exchange( direct )
image.png

public class Publisher {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        try {
            // 1. 获取连接 Connection
            Connection connection = RabbitMQUtil.getConnection();

            // 2. 获取信道 Channel
            Channel channel = connection.createChannel();

            //3. 创建队列 Queue
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //4. 发布消息
            String message = "hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息发送成功");

            //5 关闭资源
            channel.close();
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
public class Consumer {

    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) {
        try {
            // 1 创建连接 Connection
            Connection connection = RabbitMQUtil.getConnection();
            // 2 构建信道 Channel
            Channel channel = connection.createChannel();
            // 3 构建队列(与Publisher构建的队列一样)。 在这里也声明了队列。因为可能会在Publisher之前启动Consumer,确保在消费消息之前队列是存在的
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4 消费消息
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
                }
            });


            //5 关闭资源
            channel.close();
            connection.close();
        } catch (IOException |
                TimeoutException e) {
            e.printStackTrace();
        }
    }
}

WorkQueue

image.png

  • 一个队列中的一个消息,只会被一消费者消费
  • 默认情况下,RabbitMQ队列会将消息以轮询的方式交给不同的消费者消费
  • 消费者拿到消息后,需要给RabbitMQ一个ack,这样RabbitMQ就认为消费者拿到消息了

    public class Publisher {
    
      private static final String QUEUE_NAME = "work";
    
      public static void main(String[] args) {
          try {
              //1 获取连接
              Connection connection = RabbitMQUtil.getConnection();
              //2 构建信道Channel
              Channel channel = connection.createChannel();
              //3 构建队列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              //4 发送消息
              AMQP.BasicProperties prop = new AMQP.BasicProperties();
              for (int i = 1; i <= 10; i++) {
                  String message = "work queue—"+i;
                  channel.basicPublish("",QUEUE_NAME,prop,message.getBytes());
              }
              System.out.println("发送成功");
              //5 关闭资源
              channel.close();
              connection.close();
          } catch (IOException | TimeoutException e) {
              e.printStackTrace();
          }
      }
    }
    

    ```java public class Consumer {

    private static final String QUEUE_NAME = “work”;

    public static void main(String[] args) {

      try {
          //1 获取连接
          Connection connection = RabbitMQUtil.getConnection();
          //2 构建信道Channel
          Channel channel = connection.createChannel();
          //3 构建队列
          channel.queueDeclare(QUEUE_NAME,false,false,false,null);
          //4 消费消息
          //4.1 设置回调函数
          // 设置信道流量。服务器将传递的最大消息数,如果无限制,则为0
          channel.basicQos(1);
          DefaultConsumer callback = new DefaultConsumer(channel){
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  System.out.println("1号—获取到消息:"+new String(body, StandardCharsets.UTF_8));
    
                  try {
                      Thread.sleep(100);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  // 设置手动Ack
                  channel.basicAck(envelope.getDeliveryTag(),false);
              }
          };
          // 消费消息,关闭自动Ack
          channel.basicConsume(QUEUE_NAME,false,callback);
      } catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
    

    } } //—————————————————————————————————————-

public class Consumer2 {

private static final String QUEUE_NAME = "work";

public static void main(String[] args) {

    try {
        //1 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        //2 构建信道Channel
        Channel channel = connection.createChannel();
        //3 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4 消费消息
        //4.1 设置回调函数
        // 设置信道流量。服务器将传递的最大消息数,如果无限制,则为0
        channel.basicQos(1);
        DefaultConsumer callback1 = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1号—获取到消息:"+new String(body, StandardCharsets.UTF_8));

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 设置手动Ack

                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 消费消息,关闭自动Ack
        channel.basicConsume(QUEUE_NAME,false,callback1);
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

}

<a name="BjOnt"></a>
#### Publish/Subscribe
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1636836/1648879067049-7a130783-f2c1-4ec2-806e-6745daa51020.png#clientId=u9b5a057a-f5ab-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=244&id=u0688f073&margin=%5Bobject%20Object%5D&name=image.png&originHeight=244&originWidth=910&originalType=binary&ratio=1&rotation=0&showTitle=false&size=18195&status=done&style=stroke&taskId=u032f668c-263b-4ca1-b76c-d948e2e8f3c&title=&width=910)

- 生产者(Publisher),自行构建FANOUT类型的Exchange,并与队列进行绑定。
```java
public class Publisher {
    private static final String EXCHANGE_NAME = "pubsub";
    private static final String QUEUE_NAME1 = "publish1";
    private static final String QUEUE_NAME2 = "publish2";

    public static void main(String[] args) {
        try {
            //1 构建连接Connection
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道Channel
            Channel channel = connection.createChannel();
            //3 构建交换机Exchange,制定交换机类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //4 构建队列
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //5 绑定交换机和队列,绑定方式为直接绑定(根据queueName和exchangeName绑定,与routingKey无关
            channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "publish1");
            channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "publish2");
            //6 发送消息
            String message = "Hello publish/subscribe";
            channel.basicPublish(EXCHANGE_NAME, "publish1", null, message.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "publish2", null, message.getBytes());
            //7 关闭资源
            channel.close();
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
public class Consumer1 {

    private static final String QUEUE_NAME1 = "publish1";


    public static void main(String[] args) {
        try {
            //1 构建连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道
            Channel channel = connection.createChannel();
            //3 构建队列,保证队列存在
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            //4 获取消息
            DefaultConsumer callback = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:"+new String(body,StandardCharsets.UTF_8));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE_NAME1,false,callback);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

//----------------------------------------------------------------------------------

public class Consumer2 {
    private static final String QUEUE_NAME2 = "publish2";

    public static void main(String[] args) {
        try {
            //1 构建连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道
            Channel channel = connection.createChannel();
            //3 构建队列,保证队列存在
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //4 获取消息
            DefaultConsumer callback = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:"+new String(body, StandardCharsets.UTF_8));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE_NAME2,false,callback);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Routing

image.png

  • 生产者 Publisher:在绑定Exchange和Queue时,需要指定routingKey,同时在发送消息时, 也需要指定routingKey,只有在routingKey一致时,消息才会路由到指定的Queue。交换机类型为DIRECT。

    public class Publisher {
      private static final String EXCHANGE_NAME = "routingKey";
      private static final String QUEUE_NAME1 = "routingKey_queue1";
      private static final String QUEUE_NAME2 = "routingKey_queue2";
    
      public static void main(String[] args) {
          try {
              //1 获取连接
              Connection connection = RabbitMQUtil.getConnection();
              //2 构建信道 Channel
              Channel channel = connection.createChannel();
              //3 构建交换机,指定类型为DIRECT
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
              //4 构建队列
              channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
              channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
              //5 绑定Exchange和Queue
              channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red");
              channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");
              channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white");
              //6 发送信息
              channel.basicPublish(EXCHANGE_NAME, "red", null, "这是Red".getBytes());
              channel.basicPublish(EXCHANGE_NAME, "blue", null, "这是blue".getBytes());
              channel.basicPublish(EXCHANGE_NAME, "black", null, "这是black".getBytes());
              channel.basicPublish(EXCHANGE_NAME, "white", null, "这是white".getBytes());
    
              //7 关闭资源
              channel.close();
              connection.close();
          } catch (IOException | TimeoutException e) {
              e.printStackTrace();
          }
      }
    }
    

    ```java public class Consumer1 {

    private static final String QUEUE_NAME1 = “routingKey_queue1”;

    public static void main(String[] args) {

      try {
          //1 获取连接
          Connection connection = RabbitMQUtil.getConnection();
          //2 构建信道
          Channel channel = connection.createChannel();
          //3 构建队列
          channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
          //4 获取消息
          DefaultConsumer callback = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  System.out.println("获取到消息" + new String(body, StandardCharsets.UTF_8));
                  channel.basicAck(envelope.getDeliveryTag(), false);
              }
          };
          channel.basicConsume(QUEUE_NAME1, false, callback);
      } catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
    

    } } //—————————————————————————————————————————- public class Consumer2 { private static final String QUEUE_NAME2 = “routingKey_queue2”;

    public static void main(String[] args) {

      try {
          //1 获取连接
          Connection connection = RabbitMQUtil.getConnection();
          //2 构建信道
          Channel channel = connection.createChannel();
          //3 构建队列
          channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
          //4 获取消息
          DefaultConsumer callback = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  System.out.println("获取到消息" + new String(body, StandardCharsets.UTF_8));
                  channel.basicAck(envelope.getDeliveryTag(), false);
              }
          };
          channel.basicConsume(QUEUE_NAME2, false, callback);
      } catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
    

    }

}

<a name="pBZ2Y"></a>
#### Topics
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1636836/1648883004049-79341ded-b422-4eab-8a96-d0df4bf83b32.png#clientId=u9b5a057a-f5ab-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=261&id=u5c96de6a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=240&originWidth=688&originalType=binary&ratio=1&rotation=0&showTitle=false&size=37776&status=done&style=stroke&taskId=u75a6016d-c61d-46a9-acb2-f028635cb66&title=&width=749)<br />Publisher:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式。交换机类型为TOPIC类型。<br />注意:topic类型交换机,在与queue绑定时,routingkey格式为:单词1.单词2.单词3……,其中*表示占位符,#表示通配符
```java
public class Publisher {
    private static final String EXCHANGE_NAME = "topic";
    private static final String QUEUE_NAME1 = "topic-queue1";
    private static final String QUEUE_NAME2 = "topic-queue2";

    public static void main(String[] args) {
        try {
            //1 获取连接Connection
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道Channel
            Channel channel = connection.createChannel();
            //3 构建交换机,指定类型为topic
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //4 构建队列
            channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
            //5 绑定交换机和队列
            channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.red.*");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"big.*.*");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.red.#");
            //6 发送消息
            channel.basicPublish(EXCHANGE_NAME,"big.red.dog",null,"big.red.dog".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"small.red.cat",null,"small.red.cat".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"big.blue.dog",null,"big.blue.dog".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"big.red.dog.pig",null,"big.red.dog".getBytes());
            //7 关闭资源
            channel.close();;
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
public class Consumer1 {
    private static final String QUEUE_NAME1 = "topic-queue1";

    public static void main(String[] args) {
        try {
            //1 获取连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道 Channel
            Channel channel = connection.createChannel();
            //3 构建队列
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            //4 接收消费信息
            DefaultConsumer callback = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME1, false, callback);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
//------------------------------------------------------------------------------------
public class Consumer2 {
    private static final String QUEUE_NAME2 = "topic-queue2";

    public static void main(String[] args) {
        try {
            //1 获取连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 构建信道 Channel
            Channel channel = connection.createChannel();
            //3 构建队列
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //4 接收消费信息
            DefaultConsumer callback = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME2, false, callback);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

RPC通信类型(了解)

因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作需要让Client发送消息时,携带两个属性:

  • replyTo:告知Server将相应信息放到哪个队列
  • correlationId:告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息

image.png

public class Client {

    private static final String QUEUE_PUBLISH = "rpc-publish";
    private static final String QUEUE_CONSUMER = "rpc-consumer";

    public static void main(String[] args) {
        try {
            //1 获取连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 获取信道
            Channel channel = connection.createChannel();
            //3 构建队列
            channel.queueDeclare(QUEUE_PUBLISH, false, false, false, null);
            channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null);
            //4 发布消息
            String message = "hello RPC";
            String correlationId = UUID.randomUUID().toString();
            AMQP.BasicProperties prop = new AMQP.BasicProperties()
                    .builder()
                    .replyTo(QUEUE_CONSUMER)
                    .correlationId(correlationId)
                    .build();
            channel.basicPublish("", QUEUE_PUBLISH, prop, message.getBytes());
            System.out.println("消息发送成功");
            channel.basicConsume(QUEUE_CONSUMER, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String correlationId1 = properties.getCorrelationId();
                    if (correlationId1 != null && correlationId1.equalsIgnoreCase(correlationId)) {
                        System.out.println("获取到服务:" + new String(body, StandardCharsets.UTF_8));
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
public class Server {
    private static final String QUEUE_PUBLISH = "rpc-publish";
    private static final String QUEUE_CONSUMER = "rpc-consumer";
    public static void main(String[] args) {
        try {
            //1 获取连接
            Connection connection = RabbitMQUtil.getConnection();
            //2 获取信道
            Channel channel = connection.createChannel();
            //3 构建队列
            channel.queueDeclare(QUEUE_PUBLISH, false, false, false, null);
            channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null);
            //4 监听消息
            DefaultConsumer callback = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("获取到消息:"+new String(body,StandardCharsets.UTF_8));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    String response = "这是服务端答复的消息";
                    String correlationId = properties.getCorrelationId();
                    String responseQueue = properties.getReplyTo();
                    AMQP.BasicProperties prop = new AMQP.BasicProperties()
                            .builder().correlationId(correlationId).build();
                    channel.basicPublish("",responseQueue,prop,response.getBytes());
                }
            };

            channel.basicConsume(QUEUE_PUBLISH, false, callback);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

RabbitMQ交换机类型

direct类型

  • direct交换器是RabbitMQ默认交换器。默认会进行公平调度,所有接受者依次从消息队列中获取值(一个队列多个消费者,采用类似轮询的方式)。
  • Publisher给哪个队列发消息,就一定是给哪个队列发送消息(routingkey,绑定交换机和队列)。对交换器绑定的其他队列没有任何影响。

    fanout类型

    扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换器上的队列。且每个队列消息中第一个Consumer能收到消息。

    topic类型

    允许在路由键(RoutingKey)中出现匹配规则。路由键的写法和包写法相同。com.msb.xxxx.xxx格式。
    在绑定时可以带有下面特殊符号,中间可以出现:
    * : 代表一个单词(两个.之间内容)
    # : 0个或多个字符
    接收方依然是公平调度,同一个队列中内容轮换获取值。

    SpringBoot使用RabbitMQ

    spring:
    rabbitmq:
      host: 192.168.126.128
      port: 5672
      username: admin
      password: admin
      virtual-host: /
    

    ```java @Configuration public class RabbitMQConfig {

    private static final String EXCHANGE_NAME = “springboot-exchange”; private static final String QUEUE_NAME = “spingboot-queue”; private static final String ROUTING_KEY = “*.black.#”;

    //获取交换机

    @Bean public Exchange bootExchange() {

      return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    

    }

    // 获取队列

    @Bean public Queue bootQueue() {

      return QueueBuilder.nonDurable(QUEUE_NAME).build();
    

    }

    //绑定交换机和队列

    @Bean public Binding getBinding(Exchange exchange, Queue queue) {

      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
    

    } }

```java
@Component
public class Publisher {

    private static final String EXCHANGE_NAME = "springboot-exchange";

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public Publisher(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void publish() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "big.black.dogAndCat", "messageWithProperties", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("发送" + new String(message.getBody(), StandardCharsets.UTF_8));
                message.getMessageProperties().setCorrelationId("test");
                return message;
            }
        });
        System.out.println("消息发送成功");
    }
}
@Component
public class Consumer {
    private static final String QUEUE_NAME = "spingboot-queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("获取到消息:" + msg);
        System.out.println("获取到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

RabbitMQ保证消息的可靠性

保证消息送到Exchange —— Confirm机制

image.png
有三种策略:

  • 单独发布消息:显著降低了消息发布速度,因为对消息的确认会阻碍所有后续消息的发布
  • 批量发布消息:必单独发布消息快,但还是降低了消息的发布速度
  • 异步回调

可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果

public static void main(String[] args) {
    try {
        //1 构建连接Connection
        Connection connection = RabbitMQUtil.getConnection();
        //2 构建信道Channel
        Channel channel = connection.createChannel();
        //3 构建交换机Exchange,制定交换机类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //4 构建队列
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
        //5 绑定交换机和队列,绑定方式为直接绑定(根据queueName和exchangeName绑定,与routingKey无关
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "publish1");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "publish2");

        // 开启confirms
        channel.confirmSelect();
        // 设置异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 消息发送到Exchange了,进行处理
                System.out.println("消息发送成功");
            }
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                // 消息发送Exchange失败,进行处理;比如重新发送等
                System.out.println("消息发送失败");
            }
        });


        //6 发送消息
        String message = "Hello publish/subscribe";
        channel.basicPublish(EXCHANGE_NAME, "publish1", null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, "publish2", null, message.getBytes());
        //7 关闭资源
        channel.close();
        connection.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

保证消息可靠路由到Queue —— Return机制

为了保证Exchange上的消息一定可以送达到Queue,设置Return机制

public static void main(String[] args) {
    try {
        //1 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        //2 构建信道 Channel
        Channel channel = connection.createChannel();
        //3 构建交换机,指定类型为DIRECT
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4 构建队列
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
        //5 绑定Exchange和Queue
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white");

        // 设置Return回调,没有路由到指定Queue时会执行回调函数
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息没有路由到Queue");
            }
        });
        //6 发送信息,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
        channel.basicPublish(EXCHANGE_NAME, "red", true,null, "这是Red".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "blue", true,null, "这是blue".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "black", true,null, "这是black".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "white", true,null, "这是white".getBytes());

        //7 关闭资源
        channel.close();
        connection.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

保证Queue消息持久化——DeliveryMode设置消息持久化

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。注意:先设置queue持久化,在设置消息持久化

public static void main(String[] args) {
    try {
        //1 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        //2 构建信道 Channel
        Channel channel = connection.createChannel();
        //3 构建交换机,指定类型为DIRECT
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4 构建队列, durable设置true,设置队列持久化
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        //5 绑定Exchange和Queue
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white");
        // 设置Return回调
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息没有路由到Queue");
            }
        });
        // deliveryMode(2),设置消息持久化
        AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().deliveryMode(2).build();

        //6 发送信息,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
        channel.basicPublish(EXCHANGE_NAME, "red", true, prop, "这是Red".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "blue", true, prop, "这是blue".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "black", true, prop, "这是black".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "white", true, prop, "这是white".getBytes());

        //7 关闭资源
        channel.close();
        connection.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

保证消费者可以正常消费消息

设置手动ack

Springboot设置消息可靠性

spring:
  rabbitmq:
    host: 192.168.126.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-confirm-type: CORRELATED  #设置开启confirm回调
    publisher-returns: true # 开启return回调
    listener:
      simple:
        acknowledge-mode: MANUAL #开启手动ack
        prefetch: 1
@Component
public class Publisher {

    private static final String EXCHANGE_NAME = "springboot-exchange";

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public Publisher(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void publish() {
        // 设置Confirm回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("消息成功发送到交换机");
                }else {
                    System.out.println("消息没有发送到交换机");
                }
            }
        });
        // 设置Return回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                String msg = new String(returned.getMessage().getBody());
                System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
            }
        });
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "big.black.dogAndCat", "messageWithProperties", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("发送" + new String(message.getBody(), StandardCharsets.UTF_8));
                // 设置消息持久化
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setCorrelationId("test");
                return message;
            }
        });
        System.out.println("消息发送成功");
    }
}

RabbitMQ死信队列与延迟交换机

image.png
产生死信的情况:

  1. 消息被消费者拒绝,requeue设置为false,该消息变为死信
  2. 给消息设置生存时间
    1. 发送消息是设置消息的生存时间,如果该消息的生存时间到了还没有被消费,该消息变为死信
    2. 设置某个队列中所有消息的生存时间,如果生存时间到了还没被消费,消息变为死信
  3. 队列已达到消息的最大长度,在路由过来的消息变为死信

死信队列的应用:

  • 基于死信队列在消息已满的情况下,消息也不会丢失;
  • 实现延迟消费的效果。比如下订单时,有15Min的付款时间

    实现死信队列

    基于消费者进行reject或者nack实现死信效果
    @Configuration
    public class DeadLetterConfig {
      private static final String NORMAL_EXCHANGE = "normal-exchange";
      private static final String NORMAL_QUEUE = "normal-queue";
      private static final String NORMAL_ROUTING_KEY = "normal.#";
    
      private static final String DEAD_EXCHANGE = "dead-exchange";
      private static final String DEAD_QUEUE = "dead-queue";
      private static final String DEAD_ROUTING_KEY = "dead.#";
    
      @Bean
      public Exchange normalExchange() {
          return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
      }
    
      @Bean
      public Queue normalQueue() {
          return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.message").build();
      }
    
      @Bean
      public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
          return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
      }
    
      @Bean
      public Exchange deadExchange() {
          return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
      }
    
      @Bean
      public Queue deadQueue() {
          return QueueBuilder.durable(DEAD_QUEUE).build();
      }
    
      @Bean
      public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
          return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
      }
    }
    
    @Component
    public class Publisher2 {
      private static final String NORMAL_EXCHANGE = "normal-exchange";
      private static final String DEAD_EXCHANGE = "dead-exchange";
    
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      public void publish() {
          // 设置Confirm回调
          rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
              @Override
              public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  if (!ack) {
                      System.out.println(cause);
                      ReturnedMessage returned = correlationData.getReturned();
                      // 发送到Exchange失败,重新再发一次
                      rabbitTemplate.convertAndSend(returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), message1 -> {
                          // 开启消息持久化
                          message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                          return message1;
                      });
                  }
              }
          });
          // 设置Return回调
          rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
              @Override
              public void returnedMessage(ReturnedMessage returned) {
                  // 路由到queue失败,重新再发一次
                  rabbitTemplate.convertAndSend(returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), message1 -> {
                      // 开启消息持久化
                      message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                      return message1;
                  });
              }
          });
    
          String message = "这是消息";
          rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, "normal.message", message, message1 -> {
              // 开启消息持久化
              message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
              return message1;
          });
      }
    }
    
    @Component
    public class DeadListener {
    
      private static final String NORMAL_QUEUE = "normal-queue";
    
      @RabbitListener(queues = NORMAL_QUEUE)
      public void consume(Message message, Channel channel) {
          System.out.println("normal队列中的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          try {
              // consumer reject
              channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
              // consumer nack
             // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
          } catch (IOException e) {
              e.printStackTrace();
          }
      }
    }
    

    给消息设置生存时间
    public void publishWithExpireTime() {
          String message = "这是有生存时间的消息";
          rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, "normal.message", message, message1 -> {
              // 开启消息持久化
              message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
              // 设置setExpiration(毫秒)
              message1.getMessageProperties().setExpiration("5000");
              return message1;
          });
      }
    

    给队列中的消息设置生存时间
    @Bean
    public Queue normalQueue(){
      return QueueBuilder.durable(NORMAL_QUEUE)
              .deadLetterExchange(DEAD_EXCHANGE)
              .deadLetterRoutingKey("dead.abc")
              .ttl(10000)// 设置ttl(毫秒)
              .build();
    }
    

    设置Queue中的消息最大长度
    @Bean
    public Queue normalQueue(){
      return QueueBuilder.durable(NORMAL_QUEUE)
              .deadLetterExchange(DEAD_EXCHANGE)
              .deadLetterRoutingKey("dead.abc")
              .maxLength(1)// 设置maxLength
              .build();
    }
    

    延迟交换机

    死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

    @Configuration
    public class DelayedConfig {
    
      public static final String DELAYED_EXCHANGE = "delayed-exchange";
      public static final String DELAYED_QUEUE = "delayed-queue";
      public static final String DELAYED_ROUTING_KEY = "delayed.#";
    
      @Bean
      public Exchange delayedExchange(){
          Map<String, Object> arguments = new HashMap<>();
          // 设置延迟交换机为Topic路由规则
          arguments.put("x-delayed-type","topic");
          Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
          return exchange;
      }
    
      @Bean
      public Queue delayedQueue(){
          return QueueBuilder.durable(DELAYED_QUEUE).build();
      }
    
      @Bean
      public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
          return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
      }
    }
    
    public void publish(){
    
      rabbitTemplate.convertAndSend(DelayExchangeConfig.DELAYED_EXCHANGE, "delayed.message", "这是消息", new MessagePostProcessor() {
          @Override
          public Message postProcessMessage(Message message) throws AmqpException {
              // 设置延迟时间
              message.getMessageProperties().setDelay(3000);
              return message;
          }
      });
    }
    

    RabbiMQ集群

    image.png