一、AMQP简介

1、AMQP是什么?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是进程之间传递异步消息的网络协议。

2、AMQP工作过程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
Publisher和Consumer就是两个Java项目,AMQP实体后面就是RabbitMQ

3、队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进后出。其中一侧负责进数据,另一次负责出数据。
MQ(消息队列)很多功能都是基于此队列结构实现的,队列也是解决高并发下排队问题的解决方案,即使恰巧出现同一时刻向队列添加的两个数据,也会有CPU帮助判断,放入到队列后,一定会有先后顺序。

二、RabbitMQ简介

1、RabbitMQ介绍

RabbitMQ是由Erlang语言编写的基于AMQP的消息中间件。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

1.1 解决应用耦合

1.1.1 不使用MQ时

应用程序A向应用程序B发送消息时,A必须知道B的相关消息,当B的位置等信息改变时,A中数据也需要跟随修改。这个时候成为A和B是耦合的。

1.1.2 使用MQ解决耦合
  • 应用程序A发送消息只需要向MQ发送,不需要管应用程序B。
  • 应用程序B只需监听MQ中消息,有消息取出即可,不需要管应用程序A。
  • 此时认为应用程序A和应用程序B是解耦,它们两个没有直接联系了。

    2、RabbitMQ适用场景

    2.1 两大核心特性

    RabbitMQ两大核心特性:异步消息、队列。
    异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间。所有需要这种效果场景都可以使用MQ。
    队列:进入队列的数据一定有先后之分。只要应用程序要对内容分先后的场景都可以使用MQ。

    2.2 具体场景

    2.2.1 排队算法

    使用队列特性。把数据发送给MQ,进入到队列就有了排队的效果

    2.2.2 秒杀活动

    使用队列特性。例如:抢红包、限时秒杀、直播卖货时抢商品。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束。

    2.2.3 消息分发

    在程序中同时向多个其他程序发送消息。应用了AMQP中交换机,实现消息分发。

    2.2.4 异步处理

    利用MQ异步消息特性。大大提升主线程效率。

    2.2.5 数据同步

    利用异步特性。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步。

    2.2.6 处理耗时任务

    利用异步特性。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间。

    2.2.7 流量削峰

    在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(双11、品牌促销,10点抢购),如果使用监控工具,会发现这段时间访问出现顶峰。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋。
    利用RabbitMQ中交换机实现的。

    3、 目前市场上主流或常见的MQ

  • RabbitMQ:功能强大、适合分布式项目。Spring提供了对RabbitMQ的支持。Spring AMQP框架。在Spring Cloud中尤其Spring Cloud Netflix中无缝整合。

  • ActiveMQ:小巧。以前的项目中使用比较多。由Apache推出的。
  • RocketMQ:阿里的MQ。如果使用Spring Cloud Alibaba,推荐使用这个MQ。
  • Kafka:主要应用在大数据流处理上。也具备MQ相关功能。

    三、RabbitMQ原理(面试题)

    总体说明
    Client A 和 Client B都是Publisher。CLient1、Client2、Client3都是Consumer。都对应一个Java项目。
    RabbitMQ执行原理文字解释
    客户端应用程序向RabbitMQ发送消息Message,在Message会包含路由键Routing Key、交换器名称、消息内容。交换器Exchange接收到消息Message后会根据交换器类型Exchange Type判断把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中。放入到队列成功后会返回给发送者Publisher一个ACK确认消息,表示消息发送成功了。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理,处理完成后会返回给RabbitMQ一个ACK,表示消息处理完成,RabbitMQ会删除这个消息。以上这些就是RabbitMQ的运行原理。
    image.png
1.Message
消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由
一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
2.Publisher
消息的生产者。也是一个向交换器发布消息的客户端应用程序。
通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher
3.Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出。
使用MQ做耗时任务时,耗时任务就交给Consumer进行完成。
4.Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
一共支持四种的交换器类型
1. direct(发布与订阅完全匹配)
2. fanout(广播)
3. topic(主题,规则匹配)
4. header(使用较少,相比direct就多了一些头信息)
5.Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
一个交换器里面可以绑定多个队列。一个队列一般都是只绑定到一个交换器上。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列。
6.Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
7.Routing-key
路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的名称,路由键是key,队列是value)
队列通过路由键绑定到交换器。
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。
如果相匹配,消息将会投递到该队列。
如果不匹配,消息将会进入黑洞。
通俗理解:队列绑定到交换器时有路由键,这个路邮件就相当于key-value中的key,value是队列。当Publisher发送消息时一定会携带路由键,有了路由键就让交换器知道了这个消息要发送给哪个队列。
8.Connection
链接。指rabbit服务器和服务建立的TCP链接。
9.Channel
信道。
1,Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。
2,TCP一旦打开,就会创建AMQP信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10.Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/
通俗理解:一个RabbitMQ可以包含多个虚拟主机,每个虚拟主机都是一个RabbitMQ。平时我们没有去创建虚拟主机,都是使用RabbitMQ里面为/虚拟主机,单实际上一个RabbitMQ可以包含多个虚拟主机主机的,也就是说一个RabbitMQ可以包含多个实例。就像MySQL可以创建多个数据库一样。
11.Borker
表示消息队列服务器实体。
交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟交换器中绑定的路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。
路由键可以理解为匹配的规则。
RabbitMQ为什么需要信道?为什么不是TCP直接通信?
1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。
2. 如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
这也是为什么使用RabbitMQ去处理秒杀、流量削锋、海量请求时依然对RabbitMQ比较有信心的原因。

四、基于Docker安装RabbitMQ

1、拉取镜像

docker pull rabbitmq:management

2、创建并启动容器

创建容器时指定用户名和密码为bjsxt

老版本
docker run -d —name rabbitmq -p 15672:15672 -p 5672:5672 —restart=always -e RABBITMQ_DEFAULT_USER=bjsxt -e RABBITMQ_DEFAULT_PASS=bjsxt rabbitmq:management
新版本
docker run -d —name rabbitmq -p 15672:15672 -p 5672:5672 —restart=always -e DEFAULT_USER=bjsxt -e DEFAULT_PASS=bjsxt rabbitmq:management

3、访问管理界面

在浏览器输入:http://192.168.80.128:15672
如果可以正常登录说明安装成功。

五、Spring AMQP 简介

Spring AMQP 是Spring的一个顶级项目。目前只支持RabbitMQ的实现。
使用Spring Boot整合Spring AMQP时只需要在项目中导入spring-boot-starter-amqp启动器即可。之后在配置文件中配置RabbitMQ相关信息后就可以直接在代码中注入AmqpTemplate对象。AmqpTemplate接口具体实现类只有一个。
image.png
所以在代码中导入AmqpTemplate和导入RabbitTemplate都是导入RabbitTemplate对象实例。在不考虑实现类有更多方法的问题导入AmqpTemplate和导入RabbitTemplate是一样。

1、 使用Spring AMQP操作四种交换器(面试题)

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在RabbitMQ中支持四种交换器
1. Direct Exchange:直连交换器(默认)。通过路由键明确指定存储消息的一个队列。
2. Fanout Exchange:扇形交换器。把消息发送给所有绑定的队列。
3. Topic Exchange:主题交换器。按照路由规则,把消息发送给多个队列。
4. Headers Exchange:首部交换器。比Direct多了一些头部消息,平时使用较少。
在RabbitMq的Web管理界面中Exchanges选项卡就可以看见这四个交换器。
image.png
只有direct交换器有(AMQP default),当使用direct交换器时,如果没有明确指定名称,走的是第一个AMQP default交换器,也是明确指定名称。但是其他交换器没有默认的,都需要指定名称。
只要类型相同,执行哪个交换器是没有什么区别的。
说明:下面四种重点关注交换器是如何传递消息,消息都用字符串类型消息进行演示。根据RabbitMQ原理,一定有一个发送消息的项目(Publisher)和一个监听消息的项目(Consumer)

2、队列创建方式(三种)

2.1 可以通过代码实现创建对象绑定过程(了解)

  1. public static void main(String[] args) throws IOException, TimeoutException {
  2. //创建连接工厂
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setUsername("bjsxt");
  5. factory.setPassword("bjsxt");
  6. //设置 RabbitMQ地址
  7. factory.setHost("192.168.8.129");
  8. //建立到代理服务器到连接
  9. Connection conn = factory.newConnection();
  10. //获得信道
  11. Channel channel = conn.createChannel();
  12. //声明交换器
  13. String exchangeName = "hello";
  14. //exchangeName:交换机名称
  15. // type:交换机类型,常见的如fanout、direct、topic
  16. //durable:设置是否持久化。durable设置true表示持久化,反之是持久化。持久化可以将将换机存盘,在服务器重启时不会丢失相关信息
  17. //autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换机与这个交换器绑定的队列或者交换器都与之解绑
  18. //internal:设置是否内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  19. //argument:其他一些结构化参数,比如alternate-exchange
  20. channel.exchangeDeclare(exchangeName,"direct",true);
  21. // queue: 队列的名称
  22. durable: 设置是否持久化
  23. exclusive: 设置是否排他,
  24. autoDelete: 设置是否自动删除
  25. arguments: 设置队列的其他一些参数,
  26. channel.queueDeclare("hello-exchange",false,false,false,null);
  27. String routingKey = "hola";
  28. //发布消息
  29. byte[] messageBodyByte = "liyong0889".getBytes();
  30. channel.basicPublish(exchangeName,routingKey,null,messageBodyByte);
  31. System.out.print("send:"+messageBodyByte);
  32. channel.close();
  33. conn.close();
  34. }

2.2 SpringBoot方式进行创建和绑定(了解)

在项目中新建一个配置类com.bjsxt.config.RabbitMQConfig。
重点强调:发送消息时创建队列、创建交换器、绑定交换器。

  1. @Configuration
  2. public class RabbitMQConfig {
  3. // 发送消息时如果不存在这个队列,会自动创建这个队列。
  4. // 注意:是发送消息时,而不是启动项目时。
  5. // 相当于:可视化操作时创建一个队列
  6. // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
  7. @Bean
  8. public Queue queue(){
  9. return new Queue("queue1");
  10. }
  11. // 如果没有这个交换器,在发送消息创建这个交换器
  12. // 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
  13. @Bean
  14. public DirectExchange directExchange(){
  15. return new DirectExchange("amq.direct");
  16. }
  17. @Bean
  18. // 配置类中方法参数,会由Spring 容器自动注入
  19. public Binding directBingding(DirectExchange directExchange,Queue queue){
  20. // with(“自定义路由键名称”)
  21. // return BindingBuilder.bind(queue).to(directExchange).with("thisroutingkey");
  22. // withQueueName() 表示队列名就是路由键名称
  23. return BindingBuilder.bind(queue).to(directExchange).withQueueName();
  24. }
  25. }

2.3 通过可视化创建队列及绑定队列

  1. 创建队列

在Web管理界面中点击Queues选项卡后,点击Add a new quque,在Name输入框中随意输入一个队列名称,最后点击Add queue按钮
image.png

  1. 绑定队列

在Exchanges选项卡中点击要绑定的交换器。点击amq.direct
image.png
填写绑定信息。第一行是要把哪个队列绑定,Routing Key内容随意的,一般都按照队列名起,这样对于开发者更好记忆。
image.png

  1. 实现发送消息

已经通过可视化创建了队列和绑定队列。

3、 direct、fanout、topic、headers交换器

3.1 direct交换器

direct交换器是RabbitMQ默认交换器。客户端向direct类型交换器发送消息时,direct会根据路由键把消息放入到指定的队列中。
如果队列有多个Consumer,会进行公平调度,每个消费者依次取出消息。
如果没有可视化创建队列,也没有在Publisher通过代码创建队列,且Consumer中监听的队列不存在,这时启动Consumer会报异常。通过以下方式进行绑定。

  1. /*
  2. bingdings: 设置绑定规则
  3. value: 创建队列,如果有就不创建
  4. exchange:创建交换器,如果有就不创建
  5. key:路由键名称
  6. 由于bindings和key都只有一个值,所以可以省略大括号
  7. */
  8. @RabbitListener(bindings = {@QueueBinding(value=@Queue(name="queue2"),
  9. exchange = @Exchange(name="amq.direct"),
  10. key={"queue2"})})
  11. public void receive4(String msg){
  12. System.out.println(msg);
  13. }

3.2 fanout交换器

扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换器上的队列。对应Consumer依然采用公平调度方式。
使用场景:商品下订单时,同时需要发送邮件、发送短信、向库房发送订单等多个操作同步执行时,可以使用fanout交换器。
image.png
在RabbitMQ的Web管理界面中Exchanges选项卡中。点击amq.fanout交换器,在里面绑定上三个队列,绑定时不需要写路由键,因为fanout广播,路由键无意义。
image.png
通过结果发现:每次消息都同时发送给了三个队列,每个队列中监听方法公平调度,依次获取消息。

3.3 topic交换器

topic比direct交换器功能更强,设置路由键时允许出现特殊字符。
使用topic时路由键的写法和包写法相同。例如:com.bjsxt.xxxx.xxx格式。
在绑定时可以带有下面特殊符号,中间可以出现:
* : 代表一个单词(两个.之间内容)
# : 0个或多个字符

3.4 headers交换器

headers交换器和direct交换器的主要区别是在传递消息时可以传递header部分消息。

3.5 传递对象类型参数

在RabbitMQ中,如果Pushlisher发送的消息是基本数据类型或String类型,可以在Consumer中直接使用对应类型或可以转换的类型进行接收。
如果发送的消息是对象或集合这种复杂类型时,RabbitMQ会把这些数据进行序列化后放入到Message的body中。Consumer接收时方法参数应该为Message,并对Message中body进行反序列获取到对象或集合数据。
老版本(3.1.x/3.2.x)消息接收方需要对消息进行反序列化
新版本中直接使用对应的实体类类型进行接收即可。

六、生产者和消费者实例

1、 新建项目Publisher

步骤一:添加依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.3.9.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-test</artifactId>
  10. <scope>test</scope>
  11. <exclusions>
  12. <exclusion>
  13. <groupId>org.junit.vintage</groupId>
  14. <artifactId>junit-vintage-engine</artifactId>
  15. </exclusion>
  16. </exclusions>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-amqp</artifactId>
  21. </dependency>
  22. </dependencies>

步骤二:编写配置文件

新建application.yml.
host:默认值localhost,配置RabbitMQ所安装主机ip
username默认值:guest,登录RabbitMQ的Web管理插件的用户名
password默认值:guest,登录RabbitMQ的Web管理插件的密码
port 默认值 5672,表示内部访问端口。省略配置。要和web管理插件访问端口15672区分开。

  1. spring:
  2. rabbitmq:
  3. host: 192.168.80.128
  4. username: bjsxt
  5. password: bjsxt
  6. template:
  7. reply-timeout: 1000 #设置当前等待请求方法的时间

步骤三:创建User类并在测试类中编写测试方法

@NoArgsConstructor
@AllArgsConstructor
@Data
public class User implements Serializable {
    // 在实际开发中是不需要写的。
    // 在测试环境中必须写,因为Publisher项目和Consumer项目都在一套JVM中,必须制定序列码,否则无法序列化。
    public static final long serialVersionUID=1L;
    private String name;
    private String pwd;
}
@SpringBootTest
class RabbitmqPublisherApplicationTests {

    @Autowired
    private AmqpTemplate template;
    /*
    * convertAndSend参数:
    * 第一个参数:把消息发送给哪一个交换器
    * 第二个参数:路由键。在Direct(直连)交换器中,路由键就是队列名
    * 第三个参数:发送的消息
    * 额外说明:在AmqpTemplate中convertXXXX的方法都具备把Object类型参数,转换为Message类型。
    * 虽然第三个参数是Object,底层实际发送的参数类型为Message类型
    * 该方法无返回值,由于异步消息,消息发送出去就完成,是不需要接收返回值的。
    * */
    /*---------------------------direct直连交换器---------------------------------*/
    @Test
    void contextLoads() {
        template.convertAndSend("sxt.direct","abc","这是发送的消息");
    }
    /*多个监听队列方法时获取消息的顺序(验证消息同步为异步,sleep()方法不释放锁)*/
    @Test
    void contextLoads2() {
        for (int i=1;i<=10;i++){
            template.convertAndSend("sxt.direct","abc","这是发送的消息");
        }
    }
    /*---------------------------fanout扇形(广播)交换器---------------------------------*/
    //每次消息都同时发送给了三个队列,每个队列中监听方法公平调度,依次获取消息。路由键可省略,若不省略也还是广播到所有的队列上
    @Test
    void contextLoads3() {
        template.convertAndSend("sxt.fanout","","这是发送的消息");
    }
    /*---------------------------topic主题交换器---------------------------------*/
    @Test
    void contextLoads4(){
        template.convertAndSend("sxt.topic","com.bjsxt.a","这是发送的消息");
        //template.convertAndSend("sxt.topic","com.bjsxt.a.b","这是发送的消息");
        //template.convertAndSend("sxt.topic","com.bjsxt.abc","这是发送的消息");
    }
    /*---------------------------Headers首部交换器---------------------------------*/
    @Test
    void contextLoads5(){
        MessageProperties mp = new MessageProperties();
        //设置头部信息,可以在RabbitMQ控制台中查看设置的信息
        mp.setHeader("name","zs");
        //设置消息
        Message message = new Message("这是发送的消息".getBytes(), mp);
        //发送消息
        template.send("sxt.headers","header",message);
    }
    /*---------------------------传递对象类型参数---------------------------------*/
    //[1]必须实现序列化接口
    //[2]实体类的全路径必须和 接收时候实体类的全路径保持一致
    //如果使用当前方法进行返回值接收 这个操作现在就变成了同步的操作
    //如果当前消费者没有给与返回值  生产者不会一致等待 过一定时间后直接返回null
    @Test
    void contextLoads6(){
        template.convertAndSend("sxt.direct","queue3",new User("zmy","123456"));
    }
    /*---------------------------RabbitMQ发送消息并接收反馈结果(同步并接收返回值)---------------------------------*/
    @Test
    void contextLoads7(){
        User user =(User) template.convertSendAndReceive("sxt.direct", "queue3", new User("wx", "456789"));
        System.out.println(user);
    }
    /*---------------------------消息幂等性---------------------------------*/
    @Test
    void contextLoads8(){
        //消息幂等性
        MessageProperties mp = new MessageProperties();
        //设置UUID
        mp.setMessageId(UUID.randomUUID().toString());
        Message message=new Message("这是消息幂等性返回信息".getBytes(),mp);
        template.send("sxt.direct","queue4",message);
    }

}

2、 新建项目consume

步骤一:配置pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
</parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

步骤二:新建配置文件

spring:
  rabbitmq:
    host: 192.168.80.128
    username: bjsxt
    password: bjsxt

#ACK机制
    listener:
      simple:
        retry:
          enabled: true #默认为false,开启重试
          max-attempts: 2 #默认重试三次

步骤三:新建一个监听类,负责监听指定队列。com.bjsxt.mqlistener.ReceiveListener

@Component
public class ReceiveListener {
/*-------------------------监听direct交换器--------------------------------*/
    @RabbitListener(queues = "queue1")
    public void receive1(String msg) throws InterruptedException {
        System.out.println("receive1执行开始:"+msg);
        Thread.sleep(1000);
        System.out.println("receive1执行结束");
    }

    @RabbitListener(queues = "queue1")
    public void receive2(String msg) throws InterruptedException {
        System.out.println("receive2执行开始:"+msg);
        Thread.sleep(1000);
        System.out.println("receive2执行结束");
    }

    /*
    * bingdings:设置绑定规则
    * value:创建队列,如果有就不创建
    * exchange:创建交换器,如果有就不创建
    * key:路由键名称
    * 由于bingdings和key都只有一个值,所以可以省略大括号
    * */
    @RabbitListener(bindings =
            {@QueueBinding(value =@Queue(name = "queue2"),
                    exchange =@Exchange(name = "sxt.direct"),key = {"queue2"})})
    public void receive3(String msg){
        System.out.println("创建队列以及绑定队列:"+msg);
    }


/*-------------------------监听fanout交换器--------------------------------*/
    @RabbitListener(queues = "fanout1")
    public void receive4(String msg){
        System.out.println("fanout1:"+msg);
    }
    @RabbitListener(queues = "fanout2")
    public void receive5(String msg){
        System.out.println("fanout2:"+msg);
    }
    @RabbitListener(queues = "fanout3")
    public void receive6(String msg){
        System.out.println("fanout3:"+msg);
    }
    /*-------------------------监听topic交换器--------------------------------*/
    @RabbitListener(queues = "topic1")
    public void receive7(String msg){
        System.out.println("topic1带有a:"+msg);
    }
    @RabbitListener(queues = "topic2")
    public void receive8(String msg){
        System.out.println("topic2带有#:"+msg);
    }
    @RabbitListener(queues = "topic3")
    public void receive9(String msg){
        System.out.println("topic3带有*:"+msg);
    }
    /*-------------------------监听headers首部交换器--------------------------------*/
    @RabbitListener(queues = "header")
    public void receive10(Message message){
        System.out.println(new String(message.getBody()));//接收发送过来的数据
        //header中key和对应的value
        String v = message.getMessageProperties().getHeader("name");
        System.out.println(v);
    }
    /*-------------------------传递对象类型参数--------------------------------*/
    //新版本RabbitMQ
    @RabbitListener(queues = "queue3")
    public void receive11(User user){
        System.out.println(user);
    }
    //老版本(3.1.x/3.2.x)RabbitMQ,使用Message接收
    @RabbitListener(queues = "queue3")
    public void receive12(Message msg) throws IOException, ClassNotFoundException {
        byte[] body = msg.getBody();
        InputStream inputStream = new ByteArrayInputStream(body);
        ObjectInputStream ois = new ObjectInputStream(inputStream);
        User user = (User) ois.readObject();
        System.out.println(user);
    }
    /*---------------------------RabbitMQ接收返回值)---------------------------------*/
    @RabbitListener(queues = "queue3")
    public User receive13(User user){
        System.out.println("同步请求接收成功:"+user);
/*        int a=10/0;*/
        return user;
    }
    /*---------------------------消息幂等性---------------------------------*/
    @RabbitListener(queues = "queue4")
    public void receive14(Message message){
        //唯一的值
        System.out.println(message.getMessageProperties().getMessageId());
        //使用Redis进行判断  当前key 是否存在
        //if(key不存在){
        //进行数据库操作 。。。。。。

        //以uuid作为redis中key 保存到redis中  ---设置有效时间  比如5分钟
        //}
    }

}

七、 RabbitMQ发送消息并接收反馈结果

   使用RabbitMQ很多情况都需要使用RabbitMQ的队列功能对数据进行排序。如果使用异步类型消息,Publisher发送完成消息后是没有任何反馈结果的,如果需要反馈结果就需要使用AmqpTemplate中convertSendAndReceive,并Consumer项目监听方法必须有返回值。<br />         场景:秒杀、抢红包等功能时都适用。<br />当使用convertSendAndReceive消息由异步变成同步,阻塞主线程。发送给队列消息后需要Consumer返回ACK值(监听方法返回值),所以在使用这个功能时,都是先启动Consumer后发送消息。

1、发送消息方

convertSendAndReceive
参数:和convertAndSend参数列表完成相同。
返回值:是consumer监听方法返回值。
如果超过时间没有收到consumer返回值,不在继续阻塞主线程,向下执行,返回值为null。

Object obj = amqpTemplate.convertSendAndReceive(“amq.direct”, “imgs”, new People(2, “李四”));

2、 接收消息方法

    方法必须有返回值,返回值就是Publisher项目中convertSendAndReceive方法的返回值。
@RabbitListener(queues = “imgs”)
public String imgs(People pe){
return “处理完成jqk”;
}

自己可以设置当前等待实现

#单位是毫秒 spring.rabbitmq.template.reply-timeout=10000

八、 ACK机制

RabbitMQ中内置ACK(消息确认机制)机制,本质就是消息确认机制。在队列中的消息,当Consumer往出取得时候,如果代码出现问题,自动切换到另一个Consumer去处理。直到消息被成功取出。
当Consumer执行成功后,会通知RabbitMQ可以从队列中删除消息。
如果希望关闭ACK机制,可以开启重试,通过设置重试次数,到达指定次数后删除消息。

spring:
rabbitmq:
host: 192.168.8.128
port: 5672
username: bjsxt password: bjsxt listener:
simple:
retry:
enabled: true # 默认false,开启重试。 max-attempts: 2 # 默认3次

九、 常见面试题

1、幂等性

幂等性常见两种情况:接口幂等性和消息幂等性。

1.1 接口幂等性

接口幂等性是指用户的同一个操作发起的一次请求或多次请求的结果是一样的,不会因为多次点击而产生不同的结果。
举例说明:用户在提交表单时(注册、电商中下单等都属于提交表单)第一次点击请求已经发送给服务器,但是由于网络问题,服务器的响应没有正常返回。用户以为没有执行成功,会再次点击提交按钮,但是实际上服务器已经处理了消息。第二次点击就会在服务器产生第二条数据。这就是没有保证接口的幂等性。
在对数据库操作中,查询和删除是不需要考虑接口幂等性问题的。只要数据库数据不变每次查询的结果都是一样的,而多次删除同一条数据影响也不大(但是需要考虑代码删除后返回值),新增时多次执行就新增多条数据,尤其是在主键自增的情况下。而修改时如果是修改成固定值是没有接口幂等性问题的,但是如果对某个列修改的时候是增量修改就需要考虑了。
解决方案:
1. 逻辑判断。此方案通过代码逻辑进行判断是否为重复提交,所以局限性比较大。只有少量需求满足这种情况。
2. 添加Token(令牌)。在显示需要提交表单的页面之前生成Token(UUID),把Token的值放入到表单内部,服务器中也存储一份token。用户在第一次提交表单的时候,服务器会判断是否有同值Token,如果有执行操作后吧Token在服务器删除。用户在同一个页面再次点击表单后,服务返回Token值已经不存在了,则不会执行对应操作。

1.2 消息幂等性(重复发送和重复消费问题)

在MQ中可能出现消息幂等性的情况:
1. (重复发送)Publisher给MQ发送消息的时候,MQ在给Publisher返回ACK时由于网络中断等问题,没有成功返回。Publisher会认为消息没有发送成功,在网络恢复后会重新发送消息。
2. (重复消费)Consumer接收到消息后,在给MQ返回ACK时由于网络问题,MQ没有成功接收ACK,MQ会认为此消息没有正确消费。在网络重连后会把消息重新发送给此消费者,或重新广播给其他所有消费者。
解决办法
1. 解决重复发送问题。MQ内部会给每个消息生成一个唯一ID。当消息接收到后会判断此ID。
2. 解决重复消费问题。在Consumer中可以通过消息的唯一ID进行判断是否已经消费过(借助Redis等工具每次消费都要记录已经消费过),也可以在每条消息中自定义唯一标识,判断是否已经消费过。
生产者:

MessageProperties mp = new MessageProperties();
mp.setMessageId(UUID.randomUUID().toString());
Message msg = new Message(“消息内容”.getBytes(),mp);
amqpTemplate.send(“sxt.dit”,“xxx”,msg);

消费者:接收判断

System.out.println(message.getMessageProperties().getMessageId());

2、如何保证消息的可靠性传输

2.1 生产者弄丢了数据

  生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。此时可以选择用RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务(channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,因为太耗性能。
  所以一般来说,如果你要确保说写RabbitMQ的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
  事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你一个接口通知你这个消息接收到了。
  所以一般在生产者这块避免数据丢失,都是用confirm机制的。
Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;(掌握)
RabbitMQ消息代理中间件 - 图9
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:channel.txSelect()声明启动事务模式;channel.txComment()提交事务;channel.txRollback()回滚事务;
RabbitMQ消息代理中间件 - 图10

2.2 .RabbitMQ弄丢了数据

  就是RabbitMQ自己弄丢了数据,这个你必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
  设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
  而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
哪怕是你给RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ挂了,就会导致内存里的一点点数据会丢失。
RabbitMQ中队列保存的消息,默认是保存在内存中的。如果重启服务应该丢失消息。RabbitMQ有消息持久能力。当RabbitMQ空闲的时候,关闭服务的时候,都会把内存中未消费的消息保存到硬盘。重启后,恢复到内存。
想要实现消息持久化需要具备以下特性:

  • 交换器是持久的
  • 在项目中建立的交换器的Durability=Durable,表示持久交换器。

RabbitMQ消息代理中间件 - 图11
队列是持久的
创建队列,默认也是持久的。
RabbitMQ消息代理中间件 - 图12
消息是持久的。
在使用Spring AMQP时,调用的convertAndSend方法时MessageProperties中的deliveryMode的默认取值为MessageDeliveryMode.PERSISTENT,表示消息的delivery_mode=2,持久消息。

2.3 .消费端弄丢了数据

  RabbitMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。
  这个时候得用RabbitMQ提供的ack机制,简单来说,就是你关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

3、 如何保证消息的顺序性

  因为在某些情况下我们扔进MQ中的消息是要严格保证顺序的,尤其涉及到订单什么的业务需求,消费的时候也是要严格保证顺序,不然会出大问题的。
1.先看看顺序会错乱的俩场景
rabbitmq:一个queue,多个consumer,这不明显乱了
kafka:一个topic,一个partition,一个consumer,内部多线程,这不也明显乱了
2.如何来保证消息的顺序性呢?
rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可。

4、 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时怎么解决?

大量消息在mq里积压了几个小时了还没解决几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。<br />  一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条,所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。<br />  一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:<br />1. 先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。<br />2. 新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量。<br />3. 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。<br />4. 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。<br />5. 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。

5、 消息队列过期失效问题

  假设你用的是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
  这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。
  这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。
  假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。

6、 消息队列满了怎么搞?

  如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

十、 网页访问:http://192.168.80.128:15672/

image.png
image.png
image.png
image.png