git地址: https://github.com/leezhang0525/boot
branch:1.1.3-rocketMQ
本文档持续更新,目标为多组件系统,常用组件集成完毕后,后续会添加分布式内容。
一、说明及准备
本文测试需要依赖rocketMQ的下载以及可视化页面,下载安装流程参考https://www.cnblogs.com/sexintercourse/p/15519752.html
前置准备
1、启动rocketMQ服务
依次启动 mqnamesrv.cmd 以及 mqbroker.cmd ,cmd窗口不要关闭
2、启动rocketmq-externals 
这里可以直接maven打包,运行jar 是一样的。
访问:http://127.0.0.1:8082/#/
二、添加依赖及修改yaml文件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><rocketmq.version>2.0.4</rocketmq.version>
server:port: 8081spring:datasource:master:url: jdbc:mysql://localhost:3306/pay?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceslave:url: jdbc:mysql://localhost:3306/payslave?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceredis:host: localhostport: 6379database: 0password: 123456timeout: 3Srocket:#生产者producer:namesrvAddr: localhost:9876producerGroupName: zhangsan_producer# 消息最大长度 默认 1024 * 4 (4M)maxMessageSize: 4096# 发送消息超时时间,默认 3000sendMsgTimeOut: 3000retryTimesWhenSendFailed: 2#消费者consumer:namesrvAddr: localhost:9876consumerGroupName: zhangsan_consumertopicName: pay_successtag: tag1# 消费者线程数据量consumeThreadMin: 5consumeThreadMax: 32# 设置一次消费消息的条数,默认1consumeMessageBatchMaxSize: 1logging:level:root: debug
三、添加生产者,消费者
package com.zhangsan.boot.mq.rocketmq;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Getter@Setter@Slf4j@Configuration@ConfigurationProperties(prefix = "spring.rocket.producer")public class Producer {private String producerGroupName;private String namesrvAddr;// 消息最大值private Integer maxMessageSize;// 消息发送超时时间private Integer sendMsgTimeOut;// 失败重试次数private Integer retryTimesWhenSendFailed;/*** 生产者* @return*/@Beanpublic DefaultMQProducer defaultProducer() {DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);producer.setNamesrvAddr(namesrvAddr);producer.setVipChannelEnabled(false);producer.setMaxMessageSize(maxMessageSize);producer.setSendMsgTimeout(sendMsgTimeOut);producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);return producer;}}
消费者
package com.zhangsan.boot.mq.rocketmq;import com.zhangsan.boot.listener.OrderlyListener;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Getter@Setter@Slf4j@Configuration@ConfigurationProperties(prefix = "spring.rocket.consumer")public class Consumer {private String namesrvAddr;private String consumerGroupName;private String topicName;private String tag;// 消费者线程数据量private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;@Autowiredprivate OrderlyListener orderlyListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public DefaultMQPushConsumer defaultConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);// 设置消息监听consumer.registerMessageListener(orderlyListener);// 设置消费起始位置位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置消费模型,集群还是广播consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe(topicName, tag);return consumer;}}
四、生产及消费分区顺序消息
分区顺序消息监听器
package com.zhangsan.boot.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.common.message.MessageExt;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.List;/*** 分区顺序消费*/@Component@Slf4jpublic class OrderlyListener implements MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {if (CollectionUtils.isEmpty(list)) {log.info("MQ接收消息为空,直接返回成功");return ConsumeOrderlyStatus.SUCCESS;}MessageExt messageExt = list.get(0);try {String topic = messageExt.getTopic();String tags = messageExt.getTags();int queueId = messageExt.getQueueId();String body = new String(messageExt.getBody(), "utf-8");log.info("MQ消息topic={},queueId={}, tags={}, 消息内容={}", topic,queueId,tags,body);} catch (Exception e) {log.error("获取MQ消息内容异常{}",e);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}// 处理业务逻辑return ConsumeOrderlyStatus.SUCCESS;}}
五、分区顺序消息生产
这里通过SelectMessageQueueByHash类实现orderId hash到同一queue中,可以看SelectMessageQueueByHash具体实现。
package com.zhangsan.boot.impl;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import java.util.UUID;@Service("mQService")@Slf4jpublic class MQServiceImpl {@Autowiredprivate DefaultMQProducer defaultMQProducer;@Value("${spring.rocket.consumer.topicName}")private String topic;@Value("${spring.rocket.consumer.tag}")private String tag;public String sendMQMsg (){for (int i = 0; i < 50; i++) {//生成50个订单,这里订单号就用uuid来代替了String orderId = UUID.randomUUID().toString();// 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来for (int j = 1; j <= 4; j++) {try {Message msg =new Message(topic, tag, "KEY" + orderId,("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = defaultMQProducer.send(msg, new SelectMessageQueueByHash(), orderId);} catch (Exception e) {e.printStackTrace();log.info("消息发送失败,orderId:{}",orderId);}}}return "success";}}
六、编写测试方法及验证
@Autowiredprivate MQServiceImpl mqService;@GetMapping("/mqOrderLyTest")public String mqOrderLyTest(){return mqService.sendMQMsg();}

可以看到统一订单的不同步骤都在一个queue中,并且同一订单id消费都是按步骤有序消费的,这样就实现了分区顺序消息功能
七、生产及消费随机消息
这里通过不同的groupName,topic,tag来消费,所以需要改造原来文件
server:port: 8081spring:datasource:master:url: jdbc:mysql://localhost:3306/pay?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceslave:url: jdbc:mysql://localhost:3306/payslave?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useLocalSessionState=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceredis:host: localhostport: 6379database: 0password: 123456timeout: 3Srocket:#生产者producer:namesrvAddr: localhost:9876producerGroupName: zhangsan_producer# 消息最大长度 默认 1024 * 4 (4M)maxMessageSize: 4096# 发送消息超时时间,默认 3000sendMsgTimeOut: 3000retryTimesWhenSendFailed: 2#消费者consumer:base:namesrvAddr: localhost:9876# 消费者线程数据量consumeThreadMin: 5consumeThreadMax: 32# 设置一次消费消息的条数,默认1consumeMessageBatchMaxSize: 1orderly:consumerGroupName: zhangsan_consumertopicName: pay_successtag: tag1concurrently:consumerGroupName: zhangsan_consumer1topicName: pay_success1tag: tag2logging:level:root: debug
@Getter@Setter@Slf4j@Configuration@ConfigurationProperties(prefix = "spring.rocket.consumer.base")public class BaseConsumerConfig {private String namesrvAddr;// 消费者线程数据量private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;}
package com.zhangsan.boot.mq.rocketmq;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@Getter@Setter@Slf4j@Configuration@ConfigurationProperties(prefix = "spring.rocket.consumer.orderly")public class OrderlyConsumerConfig {private String consumerGroupName;private String topicName;private String tag;}
package com.zhangsan.boot.mq.rocketmq;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@Getter@Setter@Slf4j@Configuration@ConfigurationProperties(prefix = "spring.rocket.consumer.concurrently")public class ConcurrentlyConsumerConfig {private String consumerGroupName;private String topicName;private String tag;}
package com.zhangsan.boot.mq.rocketmq;import com.zhangsan.boot.listener.ConcurrentlyListener;import com.zhangsan.boot.listener.OrderlyListener;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Getter@Setter@Slf4j@Configurationpublic class Consumer {@Autowiredprivate BaseConsumerConfig baseConsumerConfig;@Autowiredprivate OrderlyConsumerConfig orderlyConsumerConfig;@Autowiredprivate ConcurrentlyConsumerConfig concurrentlyConsumerConfig;@Autowiredprivate OrderlyListener orderlyListener;@Autowiredprivate ConcurrentlyListener concurrentlyListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public DefaultMQPushConsumer defaultConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderlyConsumerConfig.getConsumerGroupName());consumer.setNamesrvAddr(baseConsumerConfig.getNamesrvAddr());consumer.setConsumeThreadMin(baseConsumerConfig.getConsumeThreadMin());consumer.setConsumeThreadMax(baseConsumerConfig.getConsumeThreadMax());consumer.setConsumeMessageBatchMaxSize(baseConsumerConfig.getConsumeMessageBatchMaxSize());// 设置消息监听consumer.registerMessageListener(orderlyListener);// 设置消费起始位置位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置消费模型,集群还是广播consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe(orderlyConsumerConfig.getTopicName(), orderlyConsumerConfig.getTag());return consumer;}@Bean(initMethod = "start", destroyMethod = "shutdown")public DefaultMQPushConsumer concurrentConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(concurrentlyConsumerConfig.getConsumerGroupName());consumer.setNamesrvAddr(baseConsumerConfig.getNamesrvAddr());consumer.setConsumeThreadMin(baseConsumerConfig.getConsumeThreadMin());consumer.setConsumeThreadMax(baseConsumerConfig.getConsumeThreadMax());consumer.setConsumeMessageBatchMaxSize(baseConsumerConfig.getConsumeMessageBatchMaxSize());// 设置消息监听consumer.registerMessageListener(concurrentlyListener);// 设置消费起始位置位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置消费模型,集群还是广播consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe(concurrentlyConsumerConfig.getTopicName(), concurrentlyConsumerConfig.getTag());return consumer;}}
package com.zhangsan.boot.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.List;@Slf4j@Componentpublic class ConcurrentlyListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isEmpty(list)) {log.info("并发消费MQ接收消息为空,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);try {String topic = messageExt.getTopic();String tags = messageExt.getTags();int queueId = messageExt.getQueueId();String body = new String(messageExt.getBody(), "utf-8");log.info("并发消费MQ消息topic={},queueId={}, tags={}, 消息内容={}", topic,queueId,tags,body);} catch (Exception e) {log.error("并发消费获取MQ消息内容异常{}",e);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}
package com.zhangsan.boot.impl;import com.zhangsan.boot.mq.rocketmq.ConcurrentlyConsumerConfig;import com.zhangsan.boot.mq.rocketmq.OrderlyConsumerConfig;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.UUID;@Service("mQService")@Slf4jpublic class MQServiceImpl {@Autowiredprivate DefaultMQProducer defaultMQProducer;@Autowiredprivate OrderlyConsumerConfig orderlyConsumerConfig;@Autowiredprivate ConcurrentlyConsumerConfig concurrentlyConsumerConfig;public String sendMQMsg (){for (int i = 0; i < 50; i++) {//生成50个订单,这里订单号就用uuid来代替了String orderId = UUID.randomUUID().toString();// 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来for (int j = 1; j <= 4; j++) {try {Message msg =new Message(orderlyConsumerConfig.getTopicName(), orderlyConsumerConfig.getTag(), "KEY" + orderId,("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = defaultMQProducer.send(msg, new SelectMessageQueueByHash(), orderId);} catch (Exception e) {e.printStackTrace();log.info("消息发送失败,orderId:{}",orderId);}}}return "success";}public String sendRandomMQMsg (){for (int i = 0; i < 50; i++) {//生成50个订单,这里订单号就用uuid来代替了String orderId = UUID.randomUUID().toString();// 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来for (int j = 1; j <= 4; j++) {try {Message msg =new Message(concurrentlyConsumerConfig.getTopicName(), concurrentlyConsumerConfig.getTag(), "KEY" + orderId,("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = defaultMQProducer.send(msg);} catch (Exception e) {e.printStackTrace();log.info("消息发送失败,orderId:{}",orderId);}}}return "success";}}
@Autowiredprivate MQServiceImpl mqService;@GetMapping("/mqOrderLyTest")public String mqOrderLyTest(){return mqService.sendMQMsg();}@GetMapping("/mqConcurrentTest")public String mqConcurrentTest(){return mqService.sendRandomMQMsg();}
可以看到同一订单在不同队列中
