1.引入依赖

  1. <dependency>
  2. <groupId>com.xy</groupId>
  3. <artifactId>xy-core-mq-rocket-boot-starter-monitor</artifactId>
  4. <version>${xy-core-mq-rocket-boot-starter-version}</version>
  5. </dependency>

2.添加配置项

  1. rocketmq.enable=true
  2. rocketmq.name-server=${server-addr}:9876
  3. rocketmq.access-channel=LOCAL
  4. rocketmq.producer.group=company-demo-producer
  5. #rocketmq.producer.access-key=
  6. #rocketmq.producer.secret-key=
  7. #rocketmq.producer.enable-msg-trace=true
  8. #rocketmq.producer.customized-trace-topic=
  9. #rocketmq.producer.max-message-size=4194394
  10. rocketmq.producer.compress-message-body-threshold=4096
  11. rocketmq.producer.retry-next-server=false
  12. rocketmq.producer.retry-times-when-send-failed=2
  13. rocketmq.producer.retry-times-when-send-async-failed=2
  14. rocketmq.producer.send-message-timeout=3000
  15. #是否开启mq消息监听,如果开启,则正常消费消息,如果关系,服务当前节点则不在消费任何消息
  16. #只在启动时生效,服务运行期间修改无效
  17. rocketmq.consumer.enable=true
  18. #rocketmq.consumer.group=${spring.application.name}-consumer

3.定义消息消费者

消费组命名格式:consumerGroup = “topic-applicationName”

即:消费主题-${spring.application.name}�

3.1对象方式接收

  1. @Slf4j
  2. @Component
  3. @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)
  4. @RocketMQMessageListener(consumerGroup = "company-demo-consumer", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
  5. public class RocketMessageListener implements RocketMQListener<DemoInfoMQ> {
  6. /**
  7. * 请不要捕获异常信息,否则无法进行消息重新推送
  8. *
  9. * @param message
  10. */
  11. @Override
  12. public void onMessage(DemoInfoMQ message) {
  13. String jsonString = JSON.toJSONString(message);
  14. System.out.println(jsonString);
  15. log.info("-------------33333333----DemoInfoMQ:{}-------------------", JSON.toJSONString(message));
  16. }
  17. }

泛型直接使用对象接收。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。

3.2MessageExt对象接收

  1. @Slf4j
  2. @Component
  3. @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)
  4. @RocketMQMessageListener(consumerGroup = "company-demo-consumer11", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
  5. public class RocketMessageExtListener implements RocketMQListener<MessageExt> {
  6. /**
  7. * 请不要捕获异常信息,否则无法进行消息重新推送
  8. *
  9. * @param message
  10. */
  11. @Override
  12. public void onMessage(MessageExt message) {
  13. byte[] body = message.getBody();
  14. String s = new String(body);
  15. DemoInfoMQ demoInfoMQ = JSON.parseObject(s, DemoInfoMQ.class);
  16. String traceId = message.getProperty(CommonConstants.TRACE_ID);
  17. MDC.put(CommonConstants.TRACE_ID, traceId);
  18. log.info("traceId---------22222-----------{}", JSON.toJSONString(demoInfoMQ));
  19. }
  20. }

此方法可以灵活获取请求头信息。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。

4.消息发送

topic命名格式:应用名-事件名

即:${spring.application.name}�-xxx

4.1普通消息

  1. public void sentMessage() {
  2. DemoInfoMQ demoInfoMQ = new DemoInfoMQ();
  3. demoInfoMQ.setInfoId("10010101");
  4. demoInfoMQ.setType("order_cc");
  5. SendResult sendMessage = rocketMQTemplate.syncSend("demoInfoTopic", demoInfoMQ);
  6. }

普通消息直接发送消息对象到指定topic即可。

为保证日志链路的完整性,框架会获取当前线程的日志上下文中的traceId,并设置到header中。若当前没有获取到traceId,会默认自动生成一个uuid的traceId并设置到head中。

4.2设置请求头head的消息

  1. public void send() {
  2. DemoInfoMQ demoInfoMQ = new DemoInfoMQ();
  3. demoInfoMQ.setInfoId("22222");
  4. demoInfoMQ.setType("order_dd");
  5. Message<DemoInfoMQ> mqMessage = MessageBuilder.withPayload(demoInfoMQ)
  6. .setHeader(MessageConst.PROPERTY_KEYS, "20191018")
  7. .setHeader("TAGS", "191018")
  8. .setHeader(MessageConst.PROPERTY_BUYER_ID, "20191018a")
  9. .setHeader(HeadConstants.USER_ID, UserUtils.getUserInfo().getUserId())
  10. .setHeader(HeadConstants.MERCHANT_CODE, UserUtils.getUserInfo().getMerchantCode())
  11. .setHeader("MQ", "user_mq")
  12. .build();
  13. log.info("send----------11111----------{}", JSON.toJSONString(demoInfoMQ));
  14. SendResult sendResult = rocketMQTemplate.syncSend("demoInfoTopic", mqMessage);
  15. }

只是将消息对象用Message做了一层包装,这样可以通过设置请求头信息来做一些业务上需要的判断。同样,框架也自动添加了日志上下文中traceId到header中。

4.3事务消息

  1. @Slf4j
  2. @Component
  3. @RocketMQTransactionListener
  4. public class RocketTransactionListener implements RocketMQLocalTransactionListener {
  5. @Override
  6. public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
  7. initTraceId(message);
  8. log.info("【执行本地事务】start execute local tx start");
  9. try {
  10. //如果有多个topic的事物消息,可以在header中设置指定参数来区别不同的业务逻辑
  11. String type = (String) message.getHeaders().get("type");
  12. //do your biz logic
  13. Object payload = message.getPayload();
  14. //1.强转为发送对象
  15. //1.2检查对象中的id是否已经提交
  16. //1.2.1提交 commit
  17. //1.2.2未提交 rollback
  18. return RocketMQLocalTransactionState.COMMIT;
  19. } catch (Exception e) {
  20. return RocketMQLocalTransactionState.ROLLBACK;
  21. }
  22. }
  23. @Override
  24. public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
  25. initTraceId(message);
  26. log.info("【回查本地执行结果】 start check local tx start");
  27. // 回查本地事务执行结果,保证数据最终落盘
  28. return RocketMQLocalTransactionState.COMMIT;
  29. }
  30. private void initTraceId(Message message) {
  31. String traceId = (String) message.getHeaders().get(CommonConstants.TRACE_ID);
  32. if (StringUtils.isEmpty(traceId)) {
  33. traceId = UUID.fastUUID().toString();
  34. }
  35. MDC.put(CommonConstants.TRACE_ID, traceId);
  36. }
  37. }

事务消息,框架暂时未能实现从请求header自动获取并设置日志上下文中的traceId,需要代码显示添加initTraceId(Message message)方法。