1.1 idea导入RocketMQ源码
官网下载地址:https://github.com/apache/rocketmq/tree/master
1.2 启动namesrv模块
1):配置NamesrvStartup启动应用,增加Environment variables(环境变量)
ROCKETMQ_HOME=D:\rocketmq(可自定义)

2): 复制distribution/conf下的broker.conf,logback_broker.xml,logback_broker.xml到D:\rocketmq(可自定义)目录下
3): 修改broker.conf参数
brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0#nameServer 地址,分号分割namesrvAddr=127.0.0.1:9876deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSH#存储路径storePathRootDir=D:\\rocketmq\\store#commitLog存储路径storePathCommitLog=D:\\rocketmq\\store\\commitlog#消费队列存储地址storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue#消息索引存储路径storePathIndex=D:\\rocketmq\\store\\index#checkpoint文件存储路径storeCheckpoint=D:\\rocketmq\\store\\checkpoint#abort文件存储路径abortFile=D:\\rocketmq\\store\\abort
4): 替换logback_broker.xml,logback_broker.xml中user.home为ROCKETMQ_HOME
5): 执行NamesrvStartup.java,并成功启动
The Name Server boot success. serializeType=JSON
1.2 启动broker模块
启动方式同namesrv模块基本一致<br />1) 配置NamesrvStartup启动应用<br /><br />2): 增加Program arguments参数配置<br />-c D:\rocketmq\conf\broker.conf<br /> <br />3):启动成功<br />The broker[broker-a, 192.168.1.16:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876<br />
1.3 producer测试发送消息
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("producer_group_1");//设置nameServer服务地址producer.setNamesrvAddr("127.0.0.1:9876");//生产者服务启动producer.start();Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,"你好,RocketMQ,hello world!".getBytes() /* Message body */);SendResult sendResult = producer.send(msg);//打印发送结果System.out.printf("%s%n", sendResult);//生产者停止producer.shutdown();}}
1.4 producer测试消费消息
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//初始化消费组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_1");//设置nameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");//设置从哪开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//设置订阅的topicconsumer.subscribe("TopicTest", "*");//消费者消息消费监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("线程名:%s \t收到消息: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//消费者启动consumer.start();System.out.printf("消费者启动.%n");}}
消息消费成功,其中body(消息体)是字节数组,需要做转换。

