springboot 整合 RabbitMQ

背景

多线程读取大文件后需要分时段进行第三方接口的调用【由于外部接口限制API30s才可以调用一次】,所以我们将使用RabbitMQ的延时消费的方式进行外部接口的调用,以保证项目程序的正常运行

RabbitMQ介绍

RabbitMQ介绍
RabbitMQ 快速下载安装

详情看上 ,本篇文章只解决背景问题,整合springboot+RabbitMQ

springboot 整合 RabbitMQ[延时队列]

延时队列

延时队列是存储延时消息的队列,延时消息就是生产者发送了一条消息,但是不希望该消息不要被立即消费,而是设置一个延时时间,等过了这个时间再消费消息

延时队列安装插件

RabbitMQ3.6版本以前一般采用死信队列 + TTL过期时间的方式来实现延迟队列。
从3.6以后的版本,RabbitMQ官方提供了延迟对列的插件官方下载地址rabbitmq_delayed_message_exchange-3.8.0.ez 这个插件放到 RabbitMQ 安装目录的 plugins文件中

RabbitMQ 安装 sbin 文件中用 cmd 执行命令:

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后登陆web页面,可以查看到交换机tab下新增了x-delayed-message

插件原理

springboot 整合 RabbitMQ - 图1

  • 原始的 死信队列+ TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用)
  • 这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了

代码具体实现

pom.xml

  1. <!--springboot 2.3.1.RELEASE-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>

application.properties

  1. # 由于MQ只在本项目中,未跨项目 所有配置文件一份,跨项目则需要分开配置
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. #开启消息的return机制
  7. spring.rabbitmq.publisher-confirms=true
  8. #确认消息已发送到队列
  9. spring.rabbitmq.publisher-returns=true
  10. #开启消息的confirm机制 correlated:开启;NONE:关闭
  11. spring.rabbitmq.publisher-confirms-type=correlated
  12. #在需要使用消息的return机制时候,此参数必须设置为true
  13. spring.rabbitmq.template.mandatory=true
  14. spring.rabbitmq.listener.type=simple
  15. #消费方消息确认:手动确认
  16. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  17. #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
  18. spring.rabbitmq.listener.simple.default-requeue-rejected=false

静态常量抽取

  1. /**
  2. * MQ相关常量
  3. **/
  4. public class MqConstant {
  5. // 延时交换机
  6. public static final String MQ_WEBSITE_FILM_DELAY_EXCHANGE = "website_film_delay_exchange";
  7. // 延时队列名称
  8. public static final String MQ_WEBSITE_FILM_QUEUE = "website_film_delay_queue";
  9. // 普通交换机
  10. public static final String MQ_WEBSITE_NORMAL_EXCHANGE = "website_normal_exchange";
  11. // 普通队列名称
  12. public static final String MQ_WEBSITE_NORMAL_QUEUE = "website_normal_queue";
  13. // 普通交换机路由键
  14. public static final String MQ_WEBSITE_NORMAL_ROUTING_KEY = "website_normal_routing_key";
  15. }

配置类

RabbitConfig

主要是队列,交换机的配置绑定

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * RabbitMQ的配置 主要是队列,交换机的配置绑定
  12. **/
  13. @Configuration
  14. public class RabbitConfig {
  15. //https://blog.csdn.net/weixin_38192427/article/details/120509586
  16. /**
  17. * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
  18. * exclusive 表示该消息队列是否只在当前connection生效,默认是false
  19. * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
  20. */
  21. // 延时队列
  22. @Bean
  23. public Queue delayQueue() {
  24. return new Queue(MqConstant.MQ_WEBSITE_FILM_QUEUE, true,false,false);
  25. }
  26. /**
  27. * 交换机说明:
  28. * durable="true" rabbitmq重启的时候不需要创建新的交换机
  29. * auto-delete 表示交换机没有在使用时将被自动删除 默认是false
  30. * direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
  31. * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
  32. * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
  33. */
  34. // 延时交换机 设置
  35. public FanoutExchange delayExchange() {
  36. //使用自定义交换器
  37. Map<String, Object> args = new HashMap<>();
  38. args.put("x-delayed-type", "direct");
  39. FanoutExchange fanoutExchange = new FanoutExchange(MqConstant.MQ_WEBSITE_FILM_DELAY_EXCHANGE, true, false, args);
  40. fanoutExchange.setDelayed(true);
  41. return fanoutExchange;
  42. }
  43. // 绑定延时队列与延时交换机
  44. @Bean
  45. public Binding delayBind() {
  46. return BindingBuilder.bind(delayQueue()).to(delayExchange());
  47. }
  48. // ------------------------普通队列------------------------
  49. // 普通队列
  50. @Bean
  51. public Queue normalQueue() {
  52. return new Queue(MqConstant.MQ_WEBSITE_NORMAL_QUEUE, true);
  53. }
  54. // 普通交换机
  55. @Bean
  56. public DirectExchange normalExchange() {
  57. return new DirectExchange(MqConstant.MQ_WEBSITE_NORMAL_EXCHANGE, true, false);
  58. }
  59. // 绑定普通消息队列
  60. @Bean
  61. public Binding normalBind() {
  62. return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(MqConstant.MQ_WEBSITE_NORMAL_ROUTING_KEY);
  63. }
  64. // 定义消息转换器
  65. @Bean
  66. public Jackson2JsonMessageConverter jsonMessageConverter() {
  67. return new Jackson2JsonMessageConverter();
  68. }
  69. /** ======================== 定制一些处理策略 =============================*/
  70. /**
  71. * 定制化amqp模版
  72. * <p>
  73. * Rabbit MQ的消息确认有两种。
  74. * <p>
  75. * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
  76. * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
  77. * <p>
  78. * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
  79. */
  80. // 定义消息模板用于发布消息,并且设置其消息转换器
  81. @Bean
  82. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  83. final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  84. // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  85. rabbitTemplate.setMandatory(true);
  86. /**
  87. * 使用该功能需要开启消息确认,yml需要配置 publisher-confirms: true
  88. * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
  89. * correlationData 消息唯一标志
  90. * ack 确认结果
  91. * cause 失败原因
  92. */
  93. rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallback());
  94. /**
  95. * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
  96. * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
  97. * message 消息主体 message
  98. * replyCode 消息主体 message
  99. * replyText 描述
  100. * exchange 消息使用的交换机
  101. * routingKey 消息使用的路由键
  102. */
  103. rabbitTemplate.setReturnCallback(new MsgSendReturnCallback());
  104. rabbitTemplate.setMessageConverter(jsonMessageConverter());
  105. return rabbitTemplate;
  106. }
  107. // --------------------------使用RabbitAdmin启动服务便创建交换机和队列--------------------------
  108. @Bean
  109. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  110. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  111. // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  112. rabbitAdmin.setAutoStartup(true);
  113. // 创建延时交换机和对列
  114. rabbitAdmin.declareExchange(delayExchange());
  115. rabbitAdmin.declareQueue(delayQueue());
  116. // 创建普通交换机和对列
  117. rabbitAdmin.declareExchange(normalExchange());
  118. rabbitAdmin.declareQueue(normalQueue());
  119. return new RabbitAdmin(connectionFactory);
  120. }
  121. }

RabbitConfirmConfig

RabbitMQ的配置 消息发送到 exchange,queue 的回调函数

  1. import org.apache.logging.log4j.LogManager;
  2. import org.apache.logging.log4j.Logger;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * RabbitMQ的配置 消息发送到 exchange,queue 的回调函数
  9. **/
  10. @Configuration
  11. public class RabbitConfirmConfig {
  12. private static Logger logger = LogManager.getLogger(RabbitConfirmConfig.class);
  13. /** ======================== 定制一些处理策略 =============================*/
  14. /**
  15. * 定制化amqp模版
  16. * <p>
  17. * Rabbit MQ的消息确认有两种。
  18. * <p>
  19. * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
  20. * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
  21. * <p>
  22. * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
  23. * Springboot中使用ConfirmCallback和ReturnCallback
  24. * 注意:
  25. * 在需要使用消息的return机制时候,mandatory参数必须设置为true
  26. * 新版本开启消息的confirm配置publisher-confirms已经过时,改为使用publisher-confirm-type参数设置(correlated:开启;NONE:关闭)
  27. */
  28. @Bean
  29. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  30. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  31. rabbitTemplate.setConnectionFactory(connectionFactory);
  32. // 设置开启Mandatory才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  33. rabbitTemplate.setMandatory(true);
  34. /**
  35. * 使用该功能需要开启消息确认 无论成功与否,需要配置 publisher-confirms: true
  36. * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
  37. * correlationData 消息唯一标志
  38. * ack 确认结果
  39. * cause 失败原因
  40. */
  41. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  42. logger.info("ConfirmCallback:" + "相关数据:" + correlationData);
  43. logger.info("ConfirmCallback:" + "确认情况:" + ack);
  44. logger.info("ConfirmCallback:" + "原因:" + cause);
  45. });
  46. /**
  47. * 消息从Exchange路由到Queue失败的回调
  48. * 使用该功能需要开启消息返回确认,需要配置 publisher-returns: true
  49. * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
  50. * message 消息主体 message
  51. * replyCode 消息主体 message
  52. * replyText 描述
  53. * exchange 消息使用的交换机
  54. * routingKey 消息使用的路由键
  55. */
  56. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  57. logger.info("ReturnCallback:" + "消息:" + message);
  58. logger.info("ReturnCallback:" + "回应码:" + replyCode);
  59. logger.info("ReturnCallback:" + "回应信息:" + replyText);
  60. logger.info("ReturnCallback:" + "交换机:" + exchange);
  61. logger.info("ReturnCallback:" + "路由键:" + routingKey);
  62. });
  63. return rabbitTemplate;
  64. }
  65. }

MsgSendReturnCallback

消息从交换机发送到对应队列失败时触发

  1. import org.apache.logging.log4j.LogManager;
  2. import org.apache.logging.log4j.Logger;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
  8. private static Logger logger = LogManager.getLogger(MsgSendReturnCallback.class);
  9. /**
  10. * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
  11. * message 消息主体 message
  12. * replyCode 消息主体 message
  13. * replyText 描述
  14. * exchange 消息使用的交换机
  15. * routingKey 消息使用的路由键
  16. * <p>
  17. * PS:通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
  18. * </p>
  19. */
  20. @Override
  21. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  22. if (exchange.equals(MqConstant.MQ_WEBSITE_FILM_DELAY_EXCHANGE)) {
  23. // 如果配置了发送回调ReturnCallback,rabbitmq_delayed_message_exchange插件会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期/时间到了才会发往队列。
  24. // 所以如果是延迟队列的交换器,则直接放过,并不是bug
  25. return;
  26. }
  27. String correlationId = message.getMessageProperties().getCorrelationId();
  28. logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
  29. }
  30. }

AckListener

Consumer ACK机制

  1. import com.rabbitmq.client.Channel;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Map;
  8. /**
  9. * Consumer ACK机制:
  10. * 1. 设置手动签收。acknowledge="manual"
  11. * 2. 让监听器类实现ChannelAwareMessageListener接口
  12. * 3. 如果消息成功处理,则调用channel的 basicAck()签收
  13. * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
  14. *
  15. */
  16. @Component
  17. public class AckListener implements ChannelAwareMessageListener {
  18. private static Logger logger = LogManager.getLogger(AckListener.class);
  19. Map<Long,Integer> map;
  20. @Override
  21. public void onMessage(Message message, Channel channel) throws Exception {
  22. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  23. try {
  24. //1.接收转换消息
  25. logger.info("MQ接收转换消息[{}]",new String(message.getBody()));
  26. //2. 处理业务逻辑
  27. logger.info("处理业务逻辑...");
  28. int i = 3/0;//出现错误
  29. //3. 手动签收
  30. channel.basicAck(deliveryTag,false);
  31. } catch (Exception e) {
  32. //e.printStackTrace();
  33. logger.error("MQ拒绝签收逻辑...");
  34. /*
  35. 4.拒绝签收
  36. 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
  37. */
  38. /*if(!map.containsKey(deliveryTag)){
  39. map.put(deliveryTag,1);
  40. channel.basicNack(deliveryTag,true,true);
  41. }else{
  42. //判断是否大于3次
  43. if(>3){
  44. channel.basicNack(deliveryTag,false,true);
  45. //记录
  46. }else{
  47. channel.basicNack(deliveryTag,true,true);
  48. }
  49. }*/
  50. channel.basicNack(deliveryTag,false,false);
  51. //channel.basicReject(deliveryTag,true);
  52. }
  53. }
  54. }

MsgProductionService

发送消息到MQ 业务类 消费生产

  1. import org.springframework.amqp.core.MessageDeliveryMode;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.stereotype.Service;
  4. import javax.annotation.Resource;
  5. /**
  6. * 发送消息到MQ
  7. **/
  8. @Service("msgProductionService")
  9. public class MsgProductionService {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. // 发送延时信息
  13. public void sendTimeoutMsg(String exchange,String content, String routingKey, int delay) {
  14. // 通过广播模式发布延时消息,会广播至每个绑定此交换机的队列,这里的路由键没有实质性作用
  15. rabbitTemplate.convertAndSend(exchange, routingKey, content, message -> {
  16. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  17. // 毫秒为单位,指定此消息的延时时长
  18. message.getMessageProperties().setDelay(delay * 1000);
  19. return message;
  20. });
  21. }
  22. // 发送普通消息
  23. public void sendMsg(String exchange,String routingKey, String content) {
  24. // DirectExchange类型的交换机,必须指定对应的路由键
  25. rabbitTemplate.convertAndSend(exchange, routingKey, content);
  26. }
  27. }

MsgComsumerService

MQ消费 消费者【小编这里是在同一个项目中实现的,生产和消费】

  1. import com.rabbitmq.client.Channel;
  2. import com.zy.website.service.FilmService;
  3. import org.apache.logging.log4j.LogManager;
  4. import org.apache.logging.log4j.Logger;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. import javax.annotation.Resource;
  10. import java.io.IOException;
  11. /**
  12. * MQ 消费者
  13. **/
  14. @Service("msgComsumerService")
  15. public class MsgComsumerService {
  16. private static Logger logger = LogManager.getLogger(MsgComsumerService.class);
  17. @Resource
  18. FilmService filmService;//自己业务具体实现类注入
  19. // 监听消费延时消息
  20. @RabbitListener(queues = {"website_film_delay_queue"})
  21. @RabbitHandler
  22. public void process(String content, Message message, Channel channel) throws IOException {
  23. try {
  24. // 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
  25. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  26. //调用方法消费消息 自己业务具体实现类
  27. filmService.getFilmInfoByExternalApi(content);
  28. logger.info("延迟队列消息[{}]被消费!!",content);
  29. } catch (Exception e) {
  30. logger.error("延迟队列消息 处理失败:{}", e.getMessage());
  31. // 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
  32. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  33. /*// ack返回false,requeue-true并重新回到队列
  34. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);*/
  35. }
  36. }
  37. // 消费普通消息
  38. @RabbitListener(queues = {"website_normal_queue"})
  39. @RabbitHandler
  40. public void process1(String content, Message message, Channel channel) throws IOException {
  41. try {
  42. logger.info("普通队列的内容[{}]", content);
  43. // 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
  44. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  45. logger.info("普通信息处理完毕");
  46. } catch (Exception e) {
  47. logger.error("处理失败:{}", e.getMessage());
  48. // 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
  49. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  50. }
  51. }
  52. }
  1. **代码实现完成,这只是简单实现,复杂还需要研究。。。。。**

RabbitMQ的具体详细配置项

基础配置项

  1. spring.rabbitmq.host: 默认localhost
  2. spring.rabbitmq.port: 默认5672
  3. spring.rabbitmq.username: 用户名
  4. spring.rabbitmq.password: 密码
  5. spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机
  6. spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresseshost
  7. spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙
  8. spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false
  9. spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false
  10. spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时

SSL

  1. spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false
  2. spring.rabbitmq.ssl.key-store: 持有SSL certificatekey store的路径
  3. spring.rabbitmq.ssl.key-store-password: 访问key store的密码
  4. spring.rabbitmq.ssl.trust-store: 持有SSL certificatesTrust store
  5. spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码
  6. spring.rabbitmq.ssl.trust-store-type=JKSTrust store 类型.
  7. spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置
  8. spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证
  9. spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证

缓存cache

  1. spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
  2. spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
  3. spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效
  4. spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel connection

Listener

  1. spring.rabbitmq.listener.type=simple: 容器类型.simpledirect
  2. spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器
  3. spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是nonemanualauto;默认auto
  4. spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
  5. spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
  6. spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量.
  7. spring.rabbitmq.listener.simple.transaction-size: ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch prefetch将增加到这个值
  8. spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
  9. spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器
  10. spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒
  11. spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用
  12. spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数
  13. spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔
  14. spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔
  15. spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数
  16. spring.rabbitmq.listener.simple.retry.stateless=true: 重试时有状态or无状态
  17. spring.rabbitmq.listener.direct.acknowledge-mode= ack模式
  18. spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器
  19. spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量.
  20. spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队.
  21. spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔.
  22. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败.
  23. spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量.
  24. spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制.
  25. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message.
  26. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
  27. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts.
  28. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval.
  29. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful.

Template

  1. spring.rabbitmq.template.mandatory: 启用强制信息;默认false
  2. spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
  3. spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
  4. spring.rabbitmq.template.retry.enabled=false: 发送重试是否可用
  5. spring.rabbitmq.template.retry.max-attempts=3: 最大重试次数
  6. spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次尝试发布或传递消息之间的间隔
  7. spring.rabbitmq.template.retry.multiplier=1: 应用于上一重试间隔的乘数
  8. spring.rabbitmq.template.retry.max-interval=10000: 最大重试时间间隔