1. MQ 介绍

1. 什么是MQ

MQ(message quene) 消息队列, 通过电信的生产者和消费者模型, 生产者不断向消息队列中生产消息, 消费者不断从队列总获取消息. 因为消息的生产和消费都是异步的. 而且只关心消息的发送和接受. 没有业务逻辑的侵入, 轻松的实现系统间解耦. 别名为消息中间件, 通过利用高效可靠的消息传递机制进行平台无关的数据交流, 并给予数据通信来进行分布式系统的集成

2. MQ 有哪些

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitNO,炙手可热的Kafka,阿里巴巴自主开发 RocketNQ等。

3. 不同MQ特点

  1. # 1.ActiveMQ
  2. ActiveN、是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JNS规范的的消息中间件。丰富的4PI,多种集群架构模式让ActivelK在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
  3. # 2.Kafka
  4. KafkaLinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
  5. # 3.RocketMQ
  6. RocketNQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketNQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
  7. # 4.RabbitMQ
  8. RabbitNQ是使用Erlang语言开发的开源消息队列系统,基于ANQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMOP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

Rabbit MQ 比Kafka可靠,kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2. RabbitMQ 介绍

基于AMOP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

RabbitMQ 安装

  1. # 1. 先安装 erlang 语言依赖包
  2. rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
  3. # 2. 安装 RabbitMQ 内存管理以来
  4. rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
  5. # 3. Rabbit MQ (本地安装)
  6. rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
  7. # 3.1 Rabbit MQ (yum 安装)
  8. yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
  9. > 默认安装完成后配置文件模板在 /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example 目录中, 需要将配置文件复制到 /etc/rabbitmq 目录中, 并球盖名称为 rabbitmq.config
  10. # 4. 复制配置文件
  11. cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
  12. # 5. 查看配置文件位置
  13. ls /etc/rabbitmq/rabbitmq.config
  14. # 6. 修改配置文件 (打开来宾账户) -- 具体看下图和解释
  15. vim /etc/rabbitmq/rabbitmq.config
  16. # 7 执行如下命令,启动 rabbitmq中的插件管理
  17. rabbitmq-plugins enable rabbitmq_management
  18. # 8. 启动 Rabbit MQ
  19. systemctl start rabbitmq-server
  20. # 9. 查询启动状态
  21. systemctl status rabbitmq-server
  22. # 10. 重启rabbitmq
  23. systemctl restart rabbitmq-server
  24. # 11. 关闭rabbitmq
  25. systemctl stop rabbitmq-server

image.png

开启 Rabbit MQ 来宾用户就是将配置文件中 {loopback_users, []} 前面的%% 注释去掉和最后面的 逗号去掉.

Rabbit MQ 默认端口是15672 可以通过 IP 地址:端口号进行访问 来宾账户密码都是 guest

  1. # 关闭防火墙
  2. systemctl disable firewalld
  3. systemctl stop firewalld

3. Java 中使用Rabbit MQ

引入依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.7.2</version>
  5. </dependency>

1. 直连模型

image.png

  • P 生成者, 也就是要发送的消息的程序
  • C 消费者, 消息的接受者, 会一直等待消息的到来
  • queue 消息队列, 图中红色部分, 类似一个邮箱, 可以缓存消息, 生成者向其中投递消息, 消费者从其中取出消息

    生成消息

  1. // 创建链接工厂对象
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. // 设置链接rabbitmq 主机
  4. connectionFactory.setHost("192.168.57.130");
  5. // 设置链接 rabbitmq 端口
  6. connectionFactory.setPort(5672);
  7. // 设置链接那个虚拟主机
  8. connectionFactory.setVirtualHost("/ems");
  9. // 设置访问虚拟主机的用户及密码
  10. connectionFactory.setUsername("ems");
  11. connectionFactory.setPassword("ems@123");
  12. // 获得链接对象
  13. Connection connection = connectionFactory.newConnection();
  14. // 获取链接中的通道
  15. Channel channel = connection.createChannel();
  16. // 通道绑定对象消息队列
  17. // 参数1: 队列名称, 如果队列不存在自动创建
  18. // 参数2: 用来定义队列特性是否要持久化(true: 持久化队列)
  19. // 参数3: 是否独占队列(true: 独占队列)
  20. // 参数4: 是否在消费完成后自动删除队列(true 自动删除)
  21. // 参数5: 额外附加参数
  22. channel.queueDeclare("Hello", false, false, false, null);
  23. // 发布消息
  24. // 参数1: 交换机名称; 参数2: 队列名称; 参数3: 传递消息额外设置; 参数4: 消息的具体内容
  25. channel.basicPublish("", "Hello", null, "Hello Rabbit MQ".getBytes());
  26. // 释放资源
  27. channel.close();
  28. connection.close();

如果想发送的消息也持久化, 就需要在发布消息的时候将第三个参数修改为MessageProperties.PERSISTENT_TEXT_PLAIN

消费消息

  1. // 创建链接工厂
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.57.130");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/ems");
  6. connectionFactory.setUsername("ems");
  7. connectionFactory.setPassword("ems@123");
  8. // 创建链接对象
  9. Connection connection = connectionFactory.newConnection();
  10. // 创建通道
  11. Channel channel = connection.createChannel();
  12. // 通道绑定对象
  13. channel.queueDeclare("Hello", false, false, false, null);
  14. // 消费消息
  15. // 参数1: 消费那个队列的消息(队列名称)
  16. // 参数2: 开始消息的自动确认机制
  17. // 参数3: 消费时的回调接口
  18. channel.basicConsume("Hello", true, new DefaultConsumer(channel) {
  19. // 最后一个参数: 消息队列中取出的消息
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  22. System.out.println(new String(body));
  23. }
  24. });
  25. // 释放资源
  26. // channel.close();
  27. // connection.close();

2. Work Quene

image.png
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
Work queues 也被称为 Task queues 任务模型, 当消息处理比较耗时的时候, 可能生成消息的速度会远远大于消息的消费速速 长次以往, 消息就会堆积越来越多, 无法即使处理, 此时就可以使用 work 模型,让多个消费者绑定到一个队列,共同消费队列中的消息. 队列中的消息一旦消费, 就会消失.因此任务是不会被重复执行的

生成者开发

  1. Connection connection = RabbitMQUtils.getConnection();
  2. // 获取通道
  3. Channel channel = connection.createChannel();
  4. // 通过通道声明队列
  5. channel.queueDeclare("work", true, false, false, null);
  6. for (int i = 0; i < 20; i++) {
  7. // 生成消息
  8. channel.basicPublish("", "work", null, ("秋风瑟瑟,寒风萧萧~~~" + i).getBytes());
  9. }
  10. RabbitMQUtils.closeConnectionAndChanel(channel, connection);

消费者1

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. channel.queueDeclare("wordk", true, false, false, null);
  4. channel.basicConsume("work", true, new DefaultConsumer(channel) {
  5. @Override
  6. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  7. System.out.println("消费者 1 :" + new String(body));
  8. }
  9. });

消费者2

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. channel.queueDeclare("wordk", true, false, false, null);
  4. channel.basicConsume("work", true, new DefaultConsumer(channel) {
  5. @Override
  6. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  7. System.out.println("消费者 2 :" + new String(body));
  8. }
  9. });

默认情况下, Rabbit MQ 按照熟悉怒将每个消息发给下一个使用者, 平均而言, 每个消费者都会收到相同数量的消息, 这中分发消息的方式称为循环.

3. Fanout (广播) 模型

image.png
广播模式下,消息发送流程如下:

  • 可以有多个消费者
  • 每个消费者有自己的 queue(队列)
  • 每个队列都要绑定到 Exchange(交换机)
  • 生产者发送的消息, 只能发送到交换机, 交换机来决定要发给那个队列, 生产者无法决定
  • 交换机把消息发个绑定过的所有队列
  • 队列的消费者都能够拿到消息. 实现一条效益被多个消费者消费.

    生产者

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 将通道声明指定交换机 参数1: 交换机名称, 参数2: 交换机类型(fanout 广播)
  4. channel.exchangeDeclare("order", "fanout");
  5. // 发送消息
  6. channel.basicPublish("order", "", null, "fanout type message".getBytes());
  7. RabbitMQUtils.closeConnectionAndChanel(channel, connection);

消费者

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通道绑定交换机
  4. channel.exchangeDeclare("order", "fanout");
  5. // 临时队列
  6. String queueName = channel.queueDeclare().getQueue();
  7. // 绑定交换机
  8. channel.queueBind(queueName, "order", "");
  9. // 消费消息
  10. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  11. @Override
  12. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  13. System.out.println("------------->" + new String(body));
  14. }
  15. });

4. Routing (路由) 模型

image.png

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

在Direct模型下

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

  • P: 生产者,向Exchange发送消息,发送消息时,会指定一个routing key

  • X: Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
  • c1: 消费者,其所在队列指定了需要routing key 为error的消息
  • C2∶ 消费者,其所在队列指定了需要routing key为info、error、warning的消息

    生成者

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通过通道声明交换机: 参数1: 交换机名称; 参数2: direct 路由模式
  4. channel.exchangeDeclare("logs_direct", "direct");
  5. // 发送消息
  6. String routingkey = "info";
  7. channel.basicPublish("logs_direct", routingkey, null, ("这是direct模型发布的给予 route key:[" + routingkey + "]发布的消息").getBytes());
  8. RabbitMQUtils.closeConnectionAndChanel(channel, connection);

消费者 1

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通道声明交换机以及交换机类型
  4. channel.exchangeDeclare("logs_direct", "direct");
  5. // 创建一个临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. // 基于 route key 绑定队列和交换机
  8. channel.queueBind(queue, "logs_direct", "error");
  9. // 获取消费的消息
  10. channel.basicConsume(queue, true, new DefaultConsumer(channel) {
  11. @Override
  12. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  13. System.out.println("-----1" + new String(body));
  14. }
  15. });

消费者 2

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通道声明交换机以及交换机类型
  4. channel.exchangeDeclare("logs_direct", "direct");
  5. // 创建一个临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. // 基于 route key 绑定队列和交换机
  8. channel.queueBind(queue, "logs_direct", "info");
  9. channel.queueBind(queue, "logs_direct", "error");
  10. // 获取消费的消息
  11. channel.basicConsume(queue, true, new DefaultConsumer(channel) {
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  14. System.out.println("-----2" + new String(body));
  15. }
  16. });

5. Topic 订阅(动态路由) 模型

Topic类型的Exchange 与Direct相比,都是可以根据RoutingKiey把消息路由到不同的队列。只不过Topico类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型 Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item. insert
image.png

    • 匹配不多不少恰好1个词
  • 号匹配一个或多个单词

    生产者

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通过通道声明交换机: 参数1: 交换机名称; 参数2: topic 路由模式
  4. channel.exchangeDeclare("topics", "topic");
  5. // 发送消息
  6. String routingkey = "order.save";
  7. channel.basicPublish("topics", routingkey, null, ("动态路由模型 route key:[" + routingkey + "]发布的消息").getBytes());
  8. RabbitMQUtils.closeConnectionAndChanel(channel, connection);

消费者1

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通道声明交换机以及交换机类型
  4. channel.exchangeDeclare("topics", "topic");
  5. // 创建一个临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. // 绑定队列和叫花鸡,动态通配符形式 route key
  8. channel.queueBind(queue, "topics", "order.*");
  9. // 获取消费的消息
  10. channel.basicConsume(queue, true, new DefaultConsumer(channel) {
  11. @Override
  12. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  13. System.out.println("-----1---" + new String(body));
  14. }
  15. });

消费者2

  1. Connection connection = RabbitMQUtils.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 通道声明交换机以及交换机类型
  4. channel.exchangeDeclare("topics", "topic");
  5. // 创建一个临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. // 绑定队列和叫花鸡,动态通配符形式 route key
  8. channel.queueBind(queue, "topics", "order.#");
  9. // 获取消费的消息
  10. channel.basicConsume(queue, true, new DefaultConsumer(channel) {
  11. @Override
  12. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  13. System.out.println("-----1---" + new String(body));
  14. }
  15. });

4. Spring Boot 整合 Rabbit MQ

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

配置 RabbitMQ

  1. spring:
  2. rabbitmq:
  3. host: 192.168.57.130
  4. port: 5672
  5. username: ems
  6. password: ems@123
  7. virtual-host: /ems

在配置文件中配置好RabbitMQ 后, 当应用程序启动后, spring boot 就会创建一个 RabbitTemplate 对象来简化操作, 使用的时候只需要注入即可.

1. hello

生成者

  1. @SpringBootTest(classes = App.class)
  2. @RunWith(SpringRunner.class)
  3. public class TestRabbitMQ {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void testHello() {
  8. rabbitTemplate.convertAndSend("hello", "hello word");
  9. }
  10. }

消费者

  1. @Component
  2. @RabbitListener(queuesToDeclare = @Queue(value = "hello"))
  3. public class HelloCustomer {
  4. @RabbitHandler
  5. public void receivel(String message) {
  6. System.out.println("message = " + message);
  7. }
  8. }

2. work

生产者

  1. @SpringBootTest(classes = App.class)
  2. @RunWith(SpringRunner.class)
  3. public class TestRabbitMQ {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void testWork() {
  8. rabbitTemplate.convertAndSend("work", "work 模型");
  9. }
  10. }

消费者

  1. @Component
  2. public class WorkCustomer {
  3. @RabbitListener(queuesToDeclare = @Queue(value = "work"))
  4. public void receive1(String message) {
  5. System.out.println("消费者1:" + message);
  6. }
  7. @RabbitListener(queuesToDeclare = @Queue(value = "work"))
  8. public void receive2(String message) {
  9. System.out.println("消费者2:" + message);
  10. }
  11. }

3. Fanout 广播模型

生产者

  1. @SpringBootTest(classes = App.class)
  2. @RunWith(SpringRunner.class)
  3. public class TestRabbitMQ {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void testWork() {
  8. rabbitTemplate.convertAndSend("logs","", "日志广播");
  9. }
  10. }

消费者

  1. @Component
  2. public class FanoutCustomer {
  3. @RabbitListener(bindings = { @QueueBinding(value = @Queue, // 创建临时队列
  4. exchange = @Exchange(value = "logs", type = "fanout") // 绑定交换机
  5. ) })
  6. public void receivel(String message) {
  7. System.out.println("消费者1:" + message);
  8. }
  9. @RabbitListener(bindings = { @QueueBinding(value = @Queue, // 创建临时队列
  10. exchange = @Exchange(value = "logs", type = "fanout") // 绑定交换机
  11. ) })
  12. public void receivel2(String message) {
  13. System.out.println("消费者2:" + message);
  14. }
  15. }

4. Routing (路由)

生产者

  1. @Test
  2. public void testRoute() {
  3. rabbitTemplate.convertAndSend("directs", "info", "路由发送info信息...");
  4. }

消费者

  1. @Component
  2. public class RouteCustomer {
  3. @RabbitListener(bindings = { @QueueBinding(value = @Queue, // 创建临时队列
  4. exchange = @Exchange(value = "directs", type = "direct"), // 自定义交换机名称和类型
  5. key = { "info", "error" } // 路由信息
  6. ) })
  7. public void receive1(String messge) {
  8. System.out.println("消费者1:" + messge);
  9. }
  10. @RabbitListener(bindings = { @QueueBinding(value = @Queue, // 创建临时队列
  11. exchange = @Exchange(value = "directs", type = "direct"), // 自定义交换机名称和类型
  12. key = { "info" } // 路由信息
  13. ) })
  14. public void receive2(String messge) {
  15. System.out.println("消费者1:" + messge);
  16. }
  17. }

5. Topic 订阅/动态路由

生产者

  1. @Test
  2. public void testTopic() {
  3. rabbitTemplate.convertAndSend("topics", "order.save", "order.save 路由消息");
  4. }

消费者

  1. @Component
  2. public class TopicCustomer {
  3. @RabbitListener(bindings = {
  4. @QueueBinding(
  5. value = @Queue, // 创建临时队列
  6. exchange = @Exchange(type = "topic",name="topics"), // 自定义交换机名称和类型
  7. key = { "user.save", "user.*" } // 路由信息
  8. ) })
  9. public void recevice1(String message) {
  10. System.out.println("消费者1:" + message);
  11. }
  12. @RabbitListener(bindings = {
  13. @QueueBinding(
  14. value = @Queue, // 创建临时队列
  15. exchange = @Exchange(type = "topic",name="topics"), // 自定义交换机名称和类型
  16. key = { "order.#", "user.#" } // 路由信息
  17. ) })
  18. public void recevice2(String message) {
  19. System.out.println("消费者2:" + message);
  20. }
  21. }

5. MQ 使用场景

1. 异步处理

场景说明: 用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式

  • 串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信以上三个任务全部完成后才返回给客户端。这有一个问题是邮件短信并不是必须的它只是一个通知而这种做法让客户端等待没有必要等待的东西.
  • 并行方式: 将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间
  • 消息队列: 假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间;但是,前面说过邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功应该是写入数据库后就返回,消息队列:引入消息队列后,把发送邮件短信不是必须的业务逻辑异步处理

    2. 应用解耦

3. 流量削峰

6. RabbitMQ 集群

1. 普通集群(副本集群)

2. 镜像集群/队列