git地址: https://github.com/leezhang0525/boot
branch:1.1.3-rocketMQ
本文档持续更新,目标为多组件系统,常用组件集成完毕后,后续会添加分布式内容。

一、说明及准备

本文测试需要依赖rocketMQ的下载以及可视化页面,下载安装流程参考https://www.cnblogs.com/sexintercourse/p/15519752.html
前置准备
1、启动rocketMQ服务
依次启动 mqnamesrv.cmd 以及 mqbroker.cmd ,cmd窗口不要关闭
2、启动rocketmq-externals
1650696034(1).png
这里可以直接maven打包,运行jar 是一样的。
访问:http://127.0.0.1:8082/#/
1650696160(1).png

二、添加依赖及修改yaml文件

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>${rocketmq.version}</version>
  5. </dependency>
  6. <rocketmq.version>2.0.4</rocketmq.version>
  1. server:
  2. port: 8081
  3. spring:
  4. datasource:
  5. master:
  6. url: jdbc:mysql://localhost:3306/pay?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=true
  7. username: root
  8. password: 123456
  9. driver-class-name: com.mysql.jdbc.Driver
  10. type: com.alibaba.druid.pool.DruidDataSource
  11. slave:
  12. url: jdbc:mysql://localhost:3306/payslave?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=true
  13. username: root
  14. password: 123456
  15. driver-class-name: com.mysql.jdbc.Driver
  16. type: com.alibaba.druid.pool.DruidDataSource
  17. redis:
  18. host: localhost
  19. port: 6379
  20. database: 0
  21. password: 123456
  22. timeout: 3S
  23. rocket:
  24. #生产者
  25. producer:
  26. namesrvAddr: localhost:9876
  27. producerGroupName: zhangsan_producer
  28. # 消息最大长度 默认 1024 * 4 (4M)
  29. maxMessageSize: 4096
  30. # 发送消息超时时间,默认 3000
  31. sendMsgTimeOut: 3000
  32. retryTimesWhenSendFailed: 2
  33. #消费者
  34. consumer:
  35. namesrvAddr: localhost:9876
  36. consumerGroupName: zhangsan_consumer
  37. topicName: pay_success
  38. tag: tag1
  39. # 消费者线程数据量
  40. consumeThreadMin: 5
  41. consumeThreadMax: 32
  42. # 设置一次消费消息的条数,默认1
  43. consumeMessageBatchMaxSize: 1
  44. logging:
  45. level:
  46. root: debug

三、添加生产者,消费者

  1. package com.zhangsan.boot.mq.rocketmq;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  6. import org.springframework.boot.context.properties.ConfigurationProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Getter
  10. @Setter
  11. @Slf4j
  12. @Configuration
  13. @ConfigurationProperties(prefix = "spring.rocket.producer")
  14. public class Producer {
  15. private String producerGroupName;
  16. private String namesrvAddr;
  17. // 消息最大值
  18. private Integer maxMessageSize;
  19. // 消息发送超时时间
  20. private Integer sendMsgTimeOut;
  21. // 失败重试次数
  22. private Integer retryTimesWhenSendFailed;
  23. /**
  24. * 生产者
  25. * @return
  26. */
  27. @Bean
  28. public DefaultMQProducer defaultProducer() {
  29. DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
  30. producer.setNamesrvAddr(namesrvAddr);
  31. producer.setVipChannelEnabled(false);
  32. producer.setMaxMessageSize(maxMessageSize);
  33. producer.setSendMsgTimeout(sendMsgTimeOut);
  34. producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
  35. return producer;
  36. }
  37. }

消费者

  1. package com.zhangsan.boot.mq.rocketmq;
  2. import com.zhangsan.boot.listener.OrderlyListener;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  9. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.boot.context.properties.ConfigurationProperties;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. @Getter
  15. @Setter
  16. @Slf4j
  17. @Configuration
  18. @ConfigurationProperties(prefix = "spring.rocket.consumer")
  19. public class Consumer {
  20. private String namesrvAddr;
  21. private String consumerGroupName;
  22. private String topicName;
  23. private String tag;
  24. // 消费者线程数据量
  25. private Integer consumeThreadMin;
  26. private Integer consumeThreadMax;
  27. private Integer consumeMessageBatchMaxSize;
  28. @Autowired
  29. private OrderlyListener orderlyListener;
  30. @Bean(initMethod = "start", destroyMethod = "shutdown")
  31. public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
  32. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
  33. consumer.setNamesrvAddr(namesrvAddr);
  34. consumer.setConsumeThreadMin(consumeThreadMin);
  35. consumer.setConsumeThreadMax(consumeThreadMax);
  36. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
  37. // 设置消息监听
  38. consumer.registerMessageListener(orderlyListener);
  39. // 设置消费起始位置位置
  40. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  41. //设置消费模型,集群还是广播
  42. consumer.setMessageModel(MessageModel.CLUSTERING);
  43. consumer.subscribe(topicName, tag);
  44. return consumer;
  45. }
  46. }

四、生产及消费分区顺序消息

分区顺序消息监听器

  1. package com.zhangsan.boot.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.util.CollectionUtils;
  9. import java.util.List;
  10. /**
  11. * 分区顺序消费
  12. */
  13. @Component
  14. @Slf4j
  15. public class OrderlyListener implements MessageListenerOrderly {
  16. @Override
  17. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  18. if (CollectionUtils.isEmpty(list)) {
  19. log.info("MQ接收消息为空,直接返回成功");
  20. return ConsumeOrderlyStatus.SUCCESS;
  21. }
  22. MessageExt messageExt = list.get(0);
  23. try {
  24. String topic = messageExt.getTopic();
  25. String tags = messageExt.getTags();
  26. int queueId = messageExt.getQueueId();
  27. String body = new String(messageExt.getBody(), "utf-8");
  28. log.info("MQ消息topic={},queueId={}, tags={}, 消息内容={}", topic,queueId,tags,body);
  29. } catch (Exception e) {
  30. log.error("获取MQ消息内容异常{}",e);
  31. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  32. }
  33. // 处理业务逻辑
  34. return ConsumeOrderlyStatus.SUCCESS;
  35. }
  36. }

五、分区顺序消息生产

这里通过SelectMessageQueueByHash类实现orderId hash到同一queue中,可以看SelectMessageQueueByHash具体实现。

  1. package com.zhangsan.boot.impl;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.common.RemotingHelper;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.stereotype.Service;
  11. import java.util.UUID;
  12. @Service("mQService")
  13. @Slf4j
  14. public class MQServiceImpl {
  15. @Autowired
  16. private DefaultMQProducer defaultMQProducer;
  17. @Value("${spring.rocket.consumer.topicName}")
  18. private String topic;
  19. @Value("${spring.rocket.consumer.tag}")
  20. private String tag;
  21. public String sendMQMsg (){
  22. for (int i = 0; i < 50; i++) {
  23. //生成50个订单,这里订单号就用uuid来代替了
  24. String orderId = UUID.randomUUID().toString();
  25. // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
  26. for (int j = 1; j <= 4; j++) {
  27. try {
  28. Message msg =new Message(topic, tag, "KEY" + orderId,
  29. ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
  30. SendResult sendResult = defaultMQProducer.send(msg, new SelectMessageQueueByHash(), orderId);
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. log.info("消息发送失败,orderId:{}",orderId);
  34. }
  35. }
  36. }
  37. return "success";
  38. }
  39. }

六、编写测试方法及验证

  1. @Autowired
  2. private MQServiceImpl mqService;
  3. @GetMapping("/mqOrderLyTest")
  4. public String mqOrderLyTest(){
  5. return mqService.sendMQMsg();
  6. }

1650703291(1).png
可以看到统一订单的不同步骤都在一个queue中,并且同一订单id消费都是按步骤有序消费的,这样就实现了分区顺序消息功能
1650703355(1).png
1650703578(1).png

七、生产及消费随机消息

这里通过不同的groupName,topic,tag来消费,所以需要改造原来文件

  1. server:
  2. port: 8081
  3. spring:
  4. datasource:
  5. master:
  6. url: jdbc:mysql://localhost:3306/pay?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=true
  7. username: root
  8. password: 123456
  9. driver-class-name: com.mysql.jdbc.Driver
  10. type: com.alibaba.druid.pool.DruidDataSource
  11. slave:
  12. url: jdbc:mysql://localhost:3306/payslave?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=true
  13. username: root
  14. password: 123456
  15. driver-class-name: com.mysql.jdbc.Driver
  16. type: com.alibaba.druid.pool.DruidDataSource
  17. redis:
  18. host: localhost
  19. port: 6379
  20. database: 0
  21. password: 123456
  22. timeout: 3S
  23. rocket:
  24. #生产者
  25. producer:
  26. namesrvAddr: localhost:9876
  27. producerGroupName: zhangsan_producer
  28. # 消息最大长度 默认 1024 * 4 (4M)
  29. maxMessageSize: 4096
  30. # 发送消息超时时间,默认 3000
  31. sendMsgTimeOut: 3000
  32. retryTimesWhenSendFailed: 2
  33. #消费者
  34. consumer:
  35. base:
  36. namesrvAddr: localhost:9876
  37. # 消费者线程数据量
  38. consumeThreadMin: 5
  39. consumeThreadMax: 32
  40. # 设置一次消费消息的条数,默认1
  41. consumeMessageBatchMaxSize: 1
  42. orderly:
  43. consumerGroupName: zhangsan_consumer
  44. topicName: pay_success
  45. tag: tag1
  46. concurrently:
  47. consumerGroupName: zhangsan_consumer1
  48. topicName: pay_success1
  49. tag: tag2
  50. logging:
  51. level:
  52. root: debug
  1. @Getter
  2. @Setter
  3. @Slf4j
  4. @Configuration
  5. @ConfigurationProperties(prefix = "spring.rocket.consumer.base")
  6. public class BaseConsumerConfig {
  7. private String namesrvAddr;
  8. // 消费者线程数据量
  9. private Integer consumeThreadMin;
  10. private Integer consumeThreadMax;
  11. private Integer consumeMessageBatchMaxSize;
  12. }
  1. package com.zhangsan.boot.mq.rocketmq;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.boot.context.properties.ConfigurationProperties;
  6. import org.springframework.context.annotation.Configuration;
  7. @Getter
  8. @Setter
  9. @Slf4j
  10. @Configuration
  11. @ConfigurationProperties(prefix = "spring.rocket.consumer.orderly")
  12. public class OrderlyConsumerConfig {
  13. private String consumerGroupName;
  14. private String topicName;
  15. private String tag;
  16. }
  1. package com.zhangsan.boot.mq.rocketmq;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.boot.context.properties.ConfigurationProperties;
  6. import org.springframework.context.annotation.Configuration;
  7. @Getter
  8. @Setter
  9. @Slf4j
  10. @Configuration
  11. @ConfigurationProperties(prefix = "spring.rocket.consumer.concurrently")
  12. public class ConcurrentlyConsumerConfig {
  13. private String consumerGroupName;
  14. private String topicName;
  15. private String tag;
  16. }
  1. package com.zhangsan.boot.mq.rocketmq;
  2. import com.zhangsan.boot.listener.ConcurrentlyListener;
  3. import com.zhangsan.boot.listener.OrderlyListener;
  4. import lombok.Getter;
  5. import lombok.Setter;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  8. import org.apache.rocketmq.client.exception.MQClientException;
  9. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  10. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. @Getter
  15. @Setter
  16. @Slf4j
  17. @Configuration
  18. public class Consumer {
  19. @Autowired
  20. private BaseConsumerConfig baseConsumerConfig;
  21. @Autowired
  22. private OrderlyConsumerConfig orderlyConsumerConfig;
  23. @Autowired
  24. private ConcurrentlyConsumerConfig concurrentlyConsumerConfig;
  25. @Autowired
  26. private OrderlyListener orderlyListener;
  27. @Autowired
  28. private ConcurrentlyListener concurrentlyListener;
  29. @Bean(initMethod = "start", destroyMethod = "shutdown")
  30. public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
  31. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderlyConsumerConfig.getConsumerGroupName());
  32. consumer.setNamesrvAddr(baseConsumerConfig.getNamesrvAddr());
  33. consumer.setConsumeThreadMin(baseConsumerConfig.getConsumeThreadMin());
  34. consumer.setConsumeThreadMax(baseConsumerConfig.getConsumeThreadMax());
  35. consumer.setConsumeMessageBatchMaxSize(baseConsumerConfig.getConsumeMessageBatchMaxSize());
  36. // 设置消息监听
  37. consumer.registerMessageListener(orderlyListener);
  38. // 设置消费起始位置位置
  39. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  40. //设置消费模型,集群还是广播
  41. consumer.setMessageModel(MessageModel.CLUSTERING);
  42. consumer.subscribe(orderlyConsumerConfig.getTopicName(), orderlyConsumerConfig.getTag());
  43. return consumer;
  44. }
  45. @Bean(initMethod = "start", destroyMethod = "shutdown")
  46. public DefaultMQPushConsumer concurrentConsumer() throws MQClientException {
  47. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(concurrentlyConsumerConfig.getConsumerGroupName());
  48. consumer.setNamesrvAddr(baseConsumerConfig.getNamesrvAddr());
  49. consumer.setConsumeThreadMin(baseConsumerConfig.getConsumeThreadMin());
  50. consumer.setConsumeThreadMax(baseConsumerConfig.getConsumeThreadMax());
  51. consumer.setConsumeMessageBatchMaxSize(baseConsumerConfig.getConsumeMessageBatchMaxSize());
  52. // 设置消息监听
  53. consumer.registerMessageListener(concurrentlyListener);
  54. // 设置消费起始位置位置
  55. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  56. //设置消费模型,集群还是广播
  57. consumer.setMessageModel(MessageModel.CLUSTERING);
  58. consumer.subscribe(concurrentlyConsumerConfig.getTopicName(), concurrentlyConsumerConfig.getTag());
  59. return consumer;
  60. }
  61. }
  1. package com.zhangsan.boot.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.util.CollectionUtils;
  9. import java.util.List;
  10. @Slf4j
  11. @Component
  12. public class ConcurrentlyListener implements MessageListenerConcurrently {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  15. if (CollectionUtils.isEmpty(list)) {
  16. log.info("并发消费MQ接收消息为空,直接返回成功");
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. }
  19. MessageExt messageExt = list.get(0);
  20. try {
  21. String topic = messageExt.getTopic();
  22. String tags = messageExt.getTags();
  23. int queueId = messageExt.getQueueId();
  24. String body = new String(messageExt.getBody(), "utf-8");
  25. log.info("并发消费MQ消息topic={},queueId={}, tags={}, 消息内容={}", topic,queueId,tags,body);
  26. } catch (Exception e) {
  27. log.error("并发消费获取MQ消息内容异常{}",e);
  28. }
  29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  30. }
  31. }
  1. package com.zhangsan.boot.impl;
  2. import com.zhangsan.boot.mq.rocketmq.ConcurrentlyConsumerConfig;
  3. import com.zhangsan.boot.mq.rocketmq.OrderlyConsumerConfig;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
  8. import org.apache.rocketmq.common.message.Message;
  9. import org.apache.rocketmq.remoting.common.RemotingHelper;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Service;
  12. import java.util.UUID;
  13. @Service("mQService")
  14. @Slf4j
  15. public class MQServiceImpl {
  16. @Autowired
  17. private DefaultMQProducer defaultMQProducer;
  18. @Autowired
  19. private OrderlyConsumerConfig orderlyConsumerConfig;
  20. @Autowired
  21. private ConcurrentlyConsumerConfig concurrentlyConsumerConfig;
  22. public String sendMQMsg (){
  23. for (int i = 0; i < 50; i++) {
  24. //生成50个订单,这里订单号就用uuid来代替了
  25. String orderId = UUID.randomUUID().toString();
  26. // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
  27. for (int j = 1; j <= 4; j++) {
  28. try {
  29. Message msg =new Message(orderlyConsumerConfig.getTopicName(), orderlyConsumerConfig.getTag(), "KEY" + orderId,
  30. ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
  31. SendResult sendResult = defaultMQProducer.send(msg, new SelectMessageQueueByHash(), orderId);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. log.info("消息发送失败,orderId:{}",orderId);
  35. }
  36. }
  37. }
  38. return "success";
  39. }
  40. public String sendRandomMQMsg (){
  41. for (int i = 0; i < 50; i++) {
  42. //生成50个订单,这里订单号就用uuid来代替了
  43. String orderId = UUID.randomUUID().toString();
  44. // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
  45. for (int j = 1; j <= 4; j++) {
  46. try {
  47. Message msg =new Message(concurrentlyConsumerConfig.getTopicName(), concurrentlyConsumerConfig.getTag(), "KEY" + orderId,
  48. ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
  49. SendResult sendResult = defaultMQProducer.send(msg);
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. log.info("消息发送失败,orderId:{}",orderId);
  53. }
  54. }
  55. }
  56. return "success";
  57. }
  58. }
  1. @Autowired
  2. private MQServiceImpl mqService;
  3. @GetMapping("/mqOrderLyTest")
  4. public String mqOrderLyTest(){
  5. return mqService.sendMQMsg();
  6. }
  7. @GetMapping("/mqConcurrentTest")
  8. public String mqConcurrentTest(){
  9. return mqService.sendRandomMQMsg();
  10. }

可以看到同一订单在不同队列中
1650706999(1).png