使用rabbit步骤

1.引入相关依赖

  1. <dependency>
  2. <groupId>com.xy</groupId>
  3. <artifactId>xy-core-mq-rabbit-boot-starter</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>

2.配置环境参数

  1. spring:
  2. application:
  3. name: test-rabbit
  4. autoconfigure:
  5. exclude:
  6. - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  7. rabbitmq:
  8. enable: true
  9. hard-code-init: true
  10. #addresses: localhost:5672 链接到集群
  11. host: localhost
  12. port: 5672
  13. virtual-host: /
  14. username: guest
  15. password: guest
  16. #requested-heartbeat: 1s
  17. publisher-confirm-type: correlated
  18. publisher-returns: true
  19. cache:
  20. connection:
  21. mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTION
  22. size: 20 #缓存的连接数,只有是CONNECTION模式时生效
  23. listener:
  24. type: simple
  25. simple:
  26. acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
  27. #auto-startup: true #是否启动时自动启动容器
  28. batch-size: 20
  29. concurrency: 2 #最小的消费者数量
  30. default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
  31. #idle-event-interval: 1000ms #多少长时间发布空闲容器时间,单位毫秒
  32. max-concurrency: 8 #最大的消费者数量
  33. missing-queues-fatal: true #队列不存在是否跑异常
  34. prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
  35. retry:
  36. enabled: true #监听重试是否可用
  37. initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
  38. max-attempts: 3 #最大重试次数
  39. max-interval: 10000ms #最大重试时间间隔
  40. multiplier: 2.0D #应用于上一重试间隔的乘数
  41. stateless: true #重试是有状态or无状态
  42. template:
  43. receive-timeout: 10000ms #receive() 操作的超时时间
  44. mandatory: false # 启用强制信息;默认false
  45. reply-timeout: 10000ms #sendAndReceive() 操作的超时时间
  46. retry:
  47. enabled: true #发送重试是否可用
  48. max-attempts: 3 #最大重试次数
  49. initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
  50. multiplier: 2.0D #应用于上一重试间隔的乘数
  51. max-interval: 10000ms #最大重试时间间隔

3.定义交换机,路由,队列信息

继承ExchangeQueueConfig,添加交换机,路由,队列绑定信息,将ExchangeQueueConfig作为Component添加到容器中

  1. @Component
  2. public class TestExchangeQueueConfig extends ExchangeQueueConfig {
  3. @Override
  4. protected List<ExchangeRoutingKeyQueue> configExchangeQueue() {
  5. List<ExchangeRoutingKeyQueue> result = new ArrayList<>();
  6. ExchangeRoutingKeyQueue exchangeQueue1 = new ExchangeRoutingKeyQueue(ExchangeEnum.FANOUT, "aaa", "bbb", "ccc", true);
  7. ExchangeRoutingKeyQueue exchangeQueue2 = new ExchangeRoutingKeyQueue(ExchangeEnum.TOPIC, "TTT", "MMM", "DDD", false);
  8. result.add(exchangeQueue1);
  9. result.add(exchangeQueue2);
  10. return result;
  11. }
  12. }

4.XyConfirmCallback

如果需要发送确认回调,则继承XyConfirmCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作

  1. @Component
  2. public class XyConfirmCallbackComponent extends XyConfirmCallback {
  3. @Override
  4. protected void call(CorrelationData correlationData, boolean ack, String cause) {
  5. if (!ack) {
  6. System.err.println(">>>1111111111111111111" + correlationData.getId() + "," + cause);
  7. }
  8. }
  9. }

5.XyReturnCallback

如果需要对发送的消息没有进入队列确认回调,则继承XyReturnCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作

  1. @Component
  2. public class XyReturnCallbackComponent extends XyReturnCallback {
  3. @Override
  4. protected void call(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  5. System.err.println(">>>222222222222222222222" + message + "," + replyCode + "," + replyText + "," + exchange + "," + routingKey);
  6. }
  7. }

6.手动确认

消费者方法参数中添加Channel channel,通过Channel做手动确认

  1. @RabbitMqConsumerDot(group = "group2", desc = "desc2", author = "author2", name = "name2")
  2. @RabbitListener(queues = {"fanout.queue"})
  3. public void receiveFanout(TaskQueueModel taskQueueModel, Channel channel, Message message) throws IOException {
  4. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  5. // System.err.println("2:" + taskQueueModel + "2:deliveryTag" + deliveryTag);
  6. long id = Thread.currentThread().getId();
  7. System.err.println("thread id is " + id);
  8. try {
  9. TimeUnit.SECONDS.sleep(2);
  10. channel.basicAck(deliveryTag, false);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. //网络异常
  14. channel.basicNack(deliveryTag, false, true);
  15. }
  16. }

@RabbitMqConsumerDot:提供监控预警跟踪

如果无需手动确认,则方法参数保留接收对象,直接消费即可

7.配置项说明

  1. server:
  2. port: 8081
  3. spring:
  4. application:
  5. name: test-rabbit
  6. autoconfigure:
  7. exclude:
  8. - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  9. rabbitmq:
  10. enable: true
  11. hard-code-init: true
  12. #addresses: localhost:5672 链接到集群
  13. host: localhost
  14. port: 5672
  15. virtual-host: /
  16. username: guest
  17. password: guest
  18. requested-heartbeat: 1s
  19. publisher-confirm-type: correlated
  20. publisher-returns: true
  21. cache:
  22. #channel:
  23. # size: 20 #缓存中保持的channel数量
  24. # checkout-timeout: 1000ms #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
  25. connection:
  26. mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTION
  27. size: 20 #缓存的连接数,只有是CONNECTION模式时生效
  28. # routing-key:
  29. # dynamic: true
  30. # connection-timeout: 50000
  31. listener:
  32. type: simple
  33. direct:
  34. acknowledge-mode: manual
  35. #auto-startup: true
  36. #consumers-per-queue: 1
  37. #default-requeue-rejected: true
  38. #idle-event-interval: 0
  39. #missing-queues-fatal: true
  40. prefetch: 1
  41. #retry:
  42. # enabled: true
  43. # initial-interval: 1000ms
  44. # max-attempts: 3
  45. # max-interval: 10000ms
  46. # multiplier: 1.0
  47. # stateless: true
  48. simple:
  49. acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
  50. auto-startup: true #是否启动时自动启动容器
  51. batch-size: 0
  52. concurrency: 0 #最小的消费者数量
  53. default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
  54. idle-event-interval: #多少长时间发布空闲容器时间,单位毫秒
  55. max-concurrency: 0 #最大的消费者数量
  56. missing-queues-fatal: true #队列不存在是否跑异常
  57. prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
  58. retry:
  59. enabled: true #监听重试是否可用
  60. initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
  61. max-attempts: 3 #最大重试次数
  62. max-interval: 10000ms #最大重试时间间隔
  63. multiplier: 2.0D #应用于上一重试间隔的乘数
  64. stateless: true #重试是有状态or无状态
  65. template:
  66. receive-timeout: 10000ms #receive() 操作的超时时间
  67. mandatory: false # 启用强制信息;默认false
  68. reply-timeout: 10000ms #sendAndReceive() 操作的超时时间
  69. retry:
  70. enabled: true #发送重试是否可用
  71. max-attempts: 3 #最大重试次数
  72. initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
  73. multiplier: 2.0D #应用于上一重试间隔的乘数
  74. max-interval: 10000ms #最大重试时间间隔
  75. exchange: #默认的发送路由
  76. routing-key: #默认路由key
  77. default-receive-queue: #默认队列
  78. ssl:
  79. enabled: true #是否支持ssl
  80. key-store: #指定持有SSL certificate的key store的路径
  81. key-store-type: PKCS12 #加密类型
  82. key-store-password: #指定访问key store的密码
  83. trust-store: # 指定持有SSL certificates的Trust store
  84. trust-store-type: JKS #加密类型
  85. trust-store-password: #指定访问trust store的密码
  86. algorithm: #ssl使用的算法,例如,TLSv1.1

spring-cloud-stream

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/#_configuration_options