一、引言

模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。
RabbitMQ - 图1

二、RabbitMQ介绍

市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka分布式消息队列,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
视频 sip remp
https://www.rabbitmq.com/getstarted.html

三、RabbitMQ安装

  1. version: "3.1"
  2. services:
  3. rabbitmq:
  4. image: daocloud.io/library/rabbitmq:management
  5. restart: always
  6. container_name: rabbitmq
  7. ports:
  8. - 5672:5672
  9. - 15672:15672
  10. volumes:
  11. - ./data:/var/lib/rabbitmq

四、RabbitMQ架构【重点】

4.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange
  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息
  • Exchange - 交换机:和生产者建立连接并接收生产者的消息 (丢失处理config)
  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互(丢失处理return)
  • Routes - 路由:交换机以什么样的策略将消息发布到QueueRabbitMQ - 图2

    4.2 RabbitMQ的完整架构图

    RabbitMQ - 图3

    4.3 查看图形化界面并创建一个Virtual Host

    默认账户:guest guest
    创建一个全新的用户和全新的Virtual Host,并且将test用户设置上可以操作/test的权限
    ip+port
    http://162.14.64.72:15672/
    RabbitMQ - 图4

    五、RabbitMQ的使用【重点】

    5.1 RabbitMQ的通讯方式

    RabbitMQ - 图5
    RabbitMQ - 图6

    5.2 Java连接RabbitMQ

    5.2.2 导入依赖
    1. <dependencies>
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.6.0</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>junit</groupId>
    9. <artifactId>junit</artifactId>
    10. <version>4.12</version>
    11. </dependency>
    12. </dependencies>

    5.2.3 创建工具类连接RabbitMQ
    1. public static Connection getConnection(){
    2. // 创建Connection工厂
    3. ConnectionFactory factory = new ConnectionFactory();
    4. factory.setHost("192.168.199.109");
    5. factory.setPort(5672);
    6. factory.setUsername("test");
    7. factory.setPassword("test");
    8. factory.setVirtualHost("/test");
    9. // 创建Connection
    10. Connection conn = null;
    11. try {
    12. conn = factory.newConnection();
    13. } catch (Exception e) {
    14. e.printStackTrace();
    15. }
    16. // 返回
    17. return conn;
    18. }

    RabbitMQ - 图7

    5.3 Hello-World

    一个生产者,一个默认的交换机,一个队列,一个消费者
    RabbitMQ - 图8
    创建生产者,创建一个channel,发布消息到exchange,指定路由规则。

    1. @Test
    2. public void publish() throws Exception {
    3. //1. 获取Connection
    4. Connection connection = RabbitMQClient.getConnection();
    5. //2. 创建Channel
    6. Channel channel = connection.createChannel();
    7. //3. 发布消息到exchange,同时指定路由的规则
    8. String msg = "Hello-World!";
    9. // 参数1:指定exchange,使用""。
    10. // 参数2:指定路由的规则,使用具体的队列名称。
    11. // 参数3:指定传递的消息所携带的properties,使用null。
    12. // 参数4:指定发布的具体消息,byte[]类型
    13. channel.basicPublish("","HelloWorld",null,msg.getBytes());
    14. // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
    15. System.out.println("生产者发布消息成功!");
    16. //4. 释放资源
    17. channel.close();
    18. connection.close();
    19. }

    创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

    1. @Test
    2. public void consume() throws Exception {
    3. //1. 获取连接对象
    4. Connection connection = RabbitMQClient.getConnection();
    5. //2. 创建channel
    6. Channel channel = connection.createChannel();
    7. //3. 声明队列-HelloWorld
    8. //参数1:queue - 指定队列的名称
    9. //参数2:durable - 当前队列是否需要持久化(true)
    10. //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
    11. //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
    12. //参数5:arguments - 指定当前队列的其他信息
    13. channel.queueDeclare("HelloWorld",true,false,false,null);
    14. //4. 开启监听Queue
    15. DefaultConsumer consume = new DefaultConsumer(channel){
    16. @Override
    17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    18. System.out.println("接收到消息:" + new String(body,"UTF-8"));
    19. }
    20. };
    21. //参数1:queue - 指定消费哪个队列
    22. //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
    23. //参数3:consumer - 指定消费回调
    24. channel.basicConsume("HelloWorld",true,consume);
    25. System.out.println("消费者开始监听队列!");
    26. // System.in.read();
    27. System.in.read();
    28. //5. 释放资源
    29. channel.close();
    30. connection.close();
    31. }

    5.4 Work(工作)

    一个生产者,一个默认的交换机,一个队列,两个消费者
    RabbitMQ - 图9
    只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
    消费者指定Qos和手动ack

    1. //1 指定当前消费者,一次消费多少个消息
    2. channel.basicQos(1);
    3. DefaultConsumer consumer = new DefaultConsumer(channel){
    4. @Override
    5. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    6. try {
    7. Thread.sleep(100);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
    12. //2. 手动ack
    13. channel.basicAck(envelope.getDeliveryTag(),false);
    14. }
    15. };
    16. //3. 指定手动ack
    17. channel.basicConsume("Work",false,consumer);

    5.5 Publish/Subscribe(发布/订阅)

    一个生产者,一个交换机,两个队列,两个消费者
    RabbitMQ - 图10
    声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
    让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

    1. //3. 创建exchange - 绑定某一个队列
    2. //参数1: exchange的名称
    3. //参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
    4. channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
    5. channel.queueBind("pubsub-queue1","pubsub-exchange","");
    6. channel.queueBind("pubsub-queue2","pubsub-exchange","");

    消费者还是正常的监听某一个队列即可。

5.6 Routing(路由)

一个生产者,一个交换机,两个队列,两个消费者
RabbitMQ - 图11
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

  1. //3. 创建exchange, routing-queue-error,routing-queue-info,
  2. channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
  3. channel.queueBind("routing-queue-error","routing-exchange","ERROR");
  4. channel.queueBind("routing-queue-info","routing-exchange","INFO");
  5. //4. 发布消息到exchange,同时指定路由的规则
  6. channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
  7. channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
  8. channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
  9. channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

消费者没有变化

5.7 Topic

一个生产者,一个交换机,两个队列,两个消费者
RabbitMQ - 图12
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
route:xxx.xxx.xxx
实例:yellow.zgq.qj
.zgq.
bule.#(zgq.qj)0

  1. //2. 创建exchange并指定绑定方式
  2. channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
  3. channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
  4. channel.queueBind("topic-queue-2","topic-exchange","fast.#");
  5. channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
  6. //3. 发布消息到exchange,同时指定路由的规则
  7. channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
  8. channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
  9. channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());

消费者只是监听队列,没变化。

六、RabbitMQ整合SpringBoot【重点】

6.1 SpringBoot整合RabbitMQ

6.1.1 创建SpringBoot工程

6.1.2 导入依赖
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. </dependency>
  14. </dependencies>

6.1.3 编写配置文件
  1. spring:
  2. rabbitmq:
  3. host: 192.168.199.109
  4. port: 5672
  5. username: test
  6. password: test
  7. virtual-host: /test

6.1.4 声明exchange、queue
  1. @Configuration
  2. public class RabbitMQConfig {
  3. //1. 创建exchange - topic
  4. @Bean
  5. public TopicExchange getTopicExchange(){
  6. return new TopicExchange("boot-topic-exchange",true,false);
  7. }
  8. //2. 创建queue
  9. @Bean
  10. public Queue getQueue(){
  11. return new Queue("boot-queue",true,false,false,null);
  12. }
  13. //3. 绑定在一起
  14. @Bean
  15. public Binding getBinding(TopicExchange topicExchange,Queue queue){
  16. return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
  17. }
  18. }

6.1.5 发布消息到RabbitMQ
  1. import org.junit.jupiter.api.Test;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import javax.annotation.Resource;
  6. @SpringBootTest
  7. public class TestBootRabbitMQ {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. public void pulisher(){
  12. rabbitTemplate.convertAndSend("boot-exchange","yellow.zgq.xj","周高强被洗脚仙人跳!!!");
  13. }
  14. }@Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. @Test
  17. void contextLoads() {
  18. rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
  19. }

6.1.6 创建消费者监听消息
  1. @Component
  2. public class Consumer {
  3. @RabbitListener(queues = "boot-queue")
  4. public void getMessage(Object message){
  5. System.out.println("接收到消息:" + message);
  6. }
  7. }

6.2 手动Ack

6.2.1 添加配置文件
  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. acknowledge-mode: manual

6.2.2 手动ack
  1. @RabbitListener(queues = "boot-queue")
  2. public void getMessage(String msg, Channel channel, Message message) throws IOException {
  3. System.out.println("接收到消息:" + msg);
  4. int i = 1 / 0;
  5. // 手动ack
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  7. }

七、RabbitMQ的其他操作

7.1 消息的可靠性

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

7.1.1 普通Confirm方式
  1. //3.1 开启confirm
  2. channel.confirmSelect();
  3. //3.2 发送消息
  4. String msg = "Hello-World!";
  5. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  6. //3.3 判断消息发送是否成功
  7. if(channel.waitForConfirms()){
  8. System.out.println("消息发送成功");
  9. }else{
  10. System.out.println("发送消息失败");
  11. }

7.1.2 批量Confirm方式。
  1. //3.1 开启confirm
  2. channel.confirmSelect();
  3. //3.2 批量发送消息
  4. for (int i = 0; i < 1000; i++) {
  5. String msg = "Hello-World!" + i;
  6. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  7. }
  8. //3.3 确定批量操作是否成功
  9. channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException

7.1.3 异步Confirm方式。
  1. //3.1 开启confirm
  2. channel.confirmSelect();
  3. //3.2 批量发送消息
  4. for (int i = 0; i < 1000; i++) {
  5. String msg = "Hello-World!" + i;
  6. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  7. }
  8. //3.3 开启异步回调
  9. channel.addConfirmListener(new ConfirmListener() {
  10. @Override
  11. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  12. System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
  13. }
  14. @Override
  15. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  16. System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
  17. }
  18. });

RabbitMQ - 图13

7.1.4 Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
RabbitMQ - 图14
开启Return机制,并在发送消息时,指定mandatory为true

  1. // 开启return机制
  2. channel.addReturnListener(new ReturnListener() {
  3. @Override
  4. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  5. // 当消息没有送达到queue时,才会执行。
  6. System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
  7. }
  8. });
  9. // 在发送消息时,指定mandatory参数为true
  10. channel.basicPublish("","HelloWorld",true,null,msg.getBytes());

7.2 SpringBoot实现

7.2.1 编写配置文件
  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: simple
  4. publisher-returns: true

7.2.2 开启Confirm和Return
  1. @Component
  2. public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PostConstruct // init-method
  6. public void initMethod(){
  7. rabbitTemplate.setConfirmCallback(this);
  8. rabbitTemplate.setReturnCallback(this);
  9. }
  10. @Override
  11. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  12. if(ack){
  13. System.out.println("消息已经送达到Exchange");
  14. }else{
  15. System.out.println("消息没有送达到Exchange");
  16. }
  17. }
  18. @Override
  19. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  20. System.out.println("消息没有送达到Queue");
  21. }
  22. }

7.3 避免消息重复消费 (作业)

重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
RabbitMQ - 图15
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
  2. .deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
  3. .messageId(UUID.randomUUID().toString())
  4. .build();
  5. String msg = "Hello-World!";
  6. channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

消费者,在消费消息时,根据具体业务逻辑去操作redis

  1. DefaultConsumer consume = new DefaultConsumer(channel){
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. Jedis jedis = new Jedis("192.168.199.109",6379);
  5. String messageId = properties.getMessageId();
  6. //1. setnx到Redis中,默认指定value-0
  7. String result = jedis.set(messageId, "0", "NX", "EX", 10);
  8. if(result != null && result.equalsIgnoreCase("OK")) {
  9. System.out.println("接收到消息:" + new String(body, "UTF-8"));
  10. //2. 消费成功,set messageId 1
  11. jedis.set(messageId,"1");
  12. channel.basicAck(envelope.getDeliveryTag(),false);
  13. }else {
  14. //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
  15. String s = jedis.get(messageId);
  16. if("1".equalsIgnoreCase(s)){
  17. channel.basicAck(envelope.getDeliveryTag(),false);
  18. }
  19. }
  20. }
  21. };

7.4 SpringBoot如何实现

7.4.1 导入依赖
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>

7.4.2 编写配置文件
  1. spring:
  2. redis:
  3. host: 192.168.199.109
  4. port: 6379

7.4.3 修改生产者
  1. @Test
  2. void contextLoads() throws IOException {
  3. CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
  4. rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
  5. System.in.read();
  6. }

7.4.4 修改消费者
  1. @Autowired
  2. private StringRedisTemplate redisTemplate;
  3. @RabbitListener(queues = "boot-queue")
  4. public void getMessage(String msg, Channel channel, Message message) throws IOException {
  5. //0. 获取MessageId
  6. String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
  7. //1. 设置key到Redis
  8. if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
  9. //2. 消费消息
  10. System.out.println("接收到消息:" + msg);
  11. //3. 设置key的value为1
  12. redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
  13. //4. 手动ack
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  15. }else {
  16. //5. 获取Redis中的value即可 如果是1,手动ack
  17. if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
  18. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  19. }
  20. }
  21. }

* 完结