示例项目地址:
https://git.code.tencent.com/xinzhang0618/oa2.git
feature/xinzhang_rabbitmq分支

参考文档:
mq概念(这俩篇必读):
https://www.yuque.com/fvy7xd/xinzhang/toi6on
https://www.yuque.com/fvy7xd/xinzhang/if5xfk
延时队列: https://zhuanlan.zhihu.com/p/130417736
基本使用: https://zhuanlan.zhihu.com/p/265732383

思路

rabbitmq的玩法比较丰富, 包括但不限于: 异步, 微服务通信, 多线程处理提高性能, 延时等.
难点在于一是本身的概念配置丰富, 二是不同的业务场景玩法比较花, 本篇着重于梳理mq的最佳简易配置, 并指出各功能点针对的应用场景.
在看本篇之前, 建议熟悉rabbitmq中概念, 并熟悉基本使用.

解析

部署略, 注意:

  1. 需要添加管理界面用户
  2. 启动管理插件
  3. 安装并启用死信交换机插件
  4. 当exchange/queue为durable时, 若有配置变更, 需要删掉原来的使程序重新生成

常用命令

  1. - rabbitmqctl
  2. rabbitmqctl --help 查看帮助
  3. rabbitmqctl status 查看状态
  4. rabbitmqctl shutdown/stop/stop_app 停止
  5. rabbitmqctl start_app 启动
  6. - rabbitmq-plugins
  7. rabbitmq-plugins --help 查看帮助
  8. rabbitmq-plugins list 查看插件列表
  9. rabbitmq-plugins enable <pluginName> 启用插件
  10. rabbitmq-plugins disable <pluginName> 禁用插件
  11. 部署补充:
  12. 1.需启用管理插件, rabbitmq_plugins enable rabbitmq_management
  13. 2.需安装并启用死信交换机插件
  14. - https://www.rabbitmq.com/community-plugins.html 找 rabbitmq_delayed_message_exchange
  15. - 下载rabbitmq_delayed_message_exchange-3.8.0.ez扔到/usr/lib/rabbitmq/plugins/下
  16. - 启用rabbitmq-plusgins enable rabbitmq_delayed_message_exchange

配置

注: 示例项目为单体多模块, 交换机都使用topic类型, 项目结构如下图

依赖

oa-core, oa-consumer, oa-biz三个模块都需配置

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-test</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.alibaba</groupId>
  15. <artifactId>fastjson</artifactId>
  16. </dependency>

基础配置

以下配置都在oa-core模块下, 这部分主要是对mq的通用配置以及template的封装

Message抽象类, 包含优先级, 延迟分钟, 租户等属性, 具体实现类需要指定交换机以及路由key

  1. package top.xinzhang0618.oa.amqp;
  2. import top.xinzhang0618.oa.BizContext;
  3. /**
  4. * MQ消息抽象类 实现类需要提供消息发送的exchange,避免发送到默认exchange 实现类需要提供消息的routingKey,用于消息路由
  5. */
  6. public abstract class Message {
  7. /**
  8. * 优先级
  9. */
  10. private Integer priority;
  11. /**
  12. * 延迟分钟
  13. */
  14. private Integer delayMinutes;
  15. /**
  16. * 租户ID
  17. */
  18. private Long tenantId;
  19. public Message() {
  20. this.tenantId = BizContext.getTenantId();
  21. }
  22. public Long getTenantId() {
  23. return tenantId;
  24. }
  25. public void setTenantId(Long tenantId) {
  26. this.tenantId = tenantId;
  27. }
  28. public Integer getPriority() {
  29. return priority;
  30. }
  31. public void setPriority(Integer priority) {
  32. this.priority = priority;
  33. }
  34. public Integer getDelayMinutes() {
  35. return delayMinutes;
  36. }
  37. public void setDelayMinutes(Integer delayMinutes) {
  38. this.delayMinutes = delayMinutes;
  39. }
  40. /**
  41. * 返回消息对应的路由器
  42. *
  43. * @return ExchangeName
  44. */
  45. public String exchange() {
  46. return "oa";
  47. }
  48. /**
  49. * 返回消息路由键
  50. *
  51. * @return RoutingKey
  52. */
  53. public abstract String routingKey();
  54. @Override
  55. public String toString() {
  56. return "Message{" +
  57. "priority=" + priority +
  58. ", delayMinutes=" + delayMinutes +
  59. ", tenantId=" + tenantId +
  60. '}';
  61. }
  62. }

MqProducer消息发布接口

  1. package top.xinzhang0618.oa.amqp;
  2. import java.util.Collection;
  3. /**
  4. * 消息生产者.
  5. */
  6. public interface MqProducer {
  7. /**
  8. * 发送消息.
  9. */
  10. void send(Message message);
  11. /**
  12. * 批量发送消息.
  13. */
  14. void send(Collection<? extends Message> messages);
  15. }

RabbitMqProducer, rabbitmq消息发布的实现
注意: 这里包装了一层SpringEvent来处理mq事务(是否性能更佳存疑…)
rabbbitmq的事务: https://blog.csdn.net/u013256816/article/details/55515234
applicationEvent事务: https://blog.csdn.net/crowhyc/article/details/96433001
rabbitmq原生事务性能差, 因此不用; ApplicationEvent的消费端注解@TransactionalEventListener默认绑定的监听阶段是 TransactionPhase.AFTER_COMMIT即事务提交后, 如果事务回滚了, 消费端是接收不到消息的.

同时, 也aplicationEvent也解决了”生产端事务未提交, 消费端从数据库中查不到数据”这一问题

  1. package top.xinzhang0618.oa.amqp;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  5. import org.springframework.context.ApplicationEventPublisher;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.event.TransactionalEventListener;
  8. import top.xinzhang0618.oa.Assert;
  9. import javax.annotation.Resource;
  10. import java.util.Collection;
  11. import java.util.List;
  12. /**
  13. * 消息模板包装类.
  14. * 这里利用applicationEvent的事务来取代rabbitmq的事务
  15. */
  16. @Component
  17. @ConditionalOnBean(RabbitTemplate.class)
  18. public class RabbitMqProducer implements MqProducer {
  19. @Autowired
  20. private ApplicationEventPublisher applicationEventPublisher;
  21. /**
  22. * 发送消息.
  23. *
  24. * @param message 消息
  25. */
  26. @Override
  27. public void send(Message message) {
  28. applicationEventPublisher.publishEvent(new MessageSendingEvent(message.routingKey(), message));
  29. }
  30. @Override
  31. public void send(Collection<? extends Message> messages) {
  32. messages.forEach(this::send);
  33. }
  34. @Component
  35. public static class RabbitMqMessageSender {
  36. @Resource
  37. private RabbitTemplate rabbitTemplate;
  38. @TransactionalEventListener(fallbackExecution = true, classes = MessageSendingEvent.class)
  39. public void sendMessage(MessageSendingEvent messageSendingEvent) {
  40. List<Message> messages = messageSendingEvent.getMessages();
  41. if (Assert.isEmpty(messages)) {
  42. return;
  43. }
  44. send(messages);
  45. }
  46. private void send(Message message) {
  47. if (message.getPriority() == null && message.getDelayMinutes() == null) {
  48. rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message);
  49. return;
  50. }
  51. rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message, msg -> {
  52. if (message.getPriority() != null) {
  53. msg.getMessageProperties().setPriority(message.getPriority());
  54. }
  55. if (message.getDelayMinutes() != null) {
  56. msg.getMessageProperties().setDelay(1000 * 60 * message.getDelayMinutes());
  57. }
  58. return msg;
  59. });
  60. }
  61. private void send(List<Message> messages) {
  62. messages.forEach(this::send);
  63. }
  64. }
  65. }

MessageSendingEvent, ApplicationEvent的实现

  1. package top.xinzhang0618.oa.amqp;
  2. import org.springframework.context.ApplicationEvent;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. /**
  6. * 消息事件.
  7. */
  8. public class MessageSendingEvent extends ApplicationEvent {
  9. private final List<Message> messages;
  10. public MessageSendingEvent(Object source, Message... messages) {
  11. super(source);
  12. this.messages = Arrays.asList(messages);
  13. }
  14. public List<Message> getMessages() {
  15. return messages;
  16. }
  17. @Override
  18. public String toString() {
  19. return "MessageSendingEvent{" +
  20. "messages=" + messages +
  21. '}';
  22. }
  23. }

另外
BaseMqConfig, 整个项目的总交换机配置

  1. package top.xinzhang0618.oa.amqp;
  2. import org.springframework.amqp.core.TopicExchange;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @author xinzhang
  9. * @date 2021/2/8 14:05
  10. */
  11. @Configuration
  12. public class BaseMqConfig {
  13. /**
  14. * 总交换机.
  15. */
  16. private static final String EXCHANGE_OA = "oa";
  17. /**
  18. * OA总交换机.
  19. */
  20. @Bean
  21. public TopicExchange oaExchange() {
  22. return new TopicExchange(EXCHANGE_OA);
  23. }
  24. @Bean
  25. public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
  26. final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  27. rabbitTemplate.setMessageConverter(new FastJson2JsonMessageConverter());
  28. return rabbitTemplate;
  29. }
  30. }

FastJson2JsonMessageConverter, 序列化转换器, 在BaseMqConfig的生产端rabbitTemplate的Bean中有配置, 消费端的factory也需要配置, 后文会提到

  1. package top.xinzhang0618.oa.amqp;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageProperties;
  5. import org.springframework.amqp.support.converter.AbstractMessageConverter;
  6. import org.springframework.amqp.support.converter.ClassMapper;
  7. import org.springframework.amqp.support.converter.DefaultClassMapper;
  8. import org.springframework.amqp.support.converter.MessageConversionException;
  9. /**
  10. * @author xinzhang
  11. * @date 2021/2/8 11:13
  12. */
  13. public class FastJson2JsonMessageConverter extends AbstractMessageConverter {
  14. public static final String DEFAULT_CHARSET = "UTF-8";
  15. public static final String DEFAULT_CONTENT_TYPE = "application/json";
  16. private static final ClassMapper CLASS_MAPPER = new FastJson2JsonMessageConverter.FastJsonClassMapper();
  17. public FastJson2JsonMessageConverter() {
  18. }
  19. public ClassMapper getClassMapper() {
  20. return CLASS_MAPPER;
  21. }
  22. @Override
  23. protected Message createMessage(Object object, MessageProperties messageProperties) {
  24. byte[] bytes = JSON.toJSONBytes(object);
  25. messageProperties.setContentType(DEFAULT_CONTENT_TYPE);
  26. messageProperties.setContentEncoding(DEFAULT_CHARSET);
  27. messageProperties.setContentLength(bytes.length);
  28. this.getClassMapper().fromClass(object.getClass(), messageProperties);
  29. return new Message(bytes, messageProperties);
  30. }
  31. @Override
  32. public Object fromMessage(Message message) throws MessageConversionException {
  33. Object content = null;
  34. MessageProperties properties = message.getMessageProperties();
  35. if (properties != null) {
  36. String contentType = properties.getContentType();
  37. if (DEFAULT_CONTENT_TYPE.equals(contentType)) {
  38. Class<?> targetClass = this.getClassMapper().toClass(message.getMessageProperties());
  39. content = JSON.parseObject(message.getBody(), targetClass);
  40. }
  41. }
  42. if (content == null) {
  43. content = message.getBody();
  44. }
  45. return content;
  46. }
  47. public static class FastJsonClassMapper extends DefaultClassMapper {
  48. public FastJsonClassMapper() {
  49. this.setTrustedPackages("*");
  50. }
  51. }
  52. }

消费端配置

以下配置在oa-consumer模块, 这部分主要是对rabbitmq监听容器的配置

配置文件

  1. spring:
  2. rabbitmq:
  3. host: 47.94.148.180
  4. port: 5672
  5. username: bite
  6. password: bite
  7. virtual-host: xinzhang
  8. oa:
  9. consumer:
  10. demo:
  11. exchange:
  12. concurrent: 5
  13. maxConcurrent: 10

启动类添加注解

  1. @EnableAsync
  2. @EnableRabbit
  3. @SpringBootApplication(scanBasePackages = "top.xinzhang0618.oa")
  4. public class ConsumerApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(ConsumerApplication.class, args);
  7. }
  8. }

ConsumerConfig, 消费端factory配置
注意:

  1. 核心参数是prefetchCount, concurrent, maxConcurrent, 后两个从配置文件中读取
  2. factory配置了fastJson序列化
  3. (confirm默认关闭, ack默认开启)这里开启了手动ack, 正常场景直接自动ack就行, ack的应用场景下文会特别提到 ```java package top.xinzhang0618.oa.config;

import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import top.xinzhang0618.oa.amqp.FastJson2JsonMessageConverter;

/**

  • @author xinzhang
  • @date 2021/2/8 15:27 */ @Configuration public class ConsumerConfig { @Value(“${oa.consumer.demo.exchange.concurrent:1}”) private Integer concurrent; @Value(“${oa.consumer.demo.exchange.maxConcurrent:1}”) private Integer maxConcurrent;

    @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

    1. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    2. factory.setConnectionFactory(connectionFactory);
    3. factory.setMessageConverter(new FastJson2JsonMessageConverter());
    4. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    5. // 参考: https://www.cnblogs.com/throwable/p/13834465.html
    6. // prefetchCount表示手动ack情况下队列未ack的预取或说阻塞的消息的最大数, 该参数用来使消费端负载均衡, 默认250, 官方推荐30
    7. factory.setPrefetchCount(30);
    8. // 并发消费者数, 即开多线程消费, 比如2个consumer则默认单线程即2个channel, 将concurrent设置为5后, 变为10个channel
    9. factory.setConcurrentConsumers(concurrent);
    10. factory.setMaxConcurrentConsumers(maxConcurrent);
    11. return factory;

    } }

  1. **AbstractConsumer, consumer基类**<br />作用: 1. 做统一异常以及日志处理; 2. 做手动ack
  2. ```java
  3. package top.xinzhang0618.oa.consumer;
  4. import com.alibaba.fastjson.JSON;
  5. import com.rabbitmq.client.Channel;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import top.xinzhang0618.oa.amqp.Message;
  9. import java.io.IOException;
  10. /**
  11. * @author xinzhang
  12. * @date 2021/2/19 13:59
  13. */
  14. public abstract class AbstractConsumer {
  15. private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumer.class);
  16. protected <T extends Message> void run(T message, final Channel channel, long tag,
  17. Runnable runnable) throws IOException {
  18. try {
  19. if (LOGGER.isDebugEnabled()) {
  20. LOGGER.debug("{}处理消息:{}", this.getClass().getName(), message);
  21. }
  22. runnable.run();
  23. channel.basicAck(tag, false);
  24. } catch (Exception e) {
  25. LOGGER.error("消息处理异常!{},{}", this.getClass().getName(), JSON.toJSONString(message));
  26. LOGGER.error("消息处理异常!", e);
  27. channel.basicReject(tag, false);
  28. }
  29. }
  30. }

消费者示例
Test1Consumer

  1. package top.xinzhang0618.oa.consumer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.support.AmqpHeaders;
  7. import org.springframework.messaging.handler.annotation.Header;
  8. import org.springframework.stereotype.Component;
  9. import top.xinzhang0618.oa.amqp.demo.MqConstants;
  10. import top.xinzhang0618.oa.amqp.demo.TestMessage1;
  11. import top.xinzhang0618.oa.amqp.demo.TestMessage3;
  12. import java.io.IOException;
  13. import java.time.LocalDateTime;
  14. /**
  15. * @author xinzhang
  16. * @date 2021/2/8 15:10
  17. */
  18. @Component
  19. @RabbitListener(queues = MqConstants.QUEUE_TEST1)
  20. public class Test1Consumer extends AbstractConsumer {
  21. @RabbitHandler(isDefault = true)
  22. public void test(TestMessage1 testMessage1, Channel channel,
  23. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  24. run(testMessage1, channel, tag, () -> {
  25. System.out.println("test1队列收到消息了, 消息内容为: " + JSON.toJSON(testMessage1) + ", 消息接收时间为: " + LocalDateTime.now());
  26. });
  27. }
  28. @RabbitHandler
  29. public void test3(TestMessage3 testMessage3, Channel channel,
  30. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  31. run(testMessage3, channel, tag, () -> {
  32. System.out.println("test1队列收到消息了, 消息内容为: " + JSON.toJSON(testMessage3) + ", 消息接收时间为: " + LocalDateTime.now());
  33. });
  34. }
  35. }

Test2Consumer

  1. package top.xinzhang0618.oa.consumer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.support.AmqpHeaders;
  7. import org.springframework.messaging.handler.annotation.Header;
  8. import org.springframework.stereotype.Component;
  9. import top.xinzhang0618.oa.amqp.demo.MqConstants;
  10. import top.xinzhang0618.oa.amqp.demo.TestMessage2;
  11. import java.io.IOException;
  12. import java.time.LocalDateTime;
  13. /**
  14. * @author xinzhang
  15. * @date 2021/2/8 15:10
  16. */
  17. @Component
  18. @RabbitListener(queues = MqConstants.QUEUE_TEST2)
  19. public class Test2Consumer extends AbstractConsumer {
  20. @RabbitHandler
  21. public void test(TestMessage2 testMessage2, Channel channel,
  22. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  23. run(testMessage2, channel, tag, () -> {
  24. System.out.println("test2队列收到消息了, 消息内容为: " + JSON.toJSON(testMessage2) + ", 消息接收时间为: " + LocalDateTime.now());
  25. });
  26. }
  27. }

业务端配置

以下配置在oa-biz模块, 这部分跟业务紧密相关, 主要是业务交换机以及队列的配置

示例配置
DemoMqConfig

  1. package top.xinzhang0618.oa.amqp.demo;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import top.xinzhang0618.oa.amqp.BaseMqConfig;
  8. /**
  9. * 交换机配置
  10. *
  11. * @author xinzhang
  12. * @date 2021/2/8 14:35
  13. */
  14. @ConditionalOnBean(BaseMqConfig.class)
  15. @Configuration
  16. public class DemoMqConfig {
  17. /**
  18. * 示例交换机
  19. */
  20. @Bean
  21. public Exchange demoExchange() {
  22. return ExchangeBuilder.topicExchange(MqConstants.EXCHANGE_DEMO).delayed().build();
  23. }
  24. /**
  25. * 示例交换机绑定总交换机, routingKey: oa.demo.#
  26. */
  27. @Bean
  28. public Binding demoExchangeBindingOaExchange(@Qualifier("oaExchange") TopicExchange oaExchange,
  29. @Qualifier("demoExchange") Exchange demoExchange) {
  30. return BindingBuilder.bind(demoExchange).to(oaExchange).with(MqConstants.EXCHANGE_DEMO_BINDING_KEY);
  31. }
  32. /**
  33. * 测试1队列
  34. */
  35. @Bean
  36. public Queue test1Queue() {
  37. return QueueBuilder.durable(MqConstants.QUEUE_TEST1).build();
  38. }
  39. /**
  40. * 测试1队列 绑定 demo交换机, routingKey: oms.demo.test1
  41. */
  42. @Bean
  43. public Binding test1QueueBindingDemoExchange(@Qualifier("demoExchange") Exchange demoExchange,
  44. @Qualifier("test1Queue") Queue test1Queue) {
  45. return BindingBuilder
  46. .bind(test1Queue)
  47. .to(demoExchange)
  48. .with(MqConstants.QUEUE_TEST1_BINDING_KEY)
  49. .noargs();
  50. }
  51. /**
  52. * 测试2队列
  53. */
  54. @Bean
  55. public Queue test2Queue() {
  56. return QueueBuilder.durable(MqConstants.QUEUE_TEST2).build();
  57. }
  58. /**
  59. * 测试2队列 绑定 demo交换机, routingKey: oms.demo.test2
  60. */
  61. @Bean
  62. public Binding test2QueueBindingDemoExchange(@Qualifier("demoExchange") Exchange demoExchange,
  63. @Qualifier("test2Queue") Queue test1Queue) {
  64. return BindingBuilder
  65. .bind(test1Queue)
  66. .to(demoExchange)
  67. .with(MqConstants.QUEUE_TEST2_BINDING_KEY)
  68. .noargs();
  69. }
  70. /**
  71. * 测试3延时队列, 延迟10min
  72. * 这个队列没有消费者, 消息过期后由死信交换机demo转发至oa.demo.test1
  73. */
  74. @Bean
  75. public Queue test3DelayQueue() {
  76. return QueueBuilder
  77. .durable(MqConstants.QUEUE_TEST3_DELAY)
  78. .withArgument("x-dead-letter-exchange", MqConstants.EXCHANGE_DEMO)
  79. .withArgument("x-dead-letter-routing-key", MqConstants.QUEUE_TEST1_BINDING_KEY)
  80. .withArgument("x-message-ttl", 1000 * 60 * 10)
  81. .build();
  82. }
  83. /**
  84. * 测试3延时队列 绑定 demo交换机, routingKey: oms.demo.test3.delay
  85. */
  86. @Bean
  87. public Binding test3DelayQueueBindingDemoExchange(@Qualifier("demoExchange") Exchange demoExchange,
  88. @Qualifier("test3DelayQueue") Queue test3DelayQueue) {
  89. return BindingBuilder
  90. .bind(test3DelayQueue)
  91. .to(demoExchange)
  92. .with(MqConstants.QUEUE_TEST3_DELAY_BINDING_KEY)
  93. .noargs();
  94. }
  95. }

MqConstants

  1. package top.xinzhang0618.oa.amqp.demo;
  2. /**
  3. * @author xinzhang
  4. * @date 2021/2/8 14:14
  5. */
  6. public class MqConstants {
  7. /**
  8. * 示例交换机.
  9. */
  10. public static final String EXCHANGE_DEMO = "oa.demo";
  11. public static final String EXCHANGE_DEMO_BINDING_KEY = "oa.demo.#";
  12. /**
  13. * 测试队列1
  14. */
  15. public static final String QUEUE_TEST1 = "oa.demo.test1";
  16. public static final String QUEUE_TEST1_BINDING_KEY = "oa.demo.test1";
  17. /**
  18. * 测试队列2
  19. */
  20. public static final String QUEUE_TEST2 = "oa.demo.test2";
  21. public static final String QUEUE_TEST2_BINDING_KEY = "oa.demo.test2";
  22. /**
  23. * 测试队列3, 延时队列
  24. */
  25. public static final String QUEUE_TEST3_DELAY = "oa.demo.test3.delay";
  26. public static final String QUEUE_TEST3_DELAY_BINDING_KEY = "oa.demo.test3.delay";
  27. }

TestMessage1

  1. package top.xinzhang0618.oa.amqp.demo;
  2. import top.xinzhang0618.oa.amqp.Message;
  3. /**
  4. * @author xinzhang
  5. * @date 2021/2/4 18:15
  6. */
  7. public class TestMessage1 extends Message {
  8. private String msg;
  9. @Override
  10. public String exchange() {
  11. return super.exchange();
  12. }
  13. @Override
  14. public String routingKey() {
  15. return "oa.demo.test1";
  16. }
  17. public String getMsg() {
  18. return msg;
  19. }
  20. public void setMsg(String msg) {
  21. this.msg = msg;
  22. }
  23. @Override
  24. public String toString() {
  25. return "TestMessage1{" +
  26. "msg='" + msg + '\'' +
  27. '}';
  28. }
  29. }

TestMessage2

  1. package top.xinzhang0618.oa.amqp.demo;
  2. import top.xinzhang0618.oa.amqp.Message;
  3. /**
  4. * @author xinzhang
  5. * @date 2021/2/4 18:15
  6. */
  7. public class TestMessage2 extends Message {
  8. private String msg;
  9. @Override
  10. public String exchange() {
  11. return super.exchange();
  12. }
  13. @Override
  14. public String routingKey() {
  15. return "oa.demo.test2";
  16. }
  17. public String getMsg() {
  18. return msg;
  19. }
  20. public void setMsg(String msg) {
  21. this.msg = msg;
  22. }
  23. @Override
  24. public String toString() {
  25. return "TestMessage2{" +
  26. "msg='" + msg + '\'' +
  27. '}';
  28. }
  29. }

TestMessage3

  1. package top.xinzhang0618.oa.amqp.demo;
  2. import top.xinzhang0618.oa.amqp.Message;
  3. /**
  4. * @author xinzhang
  5. * @date 2021/2/4 18:15
  6. */
  7. public class TestMessage3 extends Message {
  8. private String msg;
  9. @Override
  10. public String exchange() {
  11. return super.exchange();
  12. }
  13. @Override
  14. public String routingKey() {
  15. return "oa.demo.test3.delay";
  16. }
  17. public String getMsg() {
  18. return msg;
  19. }
  20. public void setMsg(String msg) {
  21. this.msg = msg;
  22. }
  23. @Override
  24. public String toString() {
  25. return "TestMessage2{" +
  26. "msg='" + msg + '\'' +
  27. '}';
  28. }
  29. }

测试

至此, 基础的配置都已完成,后续我们再来针对一个个功能点做分析, 先做简单的消息测试,配置完成后结构如下
image.png
启动消费者服务, 发送TestMessage1和TestMessage2, 两个消费者能够正常接收到消息并消费

  1. @Test
  2. public void test() {
  3. TestMessage1 testMessage1 = new TestMessage1();
  4. testMessage1.setMsg("这里是test1测试消息, 消息发送时间为: " + LocalDateTime.now());
  5. TestMessage2 testMessage2 = new TestMessage2();
  6. testMessage2.setMsg("这里是test2测试消息, 消息发送时间为: " + LocalDateTime.now());
  7. producer.send(testMessage1);
  8. producer.send(testMessage2);
  9. }
  10. -----------------------------
  11. 14:35:37.214 DEBUG [SimpleAsyncTaskExecutor-4] t.x.oa.consumer.AbstractConsumer - top.xinzhang0618.oa.consumer.Test1Consumer处理消息:TestMessage1{msg='这里是test1测试消息, 消息发送时间为: 2021-02-19T14:35:37.089'}
  12. test1队列收到消息了, 消息内容为: {"msg":"这里是test1测试消息, 消息发送时间为: 2021-02-19T14:35:37.089"}, 消息接收时间为: 2021-02-19T14:35:37.214
  13. 14:35:37.214 DEBUG [SimpleAsyncTaskExecutor-5] t.x.oa.consumer.AbstractConsumer - top.xinzhang0618.oa.consumer.Test2Consumer处理消息:TestMessage2{msg='这里是test2测试消息, 消息发送时间为: 2021-02-19T14:35:37.089'}
  14. test2队列收到消息了, 消息内容为: {"msg":"这里是test2测试消息, 消息发送时间为: 2021-02-19T14:35:37.089"}, 消息接收时间为: 2021-02-19T14:35:37.214

消息的准确投递问题

oms参考方案如下:
消息全流程: 生产端—>broker—>消费端

  • 使用了applicationEvent取代mq事务来保证消息从生产端—>broker的准确投递(若事务回滚则mq不会发送消息)
  • 消费端若消费失败
    • 利用手动ack对特定异常, 进行消息回队, 等待再次消费
    • 利用定时器进行业务补偿, 比如因为缺货或并发导致配货失败, 会有”缺货重配”, “定时扫表再次配货”等定时任务

发布者确认以及消费者应答

参考文档: https://www.rabbitmq.com/confirms.html
https://www.zhihu.com/question/41976893

发布者确认一般用于事务控制, 默认关闭, 这里没用到
消费者应答默认开启, 这里采用手动ack的方式
ack机制能灵活的控制消息的消费, 拒绝后可以选择性的删除或者重新入队, 比如oms中使用consumer ack来实现
“如果消费端爆出版本异常(乐观锁), 则消息重新入队(后续再被消费), 其余情况则直接丢弃” 这一功能

使用ack示例代码如下:

  1. 先打开手动ack

    1. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  2. 在AbstractConsumer中针对特殊异常做处理 ```java package com.greatonce.oms.consumer;

import com.greatonce.core.util.JsonUtil; import com.greatonce.oms.domain.OmsException; import com.greatonce.oms.domain.SysExceptions; import com.greatonce.oms.message.Message; import com.greatonce.oms.util.consumer.MessageListenerContainerManager; import com.rabbitmq.client.Channel; import java.io.IOException; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;

/**

  • 消息处理抽象类. *
  • @author ginta
  • @author Shenzhen Greatonce Co Ltd
  • @version 2018/3/23 */ public abstract class AbstractConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumer.class);

    @Autowired private MessageListenerContainerManager messageListenerContainerManager;

    @PostConstruct protected void init() { messageListenerContainerManager.registerContainer(containerId(), containerName()); }

    protected abstract String containerId();

    protected abstract String containerName();

    protected void run(T message, final Channel channel, long tag, Runnable runnable) throws IOException { try { if (LOGGER.isDebugEnabled()) {

    1. LOGGER.debug("{}处理消息:{}", this.getClass().getName(), message);

    } runnable.run(); channel.basicAck(tag, false); } catch (OmsException e) { LOGGER.error(“消息处理异常!{},{},{}”, this.getClass().getName(), JsonUtil.toJson(message), e.getMessage()); LOGGER.error(“消息处理异常!”, e); if (SysExceptions.VERSION_CHANGED.equals(e.getMessage())

    1. || SysExceptions.MALL_SECURITY_API_ERROR.equals(e.getMessage())) {
    2. channel.basicReject(tag, true);

    } else {

    1. channel.basicReject(tag, false);

    } } catch (Exception e) { LOGGER.error(“消息处理异常!{},{}”, this.getClass().getName(), JsonUtil.toJson(message)); LOGGER.error(“消息处理异常!”, e); channel.basicReject(tag, false); } } }

  1. > basicRejectbasicNack方法区别:
  2. > basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack
  3. > basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息.
  4. >
  5. > DeliveryTag作用:
  6. > 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等
  7. <a name="qvOni"></a>
  8. ### 消息的优先级
  9. 只有在消费端消息堆积的时候才有效果, 不然也没卵用, 比如oms中转化队列配置了"下载的销售单""从第三方系统同步的销售单"更优先进行转化
  10. 使用示例:
  11. 1. 先设置队列的优先级, 表示能容纳的消息的最大优先级, 数字越大越优先
  12. ```java
  13. @Bean
  14. public Queue salesTranslateQueue() {
  15. return QueueBuilder
  16. .durable(QUEUE_TRADE_SALES_ORDER_TRANSLATE)
  17. .withArgument("x-max-priority", 2)
  18. .build();
  19. }
  1. 然后设置消息的优先级

    1. /**
    2. * 销售单从第三方系统同步消息.
    3. *
    4. * @author ginta
    5. * @author Shenzhen Greatonce Co Ltd
    6. * @version 2018/3/7
    7. */
    8. public class MallSalesOrderSynchronizedMessage extends MallSalesOrderMessage {
    9. public MallSalesOrderSynchronizedMessage(Long mallSalesOrderId, Long storeId, String tradeId) {
    10. super(mallSalesOrderId, storeId, tradeId,"synchronized");
    11. this.setPriority(1);
    12. }
    13. }
    1. /**
    2. * 销售单已下载消息.
    3. *
    4. * @author ginta
    5. * @author Shenzhen Greatonce Co Ltd
    6. * @version 2018/3/7
    7. */
    8. public class MallSalesOrderDownloadMessage extends MallSalesOrderMessage {
    9. public MallSalesOrderDownloadMessage(Long mallSalesOrderId, Long storeId, String tradeId) {
    10. super(mallSalesOrderId, storeId, tradeId,"download");
    11. this.setPriority(2);
    12. }
    13. }
  2. 在配置上, 上面的RabbitMqProducer示例代码中已经将priority属性设置进MessageProperties了

    1. rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message, msg -> {
    2. if (message.getPriority() != null) {
    3. msg.getMessageProperties().setPriority(message.getPriority());
    4. }
    5. if (message.getDelayMinutes() != null) {
    6. msg.getMessageProperties().setDelay(1000 * 60 * message.getDelayMinutes());
    7. }
    8. return msg;
    9. });

延迟消息

利用队列的ttl以及死信队列实现

前言
这里也可以利用消息的ttl实现, 需要设置消息的expiration属性
补充: 当队列和消息同时有过期时间时, 更短的一个会生效

但有个严重问题是, 消息会在队列进行排队顺序消费, 若前一消息的ttl大于后一消息, 则后一消息到期后依然要等到前面消息消费完
比如按顺序发送”2min过期的消息”, “1min过期的消息”, 消费端会在2min后一起消费这两条消息,
因此在示例项目的配置中(message的封装上, rabbitmqProducer的封装上), 我完全摈弃了消息的ttl这一特性

  1. if (this.delayMinutes != null) {
  2. int delayTimemillis = 1000 * 60 * this.delayMinutes;
  3. if (delayType == DelayType.EXCHANGE) {
  4. message.getMessageProperties().setDelay(delayTimemillis);
  5. } else {
  6. message.getMessageProperties().setExpiration(String.valueOf(delayTimemillis));
  7. }
  8. }

使用流程
这一方案, 可以参考上面test3DelayQueue的配置, 这个队列10分钟过期, 没有消费者
消息过期后, 会由”x-dead-letter-exchange”通过”x-dead-letter-routing-key”将消息转发出去, 队列配置如下
因此使用示例:
发送消息到test3DelayQueue队列, 过10分钟后消息过期转发到路由oa.demo.test1, 然后Test1Consumer消费, 实现了消息延迟10分钟消费

  1. /**
  2. * 测试3延时队列, 延迟10min
  3. * 这个队列没有消费者, 消息过期后由死信交换机demo转发至oa.demo.test1
  4. */
  5. @Bean
  6. public Queue test3DelayQueue() {
  7. return QueueBuilder
  8. .durable(MqConstants.QUEUE_TEST3_DELAY)
  9. .withArgument("x-dead-letter-exchange", MqConstants.EXCHANGE_DEMO)
  10. .withArgument("x-dead-letter-routing-key", MqConstants.QUEUE_TEST1_BINDING_KEY)
  11. .withArgument("x-message-ttl", 1000 * 60 * 10)
  12. .build();
  13. }
  14. /**
  15. * 测试3延时队列 绑定 demo交换机, routingKey: oms.demo.test3.delay
  16. */
  17. @Bean
  18. public Binding test3DelayQueueBindingDemoExchange(@Qualifier("demoExchange") Exchange demoExchange,
  19. @Qualifier("test3DelayQueue") Queue test3DelayQueue) {
  20. return BindingBuilder
  21. .bind(test3DelayQueue)
  22. .to(demoExchange)
  23. .with(MqConstants.QUEUE_TEST3_DELAY_BINDING_KEY)
  24. .noargs();
  25. }

从管理界面上也能看到各个标签, 鼠标挪到标签上会有信息显示
image.png
image.png

利用死信交换机插件实现

这一实现更加简单, 安装并启用插件后, 声明交换机的delay属性

一个消息变成死信的条件有: 1.消费被拒绝(basic.reject 或者 basic.nack),并且参数 requeue = false 时 2.消息TTL(存活时间)过期 3.队列达到最大长度 rabbitMQ对于死信消息的处理是:如果配置了死信队列,成为死信的消息会被丢进死信队列,如果没有则被丢弃。

  1. /**
  2. * 示例交换机
  3. */
  4. @Bean
  5. public Exchange demoExchange() {
  6. return ExchangeBuilder.topicExchange(MqConstants.EXCHANGE_DEMO).delayed().build();
  7. }

管理界面也能看到交换机类型变为”x-delayed-message”
image.png
给消息设置一定的延迟时间, 这里也要注意是消息的”x-delay”属性
交换机会等待delay的时间, 然后再将消息转发到对应的队列中

  1. rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message, msg -> {
  2. if (message.getPriority() != null) {
  3. msg.getMessageProperties().setPriority(message.getPriority());
  4. }
  5. if (message.getDelayMinutes() != null) {
  6. msg.getMessageProperties().setDelay(1000 * 60 * message.getDelayMinutes());
  7. }
  8. return msg;
  9. });

测试如下

  1. @Test
  2. public void testSimpleConsume() {
  3. TestMessage1 testMessage1 = new TestMessage1();
  4. testMessage1.setDelayMinutes(1);
  5. testMessage1.setMsg("这里是test1测试消息, 消息发送时间为: " + LocalDateTime.now());
  6. producer.send(testMessage1);
  7. }
  8. -------------------
  9. 16:02:33.276 DEBUG [SimpleAsyncTaskExecutor-5] t.x.oa.consumer.AbstractConsumer - top.xinzhang0618.oa.consumer.Test1Consumer处理消息:TestMessage1{msg='这里是test1测试消息, 消息发送时间为: 2021-02-19T16:01:33.152'}
  10. test1队列收到消息了, 消息内容为: {"msg":"这里是test1测试消息, 消息发送时间为: 2021-02-19T16:01:33.152","delayMinutes":1}, 消息接收时间为: 2021-02-19T16:02:33.276

灵活控制消费者开启关闭

自定义注解

通过自定义注解能开启或关闭单个消费者, 由配置文件控制, 更改需要重启应用

  1. import java.lang.annotation.Documented;
  2. import java.lang.annotation.ElementType;
  3. import java.lang.annotation.Retention;
  4. import java.lang.annotation.RetentionPolicy;
  5. import java.lang.annotation.Target;
  6. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  7. @Target({ElementType.TYPE, ElementType.METHOD})
  8. @Retention(RetentionPolicy.RUNTIME)
  9. @Documented
  10. @ConditionalOnProperty(name = "oms.consumer.dispatch.enabled", havingValue = "true")
  11. public @interface DispatchOrderCondition {
  12. }

监听容器开关

根据业务区分不同的监听容器, 个业务组下的交换机注册到各监听容器中, 通过http请求方式直接关闭开启监听容器