Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

springboot集成rabbitmq

1.引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.application.yml配置

  1. spring:
  2. rabbitmq:
  3. addresses: localhost:5672
  4. virtual-host: /rabbitmq
  5. username: admin
  6. password: admin

3.创建RabbitmqConfig.java

  1. package com.itmck.springbootmq.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 太阳当空照,花儿对我笑
  7. * <p>
  8. * Create by M ChangKe 2021/7/17 16:00
  9. **/
  10. @Configuration
  11. public class RabbitConfig {
  12. @Bean
  13. public Queue Queue() {
  14. /**
  15. * name:队列名
  16. * durable:是否持久化,默认false.持久化队列会被存储在磁盘上.当消息代理重启仍然存在
  17. * exclusive:默认false,只能被当前的连接使用.当前连接关闭后队列被删除,优先级高于durable
  18. * autoDelete:默认false.没有生产者或者消费者,队列会被删除
  19. * @return
  20. */
  21. // return new Queue("hello");//源码可知当前和下面一样功能
  22. return new Queue("hello", true, false, false);
  23. }
  24. }

4.创建生产者

  1. package com.itmck.springbootmq.compnent;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.AmqpTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * 太阳当空照,花儿对我笑
  8. * <p>
  9. * Create by M ChangKe 2021/7/17 16:01
  10. **/
  11. @Slf4j
  12. @Component
  13. public class HelloSender {
  14. @Autowired
  15. private AmqpTemplate rabbitTemplate;
  16. public void send(String message) {
  17. log.info("发送消息:{}",message);
  18. this.rabbitTemplate.convertAndSend("hello",message);
  19. }
  20. }

5.创建消费端

  1. package com.itmck.springbootmq.compnent;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * 太阳当空照,花儿对我笑
  8. * <p>
  9. * Create by M ChangKe 2021/7/17 16:00
  10. **/
  11. @Slf4j
  12. @Component
  13. @RabbitListener(queues = "hello")
  14. public class HelloReceiver {
  15. @RabbitHandler
  16. public void process(String message) {
  17. log.info("消费端接收消息: {}", message);
  18. }
  19. }

测试如下:

  1. package com.itmck.springbootmq;
  2. import com.itmck.springbootmq.compnent.HelloSender;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. public class RabbitMqHelloTest {
  8. @Autowired
  9. private HelloSender helloSender;
  10. @Test
  11. public void hello() {
  12. helloSender.send("hello rabbitmq");
  13. }
  14. }

直接启动测试类,因为客生产者以及消费者都在一个服务里面,所以打印如下:

image.png


通过api进行创建队列

上面是通过配置进行队列的创建,现在使用api进行队列的创建,这种方式可动态创建多个队列.

代码如下:

  1. @Test
  2. public void create() {
  3. String queueName = "mck_queue";
  4. String dlExchangeName = "mckExchange";
  5. ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
  6. try (
  7. Connection connection = connectionFactory.createConnection();
  8. Channel channel = connection.createChannel(true)
  9. ) {
  10. log.debug("创建交换机");
  11. channel.exchangeDeclare(dlExchangeName, "direct", true);//创建交换机
  12. channel.queueDeclare(queueName, true, false, false, null);//创建队列
  13. channel.queueBind(queueName, dlExchangeName, "DL_KEY", null);//将交换机和队列绑定
  14. channel.close();
  15. log.info("队列创建完成");
  16. } catch (Exception e) {
  17. log.error("队列创建失败", e);
  18. }
  19. }

步骤总结:

  • 通过RabbitTemplate获取连接管道Channel实例
  • 通过管道创建交换机 channel.exchangeDeclare(dlExchangeName, “direct”, true);
  • 声明创建队列 channel.queueDeclare(queueName, true, false, false, null);
  • 将交换机和队列绑定 channel.queueBind(queueName, dlExchangeName, “DL_KEY”, null);

控制台如下:

通过rabbitmq控制台可以看到创建完成,交换机如下:

image.png

队列如下

image.png

通过控制台进行创建队列

rabbitmq提供了控制台进行队列的操作登录控制台 ip:15672 注意:默认控制台端口是15672 不同于配置文件的5672

image.png

手动创建队列

image.png

点击队列名

image.png

创建交换机

image.png

同理队列

image.png

上面就是rabbitmq几种创建队列的方式

几种常用队列的使用

下面图示: P:代表publisher生产者 c:代表consumer消费者 红色长条:代表队列 X:代表交换机

1.简单模式

简单模式就是使用简单的队列进行监听.一个生产者一个消费者,一个队列.如下:

image.png

  1. //通过javaconfig方式进行创建队列
  2. @Configuration
  3. public class RabbitConfig {
  4. @Bean
  5. public Queue queue() {
  6. return new Queue("hello", true, false, false);
  7. }
  8. }
  9. //创建生产者
  10. @Slf4j
  11. @Component
  12. public class HelloSender {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. public void send(String message) {
  16. log.info("发送消息:{}",message);
  17. this.rabbitTemplate.convertAndSend("hello",message);
  18. }
  19. }
  20. //创建消费者(监听者),进行消费
  21. @Slf4j
  22. @Component
  23. @RabbitListener(queues = "hello")
  24. public class HelloReceiver {
  25. @RabbitHandler
  26. public void process(String message) {
  27. log.info("消费端接收消息: {}", message);
  28. }
  29. }

2.work queues模式

工作队列方式:在工人之间分配任务(竞争消费者模式),能者多劳动 默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。

image.png

代码与上述一样,开启多实例消费端

模拟一个生产者,两个消费者.默认情况下,轮询进行消费
image.png
image.png

idea开启多实例,配置如下.操作是先进行一个实例启动.然后修改端口再次运行.

image.png
image.png

3.Publish/Subscribe

发布/订阅模式:同时将向多个消费者传递一条消息。这种模式被称为“发布/订阅”。或者说,发布一条消息同时被多个消费者进行监听. 扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。

image.png

  1. /**
  2. * 太阳当空照,花儿对我笑
  3. * <p>
  4. * Create by M ChangKe 2021/7/17 16:01
  5. **/
  6. @Slf4j
  7. @Component
  8. public class HelloSender {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. public void send(String message) {
  12. String context = "hi, fanout msg ";
  13. System.out.println("Sender : " + context);
  14. this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
  15. }
  16. }
  17. @Slf4j
  18. @Component
  19. @RabbitListener(queues = "fanout.A")
  20. public class FanOutHelloReceiver1 {
  21. @RabbitHandler
  22. public void process(String message) {
  23. log.info("消费端接收消息: {}", message);
  24. }
  25. }
  26. @Slf4j
  27. @Component
  28. @RabbitListener(queues = "fanout.B")
  29. public class FanOutHelloReceiver2 {
  30. @RabbitHandler
  31. public void process(String message) {
  32. log.info("消费端接收消息: {}", message);
  33. }
  34. }

结果:两个消费者收到同样的消息
image.png

4.Routing路由方式

路由方式这里多了一个路由键,根据不同的路由键,可以将消息分别送到不同的队列,进行消费

Directed Exchange

路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

image.png

  1. @Configuration
  2. public class RabbitConfig {
  3. @Bean
  4. public Queue directQueue() {
  5. return new Queue("direct_queue");
  6. }
  7. @Bean
  8. public DirectExchange directExchange() {
  9. return new DirectExchange("direct_Exchange");
  10. }
  11. @Bean
  12. public Binding bindingDirectExchange() { //直连交换机
  13. return BindingBuilder
  14. .bind(directQueue())
  15. .to(directExchange())
  16. .with("direct_key");
  17. }
  18. }

5.Topics主题模式

通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a..c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

image.png

  1. @Configuration
  2. public class TopicRabbitConfig {
  3. final static String message = "topic.message";
  4. final static String messages = "topic.messages";
  5. @Bean
  6. public Queue queueMessage() {
  7. return new Queue(TopicRabbitConfig.message);
  8. }
  9. @Bean
  10. public Queue queueMessages() {
  11. return new Queue(TopicRabbitConfig.messages);
  12. }
  13. @Bean
  14. TopicExchange exchange() {
  15. return new TopicExchange("exchange");
  16. }
  17. @Bean
  18. Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
  19. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  20. }
  21. @Bean
  22. Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
  23. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  24. }
  25. }
  26. public void send1() {
  27. String context = "hi, i am message 1";
  28. System.out.println("Sender : " + context);
  29. this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
  30. }
  31. public void send2() {
  32. String context = "hi, i am messages 2";
  33. System.out.println("Sender : " + context);
  34. this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
  35. }
  36. @Slf4j
  37. @Component
  38. @RabbitListener(queues = "topic.message")
  39. public class FanOutHelloReceiver1 {
  40. @RabbitHandler
  41. public void process(String message) {
  42. log.info("topic.message消费端接收消息: {}", message);
  43. }
  44. }
  45. @Slf4j
  46. @Component
  47. @RabbitListener(queues = "topic.messages")
  48. public class FanOutHelloReceiver2 {
  49. @RabbitHandler
  50. public void process(String message) {
  51. log.info("topic.messages 消费端接收消息: {}", message);
  52. }
  53. }

运行send1此时 topic.message与topic.messages都可以收到消息

image.png

运行send2 此时只有topic.messages可以收到消息

image.png

6.RPC模式

不怎么用,省略

7.Publisher Confirms

发布确认模式:包括,是否发送到交换机的确认,以及消息是否路由到队列确认.消费者的nack/ack机制

先看一下知乎帖子:
image.png

其实这个确认机制如果不让回原队列,可以配置死信队列进行使用.nack消息,进入死信队列. 如果是程序性bug导致nack,那程序修复后,可以使死信队列中的消息重新入队列,二次消费即可

application.yml

  1. spring:
  2. rabbitmq:
  3. addresses: localhost:5672
  4. virtual-host: /rabbitmq
  5. username: admin
  6. password: admin
  7. publisher-confirms: true
  8. publisher-returns: true
  9. template:
  10. mandatory: true

RabbitmqConfig.java

使用javaconfig方式进行队列的创建与绑定

  1. @Configuration
  2. @EnableRabbit
  3. public class RabbitmqConfig {
  4. //创建队列
  5. @Bean
  6. public Queue ttlQueue() {
  7. Map<String, Object> args = new HashMap<>();
  8. args.put("x-dead-letter-exchange", "spring.direct.deadExchange");
  9. args.put("x-dead-letter-routing-key", "spring.deadRouting");
  10. //args.put("x-message-ttl",10000); // 设置消息过期时间无效果
  11. //args.put("x-expires",10000); // 设置队列过期时间无效果
  12. return new Queue("spring.direct.ttl.queue", true, false, false, args);
  13. }
  14. //创建死信队列
  15. @Bean
  16. public Queue deadQueue() {
  17. return new Queue("spring.direct.deadQueue");
  18. }
  19. //创建交换机
  20. @Bean
  21. public DirectExchange directExchange() {
  22. return new DirectExchange("spring.direct.exchange");
  23. }
  24. // 这里直接将死信队列以及正常消费队列都绑定到一个交换机上
  25. @Bean
  26. public Binding queueBinding1() {
  27. return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("spring.routing");
  28. }
  29. @Bean
  30. public Binding queueBinding2() {
  31. return BindingBuilder.bind(deadQueue()).to(directExchange()).with("spring.deadRouting");
  32. }
  33. // 这里直接将死信队列以及正常消费队列都绑定到一个交换机上
  34. @Bean
  35. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  36. return new RabbitAdmin(connectionFactory);
  37. }
  38. /**
  39. *
  40. * 这是生产端,使用RabbitTemplate连接
  41. *
  42. * @param connectionFactory 连接工厂
  43. * @return
  44. */
  45. @Bean
  46. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  47. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  48. template.setMessageConverter(new Jackson2JsonMessageConverter());
  49. return template;
  50. }
  51. /**
  52. * 这是配置在消费端,SimpleRabbitListenerContainerFactory进行连接
  53. * @param connectionFactory 连接工厂
  54. * @return
  55. */
  56. @Bean
  57. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  58. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  59. factory.setConnectionFactory(connectionFactory);
  60. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  61. return factory;
  62. }
  63. }

生产者

  1. @Component
  2. @Slf4j
  3. public class RabbitOrderSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. // @Autowired
  7. // private IBrokerMessageLogService brokerMessageLogService;
  8. @Autowired
  9. private BrokerMessageLogServiceImpl brokerMessageLogService2;
  10. /**
  11. * 发送消息, 构建自定义对象消息
  12. *
  13. * @param order
  14. */
  15. public void sendOrder(TOrder order) {
  16. rabbitTemplate.setConfirmCallback(this);
  17. rabbitTemplate.setReturnCallback(this);
  18. /**
  19. * 自定义对象,消息唯一ID,
  20. * 每个发送的消息都需要配备一个 CorrelationData 相关数据对象,
  21. * CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
  22. * 真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。
  23. */
  24. CorrelationData correlationData = new CorrelationData(order.getMessageId());
  25. // 发送消息
  26. rabbitTemplate.convertAndSend(Constant.ORDER_EXCHANGE, Constant.ORDER_ROUTING, order, correlationData);
  27. log.info("消息已发送,messageId={}", order.getMessageId());
  28. }
  29. /**
  30. * 成功接收后回调, 确认消息被rabbitmq成功接收
  31. */
  32. @Override
  33. public void confirm(CorrelationData correlationData, boolean ack, String s) {
  34. String messageId = correlationData.getId();
  35. if (ack) {
  36. // 如果成功接收了,就修改消息记录表的状态为success。
  37. BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
  38. brokerMessageLog.setStatus(Constant.ORDER_SEND_SUCCESS);
  39. brokerMessageLog.setUpdateTime(LocalDateTime.now());
  40. QueryWrapper<BrokerMessageLog> queryWrapper = new QueryWrapper<BrokerMessageLog>();
  41. queryWrapper.eq("message_id", messageId);
  42. brokerMessageLogService2.update(brokerMessageLog, queryWrapper);
  43. log.info("消息发送成功, messageId={}", messageId);
  44. } else {
  45. //失败则进行具体的后续操作:重试 或者补偿等手段
  46. log.error("消息发送失败, messageId={}", messageId);
  47. }
  48. }
  49. /**
  50. * 失败后回调
  51. */
  52. @Override
  53. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  54. byte[] body = message.getBody();
  55. MessageProperties messageProperties = message.getMessageProperties();
  56. log.error("消息发送失败, body={}", new String(body));
  57. }
  58. }

消费端

  1. package com.guoj.rabbitmq.receive;
  2. import com.guoj.rabbitmq.entity.TOrder;
  3. import com.guoj.rabbitmq.utils.Constant;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.rabbit.annotation.*;
  7. import org.springframework.amqp.support.AmqpHeaders;
  8. import org.springframework.messaging.handler.annotation.Headers;
  9. import org.springframework.messaging.handler.annotation.Payload;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Map;
  12. @Component
  13. @Slf4j
  14. public class OrderReceive {
  15. /**
  16. * 使用这个注解,可以自动在rabbitmq中创建出交换机、队列及routingKey的绑定关系。
  17. * 使用时,可以先启动消费方把这些关系都自动创建出来。
  18. * <p>
  19. * exchange 交换机
  20. * ---------------------
  21. * name:交换机名称
  22. * durable: 是否持久化
  23. * type: 消息模式(direct、topic、fanout、header)
  24. * ignoreDeclarationExceptions: 忽略声明异常
  25. * <p>
  26. * value 队列
  27. * --------------------
  28. * value: 哪个队列
  29. * durable:是否持久化
  30. * <p>
  31. * key 路由key
  32. */
  33. @RabbitHandler
  34. @RabbitListener(bindings = @QueueBinding(
  35. exchange = @Exchange(
  36. name = Constant.ORDER_EXCHANGE,
  37. durable = Constant.DURABLE,
  38. type = Constant.MESSAGE_TYPE,
  39. ignoreDeclarationExceptions = Constant.IGNORE_DECLARATION_EXCEPTIONS
  40. ),
  41. value = @Queue(
  42. value = Constant.ORDER_QUEUE,
  43. durable = Constant.DURABLE
  44. ),
  45. key = Constant.ORDER_CONSUMER_KEY
  46. ))
  47. public void onOrderMessage(@Payload TOrder order, Channel channel,
  48. @Headers Map<String, Object> headers)
  49. throws Exception {
  50. // 消费消息
  51. log.info("-----------------------收到消息, 开始消费-----------------------");
  52. log.info("订单id: {}", order.getId());
  53. // 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  54. Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  55. /**
  56. * 手工ACK,实际中一般使用手工签收,自动签收容易丢失消息。
  57. * 这行如果注释掉,也能收到消息,但是rabbitmq的消息还在,没有没签收。
  58. * 第二个参数是multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
  59. */
  60. channel.basicAck(deliveryTag, false);
  61. /**
  62. * 手工nack
  63. *
  64. * requeue 第三个参数:再次入队列,如果配合死信队列,这里设置为false.直接进入死信队列
  65. *
  66. */
  67. channel.basicNack(deliveryTag,false,false);
  68. }
  69. }

当channel.basicNack(deliveryTag,false,false); 时,消息会进入死信队列

常见问题

可能出现的问题:

消息持久化

在生产环境中,我们需要考虑万一生产者挂了,消费者挂了,或者 rabbitmq 挂了怎么样。一般来说,如果生产者挂了或者消费者挂了,其实是没有影响,因为消息就在队列里面。那么万一 rabbitmq 挂了,之前在队列里面的消息怎么办,其实可以做消息持久化,RabbitMQ 会把信息保存在磁盘上。
做法是可以先从 Connection 对象中拿到一个 Channel 信道对象,然后再可以通过该对象设置 消息持久化。

生产者或者消费者断线重连

这里 Spring 有自动重连机制。

ACK 确认机制

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

https://github.com/401Studio/WeekLearn/issues/2