⼀、消息队列介绍

1.1 同步调用与异步调用

同步调用:
Feign客户端可以实现服务间的通信,但是Feign是同步调用,也就是说A服务调用B服务之后,会进⼊阻塞/等待状态,直到B服务返回调用结果给A服务, A服务才会继续往后执行

在特定的业务场景中:用户注册成功之后,发送短息通知用户(A服务为用户注册, B服务发送短信) A服务在完成用户注册之后(代码1),调用B服务发送短信, A服务完成B服务调用之后无需等待B服务的执行接口,直接执行提示用户注册从公(代码2),在这种需求下A服务调用B服务如果使用同步调用,必然降低A服务的执行效率,因此在这种场景下A服务需要通过异步调用 调用B服务

异步调用:

当A服务调用B服务之后,无需等待B的调用结果,可以继续往下执行;那么服务间的异步通信该如何实现呢?

服务之间可以通过消息队列实现异步调用

  • 同步调用
    • A服务调用B服务,需要等待B服务执行完毕的返回值, A服务才可以继续往下执行
    • 同步调用可以通过REST和RPC完成
      • REST: ribbon、 Feign
      • RPC: Dubbo
  • 异步调用
    • A服务调用B服务,而无需等待B服务的执行结果,也就是说在B服务执行的同时A服务可以继续往下执行
    • 通过消息队列实现异步调用

1.2 消息队列概念

17、RabbitM - 图1

  • MQ全称为Message Queue,消息队列(MQ)是⼀种应⽤程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
  • 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

1.3 常用的消息队列产品

17、RabbitM - 图2

  • RabbitMQ 稳定可靠,数据⼀致,支持多协议,有消息确认,基于erlang语言
  • Kafka 高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高
  • ActiveMQ 不够灵活轻巧,对队列较多情况⽀持不好.
  • RocketMQ 性能好,⾼吞吐,⾼可⽤性,⽀持⼤规模分布式,协议⽀持单⼀

⼆、 RabbitMQ

2.1 RabbitMQ介绍

  • RabbitMQ是⼀个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
  • AMQP,即Advanced Message Queuing Protocol, ⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语⾔等条件的限制。 Erlang中的实现有 RabbitMQ等。
  • 主要特性:
    • **保证可靠性** :使⽤⼀些机制来保证可靠性,如持久化、传输确认、发布确认
    • 灵活的路由功能
    • 可伸缩性:⽀持消息集群,多台RabbitMQ服务器可以组成⼀个集群
    • **⾼可⽤性** : RabbitMQ集群中的某个节点出现问题时队列仍然可⽤
    • ⽀持多种协议
    • ⽀持多语⾔客户端
    • **提供良好的管理界⾯**
    • 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
    • 提供插件机制:可通过插件进⾏多⽅⾯扩展

2.2 RabbitMQ安装和配置

参考安装⽂档: RabbitMQ安装及配置.pdf

链接: http://note.youdao.com/noteshare?id=b01d0f9ed0499610fc8f7b2fc76ef17e&sub=57DF1D9EB49C4529AA4EA38A517901B4

2.3 RabbitMQ逻辑结构

RabbitMQ逻辑结构
17、RabbitM - 图3

三、 RabbitMQ用户管理

RabbitMQ默认提供了⼀个guests账号,但是此账号不能作远程登录,也就是不能在管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录

3.1 逻辑结构

  • 用户
  • 虚拟主机
  • 队列

3.2 用户管理

3.2.1 命令行用户管理

  • 在linux中使用命令行创建⽤户
    ```shell

    进⼊到rabbit_mq的sbin⽬录

    cd /usr/local/rabbitmq_server-3.7.0/sbin

新增⽤户

./rabbitmqctl add_user ytao admin123

  1. - 设置用户级别
  2. ```shell
  3. ## ⽤户级别:
  4. ## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理
  5. ## 2.monitoring 监控者 登录控制台、查看所有信息
  6. ## 3.policymaker 策略制定者 登录控制台、指定策略
  7. ## 4.managment 普通管理员 登录控制台
  8. ./rabbitmqctl set_user_tags ytao administrator

3.2.2 管理系统进行用户管理

管理系统登录:访问 http://47.96.11.185:15672/

1.新增用户
17、RabbitM - 图4
2.创建虚拟主机
17、RabbitM - 图5
3.删除用户
17、RabbitM - 图6
4.用户绑定虚拟主机
17、RabbitM - 图7

四、 RabbitMQ工作方式

RabbitMQ提供了多种消息的通信方式—工作模式

https://www.rabbitmq.com/getstarted.html

消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)

4.1 简单模式

⼀个队列只有⼀个消费者

17、RabbitM - 图8

4.2 工作模式

多个消费者监听同⼀个队列

17、RabbitM - 图9

4.3 订阅模式

⼀个交换机绑定多个消息队列,每个消息队列有⼀个消费者监听

17、RabbitM - 图10

4.4 路由模式

⼀个交换机绑定多个消息队列,每个消息队列都由自己唯⼀的key,每个消息队列有⼀个消费者监听

路由模式
17、RabbitM - 图11

五、 RabbitMQ交换机和队列管理

5.1 创建队列

17、RabbitM - 图12

5.2 创建交换机

17、RabbitM - 图13

5.3 交换机绑定队列

17、RabbitM - 图14

六、在普通的Maven应用中使用MQ

RabbitMQ队列结构
17、RabbitM - 图15

6.1简单模式

6.1.1 消息生产者

  • 创建Maven项目
  • 添加RabbitMQ连接所需要的依赖

    1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>4.10.0</version>
    6. </dependency>
    7. <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    8. <dependency>
    9. <groupId>org.slf4j</groupId>
    10. <artifactId>slf4j-log4j12</artifactId>
    11. <version>1.7.25</version>
    12. <scope>test</scope>
    13. </dependency>
    14. <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    15. <dependency>
    16. <groupId>org.apache.commons</groupId>
    17. <artifactId>commons-lang3</artifactId>
    18. <version>3.9</version>
    19. </dependency>
  • 在resources目录下创建log4j.properties

    1. log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
    2. log4j.logger.org.mybatis = DEBUG
    3. log4j.appender.A1=org.apache.log4j.ConsoleAppender
    4. log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    5. log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
  • 创建MQ连接帮助类
    ```java package com.qfedu.mq.utils;

import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException; import java.util.concurrent.TimeoutException;

public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.创建连接⼯⼚ ConnectionFactory factory = new ConnectionFactory(); //2.在⼯⼚对象中设置MQ的连接信息(ip,port,virtualhost,username,password) factory.setHost(“47.96.11.185”); factory.setPort(5672); factory.setVirtualHost(“host1”); factory.setUsername(“ytao”); factory.setPassword(“admin123”); //3.通过⼯⼚对象获取与MQ的链接 Connection connection = factory.newConnection(); return connection; } }

  1. - 消息生产者发送消息
  2. ```java
  3. package com.qfedu.mq.service;
  4. import com.qfedu.mq.utils.ConnectionUtil;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. public class SendMsg {
  8. public static void main(String[] args) throws Exception{
  9. String msg = "Hello HuangDaoJun!";
  10. Connection connection = ConnectionUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //定义队列(使⽤Java代码在MQ中新建⼀个队列)
  13. //参数1:定义的队列名称
  14. //参数2:队列中的数据是否持久化(如果选择了持久化)
  15. //参数3: 是否排外(当前队列是否为当前连接私有)
  16. //参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数据))
  17. //参数5:设置当前队列的参数
  18. //channel.queueDeclare("queue7",false,false,false,null);
  19. //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
  20. //参数2:⽬标队列名称
  21. //参数3:设置当前这条消息的属性(设置过期时间 10)
  22. //参数4:消息 的内容
  23. channel.basicPublish("","queue1",null,msg.getBytes());
  24. System.out.println("发送: " + msg);
  25. channel.close();
  26. connection.close();
  27. }
  28. }

6.1.2 消息消费者

  • 创建Maven项目
  • 添加依赖
  • log4j.properties
  • ConnetionUtil.java
  • 消费者消费消息
    ```java package com.qfedu.mq.service;

import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.*;

import java.io.IOException; import java.util.concurrent.TimeoutException;

public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(“接收: “+msg); } }; channel.basicConsume(“queue1”,true,consumer); } }

  1. <a name="63df28ed"></a>
  2. ### 6.2 工作模式
  3. > ⼀个发送者多个消费者
  4. <a name="d81a7194"></a>
  5. #### 6.2.1 发送者
  6. ```java
  7. public class SendMsg {
  8. public static void main(String[] args) throws Exception{
  9. System.out.println("请输⼊消息: ");
  10. Scanner scanner = new Scanner(System.in);
  11. String msg = null;
  12. while(!"quit".equals(msg = scanner.nextLine())){
  13. Connection connection = ConnectionUtil.getConnection();
  14. Channel channel = connection.createChannel();
  15. channel.basicPublish("","queue2",null,msg.getBytes());
  16. System.out.println("发送: " + msg);
  17. channel.close();
  18. connection.close();
  19. }
  20. }
  21. }

6.2.2 消费者1

  1. public class ReceiveMsg {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,
  8. AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer1接收: "+msg);
  12. if("wait".equals(msg)){
  13. try {
  14. Thread.sleep(10000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. };
  21. channel.basicConsume("queue2",true,consumer);
  22. }
  23. }

6.2.3 消费者2

  1. public class ReceiveMsg {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer2接收: "+msg);
  11. }
  12. };
  13. channel.basicConsume("queue2",true,consumer);
  14. }
  15. }

6.3 订阅模式

6.3.1 发送者 发送消息到交换机

  1. public class SendMsg {
  2. public static void main(String[] args) throws Exception{
  3. System.out.println("请输⼊消息: ");
  4. Scanner scanner = new Scanner(System.in);
  5. String msg = null;
  6. while(!"quit".equals(msg = scanner.nextLine())){
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. channel.basicPublish("ex1","",null,msg.getBytes());
  10. System.out.println("发送: " + msg);
  11. channel.close();
  12. connection.close();
  13. }
  14. }
  15. }

6.3.2 消费者1

  1. public class ReceiveMsg1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer1接收: "+msg);
  11. if("wait".equals(msg)){
  12. try {
  13. Thread.sleep(10000);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }
  19. };
  20. channel.basicConsume("queue3",true,consumer);
  21. }
  22. }

6.3.3 消费者2

  1. public class ReceiveMsg2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,
  8. AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer2接收: "+msg);
  12. }
  13. };
  14. channel.basicConsume("queue4",true,consumer);
  15. }
  16. }

6.4 路由模式

6.4.1 发送者 发送消息到交换机

  1. public class SendMsg {
  2. public static void main(String[] args) throws Exception{
  3. System.out.println("请输⼊消息: ");
  4. Scanner scanner = new Scanner(System.in);
  5. String msg = null;
  6. while(!"quit".equals(msg = scanner.nextLine())){
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. if(msg.startsWith("a")){
  10. channel.basicPublish("ex2","a",null,msg.getBytes());
  11. }else if(msg.startsWith("b")){
  12. channel.basicPublish("ex2","b",null,msg.getBytes());
  13. }
  14. System.out.println("发送: " + msg);
  15. channel.close();
  16. connection.close();
  17. }
  18. }
  19. }

6.4.2 消费者1

  1. public class ReceiveMsg1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,
  8. AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer1接收: "+msg);
  12. if("wait".equals(msg)){
  13. try {
  14. Thread.sleep(10000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. };
  21. channel.basicConsume("queue5",true,consumer);
  22. }
  23. }

6.4.3 消费者2

  1. public class ReceiveMsg2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope,
  8. AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer2接收: "+msg);
  12. }
  13. };
  14. channel.basicConsume("queue6",true,consumer);
  15. }
  16. }

七、在SpringBoot应用中使用MQ

SpringBoot应用可以完成自动配置及依赖注⼊——可以通过Spring直接提供与MQ的连接对象

7.1 消息生产者

  • 创建SpringBoot应用,添加依赖
    17、RabbitM - 图16
  • 配置application.yml

    1. server:
    2. port: 9001
    3. spring:
    4. application:
    5. name: producer
    6. rabbitmq:
    7. host: 47.96.11.185
    8. port: 5672
    9. virtual-host: host1
    10. username: ytao
    11. password: admin123
  • 发送消息

    1. @Service
    2. public class TestService {
    3. @Resource
    4. private AmqpTemplate amqpTemplate;
    5. public void sendMsg(String msg){
    6. //1. 发送消息到队列
    7. amqpTemplate.convertAndSend("queue1",msg);
    8. //2. 发送消息到交换机(订阅交换机)
    9. amqpTemplate.convertAndSend("ex1","",msg);
    10. //3. 发送消息到交换机(路由交换机)
    11. amqpTemplate.convertAndSend("ex2","a",msg);
    12. }
    13. }

7.2 消息消费者

  • 创建项目添加依赖
  • 配置yml
  • 接收消息
    1. @Service
    2. //@RabbitListener(queues = {"queue1","queue2"})
    3. @RabbitListener(queues = "queue1")
    4. public class ReceiveMsgService {
    5. @RabbitHandler
    6. public void receiveMsg(String msg){
    7. System.out.println("接收MSG: "+msg);
    8. }
    9. }

八、使用RabbitMQ传递对象

RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

8.1 使用序列化对象

要求:

  • 传递的对象实现序列化接⼝
  • 传递的对象的包名、类名、属性名必须⼀致
  • 消息提供者

    1. @Service
    2. public class MQService {
    3. @Resource
    4. private AmqpTemplate amqpTemplate;
    5. public void sendGoodsToMq(Goods goods){
    6. //消息队列可以发送 字符串、字节数组、序列化对象
    7. amqpTemplate.convertAndSend("","queue1",goods);
    8. }
    9. }
  • 消息消费者

    1. @Component
    2. @RabbitListener(queues = "queue1")
    3. public class ReceiveService {
    4. @RabbitHandler
    5. public void receiveMsg(Goods goods){
    6. System.out.println("Goods---"+goods);
    7. }
    8. }

8.2 使用序列化字节数组

要求:

  • 传递的对象实现序列化接口
  • 传递的对象的包名、类名、属性名必须⼀致
  • 消息提供者

    1. @Service
    2. public class MQService {
    3. @Resource
    4. private AmqpTemplate amqpTemplate;
    5. public void sendGoodsToMq(Goods goods){
    6. //消息队列可以发送 字符串、字节数组、序列化对象
    7. byte[] bytes = SerializationUtils.serialize(goods);
    8. amqpTemplate.convertAndSend("","queue1",bytes);
    9. }
    10. }
  • 消息消费者

    1. @Component
    2. @RabbitListener(queues = "queue1")
    3. public class ReceiveService {
    4. @RabbitHandler
    5. public void receiveMsg(byte[] bs){
    6. Goods goods = (Goods) SerializationUtils.deserialize(bs);
    7. System.out.println("byte[]---"+goods);
    8. }
    9. }

8.3 使用JSON字符串传递

要求:对象的属性名⼀直

  • 消息提供者

    1. @Service
    2. public class MQService {
    3. @Resource
    4. private AmqpTemplate amqpTemplate;
    5. public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
    6. //消息队列可以发送 字符串、字节数组、序列化对象
    7. ObjectMapper objectMapper = new ObjectMapper();
    8. String msg = objectMapper.writeValueAsString(goods);
    9. amqpTemplate.convertAndSend("","queue1",msg);
    10. }
    11. }
  • 消息消费者

    1. @Component
    2. @RabbitListener(queues = "queue1")
    3. public class ReceiveService {
    4. @RabbitHandler
    5. public void receiveMsg(String msg) throws JsonProcessingException {
    6. ObjectMapper objectMapper = new ObjectMapper();
    7. Goods goods = objectMapper.readValue(msg,Goods.class);
    8. System.out.println("String---"+msg);
    9. }
    10. }

九、基于Java的交换机与队列创建

我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过Java代码来完成创建

9.1 普通Maven项目交换机及队列创建

  • 使用Java代码新建队列

    1. //1.定义队列 (使⽤Java代码在MQ中新建⼀个队列)
    2. //参数1:定义的队列名称
    3. //参数2:队列中的数据是否持久化(如果选择了持久化)
    4. //参数3: 是否排外(当前队列是否为当前连接私有)
    5. //参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数据))
    6. //参数5:设置当前队列的参数
    7. channel.queueDeclare("queue7",false,false,false,null);
  • 新建交换机

    1. //定义⼀个“订阅交换机”
    2. channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
    3. //定义⼀个“路由交换机”
    4. channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
  • 绑定队列到交换机

    1. //绑定队列
    2. //参数1:队列名称
    3. //参数2:⽬标交换机
    4. //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
    5. channel.queueBind("queue7","ex4","k1");
    6. channel.queueBind("queue8","ex4","k2");

9.2 SpringBoot应用中通过配置完成队列的创建

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. //声明队列
  4. @Bean
  5. public Queue queue9(){
  6. Queue queue9 = new Queue("queue9");
  7. //设置队列属性
  8. return queue9;
  9. }
  10. @Bean
  11. public Queue queue10(){
  12. Queue queue10 = new Queue("queue10");
  13. //设置队列属性
  14. return queue10;
  15. }
  16. //声明订阅模式交换机
  17. @Bean
  18. public FanoutExchange ex5(){
  19. return new FanoutExchange("ex5");
  20. }
  21. //声明路由模式交换机
  22. @Bean
  23. public DirectExchange ex6(){
  24. return new DirectExchange("ex6");
  25. }
  26. //绑定队列
  27. @Bean
  28. public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
  29. return BindingBuilder.bind(queue9).to(ex6).with("k1");
  30. }
  31. @Bean
  32. public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
  33. return BindingBuilder.bind(queue10).to(ex6).with("k2");
  34. }
  35. }

十、消息的可靠性

消息的可靠性:从生产者发送消息 —— 消息队列存储消息 —— 消费者消费消息 的整个过程中消息的安全性及可
控性。

  • 生产者
  • 消息队列
  • 消费者

17、RabbitM - 图17

10.1 RabbitMQ事务

RabbitMQ事务指的是基于客户端实现的事务管理,当在消息发送过程中添加了事务,处理效率降低⼏⼗倍甚
⾄上百倍

  1. Connection connection = RabbitMQUtil.getConnection(); //connection 表示与 host1的连接
  2. Channel channel = connection.createChannel();
  3. channel.txSelect(); //开启事务
  4. try{
  5. channel.basicPublish("ex4", "k1", null, msg.getBytes());
  6. channel.txCommit(); //提交事务
  7. }catch (Exception e){
  8. channel.txRollback(); //事务回滚
  9. }finally{
  10. channel.close();
  11. connection.close();
  12. }

10.2 RabbitMQ消息确认和return机制

17、RabbitM - 图18

消息确认机制:确认消息提供者是否成功发送消息到交换机

return机制:确认消息是否成功的从交换机分发到队列

10.2.1 普通Maven项目的消息确认

  • 普通confirm方式
    ```java //1.发送消息之前开启消息确认 channel.confirmSelect();

channel.basicPublish(“ex1”, “a”, null, msg.getBytes());

//2.接收消息确认 boolean b = channel.waitForConfirms();

System.out.println(“发送: “ +(b?”成功”:”失败”));

  1. - 批量confirm方式
  2. ```java
  3. //1.发送消息之前开启消息确认
  4. channel.confirmSelect();
  5. //2.批量发送消息
  6. for (int i=0 ; i<10 ; i++){
  7. channel.basicPublish("ex1", "a", null, msg.getBytes());
  8. }
  9. //3.接收批量消息确认:发送的所有消息中,如果有⼀条是失败的,则所有消息发送直接失败,抛出IO异常
  10. boolean b = channel.waitForConfirms();
  • 异步confirm方式
    ```java //发送消息之前开启消息确认 channel.confirmSelect();

//批量发送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish(“ex1”, “a”, null, msg.getBytes()); } //假如发送消息需要10s, waitForConfirms会进⼊阻塞状态 //boolean b = channel.waitForConfirms();

//使⽤监听器异步confirm channel.addConfirmListener(new ConfirmListener() { //参数1: long l 返回消息的表示 //参数2: boolean b 是否为批量confirm public void handleAck(long l, boolean b) throws IOException { System.out.println(“~消息成功发送到交换机”); } public void handleNack(long l, boolean b) throws IOException { System.out.println(“~消息发送到交换机失败”); } });

  1. <a name="cd79f722"></a>
  2. #### 10.2.2 普通Maven项目的return机制
  3. - 添加return监听器
  4. - 发送消息是指定第三个参数为true
  5. - 由于监听器监听是异步处理,所以在消息发送之后不能关闭channel
  6. ```java
  7. String msg = "Hello HuangDaoJun!";
  8. Connection connection = ConnectionUtil.getConnection(); //相当于JDBC操作的数据库连接
  9. Channel channel = connection.createChannel(); //相当于JDBC操作的statement
  10. //return机制:监控交换机是否将消息分发到队列
  11. channel.addReturnListener(new ReturnListener() {
  12. public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
  13. //如果交换机分发消息到队列失败,则会执⾏此⽅法(⽤来处理交换机分发消息到队列失败的情况)
  14. System.out.println("*****"+i); //标识
  15. System.out.println("*****"+s); //
  16. System.out.println("*****"+s1); //交换机名
  17. System.out.println("*****"+s2); //交换机对应的队列的key
  18. System.out.println("*****"+new String(bytes)); //发送的消息
  19. }
  20. });
  21. //发送消息
  22. //channel.basicPublish("ex2", "c", null, msg.getBytes());
  23. channel.basicPublish("ex2", "c", true, null, msg.getBytes());

10.3 在SpringBoot应用实现消息确认与return监听

10.3.1 配置application.yml,开启消息确认和return监听

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: simple ## 开启消息确认模式
  4. publisher-returns: true ##使⽤return监听机制

10.3.2 创建confirm和return监听

  • 消息确认

    1. @Component
    2. public class MyConfirmListener implements RabbitTemplate.ConfirmCallback {
    3. @Autowired
    4. private AmqpTemplate amqpTemplate;
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @PostConstruct
    8. public void init(){
    9. rabbitTemplate.setConfirmCallback(this);
    10. }
    11. @Override
    12. public void confirm(CorrelationData correlationData, boolean b, String s) {
    13. //参数b 表示消息确认结果
    14. //参数s 表示发送的消息
    15. if(b){
    16. System.out.println("消息发送到交换机成功! ");
    17. }else{
    18. System.out.println("消息发送到交换机失败! ");
    19. amqpTemplate.convertAndSend("ex4","",s);
    20. }
    21. }
    22. }
  • return机制

    1. @Component
    2. public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
    3. @Autowired
    4. private AmqpTemplate amqpTemplate;
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @PostConstruct
    8. public void init(){
    9. rabbitTemplate.setReturnsCallback(this);
    10. }
    11. @Override
    12. public void returnedMessage(ReturnedMessage returnedMessage) {
    13. System.out.println("消息从交换机分发到队列失败");
    14. String exchange = returnedMessage.getExchange();
    15. String routingKey = returnedMessage.getRoutingKey();
    16. String msg = returnedMessage.getMessage().toString();
    17. amqpTemplate.convertAndSend(exchange,routingKey,msg);
    18. }
    19. }

10.4 RabbitMQ消费者手动应答

  1. @Component
  2. @RabbitListener(queues="queue01")
  3. public class Consumer1 {
  4. @RabbitHandler
  5. public void process(String msg,Channel channel, Message message) throws IOException {
  6. try {
  7. System.out.println("get msg1 success msg = "+msg);
  8. /**
  9. * 确认⼀条消息: <br>
  10. * channel.basicAck(deliveryTag, false); <br>
  11. * deliveryTag:该消息的index <br>
  12. * multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br>
  13. */
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15. } catch (Exception e) {
  16. //消费者处理出了问题,需要告诉队列信息消费失败
  17. /**
  18. * 拒绝确认消息:<br>
  19. * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
  20. * deliveryTag:该消息的index<br>
  21. * multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。 <br>
  22. * requeue:被拒绝的是否重新⼊队列 <br>
  23. */
  24. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
  25. System.err.println("get msg1 failed msg = "+msg);
  26. }
  27. }
  28. }

10.5 消息消费的幂等性问题

消息消费的幂等性——多次消费的执⾏结果时相同的 (避免重复消费)

解决⽅案:处理成功的消息setnx到redis

十一、延迟机制

11.1 延迟队列

  • 延迟队列——消息进⼊到队列之后,延迟指定的时间才能被消费者消费
  • AMQP协议和RabbitMQ队列本身是不⽀持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能
  • TTL就是消息的存活时间。 RabbitMQ可以分别对队列和消息设置存活时间
    17、RabbitM - 图19
    • 在创建队列的时候可以设置队列的存活时间,当消息进⼊到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
    • 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;
    • 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

11.2 使用延迟队列实现订单支付监控

11.2.1 实现流程图

image.png

11.2.2 创建交换机和队列

1.创建路由交换机
image.png
2.创建消息队列
image.png
3.创建死信队列
image.png
4.队列绑定
image.png

⼗⼆、消息队列作用/使用场景总结

12.1 解耦

场景说明:用户下单之后,订单系统要通知库存系统

传统方式:订单系统直接调⽤库存系统提供的接口,如果库存系统出现故障会导致订单系统失败
image.png
使用消息队列:
image.png

12.2 异步

场景说明:用户注册成功之后,需要发送注册邮件及注册短信提醒

传统方式:
image.png
使用消息队列:
image.png

12.3 消息通信

场景说明:应用系统之间的通信,例如聊天室

聊天室
image.png

12.4 流量削峰

场景说明:秒杀业务

大量的请求不会主动请求秒杀业务,而是存放在消息队列(缓存)
image.png

12.5 日志处理

场景说明:系统中大量的日志处理

日志搜集处理
image.png